Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/block_on.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Overview of block_on function

## What is block_on

The block_on function is a synchronous function that produces a final value of an asynchronous function,
you can think of it as an adapter from the asynchronous world to the synchronous world. The block_on
function is part of the tokio or async-std crates, not part of the standard library.

## Why block_on

In a sense, asynchronous functions just pass the buck. This buck is simply due to the fact that when executing a synchronous function, the caller only resumes when the operation is completed. What if we want our thread to do something else while the operating system is doing its work? We will need to use a new I/O library that provides an asynchronous version of this function. Rust's approach to supporting asynchronous operations is by introducing a trait: std::future::Future.

A Future represents an operation you can test for completion. So with Future, you can always know the state of the current thread in order to do other jobs, but using futures seems challenging because you keep on polling other jobs while a future is still pending, keeping track of previous futures that are pending and what should be done once they are finished and poll it again, and this somehow ruins the simplicity of the function. Good news: asynchronous functions are there! This buck is solved using the .await expression which pauses the execution of this async function until the awaited value is ready before resuming its execution. It's true that it's easy to get the value of an async function: just await it. But async functions themselves return a future, so it's now the caller's job to do the polling somehow, thus someone has to wait for the value and in this case block_on is our waiter.

Consider the example below:
```sh
use async_std::io::prelude::*;
use async_std::net;
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
```

```sh
fn main() -> std::io::Result<()> {
use async_std::task;
// `block_on` is used here to run an async function cheapo_request in a synchronous context.
let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
println!("{}", response);
Ok(())
}
```

We can call the function cheapo_request from an ordinary, synchronous function (like main, for example), using async_std's task::block_on function, which takes a future and polls it until it returns a value as seen above.

So in summary, the block_on function is used to execute asynchronous blocks synchronously in Rust.
176 changes: 170 additions & 6 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
#![allow(dead_code, unused_variables, unused_mut)] // Suppresses warnings

use async_chat::{FromServer, utils};
use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task};
use async_chat::{FromClient, FromServer, utils};
use async_std::{
io::{BufReadExt, BufReader, stdin},
net,
prelude::FutureExt,
stream::StreamExt,
task,
};
use std::sync::Arc;

/// Client binary for connecting to the async chat server.
///
Expand All @@ -23,11 +30,107 @@ fn main() -> anyhow::Result<()> {
})
}

/// Reads user input (planned via `clap`) and sends commands to the server.
async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> {
// TODO: Implement use clap to parse command line arguments and print help message
todo!()
/// Reads user input and sends commands to the server.
///
/// Commands:
/// - `/join <group_name>` - Join a chat group
/// - `/post <group_name> <message>` - Post a message to a group
/// - `/help` - Show help message
/// - `/quit` - Exit the client
async fn send_commands(to_server: net::TcpStream) -> anyhow::Result<()> {
let mut to_server = to_server;
println!("Welcome to Async Chat!");
println!("Commands:");
println!(" /join <group_name> - Join a chat group");
println!(" /post <group_name> <message> - Post a message to a group");
println!(" /help - Show this help message");
println!(" /quit - Exit the client");
println!();

let stdin = BufReader::new(stdin());
let mut lines = stdin.lines();

while let Some(line_result) = lines.next().await {
let line = line_result?;
let line = line.trim();

if line.is_empty() {
continue;
}

if line == "/quit" {
println!("Goodbye!");
break;
}

if line == "/help" {
println!("Commands:");
println!(" /join <group_name> - Join a chat group");
println!(" /post <group_name> <message> - Post a message to a group");
println!(" /help - Show this help message");
println!(" /quit - Exit the client");
continue;
}

let command = parse_command(line);
match command {
Ok(from_client) => {
if let Err(e) = utils::send_as_json(&mut to_server, &from_client).await {
eprintln!("Failed to send command: {}", e);
break;
}
}
Err(e) => {
eprintln!("Error: {}", e);
eprintln!("Type /help for available commands.");
}
}
}

Ok(())
}

/// Parses a command line input into a FromClient message.
///
/// # Arguments
/// * `input` - The user input string to parse
///
/// # Returns
/// A Result containing either a FromClient message or an error string
fn parse_command(input: &str) -> Result<FromClient, String> {
let parts: Vec<&str> = input.splitn(3, ' ').collect();

match parts.as_slice() {
["/join", group_name] => {
if group_name.is_empty() {
return Err("Group name cannot be empty".to_string());
}
Ok(FromClient::Join {
group_name: Arc::new(group_name.to_string()),
})
}
["/post", group_name, message] => {
if group_name.is_empty() {
return Err("Group name cannot be empty".to_string());
}
if message.is_empty() {
return Err("Message cannot be empty".to_string());
}
Ok(FromClient::Post {
group_name: Arc::new(group_name.to_string()),
message: Arc::new(message.to_string()),
})
}
["/join"] => Err("Usage: /join <group_name>".to_string()),
["/post"] => Err("Usage: /post <group_name> <message>".to_string()),
["/post", _] => Err("Usage: /post <group_name> <message>".to_string()),
_ => Err(format!(
"Unknown command: '{}'. Type /help for available commands.",
input
)),
}
}

/// Handles responses from the server and prints them to stdout as they arrive.
async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> {
let buffered = BufReader::new(from_server);
Expand All @@ -50,3 +153,64 @@ async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> {

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_join_command() {
let result = parse_command("/join general");
assert!(result.is_ok());
match result.unwrap() {
FromClient::Join { group_name } => {
assert_eq!(*group_name, "general".to_string());
}
_ => panic!("Expected Join command"),
}
}

#[test]
fn test_parse_post_command() {
let result = parse_command("/post general Hello world!");
assert!(result.is_ok());
match result.unwrap() {
FromClient::Post {
group_name,
message,
} => {
assert_eq!(*group_name, "general".to_string());
assert_eq!(*message, "Hello world!".to_string());
}
_ => panic!("Expected Post command"),
}
}

#[test]
fn test_parse_invalid_command() {
let result = parse_command("/invalid");
assert!(result.is_err());
assert!(result.unwrap_err().contains("Unknown command"));
}

#[test]
fn test_parse_join_without_group() {
let result = parse_command("/join");
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Usage: /join <group_name>");
}

#[test]
fn test_parse_post_without_message() {
let result = parse_command("/post general");
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Usage: /post <group_name> <message>");
}

#[test]
fn test_parse_post_without_group() {
let result = parse_command("/post");
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Usage: /post <group_name> <message>");
}
}
58 changes: 53 additions & 5 deletions src/bin/server/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,59 @@ impl Group {

/// Handles the lifecycle of a subscriber: receiving messages and sending them over their connection.
///
/// This is a stub — should be implemented to read from the `receiver` and forward messages to `outbound`.
/// Receives messages from the broadcast channel and forwards them to the client connection.
/// Exits when the client disconnects or an error occurs.
async fn handle_subscriber(
_group_name: Arc<String>,
_receiver: broadcast::Receiver<Arc<String>>,
_outbound: Arc<Outbound>,
group_name: Arc<String>,
mut receiver: broadcast::Receiver<Arc<String>>,
outbound: Arc<Outbound>,
) {
todo!()
use async_chat::FromServer;

loop {
match receiver.recv().await {
Ok(message) => {
let server_message = FromServer::Message {
group_name: group_name.clone(),
message,
};

// Send the message to the client
if let Err(e) = outbound.send(server_message).await {
eprintln!(
"Failed to send message to client in group '{}': {}",
group_name, e
);
break; // Exit the loop if we can't send to the client
}
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
// Client was too slow, some messages were skipped
eprintln!(
"Client in group '{}' lagged behind, skipped {} messages",
group_name, skipped
);

let error_message = FromServer::Error(format!(
"You were lagging behind and missed {} messages",
skipped
));

if let Err(e) = outbound.send(error_message).await {
eprintln!(
"Failed to send lag error to client in group '{}': {}",
group_name, e
);
break;
}
}
Err(broadcast::error::RecvError::Closed) => {
// The broadcast channel was closed (group was deleted)
eprintln!("Broadcast channel for group '{}' was closed", group_name);
break;
}
}
}

eprintln!("Subscriber handler for group '{}' exited", group_name);
}
2 changes: 1 addition & 1 deletion src/bin/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ fn main() -> anyhow::Result<()> {
/// Logs errors from client handler tasks.
fn log_error(result: anyhow::Result<()>) {
if let Err(error) = result {
eprintln!("Error: {}", error);
eprintln!("Error: {:?}", error);
}
}