Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 85 additions & 72 deletions testsuite/tests/cli/jetsocat.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
use std::io::Read;
use std::sync::Arc;
use std::time::Duration;

use expect_test::expect;
use rstest::rstest;
use test_utils::find_unused_ports;
use testsuite::cli::{
assert_stderr_eq, jetsocat_assert_cmd, jetsocat_cmd, jetsocat_tokio_cmd, wait_for_port_bound, wait_for_tcp_port,
assert_stderr_eq, jetsocat_assert_cmd, jetsocat_tokio_cmd, wait_for_port_bound, wait_for_tcp_port,
};

// NOTE: Windows needs more time for listeners to be ready due to slower process startup.
#[cfg(windows)]
const WINDOWS_NAMED_PIPE_WAIT_DURATION: Duration = Duration::from_millis(600);

#[cfg(not(windows))]
const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(300);
const ASSERT_CMD_TIMEOUT: Duration = Duration::from_millis(600);
#[cfg(windows)]
const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(600);
const ASSERT_CMD_TIMEOUT: Duration = Duration::from_millis(1300);

#[cfg(not(windows))]
const COMMAND_TIMEOUT: Duration = Duration::from_millis(600);
const MCP_REQUEST_SETTLE_DURATION: Duration = Duration::from_millis(600);
#[cfg(windows)]
const MCP_REQUEST_SETTLE_DURATION: Duration = Duration::from_millis(1300);

#[cfg(windows)]
const COMMAND_TIMEOUT: Duration = Duration::from_millis(1300);
async fn wait_for_windows_named_pipe_server() {
tokio::time::sleep(WINDOWS_NAMED_PIPE_WAIT_DURATION).await;
}

#[cfg(not(windows))]
async fn wait_for_windows_named_pipe_server() {}

#[test]
fn no_args_shows_help() {
Expand Down Expand Up @@ -62,7 +70,7 @@ fn all_subcommands() {
#[case::env_force_color_1(&[], &[("FORCE_COLOR", "1"), ("TERM", "dumb")], true)]
fn log_term_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[case] expect_ansi: bool) {
let output = jetsocat_assert_cmd()
.timeout(COMMAND_TIMEOUT)
.timeout(ASSERT_CMD_TIMEOUT)
.args(["forward", "-", "-", "--log-term"])
.args(args)
.envs(envs.iter().copied())
Expand Down Expand Up @@ -95,7 +103,7 @@ fn log_file_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas
let log_file_path = tempdir.path().join("jetsocat.log");

jetsocat_assert_cmd()
.timeout(COMMAND_TIMEOUT)
.timeout(ASSERT_CMD_TIMEOUT)
.args(["forward", "-", "-", "--log-file", log_file_path.to_str().unwrap()])
.args(args)
.envs(envs.iter().copied())
Expand All @@ -111,32 +119,32 @@ fn log_file_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas
}
}

#[test]
fn forward_hello_world() {
#[tokio::test]
async fn forward_hello_world() {
// Find an available port.
let port = find_unused_ports(1)[0];

// Start jetsocat listener in background using JETSOCAT_ARGS.
let mut listener = jetsocat_cmd()
let mut listener = jetsocat_tokio_cmd()
.env(
"JETSOCAT_ARGS",
format!("forward tcp-listen://127.0.0.1:{port} 'cmd://echo hello world' --no-proxy"),
)
.kill_on_drop(true)
.spawn()
.expect("failed to start jetsocat listener");

// Give the listener time to start.
std::thread::sleep(LISTENER_WAIT_DURATION);
wait_for_port_bound(port).await.expect("listener ready");

// Connect to the listener and read the output using assert_cmd.
let client_output = jetsocat_assert_cmd()
.env("JETSOCAT_ARGS", format!("forward - tcp://127.0.0.1:{port}"))
.timeout(COMMAND_TIMEOUT)
.timeout(ASSERT_CMD_TIMEOUT)
.assert();

// Kill the listener.
let _ = listener.kill();
let _ = listener.wait();
let _ = listener.start_kill();
let _ = listener.wait().await;

// Check that we got the expected output.
#[cfg(windows)]
Expand All @@ -145,69 +153,71 @@ fn forward_hello_world() {
client_output.success().stdout("hello world\n");
}

#[test]
fn jmux_proxy_read_hello_world() {
#[tokio::test]
async fn jmux_proxy_read_hello_world() {
// Find 3 available ports at once to avoid conflicts.
let ports = find_unused_ports(3);
let echo_server_port = ports[0];
let jmux_server_port = ports[1];
let proxy_listen_port = ports[2];

// Start echo server first.
let mut echo_server = jetsocat_cmd()
let mut echo_server = jetsocat_tokio_cmd()
.env(
"JETSOCAT_ARGS",
format!("forward tcp-listen://127.0.0.1:{echo_server_port} 'cmd://echo hello world' --no-proxy"),
)
.kill_on_drop(true)
.spawn()
.expect("failed to start echo server");

// Give the echo server time to start.
std::thread::sleep(LISTENER_WAIT_DURATION);
wait_for_port_bound(echo_server_port).await.expect("echo server ready");

// Start JMUX server that will accept JMUX connections.
let mut jmux_server = jetsocat_cmd()
let mut jmux_server = jetsocat_tokio_cmd()
.env(
"JETSOCAT_ARGS",
format!("jmux-proxy tcp-listen://127.0.0.1:{jmux_server_port} --allow-all --no-proxy"),
)
.kill_on_drop(true)
.spawn()
.expect("failed to start JMUX server");

// Give the JMUX server time to start.
std::thread::sleep(LISTENER_WAIT_DURATION);
wait_for_port_bound(jmux_server_port).await.expect("JMUX server ready");

// Start JMUX client proxy that connects to the JMUX server and provides a local TCP listener.
// This creates a tunnel: client -> proxy_listen_port -> jmux_server_port -> echo_server_port
let mut jmux_client = jetsocat_cmd()
let mut jmux_client = jetsocat_tokio_cmd()
.env(
"JETSOCAT_ARGS",
format!(
"jmux-proxy tcp://127.0.0.1:{jmux_server_port} tcp-listen://127.0.0.1:{proxy_listen_port}/127.0.0.1:{echo_server_port} --no-proxy",
),
)
.kill_on_drop(true)
.spawn()
.expect("failed to start JMUX client");

// Give the JMUX client time to establish connection and set up listener.
std::thread::sleep(LISTENER_WAIT_DURATION);
wait_for_port_bound(proxy_listen_port)
.await
.expect("JMUX client proxy ready");

// Connect to the JMUX client's local listener.
let client_output = jetsocat_assert_cmd()
.env(
"JETSOCAT_ARGS",
format!("forward - tcp://127.0.0.1:{proxy_listen_port}"),
)
.timeout(COMMAND_TIMEOUT)
.timeout(ASSERT_CMD_TIMEOUT)
.assert();

// Kill all processes.
let _ = jmux_client.kill();
let _ = jmux_server.kill();
let _ = echo_server.kill();
let _ = jmux_client.wait();
let _ = jmux_server.wait();
let _ = echo_server.wait();
let _ = jmux_client.start_kill();
let _ = jmux_server.start_kill();
let _ = echo_server.start_kill();
let _ = jmux_client.wait().await;
let _ = jmux_server.wait().await;
let _ = echo_server.wait().await;

// Check that we got the expected output through the JMUX proxy.
#[cfg(windows)]
Expand All @@ -216,77 +226,84 @@ fn jmux_proxy_read_hello_world() {
client_output.success().stdout("hello world\n");
}

#[test]
fn jmux_proxy_write_hello_world() {
#[tokio::test]
async fn jmux_proxy_write_hello_world() {
use tokio::io::AsyncReadExt as _;

// Find 3 available ports at once to avoid conflicts.
let ports = find_unused_ports(3);
let read_server_port = ports[0];
let jmux_server_port = ports[1];
let proxy_listen_port = ports[2];

// Start read server first.
let mut read_server = jetsocat_cmd()
let mut read_server = jetsocat_tokio_cmd()
.env(
"JETSOCAT_ARGS",
format!("forward tcp-listen://127.0.0.1:{read_server_port} stdio --no-proxy"),
)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()
.expect("failed to start read server");

// Give the read server time to start.
std::thread::sleep(LISTENER_WAIT_DURATION);
wait_for_port_bound(read_server_port).await.expect("read server ready");

// Start JMUX server that will accept JMUX connections.
let mut jmux_server = jetsocat_cmd()
let mut jmux_server = jetsocat_tokio_cmd()
.env(
"JETSOCAT_ARGS",
format!("jmux-proxy tcp-listen://127.0.0.1:{jmux_server_port} --allow-all --no-proxy"),
)
.kill_on_drop(true)
.spawn()
.expect("failed to start JMUX server");

// Give the JMUX server time to start.
std::thread::sleep(LISTENER_WAIT_DURATION);
wait_for_port_bound(jmux_server_port).await.expect("JMUX server ready");

// Start JMUX client proxy that connects to the JMUX server and provides a local TCP listener.
let mut jmux_client = jetsocat_cmd()
let mut jmux_client = jetsocat_tokio_cmd()
.env(
"JETSOCAT_ARGS",
format!(
"jmux-proxy tcp://127.0.0.1:{jmux_server_port} tcp-listen://127.0.0.1:{proxy_listen_port}/127.0.0.1:{read_server_port} --no-proxy",
),
)
.kill_on_drop(true)
.spawn()
.expect("failed to start JMUX client");

// Give the JMUX client time to establish connection and set up listener.
std::thread::sleep(LISTENER_WAIT_DURATION);
wait_for_port_bound(proxy_listen_port)
.await
.expect("JMUX client proxy ready");

// Connect to the JMUX client's local listener.
jetsocat_assert_cmd()
.env(
"JETSOCAT_ARGS",
format!("forward tcp://127.0.0.1:{proxy_listen_port} 'cmd://echo hello world'"),
format!("forward tcp://127.0.0.1:{proxy_listen_port} 'cmd://echo hello world' --no-proxy"),
)
.timeout(COMMAND_TIMEOUT)
.timeout(ASSERT_CMD_TIMEOUT)
.assert()
.success();

// Kill all processes.
let _ = jmux_client.kill();
let _ = jmux_server.kill();
let _ = read_server.kill();
let _ = jmux_client.wait();
let _ = jmux_server.wait();
let _ = read_server.wait();
let _ = jmux_client.start_kill();
let _ = jmux_server.start_kill();
let _ = read_server.start_kill();
let _ = jmux_client.wait().await;
let _ = jmux_server.wait().await;
let _ = read_server.wait().await;

// Check that the read server received the payload.
let mut read_server_stdout = String::new();
read_server
.stdout
.take()
.unwrap()
.read_to_string(&mut read_server_stdout)
.await
.unwrap();
assert_eq!(read_server_stdout.trim(), "hello world");
}
Expand Down Expand Up @@ -621,7 +638,7 @@ fn jetsocat_log_environment_variable() {
outfile.display()
),
)
.timeout(COMMAND_TIMEOUT)
.timeout(ASSERT_CMD_TIMEOUT)
.assert();

let stdout = std::str::from_utf8(&output.get_output().stdout).unwrap();
Expand Down Expand Up @@ -747,8 +764,9 @@ async fn mcp_proxy_smoke_test(#[values(true, false)] http_transport: bool) {
let server = McpServer::new(transport);
let _server_handle = server.start().expect("start MCP server");

// Give the server time to start.
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
if !http_transport {
wait_for_windows_named_pipe_server().await;
}

// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
let mut jetsocat_process = jetsocat_tokio_cmd()
Expand Down Expand Up @@ -819,8 +837,9 @@ async fn mcp_proxy_with_tools(#[values(true, false)] http_transport: bool) {
McpServer::new(transport).with_config(ServerConfig::new().with_tool(EchoTool).with_tool(CalculatorTool));
let _server_handle = server.start().expect("start MCP server");

// Give the server time to start.
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
if !http_transport {
wait_for_windows_named_pipe_server().await;
}

// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
let mut jetsocat_process = jetsocat_tokio_cmd()
Expand Down Expand Up @@ -962,8 +981,9 @@ async fn mcp_proxy_notification(#[values(true, false)] http_transport: bool) {
McpServer::new(transport).with_config(ServerConfig::new().with_notification_handler(notification_handler));
let _server_handle = server.start().expect("start MCP server");

// Give the server time to start.
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
if !http_transport {
wait_for_windows_named_pipe_server().await;
}

// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
let mut jetsocat_process = jetsocat_tokio_cmd()
Expand Down Expand Up @@ -1010,9 +1030,6 @@ async fn execute_mcp_request(request: &str) -> String {
let server = McpServer::new(DynMcpTransport::new_box(transport));
let server_handle = server.start().expect("start MCP server");

// Give the server time to start.
tokio::time::sleep(LISTENER_WAIT_DURATION).await;

// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
let mut jetsocat_process = jetsocat_tokio_cmd()
.args(["mcp-proxy", "stdio", &server_url, "--log-term", "--color=never"])
Expand All @@ -1028,7 +1045,7 @@ async fn execute_mcp_request(request: &str) -> String {
// Write the request.
stdin.write_all(request.as_bytes()).await.unwrap();

tokio::time::sleep(COMMAND_TIMEOUT).await;
tokio::time::sleep(MCP_REQUEST_SETTLE_DURATION).await;

// Shutdown the MCP server.
server_handle.shutdown();
Expand Down Expand Up @@ -1073,9 +1090,6 @@ async fn mcp_proxy_http_error() {
let server = McpServer::new(DynMcpTransport::new_box(transport));
let _server_handle = server.start().expect("start MCP server");

// Give the server time to start.
tokio::time::sleep(LISTENER_WAIT_DURATION).await;

// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
let mut jetsocat_process = jetsocat_tokio_cmd()
.args(["mcp-proxy", "stdio", &server_url])
Expand Down Expand Up @@ -1114,8 +1128,7 @@ async fn mcp_proxy_terminated_on_broken_pipe() {
let server = McpServer::new(DynMcpTransport::new_box(np_transport));
let server_handle = server.start().expect("start MCP server");

// Give the server time to start.
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
wait_for_windows_named_pipe_server().await;

// Start jetsocat mcp-proxy with stdio pipe.
let mut jetsocat_process = jetsocat_tokio_cmd()
Expand All @@ -1141,8 +1154,8 @@ async fn mcp_proxy_terminated_on_broken_pipe() {
// Stop the MCP server.
server_handle.shutdown();

// Wait for server to shut down.
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
// Wait for the named pipe instance to be torn down on Windows.
wait_for_windows_named_pipe_server().await;

// Try to send a request - this should fail with a broken pipe error.
// The proxy will detect this and send an error response, then close.
Expand Down
Loading