diff --git a/crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs b/crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs new file mode 100644 index 000000000000..b472bdd69475 --- /dev/null +++ b/crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs @@ -0,0 +1,65 @@ +/// Test program: handler calls `exit(ok)` after transmitting each response. +/// +/// Each request is handled by a fresh instance because: +/// 1. `backpressure_inc` is called at the start of `handle`, preventing the CM +/// runtime from dispatching a second request to the same instance. +/// 2. After the response body is transmitted, a spawned task awaits `tx_result` +/// (the future returned by `response.new`) and then calls `exit(ok)`. +/// `tx_result` resolves once hyper has consumed all response body frames, so +/// the guest only exits after the response has been handed off. +/// +/// The response body is the instance's random ID (decimal u64), stable for +/// the lifetime of the instance. The test verifies that N sequential requests +/// each receive a distinct instance ID, and that no "worker error" appears in +/// stderr (i.e. `exit(ok)` is treated as a clean shutdown, not an error). +/// +/// Used by `p3_cli_serve_exit_on_single` in `tests/all/cli_tests.rs`. +use std::sync::OnceLock; +use test_programs::p3::wasi::cli::exit::exit; +use test_programs::p3::wasi::http::types::{ErrorCode, Fields, Request, Response}; +use test_programs::p3::wasi::random::random::get_random_u64; +use test_programs::p3::{service, wit_future, wit_stream}; + +/// Stable identifier for this instance, generated once on first request. +static INSTANCE_ID: OnceLock = OnceLock::new(); + +fn instance_id() -> u64 { + *INSTANCE_ID.get_or_init(get_random_u64) +} + +struct T; + +service::export!(T); + +impl service::exports::wasi::http::handler::Guest for T { + async fn handle(_request: Request) -> Result { + // Prevent the CM runtime from dispatching another request to this + // instance while the current one is in flight. + wit_bindgen::backpressure_inc(); + + let body = format!("{}", instance_id()).into_bytes(); + let (mut body_tx, body_rx) = wit_stream::new(); + let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None)); + drop(trailers_tx); + + let (response, tx_result) = Response::new(Fields::new(), Some(body_rx), trailers_rx); + + wit_bindgen::spawn(async move { + body_tx.write_all(body).await; + }); + + // Exit cleanly after the response body has been handed off to the + // HTTP layer. `tx_result` only resolves once hyper has consumed all + // response body frames, so the store is not torn down prematurely. + wit_bindgen::spawn(async move { + let _ = tx_result.await; + exit(Ok(())); + }); + + Ok(response) + } +} + +fn main() { + unreachable!() +} diff --git a/crates/test-programs/src/bin/p3_cli_serve_txresult_body_integrity.rs b/crates/test-programs/src/bin/p3_cli_serve_txresult_body_integrity.rs new file mode 100644 index 000000000000..cd0cab68ca99 --- /dev/null +++ b/crates/test-programs/src/bin/p3_cli_serve_txresult_body_integrity.rs @@ -0,0 +1,66 @@ +/// Test program: verifies that `tx_result` (the future returned by +/// `response.new`) resolves only after the response body has been fully handed +/// to the HTTP layer. +/// +/// The handler writes a 64 KiB body as 64 separate 1 KiB chunks and exits via +/// `wasi:cli/exit` after `tx_result` resolves. Each chunk is written with an +/// explicit `.await`, which yields the CM scheduler between chunks. The +/// host-side mpsc channel feeding hyper has capacity 1, so after the first +/// chunk is queued a second chunk cannot be sent until hyper drains the +/// channel (a yield the exit task can preempt). +/// +/// With incorrect `tx_result` semantics (resolves at request-resource cleanup +/// time, before the body has been handed off to hyper), the exit fires on the +/// first such yield after at most one chunk has been queued, truncating the +/// body and consistently failing the length check in the test. +/// +/// With correct semantics (resolves after hyper has consumed all body frames), +/// the exit cannot fire until the store drain is complete, so the full 64 KiB +/// reaches the client. +/// +/// Used by `p3_cli_serve_txresult_body_integrity` in `tests/all/cli_tests.rs`. +use test_programs::p3::wasi::cli::exit::exit; +use test_programs::p3::wasi::http::types::{ErrorCode, Fields, Request, Response}; +use test_programs::p3::{service, wit_future, wit_stream}; + +/// Number of 1 KiB chunks that make up the response body. +const CHUNK_COUNT: usize = 64; +/// Size of each chunk in bytes. +const CHUNK_SIZE: usize = 1024; + +struct T; + +service::export!(T); + +impl service::exports::wasi::http::handler::Guest for T { + async fn handle(_request: Request) -> Result { + let (mut body_tx, body_rx) = wit_stream::new(); + let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None)); + drop(trailers_tx); + + let (response, tx_result) = Response::new(Fields::new(), Some(body_rx), trailers_rx); + + // Write the body in small, separate chunks. Each `.await` yields the + // CM scheduler, giving the exit task a chance to run. With incorrect + // tx_result timing the exit fires at the first such yield — after at + // most one chunk has reached the host-side channel — truncating the + // body. With correct timing it cannot fire until all chunks have been + // consumed by hyper. + wit_bindgen::spawn(async move { + for _ in 0..CHUNK_COUNT { + body_tx.write_all(vec![b'x'; CHUNK_SIZE]).await; + } + }); + + wit_bindgen::spawn(async move { + let _ = tx_result.await; + exit(Ok(())); + }); + + Ok(response) + } +} + +fn main() { + unreachable!() +} diff --git a/crates/wasi-http/src/handler.rs b/crates/wasi-http/src/handler.rs index 71f5a728caec..d95a27dcbc8c 100644 --- a/crates/wasi-http/src/handler.rs +++ b/crates/wasi-http/src/handler.rs @@ -4,7 +4,7 @@ #[cfg(feature = "p2")] use crate::p2::bindings::http::types as p2_types; #[cfg(feature = "p3")] -use crate::p3; +use crate::p3::{self, BodyExt as _}; use bytes::Bytes; use futures::{ channel::oneshot, @@ -1193,7 +1193,10 @@ async fn handle( let (request, body) = request.into_parts(); let body = body.map_err(p3_types::ErrorCode::from); let request = http::Request::from_parts(request, body); - let (request, request_io_result) = p3::Request::from_http(request); + + // there isn't anything meaningful the host can do with i/o errors + // on the body read from the guest, so we just drop that future + let (request, _request_io_result) = p3::Request::from_http(request); let request = accessor.with(|mut store| { Ok::<_, wasmtime::Error>(view(store.data_mut()).table.push(request)?) @@ -1205,16 +1208,23 @@ async fn handle( .call_handle(accessor, request) .await?; + // channel that notifies the guest that the response has been transmitted + let (notify_tx, notify_rx) = oneshot::channel::<()>(); let response = accessor.with(|mut store| { let response = view(store.get()).table.delete(response?)?; Ok::<_, wasmtime::Error>(response.into_http_with_getter( &mut store, - request_io_result, + notify_rx.map(|_| Ok(())), view, )?) })?; - Ok(response.map(move |body| body.map_err(wasmtime::Error::from).boxed_unsync())) + Ok(response.map(move |body| { + // on drop, notify_tx channel will be closed indicating + // to the guest that the response has been transmitted. + let body = body.map_err(wasmtime::Error::from); + body.with_state(notify_tx).boxed_unsync() + })) }); // TODO: We should also use `oneshot::Sender::poll_close` to be diff --git a/crates/wasi-http/src/p3/mod.rs b/crates/wasi-http/src/p3/mod.rs index 153956fe012c..408cab1c791c 100644 --- a/crates/wasi-http/src/p3/mod.rs +++ b/crates/wasi-http/src/p3/mod.rs @@ -16,6 +16,8 @@ mod proxy; mod request; mod response; +pub(crate) use body::BodyExt; + #[cfg(feature = "default-send-request")] pub use request::default_send_request; pub use request::{Request, RequestOptions}; diff --git a/src/commands/serve.rs b/src/commands/serve.rs index fd0c710f7804..24a9559a5706 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -27,7 +27,7 @@ use wasmtime::{ }; use wasmtime_cli_flags::opt::WasmtimeOptionValue; use wasmtime_wasi::p2::{StreamError, StreamResult}; -use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView}; +use wasmtime_wasi::{I32Exit, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView}; #[cfg(feature = "component-model-async")] use wasmtime_wasi_http::handler::p2::bindings as p2; use wasmtime_wasi_http::handler::{ @@ -862,7 +862,12 @@ impl WorkerState for HostWorkerState { fn drop(&self, mut store: Store, result: Result<(), wasmtime::Error>) { if let Err(error) = result { - eprintln!("worker failed: {error:?}"); + // A clean exit via `wasi:cli/exit(ok)` is a guest-controlled signal + // that the instance is done and should not be reused. Treat it as a + // graceful worker exit rather than an error. + if !error.downcast_ref::().map_or(false, |e| e.0 == 0) { + eprintln!("worker failed: {error:?}"); + } } if let Some(write_profile) = store.data_mut().write_profile.take() { diff --git a/tests/all/cli_tests.rs b/tests/all/cli_tests.rs index 96d45dcebd0f..3c90f124eb3b 100644 --- a/tests/all/cli_tests.rs +++ b/tests/all/cli_tests.rs @@ -2982,6 +2982,110 @@ start a print 1234 assert!(stdout.contains("please see me")); Ok(()) } + + #[tokio::test] + #[cfg_attr(not(feature = "component-model-async"), ignore)] + async fn p3_cli_serve_exit_on_single() -> Result<()> { + // Send N sequential requests to a handler that calls `exit(ok)` after + // each response. Each request must be handled by a fresh instance + // because the handler raises backpressure permanently, preventing + // instance reuse. + // + // The response body is the instance's random ID (decimal u64). We + // assert that all N IDs are distinct (one per instance) and that no + // "worker error" appears in stderr — `exit(ok)` is a clean, guest- + // controlled shutdown and must not be treated as an error. + const N: usize = 5; + let server = WasmtimeServe::new(P3_CLI_SERVE_EXIT_ON_SINGLE_COMPONENT, |cmd| { + cmd.arg("-Wcomponent-model-async"); + cmd.arg("-Sp3,cli"); + })?; + + let mut instance_ids = std::collections::HashSet::new(); + for _ in 0..N { + let resp = server + .send_request( + hyper::Request::builder() + .uri("http://localhost/") + .body(String::new()) + .context("failed to make request")?, + ) + .await?; + assert!(resp.status().is_success()); + let id: u64 = resp + .body() + .trim() + .parse::() + .context("response body should be a u64 instance ID")?; + instance_ids.insert(id); + } + + assert_eq!( + instance_ids.len(), + N, + "expected {N} distinct instance IDs but got {}: {:?}", + instance_ids.len(), + instance_ids + ); + + let (_stdout, stderr) = server.finish()?; + assert!( + !stderr.contains("worker error"), + "unexpected worker error in stderr: {stderr}" + ); + Ok(()) + } + + #[tokio::test] + #[cfg_attr(not(feature = "component-model-async"), ignore)] + async fn p3_cli_serve_txresult_body_integrity() -> Result<()> { + // Verify that `tx_result` (the future returned by `response.new`) + // resolves only after the response body has been fully handed to the + // HTTP layer, not prematurely at request-resource cleanup time. + // + // The guest writes a 64 KiB body as 64 separate 1 KiB chunks and + // calls `exit(ok)` after `tx_result` resolves. Each chunk requires + // the host-side mpsc channel (capacity 1) to be drained by hyper + // before the next can be queued, creating a scheduler yield between + // every chunk. With incorrect tx_result semantics the exit fires at + // the first such yield — after at most one chunk has been queued — + // consistently truncating the body. With correct semantics the exit + // fires only after all chunks have been consumed, so the full body + // reaches the client. + const CHUNK_SIZE: usize = 1024; + const CHUNK_COUNT: usize = 64; + const EXPECTED_LEN: usize = CHUNK_SIZE * CHUNK_COUNT; + + let server = WasmtimeServe::new(P3_CLI_SERVE_TXRESULT_BODY_INTEGRITY_COMPONENT, |cmd| { + cmd.arg("-Wcomponent-model-async"); + cmd.arg("-Sp3,cli"); + })?; + + let resp = server + .send_request( + hyper::Request::builder() + .uri("http://localhost/") + .body(String::new()) + .context("failed to make request")?, + ) + .await?; + + assert!(resp.status().is_success()); + assert_eq!( + resp.body().len(), + EXPECTED_LEN, + "response body truncated: expected {EXPECTED_LEN} bytes but got {} \ + (tx_result fired before body handoff to hyper was complete)", + resp.body().len() + ); + + let (_stdout, stderr) = server.finish()?; + assert!( + !stderr.contains("worker error"), + "unexpected worker error in stderr: {stderr}" + ); + Ok(()) + } } #[test]