diff --git a/docs/block_on.md b/docs/block_on.md new file mode 100644 index 0000000..4b1e902 --- /dev/null +++ b/docs/block_on.md @@ -0,0 +1,42 @@ +# Overview of block_on function + +## What is block_on + +The block_on function is a synchronous function that produces a final value of an asynchronous function, +you can think of it as an adapter from the asynchronous world to the synchronous world. The block_on +function is part of the tokio or async-std crates, not part of the standard library. + +## Why block_on + +In a sense, asynchronous functions just pass the buck. This buck is simply due to the fact that when executing a synchronous function, the caller only resumes when the operation is completed. What if we want our thread to do something else while the operating system is doing its work? We will need to use a new I/O library that provides an asynchronous version of this function. Rust's approach to supporting asynchronous operations is by introducing a trait: std::future::Future. + +A Future represents an operation you can test for completion. So with Future, you can always know the state of the current thread in order to do other jobs, but using futures seems challenging because you keep on polling other jobs while a future is still pending, keeping track of previous futures that are pending and what should be done once they are finished and poll it again, and this somehow ruins the simplicity of the function. Good news: asynchronous functions are there! This buck is solved using the .await expression which pauses the execution of this async function until the awaited value is ready before resuming its execution. It's true that it's easy to get the value of an async function: just await it. But async functions themselves return a future, so it's now the caller's job to do the polling somehow, thus someone has to wait for the value and in this case block_on is our waiter. + +Consider the example below: +```sh +use async_std::io::prelude::*; +use async_std::net; +async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result { + let mut socket = net::TcpStream::connect((host, port)).await?; + let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host); + socket.write_all(request.as_bytes()).await?; + socket.shutdown(net::Shutdown::Write)?; + let mut response = String::new(); + socket.read_to_string(&mut response).await?; + Ok(response) +} +``` + +```sh +fn main() -> std::io::Result<()> { + use async_std::task; + // `block_on` is used here to run an async function cheapo_request in a synchronous context. + let response = task::block_on(cheapo_request("example.com", 80, "/"))?; + println!("{}", response); + Ok(()) +} +``` + +We can call the function cheapo_request from an ordinary, synchronous function (like main, for example), using async_std's task::block_on function, which takes a future and polls it until it returns a value as seen above. + +So in summary, the block_on function is used to execute asynchronous blocks synchronously in Rust. diff --git a/src/bin/client.rs b/src/bin/client.rs index a9b9ff4..e979fcf 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -1,7 +1,14 @@ #![allow(dead_code, unused_variables, unused_mut)] // Suppresses warnings -use async_chat::{FromServer, utils}; -use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task}; +use async_chat::{FromClient, FromServer, utils}; +use async_std::{ + io::{BufReadExt, BufReader, stdin}, + net, + prelude::FutureExt, + stream::StreamExt, + task, +}; +use std::sync::Arc; /// Client binary for connecting to the async chat server. /// @@ -23,11 +30,107 @@ fn main() -> anyhow::Result<()> { }) } -/// Reads user input (planned via `clap`) and sends commands to the server. -async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> { - // TODO: Implement use clap to parse command line arguments and print help message - todo!() +/// Reads user input and sends commands to the server. +/// +/// Commands: +/// - `/join ` - Join a chat group +/// - `/post ` - Post a message to a group +/// - `/help` - Show help message +/// - `/quit` - Exit the client +async fn send_commands(to_server: net::TcpStream) -> anyhow::Result<()> { + let mut to_server = to_server; + println!("Welcome to Async Chat!"); + println!("Commands:"); + println!(" /join - Join a chat group"); + println!(" /post - Post a message to a group"); + println!(" /help - Show this help message"); + println!(" /quit - Exit the client"); + println!(); + + let stdin = BufReader::new(stdin()); + let mut lines = stdin.lines(); + + while let Some(line_result) = lines.next().await { + let line = line_result?; + let line = line.trim(); + + if line.is_empty() { + continue; + } + + if line == "/quit" { + println!("Goodbye!"); + break; + } + + if line == "/help" { + println!("Commands:"); + println!(" /join - Join a chat group"); + println!(" /post - Post a message to a group"); + println!(" /help - Show this help message"); + println!(" /quit - Exit the client"); + continue; + } + + let command = parse_command(line); + match command { + Ok(from_client) => { + if let Err(e) = utils::send_as_json(&mut to_server, &from_client).await { + eprintln!("Failed to send command: {}", e); + break; + } + } + Err(e) => { + eprintln!("Error: {}", e); + eprintln!("Type /help for available commands."); + } + } + } + + Ok(()) } + +/// Parses a command line input into a FromClient message. +/// +/// # Arguments +/// * `input` - The user input string to parse +/// +/// # Returns +/// A Result containing either a FromClient message or an error string +fn parse_command(input: &str) -> Result { + let parts: Vec<&str> = input.splitn(3, ' ').collect(); + + match parts.as_slice() { + ["/join", group_name] => { + if group_name.is_empty() { + return Err("Group name cannot be empty".to_string()); + } + Ok(FromClient::Join { + group_name: Arc::new(group_name.to_string()), + }) + } + ["/post", group_name, message] => { + if group_name.is_empty() { + return Err("Group name cannot be empty".to_string()); + } + if message.is_empty() { + return Err("Message cannot be empty".to_string()); + } + Ok(FromClient::Post { + group_name: Arc::new(group_name.to_string()), + message: Arc::new(message.to_string()), + }) + } + ["/join"] => Err("Usage: /join ".to_string()), + ["/post"] => Err("Usage: /post ".to_string()), + ["/post", _] => Err("Usage: /post ".to_string()), + _ => Err(format!( + "Unknown command: '{}'. Type /help for available commands.", + input + )), + } +} + /// Handles responses from the server and prints them to stdout as they arrive. async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { let buffered = BufReader::new(from_server); @@ -50,3 +153,64 @@ async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_join_command() { + let result = parse_command("/join general"); + assert!(result.is_ok()); + match result.unwrap() { + FromClient::Join { group_name } => { + assert_eq!(*group_name, "general".to_string()); + } + _ => panic!("Expected Join command"), + } + } + + #[test] + fn test_parse_post_command() { + let result = parse_command("/post general Hello world!"); + assert!(result.is_ok()); + match result.unwrap() { + FromClient::Post { + group_name, + message, + } => { + assert_eq!(*group_name, "general".to_string()); + assert_eq!(*message, "Hello world!".to_string()); + } + _ => panic!("Expected Post command"), + } + } + + #[test] + fn test_parse_invalid_command() { + let result = parse_command("/invalid"); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Unknown command")); + } + + #[test] + fn test_parse_join_without_group() { + let result = parse_command("/join"); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Usage: /join "); + } + + #[test] + fn test_parse_post_without_message() { + let result = parse_command("/post general"); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Usage: /post "); + } + + #[test] + fn test_parse_post_without_group() { + let result = parse_command("/post"); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Usage: /post "); + } +} diff --git a/src/bin/server/group.rs b/src/bin/server/group.rs index 0e93150..eb5b5fa 100644 --- a/src/bin/server/group.rs +++ b/src/bin/server/group.rs @@ -46,11 +46,59 @@ impl Group { /// Handles the lifecycle of a subscriber: receiving messages and sending them over their connection. /// -/// This is a stub — should be implemented to read from the `receiver` and forward messages to `outbound`. +/// Receives messages from the broadcast channel and forwards them to the client connection. +/// Exits when the client disconnects or an error occurs. async fn handle_subscriber( - _group_name: Arc, - _receiver: broadcast::Receiver>, - _outbound: Arc, + group_name: Arc, + mut receiver: broadcast::Receiver>, + outbound: Arc, ) { - todo!() + use async_chat::FromServer; + + loop { + match receiver.recv().await { + Ok(message) => { + let server_message = FromServer::Message { + group_name: group_name.clone(), + message, + }; + + // Send the message to the client + if let Err(e) = outbound.send(server_message).await { + eprintln!( + "Failed to send message to client in group '{}': {}", + group_name, e + ); + break; // Exit the loop if we can't send to the client + } + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + // Client was too slow, some messages were skipped + eprintln!( + "Client in group '{}' lagged behind, skipped {} messages", + group_name, skipped + ); + + let error_message = FromServer::Error(format!( + "You were lagging behind and missed {} messages", + skipped + )); + + if let Err(e) = outbound.send(error_message).await { + eprintln!( + "Failed to send lag error to client in group '{}': {}", + group_name, e + ); + break; + } + } + Err(broadcast::error::RecvError::Closed) => { + // The broadcast channel was closed (group was deleted) + eprintln!("Broadcast channel for group '{}' was closed", group_name); + break; + } + } + } + + eprintln!("Subscriber handler for group '{}' exited", group_name); } diff --git a/src/bin/server/main.rs b/src/bin/server/main.rs index 048d6b5..c02208c 100644 --- a/src/bin/server/main.rs +++ b/src/bin/server/main.rs @@ -38,6 +38,6 @@ fn main() -> anyhow::Result<()> { /// Logs errors from client handler tasks. fn log_error(result: anyhow::Result<()>) { if let Err(error) = result { - eprintln!("Error: {}", error); + eprintln!("Error: {:?}", error); } }