Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
use async_chat::{FromServer, utils};
use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task};

/// Client binary for connecting to the async chat server.
///
/// Expects one argument: the server address and port to connect to.
/// Example usage: `client 127.0.0.1:8080`
fn main() -> anyhow::Result<()> {
let address = std::env::args().nth(1).expect("Usage: client ADDRESS:PORT");

task::block_on(async {
let socket = net::TcpStream::connect(address).await?;
socket.set_nodelay(true)?;
socket.set_nodelay(true)?; // Disable Nagle's algorithm for lower latency.

// Race two futures: sending commands vs. receiving server.
let to_server = send_commands(socket.clone());
let from_server = handle_replies(socket);

Expand All @@ -17,11 +23,12 @@ fn main() -> anyhow::Result<()> {
})
}

/// Reads user input (planned via `clap`) and sends commands to the server.
async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> {
// TODO: Implement use clap to parse command line arguments and print help message
todo!()
}

/// Handles responses from the server and prints them to stdout as they arrive.
async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> {
let buffered = BufReader::new(from_server);
let mut reply_stream = utils::receive_as_json(buffered);
Expand Down
31 changes: 30 additions & 1 deletion src/bin/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,27 @@ use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::sync::Mutex;

/// Represents a thread-safe outbound connection to a client.
/// This struct wraps a `TcpStream` in a `Mutex` to provide a safe and exclusive way to send data to the client.
pub struct Outbound(Mutex<TcpStream>);
impl Outbound {
/// Creates a new `Outbound` connection.
///
/// # Arguments
///
/// * `to_client` - The TCP stream to write to.
pub fn new(to_client: TcpStream) -> Outbound {
Outbound(Mutex::new(to_client))
}
/// Sends a message to the connected client in JSON format.
///
/// # Arguments
///
/// * `packet` - The message to send, wrapped in the `FromServer` enum.
///
/// # Errors
///
/// Returns an error if writing or flushing to the stream fails.
pub async fn send(&self, packet: FromServer) -> anyhow::Result<()> {
let mut guard = self.0.lock().await;
utils::send_as_json(&mut *guard, &packet).await?;
Expand All @@ -20,6 +36,19 @@ impl Outbound {
}
}

/// Serves a single client connection by reading messages and interacting with group state.
///
/// # Arguments
///
/// * `socket` - The TCP connection to the client.
/// * `groups` - A shared reference to the server's group table.
///
/// # Errors
///
/// Returns an error if:
/// - Reading from the socket fails
/// - Sending a message fails
/// - A user tries to post to a group that does not exist
pub async fn serve(socket: TcpStream, groups: Arc<GroupTable>) -> anyhow::Result<()> {
// wrapping our connection in outbound so as to have exclusive access to it in the groups and avoid interference
let outbound = Arc::new(Outbound::new(socket.clone()));
Expand All @@ -45,7 +74,7 @@ pub async fn serve(socket: TcpStream, groups: Arc<GroupTable>) -> anyhow::Result
None => Err(format!("Group '{}' does not exist", group_name)),
},
};
// not a valid request
// If an error occurred, send an error message back to the client
if let Err(message) = result {
let report = FromServer::Error(message);
// send error back to client
Expand Down
27 changes: 24 additions & 3 deletions src/bin/server/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,48 @@ use async_std::task;
use std::sync::Arc;
use tokio::sync::broadcast;

/// A named group that broadcasts messages to all connected subscribers.
pub struct Group {
name: Arc<String>,
sender: broadcast::Sender<Arc<String>>,
}

impl Group {
/// Creates a new `Group` with a given name.
///
/// # Arguments
///
/// * `name` - The name of the group.
pub fn new(name: Arc<String>) -> Group {
let (sender, _receiver) = broadcast::channel(1000);
let (sender, _receiver) = broadcast::channel(1000); // buffer size of 1000 messages
Group { name, sender }
}

/// Adds a client connection to the group and starts sending messages to it.
///
/// # Arguments
///
/// * `outbound` - The client connection to receive messages.
///
/// This function spawns a background task to handle receiving messages from the
/// broadcast channel and forwarding them to the client. A task is used so that
/// the message receiving loop can run asynchronously without blocking the caller.
pub fn join(&self, outbound: Arc<Outbound>) {
let receiver = self.sender.subscribe();
task::spawn(handle_subscriber(self.name.clone(), receiver, outbound));
}

/// Posts a message to the group, broadcasting it to all subscribers.
///
/// # Arguments
///
/// * `message` - The message to broadcast.
pub fn post(&self, message: Arc<String>) {
let _ = self.sender.send(message); // Ignoring the result to suppress warning
}
}

/// Handles the lifecycle of a subscriber: receiving messages and sending them over their connection.
///
/// This is a stub — should be implemented to read from the `receiver` and forward messages to `outbound`.
async fn handle_subscriber(
_group_name: Arc<String>,
_receiver: broadcast::Receiver<Arc<String>>,
Expand Down
13 changes: 13 additions & 0 deletions src/bin/server/group_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,26 @@ use crate::group::Group;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

/// A thread-safe table that stores all active chat groups by name.
///
/// Internally wraps a `HashMap<Arc<String>, Arc<Group>>` in a `Mutex` for safe concurrent access.
pub struct GroupTable(Mutex<HashMap<Arc<String>, Arc<Group>>>);

impl GroupTable {
/// Creates a new, empty `GroupTable`.
pub fn new() -> GroupTable {
GroupTable(Mutex::new(HashMap::new()))
}

/// Retrieves a group by name, if it exists.
///
/// # Arguments
///
/// * `name` - The name of the group to retrieve.
///
/// # Returns
///
/// An `Option` containing the group, or `None` if it doesn't exist.
pub fn get(&self, name: &String) -> Option<Arc<Group>> {
self.0.lock().unwrap().get(name).cloned()
}
Expand Down
7 changes: 7 additions & 0 deletions src/bin/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ use async_std::prelude::*;
use async_std::task;
use std::sync::Arc;

/// The main entry point for the async-chat server.
///
/// Accepts incoming TCP connections and spawns a task to handle each.
/// Expects one argument: the address to bind to (e.g., `127.0.0.1:8080`)
fn main() -> anyhow::Result<()> {
let address = std::env::args().nth(1).expect(
"Usage: server
ADDRESS",
);
// A thread-safe table that stores all active chat groups by name.
let chat_group_table = Arc::new(group_table::GroupTable::new());
async_std::task::block_on(async {
let listener = TcpListener::bind(address).await?;
let mut new_connections = listener.incoming();
// Accept incoming connections and spawn an asynchronous task to handle each
while let Some(socket_result) = new_connections.next().await {
let socket = socket_result?;
let groups = chat_group_table.clone();
Expand All @@ -29,6 +35,7 @@ fn main() -> anyhow::Result<()> {
})
}

/// Logs errors from client handler tasks.
fn log_error(result: anyhow::Result<()>) {
if let Err(error) = result {
eprintln!("Error: {}", error);
Expand Down
16 changes: 12 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
//! # async-chat
//!
//! A simple async group chat system implemented in Rust, using `async-std` for concurrency.
//! This crate defines the message formats and utility functions used by both the client and server.

use std::sync::Arc;

use serde::{Deserialize, Serialize};
pub mod utils;

/// Messages that clients can send to the server.
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub enum FromClient {
Join {
group_name: Arc<String>,
},
/// Join a group by name.
Join { group_name: Arc<String> },
/// Post a message to a group.
Post {
group_name: Arc<String>,
message: Arc<String>,
},
}

/// Messages that the server sends back to clients.
#[derive(Debug, Deserialize, Serialize)]
pub enum FromServer {
/// A message has been posted to a group.
Message {
group_name: Arc<String>,
message: Arc<String>,
},
/// The server encountered an error.
Error(String),
}

Expand Down
12 changes: 12 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use async_std::prelude::*;
use serde::de::DeserializeOwned;

/// Sends a serializable packet as a JSON-encoded line over a writable stream.
///
/// # Arguments
/// * `outbound` - The writable stream to send through.
/// * `packet` - The serializable data to be sent.
pub async fn send_as_json<S, P>(outbound: &mut S, packet: &P) -> anyhow::Result<()>
where
S: async_std::io::Write + Unpin,
Expand All @@ -12,6 +17,13 @@ where
Ok(())
}

/// Returns a stream of deserialized packets from a buffered input stream.
///
/// # Arguments
/// * `inbound` - A stream of lines containing JSON messages.
///
/// # Returns
/// A stream of parsed packets of type `P`.
pub fn receive_as_json<S, P>(inbound: S) -> impl Stream<Item = anyhow::Result<P>>
where
S: async_std::io::BufRead + Unpin,
Expand Down