server and client rewrite, issues with receive macro

This commit is contained in:
Phuntsok Drak-pa 2018-03-18 21:07:20 +01:00
parent ce45fb60ad
commit 288764df1e
2 changed files with 215 additions and 141 deletions

View File

@ -1,10 +1,19 @@
extern crate bufstream; use std::io::*;
use std::net::TcpStream; use std::net::TcpStream;
use std::io::{stdin, stdout, Read, Write}; use std::thread;
// use std::sync::mpsc;
// use std::sync::mpsc::{Receiver, Sender}; static leave_msg: &str = "BYE";
use std::thread::spawn;
// use self::bufstream::BufStream; // macro_rules! send {
// ($line:expr) => ({
// try!(writeln!(writer, "{}", $line));
// try!(writer.flush());
// })
// }
fn send(writer: BufWriter<&TcpStream>,text: &str) {
}
fn get_entry() -> String { fn get_entry() -> String {
let mut buf = String::new(); let mut buf = String::new();
@ -13,81 +22,69 @@ fn get_entry() -> String {
buf.replace("\n", "").replace("\r", "") buf.replace("\n", "").replace("\r", "")
} }
fn read_from_server( fn write_to_server(stream: TcpStream) {
mut stream: TcpStream,
) { let mut writer = BufWriter::new(&stream);
let buff = &mut [0; 1024];
let stdout = stdout();
let mut io = stdout.lock();
loop { loop {
match stream.read(buff) { match &*get_entry() {
Ok(received) => { "/quit" => {
if received < 1 { println!("Disconnecting...");
// println!("Perte de connexion avec le serveur");
write!(io, "Perte de connexion avec le serveur\n").unwrap(); println!("Disconnected!");
io.flush().unwrap(); return ();
return; }
line => {
send(BufWriter::new(&stream), line);
} }
} }
Err(_) => {
// println!("Perte de connexion avec le serveur");
write!(io, "Perte de connexion avec le serveur\n").unwrap();
io.flush().unwrap();
return;
}
}
let reponse = String::from_utf8(buff.to_vec()).unwrap();
write!(io, "{}", reponse).unwrap();
io.flush().unwrap();
// println!("From server: {}", reponse);
} }
} }
fn exchange_with_server( fn exchange_with_server(stream: TcpStream) {
mut stream: TcpStream let server = stream.peer_addr().unwrap();
) { println!("Connected to {}", server);
let stdout = stdout(); // Buffered reading and writing
let mut io = stdout.lock(); let mut reader = BufReader::new(&stream);
let _buff = &mut [0; 1024]; let mut writer = BufWriter::new(&stream);
let stream_cpy = stream.try_clone().unwrap(); println!("Enter `/quit` when you want to leave");
spawn(move || {
// let stream_cpy = stream.try_clone().unwrap(); macro_rules! receive {
read_from_server(stream_cpy); () => ({
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(len) => {
if len == 0 {
// Reader is at EOF.
return Err(Error::new(ErrorKind::Other, "unexpected EOF"));
}
line.pop();
}
Err(e) => {
return Err(e);
}
};
line
})
}
thread::spawn(move || {
write_to_server(stream.try_clone().unwrap());
}); });
println!("Enter `quit` or `exit` when you want to leave"); match(|| {
loop { loop {
write!(io, "> ").unwrap(); let input = receive!();
io.flush().unwrap(); println!("{}", input);
match &*get_entry() {
"quit" => {
println!("bye!");
return;
} }
"exit" => {
println!("bye!"); })() {
return; Ok(_) => {
} println!("Left?");
line => {
write!(stream, "{}\n", line).unwrap();
// match stream.read(buff) {
// Ok(received) => {
// if received < 1 {
// println!("Perte de la connexion avec le serveur");
// return;
// }
// }
// Err(_) => {
// println!("Perte de la connexion avec le serveur");
// return;
// }
// }
// println!("Réponse du serveur : {}", buf);
// let reponse = String::from_utf8(buf.to_vec()).unwrap();
// println!("Réponse du serveur : {}", reponse);
} }
Err(e) => {
println!("Disappeared? {}", e);
} }
} }
} }

View File

@ -1,92 +1,169 @@
extern crate bufstream; use std::io::*;
use std::io::{BufRead, Write};
use std::net::{SocketAddr, TcpListener, TcpStream}; use std::net::{SocketAddr, TcpListener, TcpStream};
use std::str::FromStr; use std::thread;
use std::sync::{mpsc, Arc, RwLock}; use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::mpsc::{Receiver, Sender}; use std::collections::HashMap;
use std::thread::spawn;
use self::bufstream::BufStream;
fn handle_connection( // Map for all connected clients containing their name and stream
stream: &mut BufStream<TcpStream>, type UserMapValue = (String, TcpStream);
chan: Sender<String>, type UserMap = HashMap<SocketAddr, UserMapValue>;
arc: Arc<RwLock<Vec<String>>>,
) {
stream.write(b"Welcome this server!\n").unwrap();
stream
.write(b"Please input your username (max. 20chars): ")
.unwrap();
stream.flush().unwrap();
let mut name = String::new(); fn distribute_message(msg: &str, not_to: &SocketAddr, lock: &mut MutexGuard<UserMap>) {
stream.read_line(&mut name).unwrap(); for (other_client, entry) in (*lock).iter() {
let name = name.trim_right(); if other_client != not_to {
stream let other_name = &entry.0;
.write_fmt(format_args!("Hello, {}!\n", name)) let other_stream = &entry.1;
.unwrap(); match (|| -> Result<()> {
stream.flush().unwrap(); let mut writer = BufWriter::new(other_stream);
try!(writeln!(writer, "{}", msg));
let mut pos = 0; try!(writer.flush());
loop { return Ok(());
})()
{ {
let lines = arc.read().unwrap(); Ok(_) => {}
for i in pos..lines.len() { Err(e) => {
stream.write_fmt(format_args!("{}", lines[i])).unwrap(); println!(
pos = lines.len(); "Client {} <{}> disappeared during message distribution: {}",
other_client, other_name, e
);
} }
} }
stream.write(b" > ").unwrap(); }
stream.flush().unwrap(); }
}
let mut reads = String::new(); fn disconnect_user(name: &str, client: &SocketAddr, lock: &mut MutexGuard<UserMap>) {
stream.read_line(&mut reads).unwrap(); (*lock).remove(&client);
if reads.trim().len() != 0 { distribute_message(&format!("{} left", name), client, lock);
chan.send(format!("[{}] said: {}", name, reads)).unwrap(); }
fn handle_client(stream: TcpStream, clients: Arc<Mutex<UserMap>>) {
// Get client IP and port
let client = stream.peer_addr().unwrap();
println!("New connection from {}", client);
// Buffered reading and writing
let mut reader = BufReader::new(&stream);
let mut writer = BufWriter::new(&stream);
// Write an entire line to the client
// Can fail on IO errors, du to try! macro
macro_rules! send {
($line:expr) => ({
try!(writeln!(writer, "{}", $line));
try!(writer.flush());
})
} }
// Read an entire line from the client
// Can fail on IO errors or when EOF is reached
macro_rules! receive {
() => ({
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(len) => {
if len == 0 {
// Reader is at EOF.
return Err(Error::new(ErrorKind::Other, "unexpected EOF"));
}
line.pop();
}
Err(e) => {
return Err(e);
}
};
line
})
}
// Initialization: Ask user for his name
let name = match (|| {
send!("Welcome!");
send!("Please enter your name:");
let name = receive!();
println!("Client {} identified as {}", client, name);
send!("DEBUG: You can now type messages. Leave this chat with the request `BYE`.");
Ok(name)
})()
{
Ok(name) => name,
Err(e) => {
println!("Client {} disappeared during initialization: {}", client, e);
return ();
}
};
// Add user to global map. Lock will be released at the end of the scope
{
let mut lock = clients.lock().unwrap();
(*lock).insert(client, (name.clone(), stream.try_clone().unwrap()));
distribute_message(&format!("{} joined", name), &client, &mut lock);
}
// Chat loop: Receive messages from users
match (|| {
loop {
let input = receive!();
if input == "BYE" {
send!("Bye!");
return Ok(());
}
// Distribute message
println!("{} <{}>: {}", client, name, input);
{
let mut lock = clients.lock().unwrap();
distribute_message(&format!("<{}>: {}", name, input), &client, &mut lock);
}
}
})()
{
Ok(_) => {
println!("Client {} <{}> left", client, name);
}
Err(e) => {
println!(
"Client {} <{}> disappeared during chat: {}",
client, name, e
);
}
}
// Remove user from global map
{
let mut lock = clients.lock().unwrap();
disconnect_user(&name, &client, &mut lock);
} }
} }
pub fn serveur(addr: String) { pub fn serveur(addr: String) {
// Ouverture de la connexion sur socket // Manage UserMap in a mutex
let addr = SocketAddr::from_str(&addr).unwrap(); let clients = Arc::new(Mutex::new(HashMap::new()));
// Ajout dun listener Tcp sur le socket let serv_addr = addr.clone();
let listener = TcpListener::bind(addr).unwrap();
// création des receveurs et envoyeurs de strings asynchrones // Start a TCP Listener
let (sender, receiver): (Sender<String>, Receiver<String>) = mpsc::channel(); let listener = match TcpListener::bind(serv_addr.as_str()) {
let arc: Arc<RwLock<Vec<String>>> = Arc::new(RwLock::new(Vec::new())); Ok(listener) => listener,
let arc_w = arc.clone(); Err(e) => panic!("Could not read start TCP listener: {}", e),
};
// boucle infinie en parallèle pour recevoir des messages println!("Successfully started the server on {}", serv_addr);
spawn(move || {
loop {
// lit le message depuis le receveur
let msg = receiver.recv().unwrap();
print!("DEBUG: message {}", msg);
{
let mut arc_w = arc_w.write().unwrap();
arc_w.push(msg);
}
}
});
// Réception des clients
for stream in listener.incoming() { for stream in listener.incoming() {
match stream { match stream {
Err(e) => println!("Erreur écoute : {}", e), Ok(stream) => {
Ok(mut stream) => { let clients = clients.clone();
println!( thread::spawn(move || {
"Nouvelle connexion de {} vers {}", //connection succeeded
stream.peer_addr().unwrap(), handle_client(stream, clients)
stream.local_addr().unwrap()
);
let sender = sender.clone();
let arc = arc.clone();
spawn(move || {
let mut stream = BufStream::new(stream);
handle_connection(&mut stream, sender, arc);
}); });
} }
Err(e) => {
writeln!(stderr(), "Connection failed: {}", e).unwrap();
} }
} }
}
// close the socket server
drop(listener);
} }