diff --git a/src/bin/client.rs b/src/bin/client.rs index 0602bf7..a9b9ff4 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -3,12 +3,18 @@ use async_chat::{FromServer, utils}; use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task}; +/// Client binary for connecting to the async chat server. +/// +/// Expects one argument: the server address and port to connect to. +/// Example usage: `client 127.0.0.1:8080` fn main() -> anyhow::Result<()> { let address = std::env::args().nth(1).expect("Usage: client ADDRESS:PORT"); task::block_on(async { let socket = net::TcpStream::connect(address).await?; - socket.set_nodelay(true)?; + socket.set_nodelay(true)?; // Disable Nagle's algorithm for lower latency. + + // Race two futures: sending commands vs. receiving server. let to_server = send_commands(socket.clone()); let from_server = handle_replies(socket); @@ -17,11 +23,12 @@ fn main() -> anyhow::Result<()> { }) } +/// Reads user input (planned via `clap`) and sends commands to the server. async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> { // TODO: Implement use clap to parse command line arguments and print help message todo!() } - +/// Handles responses from the server and prints them to stdout as they arrive. async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { let buffered = BufReader::new(from_server); let mut reply_stream = utils::receive_as_json(buffered); diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index b3b11ed..9f2c492 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -7,11 +7,27 @@ use async_std::prelude::*; use async_std::sync::Arc; use async_std::sync::Mutex; +/// Represents a thread-safe outbound connection to a client. +/// This struct wraps a `TcpStream` in a `Mutex` to provide a safe and exclusive way to send data to the client. pub struct Outbound(Mutex); impl Outbound { + /// Creates a new `Outbound` connection. + /// + /// # Arguments + /// + /// * `to_client` - The TCP stream to write to. pub fn new(to_client: TcpStream) -> Outbound { Outbound(Mutex::new(to_client)) } + /// Sends a message to the connected client in JSON format. + /// + /// # Arguments + /// + /// * `packet` - The message to send, wrapped in the `FromServer` enum. + /// + /// # Errors + /// + /// Returns an error if writing or flushing to the stream fails. pub async fn send(&self, packet: FromServer) -> anyhow::Result<()> { let mut guard = self.0.lock().await; utils::send_as_json(&mut *guard, &packet).await?; @@ -20,6 +36,19 @@ impl Outbound { } } +/// Serves a single client connection by reading messages and interacting with group state. +/// +/// # Arguments +/// +/// * `socket` - The TCP connection to the client. +/// * `groups` - A shared reference to the server's group table. +/// +/// # Errors +/// +/// Returns an error if: +/// - Reading from the socket fails +/// - Sending a message fails +/// - A user tries to post to a group that does not exist pub async fn serve(socket: TcpStream, groups: Arc) -> anyhow::Result<()> { // wrapping our connection in outbound so as to have exclusive access to it in the groups and avoid interference let outbound = Arc::new(Outbound::new(socket.clone())); @@ -45,7 +74,7 @@ pub async fn serve(socket: TcpStream, groups: Arc) -> anyhow::Result None => Err(format!("Group '{}' does not exist", group_name)), }, }; - // not a valid request + // If an error occurred, send an error message back to the client if let Err(message) = result { let report = FromServer::Error(message); // send error back to client diff --git a/src/bin/server/group.rs b/src/bin/server/group.rs index 0e001e7..0e93150 100644 --- a/src/bin/server/group.rs +++ b/src/bin/server/group.rs @@ -5,27 +5,48 @@ use async_std::task; use std::sync::Arc; use tokio::sync::broadcast; +/// A named group that broadcasts messages to all connected subscribers. pub struct Group { name: Arc, sender: broadcast::Sender>, } impl Group { + /// Creates a new `Group` with a given name. + /// + /// # Arguments + /// + /// * `name` - The name of the group. pub fn new(name: Arc) -> Group { - let (sender, _receiver) = broadcast::channel(1000); + let (sender, _receiver) = broadcast::channel(1000); // buffer size of 1000 messages Group { name, sender } } - + /// Adds a client connection to the group and starts sending messages to it. + /// + /// # Arguments + /// + /// * `outbound` - The client connection to receive messages. + /// + /// This function spawns a background task to handle receiving messages from the + /// broadcast channel and forwarding them to the client. A task is used so that + /// the message receiving loop can run asynchronously without blocking the caller. pub fn join(&self, outbound: Arc) { let receiver = self.sender.subscribe(); task::spawn(handle_subscriber(self.name.clone(), receiver, outbound)); } - + /// Posts a message to the group, broadcasting it to all subscribers. + /// + /// # Arguments + /// + /// * `message` - The message to broadcast. pub fn post(&self, message: Arc) { let _ = self.sender.send(message); // Ignoring the result to suppress warning } } +/// Handles the lifecycle of a subscriber: receiving messages and sending them over their connection. +/// +/// This is a stub — should be implemented to read from the `receiver` and forward messages to `outbound`. async fn handle_subscriber( _group_name: Arc, _receiver: broadcast::Receiver>, diff --git a/src/bin/server/group_table.rs b/src/bin/server/group_table.rs index 56f3725..f3d938a 100644 --- a/src/bin/server/group_table.rs +++ b/src/bin/server/group_table.rs @@ -4,13 +4,26 @@ use crate::group::Group; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +/// A thread-safe table that stores all active chat groups by name. +/// +/// Internally wraps a `HashMap, Arc>` in a `Mutex` for safe concurrent access. pub struct GroupTable(Mutex, Arc>>); impl GroupTable { + /// Creates a new, empty `GroupTable`. pub fn new() -> GroupTable { GroupTable(Mutex::new(HashMap::new())) } + /// Retrieves a group by name, if it exists. + /// + /// # Arguments + /// + /// * `name` - The name of the group to retrieve. + /// + /// # Returns + /// + /// An `Option` containing the group, or `None` if it doesn't exist. pub fn get(&self, name: &String) -> Option> { self.0.lock().unwrap().get(name).cloned() } diff --git a/src/bin/server/main.rs b/src/bin/server/main.rs index 1d85de9..048d6b5 100644 --- a/src/bin/server/main.rs +++ b/src/bin/server/main.rs @@ -9,15 +9,21 @@ use async_std::prelude::*; use async_std::task; use std::sync::Arc; +/// The main entry point for the async-chat server. +/// +/// Accepts incoming TCP connections and spawns a task to handle each. +/// Expects one argument: the address to bind to (e.g., `127.0.0.1:8080`) fn main() -> anyhow::Result<()> { let address = std::env::args().nth(1).expect( "Usage: server ADDRESS", ); + // A thread-safe table that stores all active chat groups by name. let chat_group_table = Arc::new(group_table::GroupTable::new()); async_std::task::block_on(async { let listener = TcpListener::bind(address).await?; let mut new_connections = listener.incoming(); + // Accept incoming connections and spawn an asynchronous task to handle each while let Some(socket_result) = new_connections.next().await { let socket = socket_result?; let groups = chat_group_table.clone(); @@ -29,6 +35,7 @@ fn main() -> anyhow::Result<()> { }) } +/// Logs errors from client handler tasks. fn log_error(result: anyhow::Result<()>) { if let Err(error) = result { eprintln!("Error: {}", error); diff --git a/src/lib.rs b/src/lib.rs index 4f1b1ae..87ec78a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,25 +1,33 @@ +//! # async-chat +//! +//! A simple async group chat system implemented in Rust, using `async-std` for concurrency. +//! This crate defines the message formats and utility functions used by both the client and server. + use std::sync::Arc; use serde::{Deserialize, Serialize}; pub mod utils; +/// Messages that clients can send to the server. #[derive(Debug, Deserialize, Serialize, PartialEq)] pub enum FromClient { - Join { - group_name: Arc, - }, + /// Join a group by name. + Join { group_name: Arc }, + /// Post a message to a group. Post { group_name: Arc, message: Arc, }, } - +/// Messages that the server sends back to clients. #[derive(Debug, Deserialize, Serialize)] pub enum FromServer { + /// A message has been posted to a group. Message { group_name: Arc, message: Arc, }, + /// The server encountered an error. Error(String), } diff --git a/src/utils.rs b/src/utils.rs index 3859aa5..dc05761 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,11 @@ use async_std::prelude::*; use serde::de::DeserializeOwned; +/// Sends a serializable packet as a JSON-encoded line over a writable stream. +/// +/// # Arguments +/// * `outbound` - The writable stream to send through. +/// * `packet` - The serializable data to be sent. pub async fn send_as_json(outbound: &mut S, packet: &P) -> anyhow::Result<()> where S: async_std::io::Write + Unpin, @@ -12,6 +17,13 @@ where Ok(()) } +/// Returns a stream of deserialized packets from a buffered input stream. +/// +/// # Arguments +/// * `inbound` - A stream of lines containing JSON messages. +/// +/// # Returns +/// A stream of parsed packets of type `P`. pub fn receive_as_json(inbound: S) -> impl Stream> where S: async_std::io::BufRead + Unpin,