From 97819d94318324b98004f2c590830eaeae29cd56 Mon Sep 17 00:00:00 2001 From: Paul Osborne Date: Wed, 27 May 2026 15:32:35 -0500 Subject: [PATCH] wasi-http: fix issues with exit pattern and response completion futures When a guest calls wasi:cli/exit(ok) from within a wasi:http/handler instance, the host exit implementation returns Err(I32Exit(0)). This propagated as a worker error, causing wasmtime serve to log an error and close queued request channels rather than creating fresh instances. I32Exit(0) semantically means 'this instance is done cleanly'. Treat it as a graceful worker exit rather than an error in the Worker::run path, allowing guests to opt out of instance reuse via the exit+backpressure pattern described in WebAssembly/WASI#898. In addition to not logging an error, this patch also changes the signal used to indicate that the response has been transmitted (as returned to the guest by response.new) to be tied to the lifetime of the response. This avoids races that were previously present on the handoff of responses to the http stack and notifications to the guest of response transmission (which caused issues when exiting from this signal). Tests are added related to the single-request-per-instance pattern that had previously failed and was racy. --- .../src/bin/p3_cli_serve_exit_on_single.rs | 65 +++++++++++ .../p3_cli_serve_txresult_body_integrity.rs | 66 +++++++++++ crates/wasi-http/src/handler.rs | 18 ++- crates/wasi-http/src/p3/mod.rs | 2 + src/commands/serve.rs | 9 +- tests/all/cli_tests.rs | 104 ++++++++++++++++++ 6 files changed, 258 insertions(+), 6 deletions(-) create mode 100644 crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs create mode 100644 crates/test-programs/src/bin/p3_cli_serve_txresult_body_integrity.rs diff --git a/crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs b/crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs new file mode 100644 index 000000000000..b472bdd69475 --- /dev/null +++ b/crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs @@ -0,0 +1,65 @@ +/// Test program: handler calls `exit(ok)` after transmitting each response. +/// +/// Each request is handled by a fresh instance because: +/// 1. `backpressure_inc` is called at the start of `handle`, preventing the CM +/// runtime from dispatching a second request to the same instance. +/// 2. After the response body is transmitted, a spawned task awaits `tx_result` +/// (the future returned by `response.new`) and then calls `exit(ok)`. +/// `tx_result` resolves once hyper has consumed all response body frames, so +/// the guest only exits after the response has been handed off. +/// +/// The response body is the instance's random ID (decimal u64), stable for +/// the lifetime of the instance. The test verifies that N sequential requests +/// each receive a distinct instance ID, and that no "worker error" appears in +/// stderr (i.e. `exit(ok)` is treated as a clean shutdown, not an error). +/// +/// Used by `p3_cli_serve_exit_on_single` in `tests/all/cli_tests.rs`. +use std::sync::OnceLock; +use test_programs::p3::wasi::cli::exit::exit; +use test_programs::p3::wasi::http::types::{ErrorCode, Fields, Request, Response}; +use test_programs::p3::wasi::random::random::get_random_u64; +use test_programs::p3::{service, wit_future, wit_stream}; + +/// Stable identifier for this instance, generated once on first request. +static INSTANCE_ID: OnceLock = OnceLock::new(); + +fn instance_id() -> u64 { + *INSTANCE_ID.get_or_init(get_random_u64) +} + +struct T; + +service::export!(T); + +impl service::exports::wasi::http::handler::Guest for T { + async fn handle(_request: Request) -> Result { + // Prevent the CM runtime from dispatching another request to this + // instance while the current one is in flight. + wit_bindgen::backpressure_inc(); + + let body = format!("{}", instance_id()).into_bytes(); + let (mut body_tx, body_rx) = wit_stream::new(); + let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None)); + drop(trailers_tx); + + let (response, tx_result) = Response::new(Fields::new(), Some(body_rx), trailers_rx); + + wit_bindgen::spawn(async move { + body_tx.write_all(body).await; + }); + + // Exit cleanly after the response body has been handed off to the + // HTTP layer. `tx_result` only resolves once hyper has consumed all + // response body frames, so the store is not torn down prematurely. + wit_bindgen::spawn(async move { + let _ = tx_result.await; + exit(Ok(())); + }); + + Ok(response) + } +} + +fn main() { + unreachable!() +} diff --git a/crates/test-programs/src/bin/p3_cli_serve_txresult_body_integrity.rs b/crates/test-programs/src/bin/p3_cli_serve_txresult_body_integrity.rs new file mode 100644 index 000000000000..cd0cab68ca99 --- /dev/null +++ b/crates/test-programs/src/bin/p3_cli_serve_txresult_body_integrity.rs @@ -0,0 +1,66 @@ +/// Test program: verifies that `tx_result` (the future returned by +/// `response.new`) resolves only after the response body has been fully handed +/// to the HTTP layer. +/// +/// The handler writes a 64 KiB body as 64 separate 1 KiB chunks and exits via +/// `wasi:cli/exit` after `tx_result` resolves. Each chunk is written with an +/// explicit `.await`, which yields the CM scheduler between chunks. The +/// host-side mpsc channel feeding hyper has capacity 1, so after the first +/// chunk is queued a second chunk cannot be sent until hyper drains the +/// channel (a yield the exit task can preempt). +/// +/// With incorrect `tx_result` semantics (resolves at request-resource cleanup +/// time, before the body has been handed off to hyper), the exit fires on the +/// first such yield after at most one chunk has been queued, truncating the +/// body and consistently failing the length check in the test. +/// +/// With correct semantics (resolves after hyper has consumed all body frames), +/// the exit cannot fire until the store drain is complete, so the full 64 KiB +/// reaches the client. +/// +/// Used by `p3_cli_serve_txresult_body_integrity` in `tests/all/cli_tests.rs`. +use test_programs::p3::wasi::cli::exit::exit; +use test_programs::p3::wasi::http::types::{ErrorCode, Fields, Request, Response}; +use test_programs::p3::{service, wit_future, wit_stream}; + +/// Number of 1 KiB chunks that make up the response body. +const CHUNK_COUNT: usize = 64; +/// Size of each chunk in bytes. +const CHUNK_SIZE: usize = 1024; + +struct T; + +service::export!(T); + +impl service::exports::wasi::http::handler::Guest for T { + async fn handle(_request: Request) -> Result { + let (mut body_tx, body_rx) = wit_stream::new(); + let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None)); + drop(trailers_tx); + + let (response, tx_result) = Response::new(Fields::new(), Some(body_rx), trailers_rx); + + // Write the body in small, separate chunks. Each `.await` yields the + // CM scheduler, giving the exit task a chance to run. With incorrect + // tx_result timing the exit fires at the first such yield — after at + // most one chunk has reached the host-side channel — truncating the + // body. With correct timing it cannot fire until all chunks have been + // consumed by hyper. + wit_bindgen::spawn(async move { + for _ in 0..CHUNK_COUNT { + body_tx.write_all(vec![b'x'; CHUNK_SIZE]).await; + } + }); + + wit_bindgen::spawn(async move { + let _ = tx_result.await; + exit(Ok(())); + }); + + Ok(response) + } +} + +fn main() { + unreachable!() +} diff --git a/crates/wasi-http/src/handler.rs b/crates/wasi-http/src/handler.rs index 71f5a728caec..d95a27dcbc8c 100644 --- a/crates/wasi-http/src/handler.rs +++ b/crates/wasi-http/src/handler.rs @@ -4,7 +4,7 @@ #[cfg(feature = "p2")] use crate::p2::bindings::http::types as p2_types; #[cfg(feature = "p3")] -use crate::p3; +use crate::p3::{self, BodyExt as _}; use bytes::Bytes; use futures::{ channel::oneshot, @@ -1193,7 +1193,10 @@ async fn handle( let (request, body) = request.into_parts(); let body = body.map_err(p3_types::ErrorCode::from); let request = http::Request::from_parts(request, body); - let (request, request_io_result) = p3::Request::from_http(request); + + // there isn't anything meaningful the host can do with i/o errors + // on the body read from the guest, so we just drop that future + let (request, _request_io_result) = p3::Request::from_http(request); let request = accessor.with(|mut store| { Ok::<_, wasmtime::Error>(view(store.data_mut()).table.push(request)?) @@ -1205,16 +1208,23 @@ async fn handle( .call_handle(accessor, request) .await?; + // channel that notifies the guest that the response has been transmitted + let (notify_tx, notify_rx) = oneshot::channel::<()>(); let response = accessor.with(|mut store| { let response = view(store.get()).table.delete(response?)?; Ok::<_, wasmtime::Error>(response.into_http_with_getter( &mut store, - request_io_result, + notify_rx.map(|_| Ok(())), view, )?) })?; - Ok(response.map(move |body| body.map_err(wasmtime::Error::from).boxed_unsync())) + Ok(response.map(move |body| { + // on drop, notify_tx channel will be closed indicating + // to the guest that the response has been transmitted. + let body = body.map_err(wasmtime::Error::from); + body.with_state(notify_tx).boxed_unsync() + })) }); // TODO: We should also use `oneshot::Sender::poll_close` to be diff --git a/crates/wasi-http/src/p3/mod.rs b/crates/wasi-http/src/p3/mod.rs index 153956fe012c..408cab1c791c 100644 --- a/crates/wasi-http/src/p3/mod.rs +++ b/crates/wasi-http/src/p3/mod.rs @@ -16,6 +16,8 @@ mod proxy; mod request; mod response; +pub(crate) use body::BodyExt; + #[cfg(feature = "default-send-request")] pub use request::default_send_request; pub use request::{Request, RequestOptions}; diff --git a/src/commands/serve.rs b/src/commands/serve.rs index fd0c710f7804..24a9559a5706 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -27,7 +27,7 @@ use wasmtime::{ }; use wasmtime_cli_flags::opt::WasmtimeOptionValue; use wasmtime_wasi::p2::{StreamError, StreamResult}; -use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView}; +use wasmtime_wasi::{I32Exit, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView}; #[cfg(feature = "component-model-async")] use wasmtime_wasi_http::handler::p2::bindings as p2; use wasmtime_wasi_http::handler::{ @@ -862,7 +862,12 @@ impl WorkerState for HostWorkerState { fn drop(&self, mut store: Store, result: Result<(), wasmtime::Error>) { if let Err(error) = result { - eprintln!("worker failed: {error:?}"); + // A clean exit via `wasi:cli/exit(ok)` is a guest-controlled signal + // that the instance is done and should not be reused. Treat it as a + // graceful worker exit rather than an error. + if !error.downcast_ref::().map_or(false, |e| e.0 == 0) { + eprintln!("worker failed: {error:?}"); + } } if let Some(write_profile) = store.data_mut().write_profile.take() { diff --git a/tests/all/cli_tests.rs b/tests/all/cli_tests.rs index 96d45dcebd0f..3c90f124eb3b 100644 --- a/tests/all/cli_tests.rs +++ b/tests/all/cli_tests.rs @@ -2982,6 +2982,110 @@ start a print 1234 assert!(stdout.contains("please see me")); Ok(()) } + + #[tokio::test] + #[cfg_attr(not(feature = "component-model-async"), ignore)] + async fn p3_cli_serve_exit_on_single() -> Result<()> { + // Send N sequential requests to a handler that calls `exit(ok)` after + // each response. Each request must be handled by a fresh instance + // because the handler raises backpressure permanently, preventing + // instance reuse. + // + // The response body is the instance's random ID (decimal u64). We + // assert that all N IDs are distinct (one per instance) and that no + // "worker error" appears in stderr — `exit(ok)` is a clean, guest- + // controlled shutdown and must not be treated as an error. + const N: usize = 5; + let server = WasmtimeServe::new(P3_CLI_SERVE_EXIT_ON_SINGLE_COMPONENT, |cmd| { + cmd.arg("-Wcomponent-model-async"); + cmd.arg("-Sp3,cli"); + })?; + + let mut instance_ids = std::collections::HashSet::new(); + for _ in 0..N { + let resp = server + .send_request( + hyper::Request::builder() + .uri("http://localhost/") + .body(String::new()) + .context("failed to make request")?, + ) + .await?; + assert!(resp.status().is_success()); + let id: u64 = resp + .body() + .trim() + .parse::() + .context("response body should be a u64 instance ID")?; + instance_ids.insert(id); + } + + assert_eq!( + instance_ids.len(), + N, + "expected {N} distinct instance IDs but got {}: {:?}", + instance_ids.len(), + instance_ids + ); + + let (_stdout, stderr) = server.finish()?; + assert!( + !stderr.contains("worker error"), + "unexpected worker error in stderr: {stderr}" + ); + Ok(()) + } + + #[tokio::test] + #[cfg_attr(not(feature = "component-model-async"), ignore)] + async fn p3_cli_serve_txresult_body_integrity() -> Result<()> { + // Verify that `tx_result` (the future returned by `response.new`) + // resolves only after the response body has been fully handed to the + // HTTP layer, not prematurely at request-resource cleanup time. + // + // The guest writes a 64 KiB body as 64 separate 1 KiB chunks and + // calls `exit(ok)` after `tx_result` resolves. Each chunk requires + // the host-side mpsc channel (capacity 1) to be drained by hyper + // before the next can be queued, creating a scheduler yield between + // every chunk. With incorrect tx_result semantics the exit fires at + // the first such yield — after at most one chunk has been queued — + // consistently truncating the body. With correct semantics the exit + // fires only after all chunks have been consumed, so the full body + // reaches the client. + const CHUNK_SIZE: usize = 1024; + const CHUNK_COUNT: usize = 64; + const EXPECTED_LEN: usize = CHUNK_SIZE * CHUNK_COUNT; + + let server = WasmtimeServe::new(P3_CLI_SERVE_TXRESULT_BODY_INTEGRITY_COMPONENT, |cmd| { + cmd.arg("-Wcomponent-model-async"); + cmd.arg("-Sp3,cli"); + })?; + + let resp = server + .send_request( + hyper::Request::builder() + .uri("http://localhost/") + .body(String::new()) + .context("failed to make request")?, + ) + .await?; + + assert!(resp.status().is_success()); + assert_eq!( + resp.body().len(), + EXPECTED_LEN, + "response body truncated: expected {EXPECTED_LEN} bytes but got {} \ + (tx_result fired before body handoff to hyper was complete)", + resp.body().len() + ); + + let (_stdout, stderr) = server.finish()?; + assert!( + !stderr.contains("worker error"), + "unexpected worker error in stderr: {stderr}" + ); + Ok(()) + } } #[test]