diff --git a/Cargo.lock b/Cargo.lock index 239e136cc..46b826db1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6846,6 +6846,7 @@ dependencies = [ "tokio 1.49.0", "tokio-rustls", "tokio-tungstenite 0.26.2", + "tokio-util", "typed-builder", ] diff --git a/crates/mcp-proxy/src/lib.rs b/crates/mcp-proxy/src/lib.rs index 9260b3798..9b989d092 100644 --- a/crates/mcp-proxy/src/lib.rs +++ b/crates/mcp-proxy/src/lib.rs @@ -143,6 +143,15 @@ impl Message { pub struct McpProxy { transport: InnerTransport, + /// ID of the last forwarded request that has not yet received a response. + /// + /// Set when a request (message with both `id` and `method`) is successfully sent to the + /// backend. Cleared when a matching response (same `id`, no `method`) is received. + /// Server-initiated notifications leave this unchanged. + /// + /// Used to build a meaningful JSON-RPC error when `ReadError::Fatal` fires: if a request + /// is pending we can correlate the error to it; otherwise we send a notification. + pending_request_id: Option, } /// Error that can occur when sending a message. @@ -150,8 +159,9 @@ pub struct McpProxy { pub enum SendError { /// Fatal error - the proxy must stop as the connection is broken. Fatal { - /// Optional error message to send back when a request ID is detected. - message: Option, + /// Message to send back to the client: a JSON-RPC error response if there was a pending + /// request ID, or a `$/proxy/serverDisconnected` notification otherwise. + message: Message, /// The underlying error for logging/debugging. source: anyhow::Error, }, @@ -168,7 +178,13 @@ pub enum SendError { #[derive(Debug)] pub enum ReadError { /// Fatal error - the proxy must stop as the connection is broken. - Fatal(anyhow::Error), + Fatal { + /// Message to send back to the client: a JSON-RPC error response if there was a pending + /// request ID, or a `$/proxy/serverDisconnected` notification otherwise. + message: Message, + /// The underlying error for logging/debugging. + source: anyhow::Error, + }, /// Transient error - the proxy can continue operating. Transient(anyhow::Error), } @@ -200,7 +216,10 @@ impl McpProxy { } }; - Ok(McpProxy { transport }) + Ok(McpProxy { + transport, + pending_request_id: None, + }) } /// Send a message to the peer. @@ -222,23 +241,31 @@ impl McpProxy { } // Try to parse as request first, then as response. - let request_id = match JsonRpcMessage::parse(message) { - Ok(request) => { - match (request.id, request.method) { + let (request_id, is_request) = match JsonRpcMessage::parse(message) { + Ok(msg) => { + let is_request = match (msg.id, msg.method) { (None, None) => { warn!( - jsonrpc = request.jsonrpc, + jsonrpc = msg.jsonrpc, "Sending a malformed JSON-RPC message (missing both `id` and `method`)" - ) + ); + false } (None, Some(method)) => { - debug!(jsonrpc = request.jsonrpc, method, "Sending a notification") + debug!(jsonrpc = msg.jsonrpc, method, "Sending a notification"); + false + } + (Some(id), None) => { + debug!(jsonrpc = msg.jsonrpc, id, "Sending a response"); + false + } + (Some(id), Some(method)) => { + debug!(jsonrpc = msg.jsonrpc, method, id, "Sending a request"); + true } - (Some(id), None) => debug!(jsonrpc = request.jsonrpc, id, "Sending a response"), - (Some(id), Some(method)) => debug!(jsonrpc = request.jsonrpc, method, id, "Sending a request"), }; - request.id + (msg.id, is_request) } Err(error) => { // Not a JSON-RPC message, try best-effort ID extraction. @@ -250,7 +277,7 @@ impl McpProxy { warn!(error = format!("{error:#}"), "Sending a malformed JSON-RPC message"); } - id + (id, false) } }; @@ -274,8 +301,9 @@ impl McpProxy { error!(error = format!("{error:#}"), "Couldn't forward request"); let message = if let Some(id) = request_id { + let detail = json_escape_str(&format!("{error:#}")); let json_rpc_error_response = format!( - r#"{{"jsonrpc":"2.0","id":{id},"error":{{"code":{FORWARD_FAILURE_CODE},"message":"Forward failure: {error:#}"}}}}"# + r#"{{"jsonrpc":"2.0","id":{id},"error":{{"code":{FORWARD_FAILURE_CODE},"message":"Forward failure: {detail}"}}}}"# ); Some(Message::normalize(json_rpc_error_response)) } else { @@ -296,46 +324,21 @@ impl McpProxy { } }; - return ret; - - fn extract_id_best_effort(request_str: &str) -> Option { - let idx = request_str.find("\"id\"")?; - - let mut rest = request_str[idx + "\"id\"".len()..].chars(); - - loop { - if rest.next()? == ':' { - break; - } - } - - let mut acc = String::new(); - - loop { - match rest.next() { - Some(',') => break, - Some(ch) => acc.push(ch), - None => break, - } - } - - acc.parse().ok() + // Track pending request ID for Process/NamedPipe transports so that if the backend + // breaks while we're waiting for the response, we can synthesise a meaningful error. + if ret.is_ok() && is_request { + self.pending_request_id = request_id; } + return ret; + fn handle_write_result(result: std::io::Result<()>, request_id: Option) -> Result<(), SendError> { match result { Ok(()) => Ok(()), Err(io_error) => { // Classify the error. if is_fatal_io_error(&io_error) { - let message = if let Some(id) = request_id { - let json_rpc_error_response = format!( - r#"{{"jsonrpc":"2.0","id":{id},"error":{{"code":{FORWARD_FAILURE_CODE},"message":"connection broken: {io_error}"}}}}"# - ); - Some(Message::normalize(json_rpc_error_response)) - } else { - None - }; + let message = make_server_disconnected_message(request_id, &io_error.to_string()); Err(SendError::Fatal { message, @@ -343,8 +346,9 @@ impl McpProxy { }) } else { let message = if let Some(id) = request_id { + let detail = json_escape_str(&io_error.to_string()); let json_rpc_error_response = format!( - r#"{{"jsonrpc":"2.0","id":{id},"error":{{"code":{FORWARD_FAILURE_CODE},"message":"Forward failure: {io_error}"}}}}"# + r#"{{"jsonrpc":"2.0","id":{id},"error":{{"code":{FORWARD_FAILURE_CODE},"message":"Forward failure: {detail}"}}}}"# ); Some(Message::normalize(json_rpc_error_response)) } else { @@ -397,13 +401,29 @@ impl McpProxy { match result { Ok(message) => { trace!(%message, "Inbound message"); + + // Clear the pending request ID when the matching response arrives. + // We use best-effort ID extraction to avoid a full JSON parse in the hot path. + // Server-initiated requests with the same ID are rare enough that we accept + // the minor inaccuracy of treating any matching-ID message as the response. + if let Some(pending_id) = self.pending_request_id + && extract_id_best_effort(message.as_raw()) == Some(pending_id) + { + self.pending_request_id = None; + } + Ok(message) } Err(io_error) => { - if is_fatal_io_error(&io_error) { - Err(ReadError::Fatal(anyhow::Error::new(io_error))) + let is_fatal = is_fatal_io_error(&io_error); + let message = make_server_disconnected_message(self.pending_request_id, &io_error.to_string()); + self.pending_request_id = None; + let source = anyhow::Error::new(io_error); + + if is_fatal { + Err(ReadError::Fatal { message, source }) } else { - Err(ReadError::Transient(anyhow::Error::new(io_error))) + Err(ReadError::Transient(source)) } } } @@ -684,6 +704,46 @@ fn extract_sse_json_line(body: &str) -> Option<&str> { body.lines().find_map(|l| l.strip_prefix("data: ").map(|s| s.trim())) } +/// Escape a string for safe embedding inside a JSON string value. +fn json_escape_str(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for ch in s.chars() { + match ch { + '"' => out.push_str("\\\""), + '\\' => out.push_str("\\\\"), + '\n' => out.push_str("\\n"), + '\r' => out.push_str("\\r"), + '\t' => out.push_str("\\t"), + c if (c as u32) < 0x20 => { + use std::fmt::Write as _; + write!(out, "\\u{:04x}", c as u32).expect("write to String is infallible"); + } + c => out.push(c), + } + } + out +} + +/// Build the message to send to the MCP client when the backend connection breaks fatally. +/// +/// - If there is a `pending_request_id`, returns a JSON-RPC error response correlating the +/// failure to that outstanding request. +/// - Otherwise, returns a `$/proxy/serverDisconnected` notification so the client knows the +/// server is no longer reachable without having an in-flight request to correlate to. +fn make_server_disconnected_message(pending_request_id: Option, error_detail: &str) -> Message { + let detail = json_escape_str(error_detail); + let raw = if let Some(id) = pending_request_id { + format!( + r#"{{"jsonrpc":"2.0","id":{id},"error":{{"code":{FORWARD_FAILURE_CODE},"message":"server disconnected: {detail}"}}}}"# + ) + } else { + format!( + r#"{{"jsonrpc":"2.0","method":"$/proxy/serverDisconnected","params":{{"message":"server disconnected: {detail}"}}}}"# + ) + }; + Message::normalize(raw) +} + /// Check if an I/O error is fatal (connection broken) /// /// For process stdio and named pipe transports, these errors indicate the connection is permanently broken: @@ -699,3 +759,27 @@ fn is_fatal_io_error(error: &std::io::Error) -> bool { std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::UnexpectedEof ) } + +fn extract_id_best_effort(message: &str) -> Option { + let idx = message.find("\"id\"")?; + + let mut rest = message[idx + "\"id\"".len()..].chars(); + + loop { + if rest.next()? == ':' { + break; + } + } + + let mut acc = String::new(); + + loop { + match rest.next() { + Some(',') | Some('}') => break, + Some(ch) => acc.push(ch), + None => break, + } + } + + acc.trim().parse().ok() +} diff --git a/jetsocat/src/main.rs b/jetsocat/src/main.rs index 02eafeb13..6f0c25bb7 100644 --- a/jetsocat/src/main.rs +++ b/jetsocat/src/main.rs @@ -445,7 +445,7 @@ fn parse_env_variable_as_args(env_var_str: &str) -> Vec { fn apply_common_flags(cmd: Command) -> Command { cmd.flag(Flag::new("log-file", FlagType::String).description("Specify filepath for log file")) - .flag(Flag::new("log-term", FlagType::Bool).description("Print logs to stdout instead of log file")) + .flag(Flag::new("log-term", FlagType::Bool).description("Print logs to stderr instead of log file")) .flag( Flag::new("color", FlagType::String) .description("When to enable colored output for logs (possible values: `always`, `never` and `auto`)"), @@ -996,7 +996,7 @@ fn setup_logger(logging: &Logging, coloring: Coloring) -> LoggerGuard { Coloring::Auto => true, }; - let (non_blocking_stdio, guard) = tracing_appender::non_blocking(std::io::stdout()); // FIXME: Should be to stderr. + let (non_blocking_stdio, guard) = tracing_appender::non_blocking(std::io::stderr()); let stdio_layer = fmt::layer().with_writer(non_blocking_stdio).with_ansi(ansi); (stdio_layer, guard) diff --git a/jetsocat/src/mcp.rs b/jetsocat/src/mcp.rs index fe4716952..ce3b86f6b 100644 --- a/jetsocat/src/mcp.rs +++ b/jetsocat/src/mcp.rs @@ -49,11 +49,7 @@ pub(crate) async fn run_mcp_proxy(pipe: Pipe, mut mcp_proxy: mcp_proxy::McpProxy } Err(mcp_proxy::SendError::Fatal { message, source }) => { error!(error = format!("{source:#}"), "Fatal error sending message, stopping proxy"); - - if let Some(msg) = message { - let _ = write_flush_message(&mut writer, msg).await; - } - + let _ = write_flush_message(&mut writer, message).await; return Ok(()); } } @@ -68,8 +64,9 @@ pub(crate) async fn run_mcp_proxy(pipe: Pipe, mut mcp_proxy: mcp_proxy::McpProxy Err(mcp_proxy::ReadError::Transient(source)) => { warn!(error = format!("{source:#}"), "Transient error reading from peer"); } - Err(mcp_proxy::ReadError::Fatal(source)) => { + Err(mcp_proxy::ReadError::Fatal { message, source }) => { error!(error = format!("{source:#}"), "Fatal error reading from peer, stopping proxy"); + let _ = write_flush_message(&mut writer, message).await; return Ok(()); } } diff --git a/testsuite/Cargo.toml b/testsuite/Cargo.toml index 811a1c49a..8bd8a07ad 100644 --- a/testsuite/Cargo.toml +++ b/testsuite/Cargo.toml @@ -26,6 +26,7 @@ serde_json = "1" serde = { version = "1", features = ["derive"] } tempfile = "3" tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "net", "process"] } +tokio-util = "0.7" typed-builder = "0.21" tokio-tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] } diff --git a/testsuite/src/mcp_server.rs b/testsuite/src/mcp_server.rs index 2575d7596..464c3ac36 100644 --- a/testsuite/src/mcp_server.rs +++ b/testsuite/src/mcp_server.rs @@ -11,6 +11,8 @@ const ERROR_CODE_INVALID_REQUEST: i32 = -32600; const ERROR_CODE_METHOD_NOT_FOUND: i32 = -32601; const ERROR_CODE_INVALID_PARAMS: i32 = -32602; +pub use tokio_util::sync::CancellationToken; + #[dynosaur::dynosaur(pub DynMcpTransport = dyn(box) McpTransport)] #[allow(unreachable_pub)] // false positive. pub trait McpTransport: Send + Sync { @@ -47,31 +49,6 @@ where } } -#[derive(Clone)] -pub struct McpShutdownSignal(Arc); - -impl Default for McpShutdownSignal { - fn default() -> Self { - Self::new() - } -} - -impl McpShutdownSignal { - pub fn new() -> Self { - Self(Arc::new(tokio::sync::Notify::new())) - } - - pub fn shutdown(&self) { - self.0.notify_one(); - } -} - -impl Drop for McpShutdownSignal { - fn drop(&mut self) { - self.shutdown(); - } -} - /// A MCP server for testing purposes that implements /// the Model Context Protocol 2025-06-18 specification. pub struct McpServer { @@ -181,24 +158,24 @@ impl McpServer { self } - /// Start the server and return a handle for control - pub fn start(self) -> anyhow::Result { - let shutdown_signal = McpShutdownSignal::new(); + /// Start the server and return a handle for control. + pub fn start(self) -> anyhow::Result { + let token = CancellationToken::new(); tokio::spawn({ - let shutdown_signal = shutdown_signal.clone(); + let token = token.clone(); async move { eprintln!("[MCP-SERVER] spawn task after."); - if let Err(e) = self.run(shutdown_signal).await { + if let Err(e) = self.run(token).await { eprintln!("[MCP-SERVER] Error running the MCP server: {e:#}"); } } }); - Ok(shutdown_signal) + Ok(token) } - pub async fn run(mut self, shutdown_signal: McpShutdownSignal) -> anyhow::Result<()> { + pub async fn run(mut self, token: CancellationToken) -> anyhow::Result<()> { eprintln!("[MCP-SERVER] Running."); loop { @@ -208,16 +185,16 @@ impl McpServer { let peer = peer.context("accept peer")?; tokio::spawn({ - let shutdown_signal = shutdown_signal.clone(); + let token = token.clone(); let config = Arc::clone(&self.config); async move { - if let Err(e) = handle_peer(peer, shutdown_signal, &config).await { + if let Err(e) = handle_peer(peer, token, &config).await { eprintln!("[MCP-SERVER] Error handling connection: {e:#}"); } } }); } - _ = shutdown_signal.0.notified() => { + () = token.cancelled() => { return Ok(()); } } @@ -227,7 +204,7 @@ impl McpServer { async fn handle_peer( mut peer: Box>, - shutdown_signal: McpShutdownSignal, + token: CancellationToken, config: &ServerConfig, ) -> anyhow::Result<()> { loop { @@ -242,7 +219,7 @@ async fn handle_peer( peer.no_response().await.context("notify no response")?; } } - _ = shutdown_signal.0.notified() => { + () = token.cancelled() => { return Ok(()); } } diff --git a/testsuite/tests/cli/jetsocat.rs b/testsuite/tests/cli/jetsocat.rs index ce2f5a1de..61d81a45e 100644 --- a/testsuite/tests/cli/jetsocat.rs +++ b/testsuite/tests/cli/jetsocat.rs @@ -77,12 +77,15 @@ fn log_term_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas .assert() .success(); - let stdout = std::str::from_utf8(&output.get_output().stdout).unwrap(); + let stderr = std::str::from_utf8(&output.get_output().stderr).unwrap(); if expect_ansi { - assert!(stdout.contains("  INFO jetsocat"), "{stdout}"); + assert!( + stderr.contains(" \x1B[32m INFO\x1B[0m \x1B[2mjetsocat\x1B[0m"), + "{stderr}" + ); } else { - assert!(stdout.contains(" INFO jetsocat"), "{stdout}"); + assert!(stderr.contains(" INFO jetsocat"), "{stderr}"); } } @@ -113,7 +116,7 @@ fn log_file_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas let logs = std::fs::read_to_string(log_file_path).unwrap(); if expect_ansi { - assert!(logs.contains("  INFO jetsocat"), "{logs}"); + assert!(logs.contains(" \x1B[32m INFO\x1B[0m \x1B[2mjetsocat\x1B[0m"), "{logs}"); } else { assert!(logs.contains(" INFO jetsocat"), "{logs}"); } @@ -806,14 +809,9 @@ fn jetsocat_log_environment_variable() { .timeout(ASSERT_CMD_TIMEOUT) .assert(); - let stdout = std::str::from_utf8(&output.get_output().stdout).unwrap(); - assert!(stdout.contains("DEBUG")); - assert!(stdout.contains("hello")); - let stderr = std::str::from_utf8(&output.get_output().stderr).unwrap(); - assert!(!stderr.contains("bad")); - assert!(!stderr.contains("invalid")); - assert!(!stderr.contains("unknown")); + assert!(stderr.contains("DEBUG"), "{stderr}"); + assert!(stderr.contains("hello"), "{stderr}"); let file_contents = std::fs::read_to_string(outfile).unwrap(); assert_eq!(file_contents.trim(), "hello"); @@ -1185,7 +1183,14 @@ async fn mcp_proxy_notification(#[values(true, false)] http_transport: bool) { assert!(probe.load(std::sync::atomic::Ordering::SeqCst)); } -async fn execute_mcp_request(request: &str) -> String { +struct McpRequestOutput { + /// MCP protocol output (JSON-RPC responses written to stdout). + stdout: String, + /// Log output (written to stderr via --log-term). + stderr: String, +} + +async fn execute_mcp_request(request: &str) -> McpRequestOutput { use testsuite::mcp_server::{DynMcpTransport, HttpTransport, McpServer}; use tokio::io::AsyncWriteExt as _; @@ -1200,11 +1205,12 @@ async fn execute_mcp_request(request: &str) -> String { .args(["mcp-proxy", "stdio", &server_url, "--log-term", "--color=never"]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) .kill_on_drop(true) .spawn() .expect("start jetsocat mcp-proxy"); - // Get stdin/stdout handles for MCP client. + // Get stdin handle for MCP client. let mut stdin = jetsocat_process.stdin.take().expect("get stdin"); // Write the request. @@ -1213,29 +1219,47 @@ async fn execute_mcp_request(request: &str) -> String { tokio::time::sleep(MCP_REQUEST_SETTLE_DURATION).await; // Shutdown the MCP server. - server_handle.shutdown(); + server_handle.cancel(); // Terminate the Jetsocat process. jetsocat_process.start_kill().unwrap(); let output = jetsocat_process.wait_with_output().await.unwrap(); - String::from_utf8(output.stdout).unwrap() + + McpRequestOutput { + stdout: String::from_utf8(output.stdout).unwrap(), + stderr: String::from_utf8(output.stderr).unwrap(), + } } #[tokio::test] async fn mcp_proxy_malformed_request_with_id() { - let stdout = execute_mcp_request("{\"jsonrpc\":\"2.0\",\"decoy\":\":\",\"id\":1\n").await; - assert!(stdout.contains("malformed JSON-RPC message"), "{stdout}"); - assert!(stdout.contains("Unexpected EOF"), "{stdout}"); - assert!(stdout.contains("id=1"), "{stdout}"); + let output = execute_mcp_request("{\"jsonrpc\":\"2.0\",\"decoy\":\":\",\"id\":1\n").await; + // The warning log (with parse error and request id) goes to stderr. + assert!( + output.stderr.contains("malformed JSON-RPC message"), + "{}", + output.stderr + ); + assert!(output.stderr.contains("Unexpected EOF"), "{}", output.stderr); + assert!(output.stderr.contains("id=1"), "{}", output.stderr); + // The backend's JSON-RPC error response is forwarded to stdout. + assert!(output.stdout.contains(r#""error""#), "{}", output.stdout); } #[tokio::test] async fn mcp_proxy_malformed_request_no_id() { - let stdout = execute_mcp_request("{\"jsonrpc\":\"2.0\",}\n").await; - assert!(stdout.contains("malformed JSON-RPC message"), "{stdout}"); - assert!(stdout.contains("Invalid character"), "{stdout}"); - assert!(!stdout.contains("id=1"), "{stdout}"); + let output = execute_mcp_request("{\"jsonrpc\":\"2.0\",}\n").await; + // The warning log (with parse error, no request id) goes to stderr. + assert!( + output.stderr.contains("malformed JSON-RPC message"), + "{}", + output.stderr + ); + assert!(output.stderr.contains("Invalid character"), "{}", output.stderr); + assert!(!output.stderr.contains("id=1"), "{}", output.stderr); + // No id → treated as notification; no JSON-RPC response on stdout. + assert!(output.stdout.is_empty(), "{}", output.stdout); } #[tokio::test] @@ -1282,7 +1306,7 @@ async fn mcp_proxy_http_error() { async fn mcp_proxy_terminated_on_broken_pipe() { use testsuite::mcp_client::McpClient; use testsuite::mcp_server::{DynMcpTransport, McpServer, NamedPipeTransport}; - // use tokio::io::AsyncReadExt as _; // TODO + use tokio::io::AsyncReadExt as _; // Configure MCP server transport (named pipe only). let np_transport = NamedPipeTransport::bind().unwrap(); @@ -1297,18 +1321,18 @@ async fn mcp_proxy_terminated_on_broken_pipe() { // Start jetsocat mcp-proxy with stdio pipe. let mut jetsocat_process = jetsocat_tokio_cmd() - .args(["mcp-proxy", "stdio", &pipe]) // TODO: add "--log-term" + .args(["mcp-proxy", "stdio", &pipe, "--log-term", "--color=never"]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) - // .stderr(std::process::Stdio::piped()) // TODO: Once Jetsocat logs to stderr. + .stderr(std::process::Stdio::piped()) .kill_on_drop(true) .spawn() .expect("start jetsocat mcp-proxy"); - // Get stdin/stdout handles for MCP client. + // Get stdin/stdout/stderr handles. let stdin = jetsocat_process.stdin.take().expect("get stdin"); let stdout = jetsocat_process.stdout.take().expect("get stdout"); - // let mut stderr = jetsocat_process.stderr.take().expect("get stderr"); // TODO + let mut stderr = jetsocat_process.stderr.take().expect("get stderr"); // Initialize MCP client with jetsocat's stdin/stdout. let mut mcp_client = McpClient::new(Box::pin(stdout), Box::pin(stdin)); @@ -1317,28 +1341,26 @@ async fn mcp_proxy_terminated_on_broken_pipe() { mcp_client.connect().await.expect("connect to MCP server"); // Stop the MCP server. - server_handle.shutdown(); - - // Wait for the named pipe instance to be torn down on Windows. - wait_for_windows_named_pipe_server().await; - - // Try to send a request - this should fail with a broken pipe error. - // The proxy will detect this and send an error response, then close. + server_handle.cancel(); + + // Try to send a request after the backend has been shut down. + // The proxy always sends back a protocol-level message before exiting: + // - If the request was forwarded first: jetsocat tries to write to the now-broken backend + // → SendError::Fatal → JSON-RPC error (-32099) → client sees "JSON-RPC error". + // - If the backend EOF was detected first (no pending request): ReadError::Fatal with no + // pending ID → jetsocat sends a $/proxy/serverDisconnected notification → client reads + // the notification as the response to list_tools, finds no "result" field, and reports + // "missing result in response". This is a test-infrastructure limitation: our naïve + // McpClient::send_request reads exactly one line without distinguishing notifications + // from responses. A proper fix (notification-aware read loop) is tracked in DGW-315. + // In both cases the client receives a JSON-RPC frame, not a raw broken-pipe/EOF error. let result = mcp_client.list_tools().await; - - // Since Jetsocat is continuously reading on the pipe, it quickly detects the pipe is broken and stops itself with an error. - // Our MCP client in turns try to write from stdout / read to stdin, and this fails with a BrokenPipe on our side. let error = result.unwrap_err(); let error_debug_fmt = format!("{error:?}"); - #[cfg(windows)] - assert!(error_debug_fmt.contains("The pipe is being closed")); - #[cfg(not(windows))] - assert!(error_debug_fmt.contains("Broken pipe (os error 32)")); - - // TODO: Once Jetsocat print the logs to stderr. - // let mut stderr_str = String::new(); - // stderr.read_to_string(&mut stderr_str).await.expect("read_to_string"); - // stderr_str.contains(r#"Fatal error reading from peer, stopping proxy error="connection closed""#); + assert!( + error_debug_fmt.contains("JSON-RPC error") || error_debug_fmt.contains("missing result"), + "Expected a protocol-level error, got transport-level error: {error_debug_fmt}" + ); // The jetsocat process should exit gracefully after detecting broken pipe. let exit_status = tokio::time::timeout(Duration::from_secs(2), jetsocat_process.wait()).await; @@ -1347,6 +1369,15 @@ async fn mcp_proxy_terminated_on_broken_pipe() { // Verify it exited with success (graceful shutdown, not a crash). let status = exit_status.unwrap().unwrap(); assert!(status.success(), "Proxy should exit successfully, not crash"); + + // Verify the proxy logged a fatal error when the backend connection broke. + let mut stderr_str = String::new(); + stderr.read_to_string(&mut stderr_str).await.expect("read_to_string"); + assert!( + stderr_str.contains("Fatal error reading from peer, stopping proxy") + || stderr_str.contains("Fatal error sending message, stopping proxy"), + "jetsocat should log a fatal proxy error: {stderr_str}" + ); } /// SOCKS5 client → SOCKS5 listener → JMUX tunnel → TCP echo server.