diff --git a/Lucien/Rust/src/client.rs b/Lucien/Rust/src/client.rs index 1c5c328..207865a 100644 --- a/Lucien/Rust/src/client.rs +++ b/Lucien/Rust/src/client.rs @@ -1,10 +1,19 @@ -extern crate bufstream; +use std::io::*; use std::net::TcpStream; -use std::io::{stdin, stdout, Read, Write}; -// use std::sync::mpsc; -// use std::sync::mpsc::{Receiver, Sender}; -use std::thread::spawn; -// use self::bufstream::BufStream; +use std::thread; + +static leave_msg: &str = "BYE"; + +// macro_rules! send { +// ($line:expr) => ({ +// try!(writeln!(writer, "{}", $line)); +// try!(writer.flush()); +// }) +// } + +fn send(writer: BufWriter<&TcpStream>,text: &str) { + +} fn get_entry() -> String { let mut buf = String::new(); @@ -13,81 +22,69 @@ fn get_entry() -> String { buf.replace("\n", "").replace("\r", "") } -fn read_from_server( - mut stream: TcpStream, -) { - let buff = &mut [0; 1024]; - let stdout = stdout(); - let mut io = stdout.lock(); +fn write_to_server(stream: TcpStream) { + + let mut writer = BufWriter::new(&stream); + loop { - match stream.read(buff) { - Ok(received) => { - if received < 1 { - // println!("Perte de connexion avec le serveur"); - write!(io, "Perte de connexion avec le serveur\n").unwrap(); - io.flush().unwrap(); - return; - } + match &*get_entry() { + "/quit" => { + println!("Disconnecting..."); + + println!("Disconnected!"); + return (); } - Err(_) => { - // println!("Perte de connexion avec le serveur"); - write!(io, "Perte de connexion avec le serveur\n").unwrap(); - io.flush().unwrap(); - return; + line => { + send(BufWriter::new(&stream), line); } } - let reponse = String::from_utf8(buff.to_vec()).unwrap(); - write!(io, "{}", reponse).unwrap(); - io.flush().unwrap(); - // println!("From server: {}", reponse); } } -fn exchange_with_server( - mut stream: TcpStream -) { - let stdout = stdout(); - let mut io = stdout.lock(); - let _buff = &mut [0; 1024]; +fn exchange_with_server(stream: TcpStream) { + let server = stream.peer_addr().unwrap(); + println!("Connected to {}", server); + // Buffered reading and writing + let mut reader = BufReader::new(&stream); + let mut writer = BufWriter::new(&stream); - let stream_cpy = stream.try_clone().unwrap(); - spawn(move || { - // let stream_cpy = stream.try_clone().unwrap(); - read_from_server(stream_cpy); + println!("Enter `/quit` when you want to leave"); + + macro_rules! receive { + () => ({ + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(len) => { + if len == 0 { + // Reader is at EOF. + return Err(Error::new(ErrorKind::Other, "unexpected EOF")); + } + line.pop(); + } + Err(e) => { + return Err(e); + } + }; + line + }) + } + + thread::spawn(move || { + write_to_server(stream.try_clone().unwrap()); }); - println!("Enter `quit` or `exit` when you want to leave"); + match(|| { + loop { + let input = receive!(); + println!("{}", input); + } - loop { - write!(io, "> ").unwrap(); - io.flush().unwrap(); - match &*get_entry() { - "quit" => { - println!("bye!"); - return; - } - "exit" => { - println!("bye!"); - return; - } - line => { - write!(stream, "{}\n", line).unwrap(); - // match stream.read(buff) { - // Ok(received) => { - // if received < 1 { - // println!("Perte de la connexion avec le serveur"); - // return; - // } - // } - // Err(_) => { - // println!("Perte de la connexion avec le serveur"); - // return; - // } - // } - // println!("Réponse du serveur : {}", buf); - // let reponse = String::from_utf8(buf.to_vec()).unwrap(); - // println!("Réponse du serveur : {}", reponse); - } + })() { + Ok(_) => { + println!("Left?"); + } + Err(e) => { + println!("Disappeared? {}", e); } } } diff --git a/Lucien/Rust/src/server.rs b/Lucien/Rust/src/server.rs index 7e358f4..175d294 100644 --- a/Lucien/Rust/src/server.rs +++ b/Lucien/Rust/src/server.rs @@ -1,92 +1,169 @@ -extern crate bufstream; -use std::io::{BufRead, Write}; +use std::io::*; use std::net::{SocketAddr, TcpListener, TcpStream}; -use std::str::FromStr; -use std::sync::{mpsc, Arc, RwLock}; -use std::sync::mpsc::{Receiver, Sender}; -use std::thread::spawn; -use self::bufstream::BufStream; +use std::thread; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::collections::HashMap; -fn handle_connection( - stream: &mut BufStream, - chan: Sender, - arc: Arc>>, -) { - stream.write(b"Welcome this server!\n").unwrap(); - stream - .write(b"Please input your username (max. 20chars): ") - .unwrap(); - stream.flush().unwrap(); +// Map for all connected clients containing their name and stream +type UserMapValue = (String, TcpStream); +type UserMap = HashMap; - let mut name = String::new(); - stream.read_line(&mut name).unwrap(); - let name = name.trim_right(); - stream - .write_fmt(format_args!("Hello, {}!\n", name)) - .unwrap(); - stream.flush().unwrap(); - - let mut pos = 0; - loop { - { - let lines = arc.read().unwrap(); - for i in pos..lines.len() { - stream.write_fmt(format_args!("{}", lines[i])).unwrap(); - pos = lines.len(); +fn distribute_message(msg: &str, not_to: &SocketAddr, lock: &mut MutexGuard) { + for (other_client, entry) in (*lock).iter() { + if other_client != not_to { + let other_name = &entry.0; + let other_stream = &entry.1; + match (|| -> Result<()> { + let mut writer = BufWriter::new(other_stream); + try!(writeln!(writer, "{}", msg)); + try!(writer.flush()); + return Ok(()); + })() + { + Ok(_) => {} + Err(e) => { + println!( + "Client {} <{}> disappeared during message distribution: {}", + other_client, other_name, e + ); + } } } - stream.write(b" > ").unwrap(); - stream.flush().unwrap(); + } +} - let mut reads = String::new(); - stream.read_line(&mut reads).unwrap(); - if reads.trim().len() != 0 { - chan.send(format!("[{}] said: {}", name, reads)).unwrap(); +fn disconnect_user(name: &str, client: &SocketAddr, lock: &mut MutexGuard) { + (*lock).remove(&client); + distribute_message(&format!("{} left", name), client, lock); +} + +fn handle_client(stream: TcpStream, clients: Arc>) { + // Get client IP and port + let client = stream.peer_addr().unwrap(); + println!("New connection from {}", client); + + // Buffered reading and writing + let mut reader = BufReader::new(&stream); + let mut writer = BufWriter::new(&stream); + + // Write an entire line to the client + // Can fail on IO errors, du to try! macro + macro_rules! send { + ($line:expr) => ({ + try!(writeln!(writer, "{}", $line)); + try!(writer.flush()); + }) + } + + // Read an entire line from the client + // Can fail on IO errors or when EOF is reached + macro_rules! receive { + () => ({ + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(len) => { + if len == 0 { + // Reader is at EOF. + return Err(Error::new(ErrorKind::Other, "unexpected EOF")); + } + line.pop(); + } + Err(e) => { + return Err(e); + } + }; + line + }) + } + + // Initialization: Ask user for his name + let name = match (|| { + send!("Welcome!"); + send!("Please enter your name:"); + let name = receive!(); + println!("Client {} identified as {}", client, name); + send!("DEBUG: You can now type messages. Leave this chat with the request `BYE`."); + Ok(name) + })() + { + Ok(name) => name, + Err(e) => { + println!("Client {} disappeared during initialization: {}", client, e); + return (); } + }; + + // Add user to global map. Lock will be released at the end of the scope + { + let mut lock = clients.lock().unwrap(); + (*lock).insert(client, (name.clone(), stream.try_clone().unwrap())); + distribute_message(&format!("{} joined", name), &client, &mut lock); + } + + // Chat loop: Receive messages from users + match (|| { + loop { + let input = receive!(); + if input == "BYE" { + send!("Bye!"); + return Ok(()); + } + + // Distribute message + println!("{} <{}>: {}", client, name, input); + { + let mut lock = clients.lock().unwrap(); + distribute_message(&format!("<{}>: {}", name, input), &client, &mut lock); + } + } + })() + { + Ok(_) => { + println!("Client {} <{}> left", client, name); + } + Err(e) => { + println!( + "Client {} <{}> disappeared during chat: {}", + client, name, e + ); + } + } + + // Remove user from global map + { + let mut lock = clients.lock().unwrap(); + disconnect_user(&name, &client, &mut lock); } } pub fn serveur(addr: String) { - // Ouverture de la connexion sur socket - let addr = SocketAddr::from_str(&addr).unwrap(); - // Ajout d’un listener Tcp sur le socket - let listener = TcpListener::bind(addr).unwrap(); + // Manage UserMap in a mutex + let clients = Arc::new(Mutex::new(HashMap::new())); + let serv_addr = addr.clone(); - // création des receveurs et envoyeurs de strings asynchrones - let (sender, receiver): (Sender, Receiver) = mpsc::channel(); - let arc: Arc>> = Arc::new(RwLock::new(Vec::new())); - let arc_w = arc.clone(); + // Start a TCP Listener + let listener = match TcpListener::bind(serv_addr.as_str()) { + Ok(listener) => listener, + Err(e) => panic!("Could not read start TCP listener: {}", e), + }; - // boucle infinie en parallèle pour recevoir des messages - spawn(move || { - loop { - // lit le message depuis le receveur - let msg = receiver.recv().unwrap(); - print!("DEBUG: message {}", msg); - { - let mut arc_w = arc_w.write().unwrap(); - arc_w.push(msg); - } - } - }); + println!("Successfully started the server on {}", serv_addr); - // Réception des clients for stream in listener.incoming() { match stream { - Err(e) => println!("Erreur écoute : {}", e), - Ok(mut stream) => { - println!( - "Nouvelle connexion de {} vers {}", - stream.peer_addr().unwrap(), - stream.local_addr().unwrap() - ); - let sender = sender.clone(); - let arc = arc.clone(); - spawn(move || { - let mut stream = BufStream::new(stream); - handle_connection(&mut stream, sender, arc); + Ok(stream) => { + let clients = clients.clone(); + thread::spawn(move || { + //connection succeeded + handle_client(stream, clients) }); } + Err(e) => { + writeln!(stderr(), "Connection failed: {}", e).unwrap(); + } } } + + // close the socket server + drop(listener); }