diff --git a/testsuite/tests/cli/jetsocat.rs b/testsuite/tests/cli/jetsocat.rs index 247d54912..c34343a95 100644 --- a/testsuite/tests/cli/jetsocat.rs +++ b/testsuite/tests/cli/jetsocat.rs @@ -1,4 +1,3 @@ -use std::io::Read; use std::sync::Arc; use std::time::Duration; @@ -6,20 +5,29 @@ use expect_test::expect; use rstest::rstest; use test_utils::find_unused_ports; use testsuite::cli::{ - assert_stderr_eq, jetsocat_assert_cmd, jetsocat_cmd, jetsocat_tokio_cmd, wait_for_port_bound, wait_for_tcp_port, + assert_stderr_eq, jetsocat_assert_cmd, jetsocat_tokio_cmd, wait_for_port_bound, wait_for_tcp_port, }; -// NOTE: Windows needs more time for listeners to be ready due to slower process startup. +#[cfg(windows)] +const WINDOWS_NAMED_PIPE_WAIT_DURATION: Duration = Duration::from_millis(600); #[cfg(not(windows))] -const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(300); +const ASSERT_CMD_TIMEOUT: Duration = Duration::from_millis(600); #[cfg(windows)] -const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(600); +const ASSERT_CMD_TIMEOUT: Duration = Duration::from_millis(1300); #[cfg(not(windows))] -const COMMAND_TIMEOUT: Duration = Duration::from_millis(600); +const MCP_REQUEST_SETTLE_DURATION: Duration = Duration::from_millis(600); +#[cfg(windows)] +const MCP_REQUEST_SETTLE_DURATION: Duration = Duration::from_millis(1300); + #[cfg(windows)] -const COMMAND_TIMEOUT: Duration = Duration::from_millis(1300); +async fn wait_for_windows_named_pipe_server() { + tokio::time::sleep(WINDOWS_NAMED_PIPE_WAIT_DURATION).await; +} + +#[cfg(not(windows))] +async fn wait_for_windows_named_pipe_server() {} #[test] fn no_args_shows_help() { @@ -62,7 +70,7 @@ fn all_subcommands() { #[case::env_force_color_1(&[], &[("FORCE_COLOR", "1"), ("TERM", "dumb")], true)] fn log_term_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[case] expect_ansi: bool) { let output = jetsocat_assert_cmd() - .timeout(COMMAND_TIMEOUT) + .timeout(ASSERT_CMD_TIMEOUT) .args(["forward", "-", "-", "--log-term"]) .args(args) .envs(envs.iter().copied()) @@ -95,7 +103,7 @@ fn log_file_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas let log_file_path = tempdir.path().join("jetsocat.log"); jetsocat_assert_cmd() - .timeout(COMMAND_TIMEOUT) + .timeout(ASSERT_CMD_TIMEOUT) .args(["forward", "-", "-", "--log-file", log_file_path.to_str().unwrap()]) .args(args) .envs(envs.iter().copied()) @@ -111,32 +119,32 @@ fn log_file_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas } } -#[test] -fn forward_hello_world() { +#[tokio::test] +async fn forward_hello_world() { // Find an available port. let port = find_unused_ports(1)[0]; // Start jetsocat listener in background using JETSOCAT_ARGS. - let mut listener = jetsocat_cmd() + let mut listener = jetsocat_tokio_cmd() .env( "JETSOCAT_ARGS", format!("forward tcp-listen://127.0.0.1:{port} 'cmd://echo hello world' --no-proxy"), ) + .kill_on_drop(true) .spawn() .expect("failed to start jetsocat listener"); - // Give the listener time to start. - std::thread::sleep(LISTENER_WAIT_DURATION); + wait_for_port_bound(port).await.expect("listener ready"); // Connect to the listener and read the output using assert_cmd. let client_output = jetsocat_assert_cmd() .env("JETSOCAT_ARGS", format!("forward - tcp://127.0.0.1:{port}")) - .timeout(COMMAND_TIMEOUT) + .timeout(ASSERT_CMD_TIMEOUT) .assert(); // Kill the listener. - let _ = listener.kill(); - let _ = listener.wait(); + let _ = listener.start_kill(); + let _ = listener.wait().await; // Check that we got the expected output. #[cfg(windows)] @@ -145,8 +153,8 @@ fn forward_hello_world() { client_output.success().stdout("hello world\n"); } -#[test] -fn jmux_proxy_read_hello_world() { +#[tokio::test] +async fn jmux_proxy_read_hello_world() { // Find 3 available ports at once to avoid conflicts. let ports = find_unused_ports(3); let echo_server_port = ports[0]; @@ -154,43 +162,45 @@ fn jmux_proxy_read_hello_world() { let proxy_listen_port = ports[2]; // Start echo server first. - let mut echo_server = jetsocat_cmd() + let mut echo_server = jetsocat_tokio_cmd() .env( "JETSOCAT_ARGS", format!("forward tcp-listen://127.0.0.1:{echo_server_port} 'cmd://echo hello world' --no-proxy"), ) + .kill_on_drop(true) .spawn() .expect("failed to start echo server"); - // Give the echo server time to start. - std::thread::sleep(LISTENER_WAIT_DURATION); + wait_for_port_bound(echo_server_port).await.expect("echo server ready"); // Start JMUX server that will accept JMUX connections. - let mut jmux_server = jetsocat_cmd() + let mut jmux_server = jetsocat_tokio_cmd() .env( "JETSOCAT_ARGS", format!("jmux-proxy tcp-listen://127.0.0.1:{jmux_server_port} --allow-all --no-proxy"), ) + .kill_on_drop(true) .spawn() .expect("failed to start JMUX server"); - // Give the JMUX server time to start. - std::thread::sleep(LISTENER_WAIT_DURATION); + wait_for_port_bound(jmux_server_port).await.expect("JMUX server ready"); // Start JMUX client proxy that connects to the JMUX server and provides a local TCP listener. // This creates a tunnel: client -> proxy_listen_port -> jmux_server_port -> echo_server_port - let mut jmux_client = jetsocat_cmd() + let mut jmux_client = jetsocat_tokio_cmd() .env( "JETSOCAT_ARGS", format!( "jmux-proxy tcp://127.0.0.1:{jmux_server_port} tcp-listen://127.0.0.1:{proxy_listen_port}/127.0.0.1:{echo_server_port} --no-proxy", ), ) + .kill_on_drop(true) .spawn() .expect("failed to start JMUX client"); - // Give the JMUX client time to establish connection and set up listener. - std::thread::sleep(LISTENER_WAIT_DURATION); + wait_for_port_bound(proxy_listen_port) + .await + .expect("JMUX client proxy ready"); // Connect to the JMUX client's local listener. let client_output = jetsocat_assert_cmd() @@ -198,16 +208,16 @@ fn jmux_proxy_read_hello_world() { "JETSOCAT_ARGS", format!("forward - tcp://127.0.0.1:{proxy_listen_port}"), ) - .timeout(COMMAND_TIMEOUT) + .timeout(ASSERT_CMD_TIMEOUT) .assert(); // Kill all processes. - let _ = jmux_client.kill(); - let _ = jmux_server.kill(); - let _ = echo_server.kill(); - let _ = jmux_client.wait(); - let _ = jmux_server.wait(); - let _ = echo_server.wait(); + let _ = jmux_client.start_kill(); + let _ = jmux_server.start_kill(); + let _ = echo_server.start_kill(); + let _ = jmux_client.wait().await; + let _ = jmux_server.wait().await; + let _ = echo_server.wait().await; // Check that we got the expected output through the JMUX proxy. #[cfg(windows)] @@ -216,8 +226,10 @@ fn jmux_proxy_read_hello_world() { client_output.success().stdout("hello world\n"); } -#[test] -fn jmux_proxy_write_hello_world() { +#[tokio::test] +async fn jmux_proxy_write_hello_world() { + use tokio::io::AsyncReadExt as _; + // Find 3 available ports at once to avoid conflicts. let ports = find_unused_ports(3); let read_server_port = ports[0]; @@ -225,68 +237,73 @@ fn jmux_proxy_write_hello_world() { let proxy_listen_port = ports[2]; // Start read server first. - let mut read_server = jetsocat_cmd() + let mut read_server = jetsocat_tokio_cmd() .env( "JETSOCAT_ARGS", format!("forward tcp-listen://127.0.0.1:{read_server_port} stdio --no-proxy"), ) + .stdin(std::process::Stdio::null()) .stdout(std::process::Stdio::piped()) + .kill_on_drop(true) .spawn() .expect("failed to start read server"); - // Give the read server time to start. - std::thread::sleep(LISTENER_WAIT_DURATION); + wait_for_port_bound(read_server_port).await.expect("read server ready"); // Start JMUX server that will accept JMUX connections. - let mut jmux_server = jetsocat_cmd() + let mut jmux_server = jetsocat_tokio_cmd() .env( "JETSOCAT_ARGS", format!("jmux-proxy tcp-listen://127.0.0.1:{jmux_server_port} --allow-all --no-proxy"), ) + .kill_on_drop(true) .spawn() .expect("failed to start JMUX server"); - // Give the JMUX server time to start. - std::thread::sleep(LISTENER_WAIT_DURATION); + wait_for_port_bound(jmux_server_port).await.expect("JMUX server ready"); // Start JMUX client proxy that connects to the JMUX server and provides a local TCP listener. - let mut jmux_client = jetsocat_cmd() + let mut jmux_client = jetsocat_tokio_cmd() .env( "JETSOCAT_ARGS", format!( "jmux-proxy tcp://127.0.0.1:{jmux_server_port} tcp-listen://127.0.0.1:{proxy_listen_port}/127.0.0.1:{read_server_port} --no-proxy", ), ) + .kill_on_drop(true) .spawn() .expect("failed to start JMUX client"); - // Give the JMUX client time to establish connection and set up listener. - std::thread::sleep(LISTENER_WAIT_DURATION); + wait_for_port_bound(proxy_listen_port) + .await + .expect("JMUX client proxy ready"); // Connect to the JMUX client's local listener. jetsocat_assert_cmd() .env( "JETSOCAT_ARGS", - format!("forward tcp://127.0.0.1:{proxy_listen_port} 'cmd://echo hello world'"), + format!("forward tcp://127.0.0.1:{proxy_listen_port} 'cmd://echo hello world' --no-proxy"), ) - .timeout(COMMAND_TIMEOUT) + .timeout(ASSERT_CMD_TIMEOUT) .assert() .success(); // Kill all processes. - let _ = jmux_client.kill(); - let _ = jmux_server.kill(); - let _ = read_server.kill(); - let _ = jmux_client.wait(); - let _ = jmux_server.wait(); - let _ = read_server.wait(); + let _ = jmux_client.start_kill(); + let _ = jmux_server.start_kill(); + let _ = read_server.start_kill(); + let _ = jmux_client.wait().await; + let _ = jmux_server.wait().await; + let _ = read_server.wait().await; // Check that the read server received the payload. let mut read_server_stdout = String::new(); read_server .stdout + .take() .unwrap() .read_to_string(&mut read_server_stdout) + .await .unwrap(); assert_eq!(read_server_stdout.trim(), "hello world"); } @@ -621,7 +638,7 @@ fn jetsocat_log_environment_variable() { outfile.display() ), ) - .timeout(COMMAND_TIMEOUT) + .timeout(ASSERT_CMD_TIMEOUT) .assert(); let stdout = std::str::from_utf8(&output.get_output().stdout).unwrap(); @@ -747,8 +764,9 @@ async fn mcp_proxy_smoke_test(#[values(true, false)] http_transport: bool) { let server = McpServer::new(transport); let _server_handle = server.start().expect("start MCP server"); - // Give the server time to start. - tokio::time::sleep(LISTENER_WAIT_DURATION).await; + if !http_transport { + wait_for_windows_named_pipe_server().await; + } // Start jetsocat mcp-proxy with stdio pipe and HTTP transport. let mut jetsocat_process = jetsocat_tokio_cmd() @@ -819,8 +837,9 @@ async fn mcp_proxy_with_tools(#[values(true, false)] http_transport: bool) { McpServer::new(transport).with_config(ServerConfig::new().with_tool(EchoTool).with_tool(CalculatorTool)); let _server_handle = server.start().expect("start MCP server"); - // Give the server time to start. - tokio::time::sleep(LISTENER_WAIT_DURATION).await; + if !http_transport { + wait_for_windows_named_pipe_server().await; + } // Start jetsocat mcp-proxy with stdio pipe and HTTP transport. let mut jetsocat_process = jetsocat_tokio_cmd() @@ -962,8 +981,9 @@ async fn mcp_proxy_notification(#[values(true, false)] http_transport: bool) { McpServer::new(transport).with_config(ServerConfig::new().with_notification_handler(notification_handler)); let _server_handle = server.start().expect("start MCP server"); - // Give the server time to start. - tokio::time::sleep(LISTENER_WAIT_DURATION).await; + if !http_transport { + wait_for_windows_named_pipe_server().await; + } // Start jetsocat mcp-proxy with stdio pipe and HTTP transport. let mut jetsocat_process = jetsocat_tokio_cmd() @@ -1010,9 +1030,6 @@ async fn execute_mcp_request(request: &str) -> String { let server = McpServer::new(DynMcpTransport::new_box(transport)); let server_handle = server.start().expect("start MCP server"); - // Give the server time to start. - tokio::time::sleep(LISTENER_WAIT_DURATION).await; - // Start jetsocat mcp-proxy with stdio pipe and HTTP transport. let mut jetsocat_process = jetsocat_tokio_cmd() .args(["mcp-proxy", "stdio", &server_url, "--log-term", "--color=never"]) @@ -1028,7 +1045,7 @@ async fn execute_mcp_request(request: &str) -> String { // Write the request. stdin.write_all(request.as_bytes()).await.unwrap(); - tokio::time::sleep(COMMAND_TIMEOUT).await; + tokio::time::sleep(MCP_REQUEST_SETTLE_DURATION).await; // Shutdown the MCP server. server_handle.shutdown(); @@ -1073,9 +1090,6 @@ async fn mcp_proxy_http_error() { let server = McpServer::new(DynMcpTransport::new_box(transport)); let _server_handle = server.start().expect("start MCP server"); - // Give the server time to start. - tokio::time::sleep(LISTENER_WAIT_DURATION).await; - // Start jetsocat mcp-proxy with stdio pipe and HTTP transport. let mut jetsocat_process = jetsocat_tokio_cmd() .args(["mcp-proxy", "stdio", &server_url]) @@ -1114,8 +1128,7 @@ async fn mcp_proxy_terminated_on_broken_pipe() { let server = McpServer::new(DynMcpTransport::new_box(np_transport)); let server_handle = server.start().expect("start MCP server"); - // Give the server time to start. - tokio::time::sleep(LISTENER_WAIT_DURATION).await; + wait_for_windows_named_pipe_server().await; // Start jetsocat mcp-proxy with stdio pipe. let mut jetsocat_process = jetsocat_tokio_cmd() @@ -1141,8 +1154,8 @@ async fn mcp_proxy_terminated_on_broken_pipe() { // Stop the MCP server. server_handle.shutdown(); - // Wait for server to shut down. - tokio::time::sleep(LISTENER_WAIT_DURATION).await; + // Wait for the named pipe instance to be torn down on Windows. + wait_for_windows_named_pipe_server().await; // Try to send a request - this should fail with a broken pipe error. // The proxy will detect this and send an error response, then close.