From cc7cf83b75bed29f164acc8605c3ff4214e17936 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Thu, 3 Apr 2025 00:24:26 +0100 Subject: [PATCH 01/15] feat(group): implement message handling for subscribers in Group --- src/bin/server/group.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/bin/server/group.rs b/src/bin/server/group.rs index 786bbec..9fed660 100644 --- a/src/bin/server/group.rs +++ b/src/bin/server/group.rs @@ -1,11 +1,14 @@ use crate::connection::Outbound; +use async_chat::FromServer; use async_std::task; use std::sync::Arc; use tokio::sync::broadcast; + pub struct Group { name: Arc, sender: broadcast::Sender>, } + impl Group { pub fn new(name: Arc) -> Group { let (sender, _receiver) = broadcast::channel(1000); @@ -25,5 +28,16 @@ async fn handle_subscriber( mut receiver: broadcast::Receiver>, outbound: Arc, ) { - todo!() + while let Ok(message) = receiver.recv().await { + let response = FromServer::Message { + group_name: group_name.clone(), + message: message.clone(), + }; + + if let Err(e) = outbound.send(response).await { + eprintln!("Error sending message to client: {}", e); + // If we can't send to this client, we should probably remove them from the group + break; + } + } } From 84e3b85905859965b6fb797bd7d3917d9402c635 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Thu, 3 Apr 2025 00:31:28 +0100 Subject: [PATCH 02/15] Feat(user): integrate user management into server connection handling --- src/bin/server/main.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/bin/server/main.rs b/src/bin/server/main.rs index 304432a..d4c0836 100644 --- a/src/bin/server/main.rs +++ b/src/bin/server/main.rs @@ -1,6 +1,7 @@ pub mod connection; pub mod group_table; pub mod group; +pub mod user; use connection::serve; @@ -16,14 +17,17 @@ fn main() -> anyhow::Result<()> { ADDRESS", ); let chat_group_table = Arc::new(group_table::GroupTable::new()); + let user_manager = Arc::new(user::UserManager::new()); + async_std::task::block_on(async { let listener = TcpListener::bind(address).await?; let mut new_connections = listener.incoming(); while let Some(socket_result) = new_connections.next().await { let socket = socket_result?; let groups = chat_group_table.clone(); + let users = user_manager.clone(); task::spawn(async { - log_error(serve(socket, groups).await); + log_error(serve(socket, groups, users).await); }); } Ok(()) From 80a00b7612ef117102325ed2be1f972e2d36275e Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Thu, 3 Apr 2025 00:32:01 +0100 Subject: [PATCH 03/15] Feat(user): implement UserManager for user registration, login, and authentication --- src/bin/server/user.rs | 62 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 src/bin/server/user.rs diff --git a/src/bin/server/user.rs b/src/bin/server/user.rs new file mode 100644 index 0000000..3b5ea83 --- /dev/null +++ b/src/bin/server/user.rs @@ -0,0 +1,62 @@ +use async_chat::User; +use async_std::sync::Mutex; +use std::collections::HashMap; +use std::sync::Arc; +use uuid::Uuid; + +pub struct UserManager { + users: Mutex, User>>, + active_users: Mutex, Arc>>, // username -> session_id +} + +impl UserManager { + pub fn new() -> Self { + UserManager { + users: Mutex::new(HashMap::new()), + active_users: Mutex::new(HashMap::new()), + } + } + + pub async fn register(&self, username: Arc) -> anyhow::Result { + let mut users = self.users.lock().await; + if users.contains_key(&username) { + return Err(anyhow::anyhow!("Username already exists")); + } + + let user = User { + username: username.clone(), + id: Arc::new(Uuid::new_v4().to_string()), + }; + + users.insert(username, user.clone()); + Ok(user) + } + + pub async fn login(&self, username: Arc) -> anyhow::Result { + let users = self.users.lock().await; + let user = users.get(&username) + .cloned() + .ok_or_else(|| anyhow::anyhow!("User not found"))?; + + let mut active_users = self.active_users.lock().await; + active_users.insert(username, user.id.clone()); + + Ok(user) + } + + pub async fn logout(&self, username: Arc) -> anyhow::Result<()> { + let mut active_users = self.active_users.lock().await; + active_users.remove(&username); + Ok(()) + } + + pub async fn get_user(&self, username: &Arc) -> Option { + let users = self.users.lock().await; + users.get(username).cloned() + } + + pub async fn is_authenticated(&self, username: &Arc) -> bool { + let active_users = self.active_users.lock().await; + active_users.contains_key(username) + } +} \ No newline at end of file From 0d9249099c9e6c4188b65f5fe707358183e621ef Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Thu, 3 Apr 2025 00:32:16 +0100 Subject: [PATCH 04/15] Feat(user): extend user functionality with registration, login, and authentication responses --- src/lib.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 034bb18..b5aee4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,11 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; pub mod utils; - +#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] +pub struct User { + pub username: Arc, + pub id: Arc, +} #[derive(Debug, Deserialize, Serialize, PartialEq)] pub enum FromClient { @@ -14,15 +18,29 @@ pub enum FromClient { group_name: Arc, message: Arc, }, + Register { + username: Arc, + password: Arc, + }, + Login { + username: Arc, + password: Arc, + }, + Logout, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum FromServer { Message { group_name: Arc, message: Arc, + sender: User, }, Error(String), + AuthSuccess { + user: User, + }, + AuthError(String), } #[cfg(test)] From 5ca1ffb3b33448a82ec64862de550b6eecc411a9 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Thu, 3 Apr 2025 00:32:32 +0100 Subject: [PATCH 05/15] Feat(connection): enhance connection handling with user authentication and session management --- src/bin/server/connection.rs | 104 ++++++++++++++++++++++++++++------- 1 file changed, 84 insertions(+), 20 deletions(-) diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index 96deb66..f4a14a5 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -1,6 +1,7 @@ use crate::group_table::GroupTable; +use crate::user::UserManager; use async_chat::utils::{self}; -use async_chat::{FromClient, FromServer}; +use async_chat::{FromClient, FromServer, User}; use async_std::io::BufReader; use async_std::net::TcpStream; use async_std::prelude::*; @@ -8,6 +9,7 @@ use async_std::sync::Mutex; use async_std::sync::Arc; pub struct Outbound(Mutex); + impl Outbound { pub fn new(to_client: TcpStream) -> Outbound { Outbound(Mutex::new(to_client)) @@ -20,36 +22,98 @@ impl Outbound { } } -pub async fn serve(socket: TcpStream, groups: Arc) -> anyhow::Result<()> { - // wrapping our connection in outbound so as to have exclusive access to it in the groups and avoid interference +pub struct Connection { + outbound: Arc, + user: Option, +} + +impl Connection { + pub fn new(outbound: Arc) -> Self { + Connection { + outbound, + user: None, + } + } + + pub fn set_user(&mut self, user: User) { + self.user = Some(user); + } + + pub fn get_user(&self) -> Option<&User> { + self.user.as_ref() + } + + pub fn clear_user(&mut self) { + self.user = None; + } +} + +pub async fn serve( + socket: TcpStream, + groups: Arc, + user_manager: Arc, +) -> anyhow::Result<()> { let outbound = Arc::new(Outbound::new(socket.clone())); + let mut connection = Connection::new(outbound.clone()); let buffered = BufReader::new(socket); - // receive data from clients let mut from_client = utils::receive_as_json(buffered); + while let Some(request_result) = from_client.next().await { let request = request_result?; let result = match request { - FromClient::Join { group_name } => { - let group = groups.get_or_create(group_name); - group.join(outbound.clone()); + FromClient::Register { username, password: _ } => { + match user_manager.register(username.clone()).await { + Ok(user) => { + connection.set_user(user.clone()); + outbound.send(FromServer::AuthSuccess { user }).await + } + Err(e) => outbound.send(FromServer::AuthError(e.to_string())).await, + } + } + FromClient::Login { username, password: _ } => { + match user_manager.login(username.clone()).await { + Ok(user) => { + connection.set_user(user.clone()); + outbound.send(FromServer::AuthSuccess { user }).await + } + Err(e) => outbound.send(FromServer::AuthError(e.to_string())).await, + } + } + FromClient::Logout => { + if let Some(user) = connection.get_user() { + if let Err(e) = user_manager.logout(user.username.clone()).await { + return Err(anyhow::anyhow!("Logout error: {}", e)); + } + } + connection.clear_user(); Ok(()) } - FromClient::Post { - group_name, - message, - } => match groups.get(&group_name) { - Some(group) => { - group.post(message); + FromClient::Join { group_name } => { + if connection.get_user().is_none() { + outbound.send(FromServer::Error("Not authenticated".to_string())).await + } else { + let group = groups.get_or_create(group_name); + group.join(outbound.clone()); Ok(()) } - None => Err(format!("Group '{}' does not exist", group_name)), - }, + } + FromClient::Post { group_name, message } => { + if let Some(user) = connection.get_user() { + match groups.get(&group_name) { + Some(group) => { + group.post(message); + Ok(()) + } + None => outbound.send(FromServer::Error(format!("Group '{}' does not exist", group_name))).await, + } + } else { + outbound.send(FromServer::Error("Not authenticated".to_string())).await + } + } }; - // not a valid request - if let Err(message) = result { - let report = FromServer::Error(message); - // send error back to client - outbound.send(report).await?; + + if let Err(e) = result { + eprintln!("Error handling request: {}", e); } } Ok(()) From 6c7834b4f11d4d152f04790bf63ee9476340d25f Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 10:21:03 +0100 Subject: [PATCH 06/15] Implement interactive chat mode with user authentication #1 --- .gitignore | 1 + Cargo.lock | 167 +++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + src/bin/client.rs | 161 +++++++++++++++++++++++++++++---- src/bin/server/connection.rs | 62 ++++++++----- src/bin/server/group.rs | 14 +-- 6 files changed, 363 insertions(+), 44 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..907f4fc 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +.qodo diff --git a/Cargo.lock b/Cargo.lock index 7a0c5d9..bca9735 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,56 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" +dependencies = [ + "anstyle", + "once_cell", + "windows-sys", +] + [[package]] name = "anyhow" version = "1.0.97" @@ -52,9 +102,11 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", + "clap", "serde", "serde_json", "tokio", + "uuid", ] [[package]] @@ -243,6 +295,52 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.5.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8aa86934b44c19c50f87cc2790e19f54f7a67aedb64101c2e1a2e5ecfb73944" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2414dbb2dd0695280da6ea9261e327479e9d37b0630f6b53ba2a11c60c679fd9" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -335,6 +433,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "getrandom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi", +] + [[package]] name = "gimli" version = "0.31.1" @@ -353,12 +463,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itoa" version = "1.0.15" @@ -497,6 +619,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -578,6 +706,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.100" @@ -621,12 +755,36 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "uuid" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +dependencies = [ + "getrandom", +] + [[package]] name = "value-bag" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -780,3 +938,12 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] diff --git a/Cargo.toml b/Cargo.toml index 8b2a424..0fc83e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,5 @@ tokio = { version = "1.0", features = ["sync"] } serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" anyhow = "1.0.97" +clap = { version = "4.4", features = ["derive"] } +uuid = { version = "1.7", features = ["v4"] } diff --git a/src/bin/client.rs b/src/bin/client.rs index 7f5afbb..0f33dc2 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -1,25 +1,142 @@ -use async_chat::{FromServer, utils}; -use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task}; +use async_chat::{FromClient, FromServer, utils}; +use async_std::{io::BufReader, net, prelude::*, stream::StreamExt, task}; +use clap::{Parser, Subcommand}; +use std::io::{self, Write}; +use std::sync::Arc; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Server address (e.g., 127.0.0.1:8080) + address: String, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Join a chat group + Join { + /// Name of the group to join + #[arg(short, long)] + group: String, + }, + /// Post a message to a group + Post { + /// Name of the group to post to + #[arg(short, long)] + group: String, + /// Message to post + #[arg(short, long)] + message: String, + }, + /// Start interactive chat mode + Chat { + /// Name of the group to chat in + #[arg(short, long)] + group: String, + /// Username to use + #[arg(short, long)] + username: String, + }, +} + fn main() -> anyhow::Result<()> { - let address = std::env::args().nth(1).expect("Usage: client ADDRESS:PORT"); + let cli = Cli::parse(); task::block_on(async { - let socket = net::TcpStream::connect(address).await?; + let mut socket = net::TcpStream::connect(&cli.address).await?; socket.set_nodelay(true)?; - let to_server = send_commands(socket.clone()); - let from_server = handle_replies(socket); - // waits for client to finish before server starts handling replies - from_server.race(to_server).await?; + + match cli.command { + Commands::Chat { group, username } => { + // First register/login the user + let register_command = FromClient::Register { + username: Arc::new(username.clone()), + password: Arc::new("password".to_string()), // TODO: Add proper password handling + }; + utils::send_as_json(&mut socket, ®ister_command).await?; + socket.flush().await?; + + // Then join the group + let join_command = FromClient::Join { + group_name: Arc::new(group.clone()), + }; + utils::send_as_json(&mut socket, &join_command).await?; + socket.flush().await?; + + // Then start interactive mode + interactive_chat(socket, group, username).await?; + } + _ => { + let to_server = send_commands(socket.clone(), cli.command); + let from_server = handle_replies(socket); + from_server.race(to_server).await?; + } + } Ok(()) }) } -async fn send_commands(mut to_server: net::TcpStream) -> anyhow::Result<()> { - // Todo: Implement use clap to parse command line arguments and print help message +async fn interactive_chat( + mut socket: net::TcpStream, + group: String, + _username: String, +) -> anyhow::Result<()> { + let stdin = io::stdin(); + let mut stdout = io::stdout(); + let socket_clone = socket.clone(); - // send_as_json(&mut to_server, &result).await?; + // Spawn a task to handle incoming messages + let handle_replies = task::spawn(async move { handle_replies(socket_clone).await }); - todo!() + // Main loop for sending messages + loop { + print!("> "); + stdout.flush().unwrap(); + + let mut input = String::new(); + stdin.read_line(&mut input)?; + let input = input.trim(); + + if input.is_empty() { + continue; + } + + if input == "/quit" { + break; + } + + let command = FromClient::Post { + group_name: Arc::new(group.clone()), + message: Arc::new(input.to_string()), + }; + + utils::send_as_json(&mut socket, &command).await?; + socket.flush().await?; + } + + // Wait for the reply handler to finish + handle_replies.await?; + Ok(()) +} + +async fn send_commands(mut to_server: net::TcpStream, command: Commands) -> anyhow::Result<()> { + let command = match command { + Commands::Join { group } => FromClient::Join { + group_name: Arc::new(group), + }, + Commands::Post { group, message } => FromClient::Post { + group_name: Arc::new(group), + message: Arc::new(message), + }, + Commands::Chat { .. } => unreachable!(), // Handled separately + }; + + utils::send_as_json(&mut to_server, &command).await?; + to_server.flush().await?; + Ok(()) } async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { @@ -31,14 +148,28 @@ async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { FromServer::Message { group_name, message, + sender, } => { - println!("message posted to {}: {}", group_name, message); + println!("\n[{}] {}: {}", group_name, sender.username, message); + print!("> "); + io::stdout().flush().unwrap(); } FromServer::Error(error) => { - eprintln!("Error: {}", error); + eprintln!("\nError: {}", error); + print!("> "); + io::stdout().flush().unwrap(); + } + FromServer::AuthSuccess { user } => { + println!("\nSuccessfully authenticated as {}", user.username); + print!("> "); + io::stdout().flush().unwrap(); + } + FromServer::AuthError(error) => { + eprintln!("\nAuthentication error: {}", error); + print!("> "); + io::stdout().flush().unwrap(); } } } - Ok(()) } diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index f4a14a5..6d14f85 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -5,10 +5,10 @@ use async_chat::{FromClient, FromServer, User}; use async_std::io::BufReader; use async_std::net::TcpStream; use async_std::prelude::*; -use async_std::sync::Mutex; use async_std::sync::Arc; +use async_std::sync::Mutex; -pub struct Outbound(Mutex); +pub struct Outbound(Mutex); impl Outbound { pub fn new(to_client: TcpStream) -> Outbound { @@ -61,24 +61,26 @@ pub async fn serve( while let Some(request_result) = from_client.next().await { let request = request_result?; let result = match request { - FromClient::Register { username, password: _ } => { - match user_manager.register(username.clone()).await { - Ok(user) => { - connection.set_user(user.clone()); - outbound.send(FromServer::AuthSuccess { user }).await - } - Err(e) => outbound.send(FromServer::AuthError(e.to_string())).await, + FromClient::Register { + username, + password: _, + } => match user_manager.register(username.clone()).await { + Ok(user) => { + connection.set_user(user.clone()); + outbound.send(FromServer::AuthSuccess { user }).await } - } - FromClient::Login { username, password: _ } => { - match user_manager.login(username.clone()).await { - Ok(user) => { - connection.set_user(user.clone()); - outbound.send(FromServer::AuthSuccess { user }).await - } - Err(e) => outbound.send(FromServer::AuthError(e.to_string())).await, + Err(e) => outbound.send(FromServer::AuthError(e.to_string())).await, + }, + FromClient::Login { + username, + password: _, + } => match user_manager.login(username.clone()).await { + Ok(user) => { + connection.set_user(user.clone()); + outbound.send(FromServer::AuthSuccess { user }).await } - } + Err(e) => outbound.send(FromServer::AuthError(e.to_string())).await, + }, FromClient::Logout => { if let Some(user) = connection.get_user() { if let Err(e) = user_manager.logout(user.username.clone()).await { @@ -90,24 +92,38 @@ pub async fn serve( } FromClient::Join { group_name } => { if connection.get_user().is_none() { - outbound.send(FromServer::Error("Not authenticated".to_string())).await + outbound + .send(FromServer::Error("Not authenticated".to_string())) + .await } else { let group = groups.get_or_create(group_name); group.join(outbound.clone()); Ok(()) } } - FromClient::Post { group_name, message } => { + FromClient::Post { + group_name, + message, + } => { if let Some(user) = connection.get_user() { match groups.get(&group_name) { Some(group) => { - group.post(message); + group.post(message, user.clone()); Ok(()) } - None => outbound.send(FromServer::Error(format!("Group '{}' does not exist", group_name))).await, + None => { + outbound + .send(FromServer::Error(format!( + "Group '{}' does not exist", + group_name + ))) + .await + } } } else { - outbound.send(FromServer::Error("Not authenticated".to_string())).await + outbound + .send(FromServer::Error("Not authenticated".to_string())) + .await } } }; diff --git a/src/bin/server/group.rs b/src/bin/server/group.rs index 9fed660..2b41053 100644 --- a/src/bin/server/group.rs +++ b/src/bin/server/group.rs @@ -1,12 +1,13 @@ use crate::connection::Outbound; use async_chat::FromServer; +use async_chat::User; use async_std::task; use std::sync::Arc; use tokio::sync::broadcast; pub struct Group { name: Arc, - sender: broadcast::Sender>, + sender: broadcast::Sender<(Arc, User)>, } impl Group { @@ -18,22 +19,23 @@ impl Group { let receiver = self.sender.subscribe(); task::spawn(handle_subscriber(self.name.clone(), receiver, outbound)); } - pub fn post(&self, message: Arc) { - let _ignored = self.sender.send(message); + pub fn post(&self, message: Arc, sender: User) { + let _ignored = self.sender.send((message, sender)); } } async fn handle_subscriber( group_name: Arc, - mut receiver: broadcast::Receiver>, + mut receiver: broadcast::Receiver<(Arc, User)>, outbound: Arc, ) { - while let Ok(message) = receiver.recv().await { + while let Ok((message, sender)) = receiver.recv().await { let response = FromServer::Message { group_name: group_name.clone(), message: message.clone(), + sender, }; - + if let Err(e) = outbound.send(response).await { eprintln!("Error sending message to client: {}", e); // If we can't send to this client, we should probably remove them from the group From 8f91f07d5b82468c584f4be139d15c85c9b8c2cc Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 17:28:22 +0100 Subject: [PATCH 07/15] fix: resolve merge conflicts in client.rs --- .gitignore | 4 +++- src/bin/client.rs | 21 +-------------------- src/bin/server/main.rs | 10 ++++------ 3 files changed, 8 insertions(+), 27 deletions(-) diff --git a/.gitignore b/.gitignore index 907f4fc..2860bdd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ -/target +/target/ +**/*.rs.bk +Cargo.lock .qodo diff --git a/src/bin/client.rs b/src/bin/client.rs index aa580dc..cb4198e 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -1,4 +1,3 @@ -feature/interactive-chat use async_chat::{FromClient, FromServer, utils}; use async_std::{io::BufReader, net, prelude::*, stream::StreamExt, task}; use clap::{Parser, Subcommand}; @@ -43,19 +42,12 @@ enum Commands { }, } -#![allow(dead_code, unused_variables, unused_mut)] // Suppresses warnings - -use async_chat::{FromServer, utils}; -use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task}; -main - fn main() -> anyhow::Result<()> { let cli = Cli::parse(); task::block_on(async { let mut socket = net::TcpStream::connect(&cli.address).await?; socket.set_nodelay(true)?; - feature/interactive-chat match cli.command { Commands::Chat { group, username } => { @@ -83,16 +75,10 @@ fn main() -> anyhow::Result<()> { from_server.race(to_server).await?; } } - - let to_server = send_commands(socket.clone()); - let from_server = handle_replies(socket); - - from_server.race(to_server).await?; - main Ok(()) }) } - feature/interactive-chat + async fn interactive_chat( mut socket: net::TcpStream, group: String, @@ -151,11 +137,6 @@ async fn send_commands(mut to_server: net::TcpStream, command: Commands) -> anyh utils::send_as_json(&mut to_server, &command).await?; to_server.flush().await?; Ok(()) - -async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> { - // TODO: Implement use clap to parse command line arguments and print help message - todo!() - main } async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { diff --git a/src/bin/server/main.rs b/src/bin/server/main.rs index 1f79355..3960c82 100644 --- a/src/bin/server/main.rs +++ b/src/bin/server/main.rs @@ -1,18 +1,16 @@ pub mod connection; pub mod group; - feature/interactive-chat -pub mod user; - pub mod group_table; - main - -use connection::serve; +pub mod user; use async_std::net::TcpListener; use async_std::prelude::*; use async_std::task; +use connection::serve; use std::sync::Arc; +use async_std::prelude::*; + fn main() -> anyhow::Result<()> { let address = std::env::args().nth(1).expect( "Usage: server From 9cee217762ec01c7369677dce47ddf7a1e44ea42 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 21:00:57 +0100 Subject: [PATCH 08/15] Fix formatting issues and merge conflicts #1 --- src/bin/server/connection.rs | 5 ----- src/bin/server/group.rs | 13 +------------ src/bin/server/user.rs | 25 +++++++++++-------------- src/lib.rs | 2 -- 4 files changed, 12 insertions(+), 33 deletions(-) diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index da9b22f..6d14f85 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -7,14 +7,9 @@ use async_std::net::TcpStream; use async_std::prelude::*; use async_std::sync::Arc; use async_std::sync::Mutex; - feature/interactive-chat pub struct Outbound(Mutex); - - -pub struct Outbound(Mutex); -main impl Outbound { pub fn new(to_client: TcpStream) -> Outbound { Outbound(Mutex::new(to_client)) diff --git a/src/bin/server/group.rs b/src/bin/server/group.rs index 1200f74..ba3c13e 100644 --- a/src/bin/server/group.rs +++ b/src/bin/server/group.rs @@ -22,27 +22,16 @@ impl Group { let receiver = self.sender.subscribe(); task::spawn(handle_subscriber(self.name.clone(), receiver, outbound)); } -feature/interactive-chat + pub fn post(&self, message: Arc, sender: User) { let _ignored = self.sender.send((message, sender)); - - - pub fn post(&self, message: Arc) { - let _ = self.sender.send(message); // Ignoring the result to suppress warning - main } } async fn handle_subscriber( - feature/interactive-chat group_name: Arc, mut receiver: broadcast::Receiver<(Arc, User)>, outbound: Arc, - - _group_name: Arc, - _receiver: broadcast::Receiver>, - _outbound: Arc, - main ) { while let Ok((message, sender)) = receiver.recv().await { let response = FromServer::Message { diff --git a/src/bin/server/user.rs b/src/bin/server/user.rs index 3b5ea83..5ff55ac 100644 --- a/src/bin/server/user.rs +++ b/src/bin/server/user.rs @@ -1,12 +1,11 @@ use async_chat::User; -use async_std::sync::Mutex; +use async_std::sync::{Arc, Mutex}; use std::collections::HashMap; -use std::sync::Arc; use uuid::Uuid; pub struct UserManager { users: Mutex, User>>, - active_users: Mutex, Arc>>, // username -> session_id + active_users: Mutex, Arc>>, } impl UserManager { @@ -28,35 +27,33 @@ impl UserManager { id: Arc::new(Uuid::new_v4().to_string()), }; - users.insert(username, user.clone()); + users.insert(username.clone(), user.clone()); Ok(user) } pub async fn login(&self, username: Arc) -> anyhow::Result { let users = self.users.lock().await; - let user = users.get(&username) + let user = users + .get(&username) .cloned() .ok_or_else(|| anyhow::anyhow!("User not found"))?; let mut active_users = self.active_users.lock().await; active_users.insert(username, user.id.clone()); - + Ok(user) } pub async fn logout(&self, username: Arc) -> anyhow::Result<()> { let mut active_users = self.active_users.lock().await; - active_users.remove(&username); + active_users + .remove(&username) + .ok_or_else(|| anyhow::anyhow!("User not found"))?; Ok(()) } - pub async fn get_user(&self, username: &Arc) -> Option { - let users = self.users.lock().await; - users.get(username).cloned() - } - - pub async fn is_authenticated(&self, username: &Arc) -> bool { + pub async fn is_active(&self, username: &Arc) -> bool { let active_users = self.active_users.lock().await; active_users.contains_key(username) } -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index 57a8075..b5aee4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,14 +3,12 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; pub mod utils; -feature/interactive-chat #[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] pub struct User { pub username: Arc, pub id: Arc, } -main #[derive(Debug, Deserialize, Serialize, PartialEq)] pub enum FromClient { Join { From b1f72e39edaf6f7c59559ff53d65ff2935ba5456 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 21:17:24 +0100 Subject: [PATCH 09/15] refactor: improve server connection handling and default address configuration --- src/bin/server/main.rs | 64 ++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/src/bin/server/main.rs b/src/bin/server/main.rs index 3960c82..a738c1d 100644 --- a/src/bin/server/main.rs +++ b/src/bin/server/main.rs @@ -1,41 +1,45 @@ -pub mod connection; -pub mod group; -pub mod group_table; -pub mod user; - use async_std::net::TcpListener; -use async_std::prelude::*; +use async_std::stream::StreamExt; +use async_std::sync::Arc; use async_std::task; -use connection::serve; -use std::sync::Arc; -use async_std::prelude::*; +mod connection; +mod group; +mod group_table; +mod user; + +use connection::serve; +use group_table::GroupTable; +use user::UserManager; fn main() -> anyhow::Result<()> { - let address = std::env::args().nth(1).expect( - "Usage: server - ADDRESS", - ); - let chat_group_table = Arc::new(group_table::GroupTable::new()); - let user_manager = Arc::new(user::UserManager::new()); + let address = std::env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); + task::block_on(async { + let listener = TcpListener::bind(&address).await?; + println!("Server listening on {}", address); + + let groups = Arc::new(GroupTable::new()); + let user_manager = Arc::new(UserManager::new()); - async_std::task::block_on(async { - let listener = TcpListener::bind(address).await?; - let mut new_connections = listener.incoming(); - while let Some(socket_result) = new_connections.next().await { - let socket = socket_result?; - let groups = chat_group_table.clone(); - let users = user_manager.clone(); - task::spawn(async { - log_error(serve(socket, groups, users).await); + let mut handles = vec![]; + while let Some(stream) = listener.incoming().next().await { + let stream = stream?; + let groups = groups.clone(); + let user_manager = user_manager.clone(); + + let handle = task::spawn(async move { + if let Err(e) = serve(stream, groups, user_manager).await { + eprintln!("Error serving connection: {}", e); + } }); + handles.push(handle); + } + + for handle in handles { + handle.await; } Ok(()) }) } - -fn log_error(result: anyhow::Result<()>) { - if let Err(error) = result { - eprintln!("Error: {}", error); - } -} From 68560da2a7b2cbca3c9099d7a738cb8c76e43808 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 21:18:00 +0100 Subject: [PATCH 10/15] refactor: simplify Connection initialization and update group handling to be asynchronous --- src/bin/server/connection.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index 6d14f85..d9b47a0 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -23,16 +23,12 @@ impl Outbound { } pub struct Connection { - outbound: Arc, user: Option, } impl Connection { - pub fn new(outbound: Arc) -> Self { - Connection { - outbound, - user: None, - } + pub fn new() -> Self { + Connection { user: None } } pub fn set_user(&mut self, user: User) { @@ -54,7 +50,7 @@ pub async fn serve( user_manager: Arc, ) -> anyhow::Result<()> { let outbound = Arc::new(Outbound::new(socket.clone())); - let mut connection = Connection::new(outbound.clone()); + let mut connection = Connection::new(); let buffered = BufReader::new(socket); let mut from_client = utils::receive_as_json(buffered); @@ -96,7 +92,7 @@ pub async fn serve( .send(FromServer::Error("Not authenticated".to_string())) .await } else { - let group = groups.get_or_create(group_name); + let group = groups.get_or_create(group_name).await; group.join(outbound.clone()); Ok(()) } @@ -106,7 +102,7 @@ pub async fn serve( message, } => { if let Some(user) = connection.get_user() { - match groups.get(&group_name) { + match groups.get(&group_name).await { Some(group) => { group.post(message, user.clone()); Ok(()) From 950ded63e3f7ac9d58d29b2c807f5186fea776eb Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 21:18:08 +0100 Subject: [PATCH 11/15] refactor: restructure GroupTable to use named fields and enable asynchronous access --- src/bin/server/group_table.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/bin/server/group_table.rs b/src/bin/server/group_table.rs index 56f3725..0867011 100644 --- a/src/bin/server/group_table.rs +++ b/src/bin/server/group_table.rs @@ -1,27 +1,32 @@ #![allow(clippy::new_without_default)] // Suppresses Clippy warning use crate::group::Group; +use async_std::sync::{Arc, Mutex}; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -pub struct GroupTable(Mutex, Arc>>); +pub struct GroupTable { + groups: Mutex, Arc>>, +} impl GroupTable { pub fn new() -> GroupTable { - GroupTable(Mutex::new(HashMap::new())) + GroupTable { + groups: Mutex::new(HashMap::new()), + } } - pub fn get(&self, name: &String) -> Option> { - self.0.lock().unwrap().get(name).cloned() + pub async fn get(&self, name: &Arc) -> Option> { + let groups = self.groups.lock().await; + groups.get(name).cloned() } - pub fn get_or_create(&self, name: Arc) -> Arc { - self.0 - .lock() - .unwrap() - .entry(name.clone()) - .or_insert_with(|| Arc::new(Group::new(name))) - .clone() + pub async fn get_or_create(&self, name: Arc) -> Arc { + let mut groups = self.groups.lock().await; + groups.get(&name).cloned().unwrap_or_else(|| { + let group = Arc::new(Group::new(name.clone())); + groups.insert(name, group.clone()); + group + }) } } From 8e74370122c0434d0da825283fe328fa45428544 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 21:20:20 +0100 Subject: [PATCH 12/15] Add #[allow(dead_code)] to is_active method --- src/bin/server/user.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bin/server/user.rs b/src/bin/server/user.rs index 5ff55ac..df747cb 100644 --- a/src/bin/server/user.rs +++ b/src/bin/server/user.rs @@ -52,6 +52,7 @@ impl UserManager { Ok(()) } + #[allow(dead_code)] pub async fn is_active(&self, username: &Arc) -> bool { let active_users = self.active_users.lock().await; active_users.contains_key(username) From 7bf49a3767c8ff0631886fb6a9c29e522fe4a5a2 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 21:37:53 +0100 Subject: [PATCH 13/15] chore: update remove unnecessary entry --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 2860bdd..68f412d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ /target/ **/*.rs.bk Cargo.lock -.qodo + From a4a214c7de629894c005b14abf7f26be5a094036 Mon Sep 17 00:00:00 2001 From: chojuninengu Date: Mon, 7 Apr 2025 22:18:37 +0100 Subject: [PATCH 14/15] ci: add --force flag to cargo-audit installation --- .github/workflows/ci.yml | 95 ++++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 48 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e67b8b9..4352067 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,56 +7,55 @@ env: RUSTFLAGS: -D warnings jobs: - ci: name: Build and test runs-on: ubuntu-latest steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Cache dependencies - uses: actions/cache@v4 - with: - path: | - ~/.cargo/bin - ~/.cargo/registry/index/ - ~/.cargo/registry/cache/ - ~/.cargo/git/db/ - target/ - key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - ${{ runner.os }}-cargo - - - name: Set up Rust - uses: actions-rust-lang/setup-rust-toolchain@v1 - with: - toolchain: stable - rustflags: - - - name: Install Nextest test runner - uses: taiki-e/install-action@nextest - - - name: Check Formatting - run: cargo fmt --all -- --check - - - name: Build Project - run: cargo build --workspace --all-features - - - name: Run Tests - run: cargo nextest run --workspace --all-targets --all-features --no-fail-fast || echo "No tests found, skipping..." - - - name: Lints checks - run: | - cargo clippy --workspace --all-targets --all-features -- -D warnings - - - name: Check API documentation - run: cargo doc --workspace --all-features --no-deps - - - name: Install cargo-audit - run: cargo install cargo-audit - - - name: Run Security Audit - run: cargo audit + - name: Checkout code + uses: actions/checkout@v4 + + - name: Cache dependencies + uses: actions/cache@v4 + with: + path: | + ~/.cargo/bin + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + ${{ runner.os }}-cargo + + - name: Set up Rust + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: stable + rustflags: + + - name: Install Nextest test runner + uses: taiki-e/install-action@nextest + + - name: Check Formatting + run: cargo fmt --all -- --check + + - name: Build Project + run: cargo build --workspace --all-features + + - name: Run Tests + run: cargo nextest run --workspace --all-targets --all-features --no-fail-fast || echo "No tests found, skipping..." + + - name: Lints checks + run: | + cargo clippy --workspace --all-targets --all-features -- -D warnings + + - name: Check API documentation + run: cargo doc --workspace --all-features --no-deps + + - name: Install cargo-audit + run: cargo install --force cargo-audit + + - name: Run Security Audit + run: cargo audit From 31f038a11f162422df240782fd16d316ca635e3a Mon Sep 17 00:00:00 2001 From: JU-NINE NGU CHO Date: Tue, 8 Apr 2025 09:18:36 +0100 Subject: [PATCH 15/15] Update ci.yml that runs without the "--force to install cargo-audit" --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4352067..7a4832f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,7 +55,7 @@ jobs: run: cargo doc --workspace --all-features --no-deps - name: Install cargo-audit - run: cargo install --force cargo-audit + run: cargo install cargo-audit - name: Run Security Audit run: cargo audit