Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions crates/test-programs/src/bin/p3_cli_serve_exit_on_single.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/// Test program: handler calls `exit(ok)` after transmitting each response.
///
/// Each request is handled by a fresh instance because:
/// 1. `backpressure_inc` is called at the start of `handle`, preventing the CM
/// runtime from dispatching a second request to the same instance.
/// 2. After the response body is transmitted, a spawned task awaits `tx_result`
/// (the future returned by `response.new`) and then calls `exit(ok)`.
/// `tx_result` resolves once hyper has consumed all response body frames, so
/// the guest only exits after the response has been handed off.
///
/// The response body is the instance's random ID (decimal u64), stable for
/// the lifetime of the instance. The test verifies that N sequential requests
/// each receive a distinct instance ID, and that no "worker error" appears in
/// stderr (i.e. `exit(ok)` is treated as a clean shutdown, not an error).
///
/// Used by `p3_cli_serve_exit_on_single` in `tests/all/cli_tests.rs`.
use std::sync::OnceLock;
use test_programs::p3::wasi::cli::exit::exit;
use test_programs::p3::wasi::http::types::{ErrorCode, Fields, Request, Response};
use test_programs::p3::wasi::random::random::get_random_u64;
use test_programs::p3::{service, wit_future, wit_stream};

/// Stable identifier for this instance, generated once on first request.
static INSTANCE_ID: OnceLock<u64> = OnceLock::new();

fn instance_id() -> u64 {
*INSTANCE_ID.get_or_init(get_random_u64)
}

struct T;

service::export!(T);

impl service::exports::wasi::http::handler::Guest for T {
async fn handle(_request: Request) -> Result<Response, ErrorCode> {
// Prevent the CM runtime from dispatching another request to this
// instance while the current one is in flight.
wit_bindgen::backpressure_inc();

let body = format!("{}", instance_id()).into_bytes();
let (mut body_tx, body_rx) = wit_stream::new();
let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None));
drop(trailers_tx);

let (response, tx_result) = Response::new(Fields::new(), Some(body_rx), trailers_rx);

wit_bindgen::spawn(async move {
body_tx.write_all(body).await;
});

// Exit cleanly after the response body has been handed off to the
// HTTP layer. `tx_result` only resolves once hyper has consumed all
// response body frames, so the store is not torn down prematurely.
wit_bindgen::spawn(async move {
let _ = tx_result.await;
exit(Ok(()));
});

Ok(response)
}
}

fn main() {
unreachable!()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/// Test program: verifies that `tx_result` (the future returned by
/// `response.new`) resolves only after the response body has been fully handed
/// to the HTTP layer.
///
/// The handler writes a 64 KiB body as 64 separate 1 KiB chunks and exits via
/// `wasi:cli/exit` after `tx_result` resolves. Each chunk is written with an
/// explicit `.await`, which yields the CM scheduler between chunks. The
/// host-side mpsc channel feeding hyper has capacity 1, so after the first
/// chunk is queued a second chunk cannot be sent until hyper drains the
/// channel (a yield the exit task can preempt).
///
/// With incorrect `tx_result` semantics (resolves at request-resource cleanup
/// time, before the body has been handed off to hyper), the exit fires on the
/// first such yield after at most one chunk has been queued, truncating the
/// body and consistently failing the length check in the test.
///
/// With correct semantics (resolves after hyper has consumed all body frames),
/// the exit cannot fire until the store drain is complete, so the full 64 KiB
/// reaches the client.
///
/// Used by `p3_cli_serve_txresult_body_integrity` in `tests/all/cli_tests.rs`.
use test_programs::p3::wasi::cli::exit::exit;
use test_programs::p3::wasi::http::types::{ErrorCode, Fields, Request, Response};
use test_programs::p3::{service, wit_future, wit_stream};

/// Number of 1 KiB chunks that make up the response body.
const CHUNK_COUNT: usize = 64;
/// Size of each chunk in bytes.
const CHUNK_SIZE: usize = 1024;

struct T;

service::export!(T);

impl service::exports::wasi::http::handler::Guest for T {
async fn handle(_request: Request) -> Result<Response, ErrorCode> {
let (mut body_tx, body_rx) = wit_stream::new();
let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None));
drop(trailers_tx);

let (response, tx_result) = Response::new(Fields::new(), Some(body_rx), trailers_rx);

// Write the body in small, separate chunks. Each `.await` yields the
// CM scheduler, giving the exit task a chance to run. With incorrect
// tx_result timing the exit fires at the first such yield — after at
// most one chunk has reached the host-side channel — truncating the
// body. With correct timing it cannot fire until all chunks have been
// consumed by hyper.
wit_bindgen::spawn(async move {
for _ in 0..CHUNK_COUNT {
body_tx.write_all(vec![b'x'; CHUNK_SIZE]).await;
}
});

wit_bindgen::spawn(async move {
let _ = tx_result.await;
exit(Ok(()));
});

Ok(response)
}
}

fn main() {
unreachable!()
}
18 changes: 14 additions & 4 deletions crates/wasi-http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#[cfg(feature = "p2")]
use crate::p2::bindings::http::types as p2_types;
#[cfg(feature = "p3")]
use crate::p3;
use crate::p3::{self, BodyExt as _};
use bytes::Bytes;
use futures::{
channel::oneshot,
Expand Down Expand Up @@ -1193,7 +1193,10 @@ async fn handle<T: Send>(
let (request, body) = request.into_parts();
let body = body.map_err(p3_types::ErrorCode::from);
let request = http::Request::from_parts(request, body);
let (request, request_io_result) = p3::Request::from_http(request);

// there isn't anything meaningful the host can do with i/o errors
// on the body read from the guest, so we just drop that future
let (request, _request_io_result) = p3::Request::from_http(request);

let request = accessor.with(|mut store| {
Ok::<_, wasmtime::Error>(view(store.data_mut()).table.push(request)?)
Expand All @@ -1205,16 +1208,23 @@ async fn handle<T: Send>(
.call_handle(accessor, request)
.await?;

// channel that notifies the guest that the response has been transmitted
let (notify_tx, notify_rx) = oneshot::channel::<()>();
let response = accessor.with(|mut store| {
let response = view(store.get()).table.delete(response?)?;
Ok::<_, wasmtime::Error>(response.into_http_with_getter(
&mut store,
request_io_result,
notify_rx.map(|_| Ok(())),
view,
)?)
})?;

Ok(response.map(move |body| body.map_err(wasmtime::Error::from).boxed_unsync()))
Ok(response.map(move |body| {
// on drop, notify_tx channel will be closed indicating
// to the guest that the response has been transmitted.
let body = body.map_err(wasmtime::Error::from);
body.with_state(notify_tx).boxed_unsync()
}))
});

// TODO: We should also use `oneshot::Sender::poll_close` to be
Expand Down
2 changes: 2 additions & 0 deletions crates/wasi-http/src/p3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod proxy;
mod request;
mod response;

pub(crate) use body::BodyExt;

#[cfg(feature = "default-send-request")]
pub use request::default_send_request;
pub use request::{Request, RequestOptions};
Expand Down
9 changes: 7 additions & 2 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use wasmtime::{
};
use wasmtime_cli_flags::opt::WasmtimeOptionValue;
use wasmtime_wasi::p2::{StreamError, StreamResult};
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
use wasmtime_wasi::{I32Exit, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
#[cfg(feature = "component-model-async")]
use wasmtime_wasi_http::handler::p2::bindings as p2;
use wasmtime_wasi_http::handler::{
Expand Down Expand Up @@ -862,7 +862,12 @@ impl WorkerState for HostWorkerState {

fn drop(&self, mut store: Store<Self::StoreData>, result: Result<(), wasmtime::Error>) {
if let Err(error) = result {
eprintln!("worker failed: {error:?}");
// A clean exit via `wasi:cli/exit(ok)` is a guest-controlled signal
// that the instance is done and should not be reused. Treat it as a
// graceful worker exit rather than an error.
if !error.downcast_ref::<I32Exit>().map_or(false, |e| e.0 == 0) {
eprintln!("worker failed: {error:?}");
}
}

if let Some(write_profile) = store.data_mut().write_profile.take() {
Expand Down
104 changes: 104 additions & 0 deletions tests/all/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2982,6 +2982,110 @@ start a print 1234
assert!(stdout.contains("please see me"));
Ok(())
}

#[tokio::test]
#[cfg_attr(not(feature = "component-model-async"), ignore)]
async fn p3_cli_serve_exit_on_single() -> Result<()> {
// Send N sequential requests to a handler that calls `exit(ok)` after
// each response. Each request must be handled by a fresh instance
// because the handler raises backpressure permanently, preventing
// instance reuse.
//
// The response body is the instance's random ID (decimal u64). We
// assert that all N IDs are distinct (one per instance) and that no
// "worker error" appears in stderr — `exit(ok)` is a clean, guest-
// controlled shutdown and must not be treated as an error.
const N: usize = 5;
let server = WasmtimeServe::new(P3_CLI_SERVE_EXIT_ON_SINGLE_COMPONENT, |cmd| {
cmd.arg("-Wcomponent-model-async");
cmd.arg("-Sp3,cli");
})?;

let mut instance_ids = std::collections::HashSet::new();
for _ in 0..N {
let resp = server
.send_request(
hyper::Request::builder()
.uri("http://localhost/")
.body(String::new())
.context("failed to make request")?,
)
.await?;
assert!(resp.status().is_success());
let id: u64 = resp
.body()
.trim()
.parse::<u64>()
.context("response body should be a u64 instance ID")?;
instance_ids.insert(id);
}

assert_eq!(
instance_ids.len(),
N,
"expected {N} distinct instance IDs but got {}: {:?}",
instance_ids.len(),
instance_ids
);

let (_stdout, stderr) = server.finish()?;
assert!(
!stderr.contains("worker error"),
"unexpected worker error in stderr: {stderr}"
);
Ok(())
}

#[tokio::test]
#[cfg_attr(not(feature = "component-model-async"), ignore)]
async fn p3_cli_serve_txresult_body_integrity() -> Result<()> {
// Verify that `tx_result` (the future returned by `response.new`)
// resolves only after the response body has been fully handed to the
// HTTP layer, not prematurely at request-resource cleanup time.
//
// The guest writes a 64 KiB body as 64 separate 1 KiB chunks and
// calls `exit(ok)` after `tx_result` resolves. Each chunk requires
// the host-side mpsc channel (capacity 1) to be drained by hyper
// before the next can be queued, creating a scheduler yield between
// every chunk. With incorrect tx_result semantics the exit fires at
// the first such yield — after at most one chunk has been queued —
// consistently truncating the body. With correct semantics the exit
// fires only after all chunks have been consumed, so the full body
// reaches the client.
const CHUNK_SIZE: usize = 1024;
const CHUNK_COUNT: usize = 64;
const EXPECTED_LEN: usize = CHUNK_SIZE * CHUNK_COUNT;

let server = WasmtimeServe::new(P3_CLI_SERVE_TXRESULT_BODY_INTEGRITY_COMPONENT, |cmd| {
cmd.arg("-Wcomponent-model-async");
cmd.arg("-Sp3,cli");
})?;

let resp = server
.send_request(
hyper::Request::builder()
.uri("http://localhost/")
.body(String::new())
.context("failed to make request")?,
)
.await?;

assert!(resp.status().is_success());
assert_eq!(
resp.body().len(),
EXPECTED_LEN,
"response body truncated: expected {EXPECTED_LEN} bytes but got {} \
(tx_result fired before body handoff to hyper was complete)",
resp.body().len()
);

let (_stdout, stderr) = server.finish()?;
assert!(
!stderr.contains("worker error"),
"unexpected worker error in stderr: {stderr}"
);
Ok(())
}
}

#[test]
Expand Down
Loading