Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit b23067a

Browse files
committed
feat(p3/http): implement wasmtime serve
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent d4c69cf commit b23067a

2 files changed

Lines changed: 70 additions & 48 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ disable-logging = ["log/max_level_off", "tracing/max_level_off"]
462462
# the internal mapping for what they enable in Wasmtime itself.
463463
wasi-nn = ["dep:wasmtime-wasi-nn"]
464464
wasi-threads = ["dep:wasmtime-wasi-threads", "threads"]
465-
wasi-http = ["component-model", "dep:wasmtime-wasi-http", "dep:tokio", "dep:hyper"]
465+
wasi-http = ["component-model", "dep:wasmtime-wasi-http", "wasmtime-wasi-http/p3", "dep:tokio", "dep:hyper"]
466466
wasi-config = ["dep:wasmtime-wasi-config"]
467467
wasi-keyvalue = ["dep:wasmtime-wasi-keyvalue"]
468468
pooling-allocator = ["wasmtime/pooling-allocator", "wasmtime-cli-flags/pooling-allocator"]

src/commands/serve.rs

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
},
1212
};
1313
use tokio::io::{stderr, stdin, stdout};
14-
use wasmtime::component::Linker;
14+
use wasmtime::{component::Linker, AsContextMut as _};
1515
use wasmtime::{Engine, Store, StoreLimits};
1616
use wasmtime_wasi::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
1717
use wasmtime_wasi_http::bindings::http::types::Scheme;
@@ -549,65 +549,87 @@ impl ServeCommand {
549549

550550
log::info!("Listening on {}", self.addr);
551551

552-
if let Ok(instance) = wasmtime_wasi_http::p3::bindings::ProxyPre::new(instance.clone()) {
553-
let next_id = AtomicU64::default();
552+
if let Ok(..) = wasmtime_wasi_http::p3::bindings::ProxyPre::new(instance.clone()) {
553+
let next_id = Arc::new(AtomicU64::default());
554+
let cmd = Arc::new(self);
554555
loop {
555556
let (stream, _) = listener.accept().await?;
557+
let engine = engine.clone();
558+
let cmd = Arc::clone(&cmd);
559+
let next_id = Arc::clone(&next_id);
560+
let instance = instance.clone();
556561
tokio::task::spawn(async {
562+
let service = hyper::service::service_fn(
563+
move |req: hyper::Request<hyper::body::Incoming>| {
564+
let req_id = next_id.fetch_add(1, Ordering::Relaxed);
565+
let engine = engine.clone();
566+
let cmd = Arc::clone(&cmd);
567+
let instance = instance.clone();
568+
async move {
569+
let mut store = cmd
570+
.new_store(&engine, req_id)
571+
.context("failed to create new store")?;
572+
let instance = instance.instantiate_async(&mut store).await?;
573+
let proxy = wasmtime_wasi_http::p3::bindings::Proxy::new(
574+
&mut store, &instance,
575+
)?;
576+
let (req, body) = req.into_parts();
577+
let body = body.map_err(wasmtime_wasi_http::p3::bindings::http::types::ErrorCode::from_hyper_request_error);
578+
let handle = proxy
579+
.handle(&mut store, http::Request::from_parts(req, body))
580+
.await
581+
.context("failed to call `handle`")?;
582+
let res = handle.get(&mut store).await??;
583+
let (res, tx, io) =
584+
wasmtime_wasi_http::p3::Response::resource_into_http(
585+
&mut store, &instance, res,
586+
)?;
587+
tokio::task::spawn(async move {
588+
if let Some(io) = io {
589+
let closure = io.get(&mut store).await?;
590+
closure(store.as_context_mut())?;
591+
}
592+
// TODO: Report transmit errors
593+
if let Some(tx) = tx {
594+
tx.write(Ok(())).get(&mut store).await?;
595+
}
596+
anyhow::Ok(())
597+
});
598+
anyhow::Ok(res.map(|body| body.map_err(|err| err.unwrap_or(wasmtime_wasi_http::p3::bindings::http::types::ErrorCode::InternalError(None)))))
599+
}
600+
},
601+
);
557602
if let Err(e) = http1::Builder::new()
558603
.keep_alive(true)
559-
.serve_connection(
560-
TokioIo::new(stream),
561-
hyper::service::service_fn(move |req| {
562-
let instance = instance.clone();
563-
let engine = engine.clone();
564-
async move {
565-
let req_id = next_id.fetch_add(1, Ordering::Relaxed);
566-
let mut store = self.new_store(&engine, req_id).context("failed")?;
567-
let proxy = instance.instantiate_async(&mut store).await?;
568-
let res = proxy
569-
.handle(
570-
store,
571-
req.map(|body: hyper::body::Incoming| {
572-
body.map_err(|err| {
573-
eprintln!("TODO: convert error {err:?}");
574-
wasmtime_wasi_http::p3::bindings::http::types::ErrorCode::InternalError(None)
575-
})
576-
}),
577-
)
578-
.await.context("failed")?.context("failed")?;
579-
anyhow::Ok(res.map(|body| body.map_err(|err| err.unwrap())))
580-
}
581-
}),
582-
)
604+
.serve_connection(TokioIo::new(stream), service)
583605
.await
584606
{
585607
eprintln!("error: {e:?}");
586608
}
587609
});
588610
}
589-
}
590-
591-
let instance = wasmtime_wasi_http::bindings::ProxyPre::new(instance)?;
611+
} else {
612+
let instance = wasmtime_wasi_http::bindings::ProxyPre::new(instance)?;
592613

593-
let handler = ProxyHandler::new(self, engine, instance);
614+
let handler = ProxyHandler::new(self, engine, instance);
594615

595-
loop {
596-
let (stream, _) = listener.accept().await?;
597-
let stream = TokioIo::new(stream);
598-
let h = handler.clone();
599-
tokio::task::spawn(async {
600-
if let Err(e) = http1::Builder::new()
601-
.keep_alive(true)
602-
.serve_connection(
603-
stream,
604-
hyper::service::service_fn(move |req| handle_request(h.clone(), req)),
605-
)
606-
.await
607-
{
608-
eprintln!("error: {e:?}");
609-
}
610-
});
616+
loop {
617+
let (stream, _) = listener.accept().await?;
618+
let stream = TokioIo::new(stream);
619+
let h = handler.clone();
620+
tokio::task::spawn(async {
621+
if let Err(e) = http1::Builder::new()
622+
.keep_alive(true)
623+
.serve_connection(
624+
stream,
625+
hyper::service::service_fn(move |req| handle_request(h.clone(), req)),
626+
)
627+
.await
628+
{
629+
eprintln!("error: {e:?}");
630+
}
631+
});
632+
}
611633
}
612634
}
613635
}

0 commit comments

Comments
 (0)