Skip to content

Commit c269172

Browse files
committed
feat: implement client send_commands and server handle_subscriber functions
- Replace todo!() in client.rs send_commands with full command parsing implementation - Add support for /join, /post, /help, and /quit commands - Implement handle_subscriber function in server group.rs for message broadcasting - Add comprehensive tests for command parsing functionality - Fix async-std stdin usage with proper BufReader and BufReadExt imports Resolves the 'not yet implemented' crash when running the client Both client and server now fully functional for group chat operations
1 parent 7079d7c commit c269172

2 files changed

Lines changed: 201 additions & 11 deletions

File tree

src/bin/client.rs

Lines changed: 158 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#![allow(dead_code, unused_variables, unused_mut)] // Suppresses warnings
22

3-
use async_chat::{FromServer, utils};
4-
use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task};
3+
use async_chat::{FromClient, FromServer, utils};
4+
use async_std::{io::{BufReader, BufReadExt, stdin}, net, prelude::FutureExt, stream::StreamExt, task};
5+
use std::sync::Arc;
56

67
/// Client binary for connecting to the async chat server.
78
///
@@ -23,11 +24,104 @@ fn main() -> anyhow::Result<()> {
2324
})
2425
}
2526

26-
/// Reads user input (planned via `clap`) and sends commands to the server.
27-
async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> {
28-
// TODO: Implement use clap to parse command line arguments and print help message
29-
todo!()
27+
/// Reads user input and sends commands to the server.
28+
///
29+
/// Commands:
30+
/// - `/join <group_name>` - Join a chat group
31+
/// - `/post <group_name> <message>` - Post a message to a group
32+
/// - `/help` - Show help message
33+
/// - `/quit` - Exit the client
34+
async fn send_commands(to_server: net::TcpStream) -> anyhow::Result<()> {
35+
let mut to_server = to_server;
36+
println!("Welcome to Async Chat!");
37+
println!("Commands:");
38+
println!(" /join <group_name> - Join a chat group");
39+
println!(" /post <group_name> <message> - Post a message to a group");
40+
println!(" /help - Show this help message");
41+
println!(" /quit - Exit the client");
42+
println!();
43+
44+
let stdin = BufReader::new(stdin());
45+
let mut lines = stdin.lines();
46+
47+
while let Some(line_result) = lines.next().await {
48+
let line = line_result?;
49+
let line = line.trim();
50+
51+
if line.is_empty() {
52+
continue;
53+
}
54+
55+
if line == "/quit" {
56+
println!("Goodbye!");
57+
break;
58+
}
59+
60+
if line == "/help" {
61+
println!("Commands:");
62+
println!(" /join <group_name> - Join a chat group");
63+
println!(" /post <group_name> <message> - Post a message to a group");
64+
println!(" /help - Show this help message");
65+
println!(" /quit - Exit the client");
66+
continue;
67+
}
68+
69+
let command = parse_command(line);
70+
match command {
71+
Ok(from_client) => {
72+
if let Err(e) = utils::send_as_json(&mut to_server, &from_client).await {
73+
eprintln!("Failed to send command: {}", e);
74+
break;
75+
}
76+
}
77+
Err(e) => {
78+
eprintln!("Error: {}", e);
79+
eprintln!("Type /help for available commands.");
80+
}
81+
}
82+
}
83+
84+
Ok(())
85+
}
86+
87+
/// Parses a command line input into a FromClient message.
88+
///
89+
/// # Arguments
90+
/// * `input` - The user input string to parse
91+
///
92+
/// # Returns
93+
/// A Result containing either a FromClient message or an error string
94+
fn parse_command(input: &str) -> Result<FromClient, String> {
95+
let parts: Vec<&str> = input.splitn(3, ' ').collect();
96+
97+
match parts.as_slice() {
98+
["/join", group_name] => {
99+
if group_name.is_empty() {
100+
return Err("Group name cannot be empty".to_string());
101+
}
102+
Ok(FromClient::Join {
103+
group_name: Arc::new(group_name.to_string()),
104+
})
105+
}
106+
["/post", group_name, message] => {
107+
if group_name.is_empty() {
108+
return Err("Group name cannot be empty".to_string());
109+
}
110+
if message.is_empty() {
111+
return Err("Message cannot be empty".to_string());
112+
}
113+
Ok(FromClient::Post {
114+
group_name: Arc::new(group_name.to_string()),
115+
message: Arc::new(message.to_string()),
116+
})
117+
}
118+
["/join"] => Err("Usage: /join <group_name>".to_string()),
119+
["/post"] => Err("Usage: /post <group_name> <message>".to_string()),
120+
["/post", _] => Err("Usage: /post <group_name> <message>".to_string()),
121+
_ => Err(format!("Unknown command: '{}'. Type /help for available commands.", input)),
122+
}
30123
}
124+
31125
/// Handles responses from the server and prints them to stdout as they arrive.
32126
async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> {
33127
let buffered = BufReader::new(from_server);
@@ -50,3 +144,61 @@ async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> {
50144

51145
Ok(())
52146
}
147+
148+
#[cfg(test)]
149+
mod tests {
150+
use super::*;
151+
152+
#[test]
153+
fn test_parse_join_command() {
154+
let result = parse_command("/join general");
155+
assert!(result.is_ok());
156+
match result.unwrap() {
157+
FromClient::Join { group_name } => {
158+
assert_eq!(*group_name, "general".to_string());
159+
}
160+
_ => panic!("Expected Join command"),
161+
}
162+
}
163+
164+
#[test]
165+
fn test_parse_post_command() {
166+
let result = parse_command("/post general Hello world!");
167+
assert!(result.is_ok());
168+
match result.unwrap() {
169+
FromClient::Post { group_name, message } => {
170+
assert_eq!(*group_name, "general".to_string());
171+
assert_eq!(*message, "Hello world!".to_string());
172+
}
173+
_ => panic!("Expected Post command"),
174+
}
175+
}
176+
177+
#[test]
178+
fn test_parse_invalid_command() {
179+
let result = parse_command("/invalid");
180+
assert!(result.is_err());
181+
assert!(result.unwrap_err().contains("Unknown command"));
182+
}
183+
184+
#[test]
185+
fn test_parse_join_without_group() {
186+
let result = parse_command("/join");
187+
assert!(result.is_err());
188+
assert_eq!(result.unwrap_err(), "Usage: /join <group_name>");
189+
}
190+
191+
#[test]
192+
fn test_parse_post_without_message() {
193+
let result = parse_command("/post general");
194+
assert!(result.is_err());
195+
assert_eq!(result.unwrap_err(), "Usage: /post <group_name> <message>");
196+
}
197+
198+
#[test]
199+
fn test_parse_post_without_group() {
200+
let result = parse_command("/post");
201+
assert!(result.is_err());
202+
assert_eq!(result.unwrap_err(), "Usage: /post <group_name> <message>");
203+
}
204+
}

src/bin/server/group.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,49 @@ impl Group {
4646

4747
/// Handles the lifecycle of a subscriber: receiving messages and sending them over their connection.
4848
///
49-
/// This is a stub — should be implemented to read from the `receiver` and forward messages to `outbound`.
49+
/// Receives messages from the broadcast channel and forwards them to the client connection.
50+
/// Exits when the client disconnects or an error occurs.
5051
async fn handle_subscriber(
51-
_group_name: Arc<String>,
52-
_receiver: broadcast::Receiver<Arc<String>>,
53-
_outbound: Arc<Outbound>,
52+
group_name: Arc<String>,
53+
mut receiver: broadcast::Receiver<Arc<String>>,
54+
outbound: Arc<Outbound>,
5455
) {
55-
todo!()
56+
use async_chat::FromServer;
57+
58+
loop {
59+
match receiver.recv().await {
60+
Ok(message) => {
61+
let server_message = FromServer::Message {
62+
group_name: group_name.clone(),
63+
message,
64+
};
65+
66+
// Send the message to the client
67+
if let Err(e) = outbound.send(server_message).await {
68+
eprintln!("Failed to send message to client in group '{}': {}", group_name, e);
69+
break; // Exit the loop if we can't send to the client
70+
}
71+
}
72+
Err(broadcast::error::RecvError::Lagged(skipped)) => {
73+
// Client was too slow, some messages were skipped
74+
eprintln!("Client in group '{}' lagged behind, skipped {} messages", group_name, skipped);
75+
76+
let error_message = FromServer::Error(
77+
format!("You were lagging behind and missed {} messages", skipped)
78+
);
79+
80+
if let Err(e) = outbound.send(error_message).await {
81+
eprintln!("Failed to send lag error to client in group '{}': {}", group_name, e);
82+
break;
83+
}
84+
}
85+
Err(broadcast::error::RecvError::Closed) => {
86+
// The broadcast channel was closed (group was deleted)
87+
eprintln!("Broadcast channel for group '{}' was closed", group_name);
88+
break;
89+
}
90+
}
91+
}
92+
93+
eprintln!("Subscriber handler for group '{}' exited", group_name);
5694
}

0 commit comments

Comments
 (0)