Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ itertools = "0.14.0"
base64 = "0.22.1"
termcolor = "1.4.1"
flate2 = "1.0.30"
tokio-util = "0.7.4"

# =============================================================================
#
Expand Down
3 changes: 2 additions & 1 deletion crates/wasi-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ workspace = true
[features]
default = ["default-send-request"]
default-send-request = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"]
p3 = ["wasmtime-wasi/p3"]
p3 = ["wasmtime-wasi/p3", "dep:tokio-util"]

[dependencies]
anyhow = { workspace = true }
Expand All @@ -27,6 +27,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"time",
] }
tokio-util = { workspace = true, optional = true }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
Expand Down
19 changes: 19 additions & 0 deletions crates/wasi-http/src/p3/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,31 @@ mod generated {
world: "wasi:http/proxy",
imports: {
"wasi:http/handler/[async]handle": async | store | trappable | tracing,
"wasi:http/types/[method]request.consume-body": async | store | trappable | tracing,
"wasi:http/types/[method]response.consume-body": async | store | trappable | tracing,
"wasi:http/types/[static]request.new": async | store | trappable | tracing,
"wasi:http/types/[static]response.new": async | store | trappable | tracing,
default: trappable | tracing,
},
exports: { default: async | store },
with: {
"wasi:http/types/fields": with::Fields,
"wasi:http/types/request": crate::p3::Request,
"wasi:http/types/request-options": with::RequestOptions,
"wasi:http/types/response": crate::p3::Response,
},
trappable_error_type: {
"wasi:http/types/error-code" => crate::p3::HttpError,
},
});

mod with {
/// The concrete type behind a `wasi:http/types/fields` resource.
pub type Fields = crate::p3::MaybeMutable<http::HeaderMap>;

/// The concrete type behind a `wasi:http/types/request-options` resource.
pub type RequestOptions = crate::p3::MaybeMutable<crate::p3::RequestOptions>;
}
}

pub use self::generated::wasi::*;
Expand Down
257 changes: 257 additions & 0 deletions crates/wasi-http/src/p3/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
use crate::p3::bindings::http::types::{ErrorCode, Trailers};
use crate::p3::{WasiHttp, WasiHttpCtxView};
use anyhow::Context as _;
use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll, ready};
use http::HeaderMap;
use http_body_util::combinators::BoxBody;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::PollSender;
use wasmtime::component::{
Accessor, FutureConsumer, FutureReader, Resource, Source, StreamConsumer, StreamReader,
StreamResult,
};
use wasmtime::{AsContextMut, StoreContextMut};

/// The concrete type behind a `wasi:http/types/body` resource.
pub(crate) enum Body {
/// Body constructed by the guest
Guest {
/// The body stream
contents_rx: Option<StreamReader<u8>>,
/// Future, on which guest will write result and optional trailers
trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
/// Channel, on which transmission result will be written
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
},
/// Body constructed by the host.
Host {
body: BoxBody<Bytes, ErrorCode>,
/// Channel, on which transmission result will be written
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
},
/// Body is consumed.
Consumed,
}

pub(crate) struct GuestBodyConsumer {
pub(crate) tx: PollSender<Bytes>,
}

impl<D> StreamConsumer<D> for GuestBodyConsumer {
type Item = u8;

fn poll_consume(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
src: Source<Self::Item>,
finish: bool,
) -> Poll<wasmtime::Result<StreamResult>> {
match self.tx.poll_reserve(cx) {
Poll::Ready(Ok(())) => {
let mut src = src.as_direct(store);
let buf = Bytes::copy_from_slice(src.remaining());
let n = buf.len();
match self.tx.send_item(buf) {
Ok(()) => {
src.mark_read(n);
Poll::Ready(Ok(StreamResult::Completed))
}
Err(..) => Poll::Ready(Ok(StreamResult::Dropped)),
}
}
Poll::Ready(Err(..)) => Poll::Ready(Ok(StreamResult::Dropped)),
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
Poll::Pending => Poll::Pending,
}
}
}

pub(crate) struct GuestBody {
pub(crate) contents_rx: Option<mpsc::Receiver<Bytes>>,
pub(crate) trailers_rx:
Option<oneshot::Receiver<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>>,
}

impl GuestBody {
pub fn new<T: 'static>(
mut store: impl AsContextMut<Data = T>,
contents_rx: Option<StreamReader<u8>>,
trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>,
) -> Self {
let (trailers_http_tx, trailers_http_rx) = oneshot::channel();
trailers_rx.pipe(
&mut store,
GuestTrailerConsumer {
tx: trailers_http_tx,
getter,
},
);
let contents_rx = contents_rx.map(|rx| {
let (http_tx, http_rx) = mpsc::channel(1);
rx.pipe(
store,
GuestBodyConsumer {
tx: PollSender::new(http_tx),
},
);
http_rx
});
Self {
trailers_rx: Some(trailers_http_rx),
contents_rx,
}
}
}

impl http_body::Body for GuestBody {
type Data = Bytes;
type Error = ErrorCode;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
if let Some(contents_rx) = self.contents_rx.as_mut() {
while let Some(buf) = ready!(contents_rx.poll_recv(cx)) {
return Poll::Ready(Some(Ok(http_body::Frame::data(buf))));
}
self.contents_rx = None;
}

let Some(trailers_rx) = self.trailers_rx.as_mut() else {
return Poll::Ready(None);
};

let res = ready!(Pin::new(trailers_rx).poll(cx));
self.trailers_rx = None;
match res {
Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(
Arc::unwrap_or_clone(trailers),
)))),
Ok(Ok(None)) => Poll::Ready(None),
Ok(Err(err)) => Poll::Ready(Some(Err(err))),
Err(..) => Poll::Ready(None),
}
}
Comment on lines +115 to +140
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind leaving some comments here (and in is_end_stream) about the state machine being managed here? Doesn't need to be too wordy but having headings to grasp on I think would be helpful


fn is_end_stream(&self) -> bool {
if let Some(contents_rx) = self.contents_rx.as_ref() {
if !contents_rx.is_empty() || !contents_rx.is_closed() {
return false;
}
}
if let Some(trailers_rx) = self.trailers_rx.as_ref() {
if !trailers_rx.is_terminated() {
return false;
}
}
return true;
}

fn size_hint(&self) -> http_body::SizeHint {
// TODO: use content-length
http_body::SizeHint::default()
}
}

pub(crate) struct ConsumedBody;

impl http_body::Body for ConsumedBody {
type Data = Bytes;
type Error = ErrorCode;

fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
Poll::Ready(Some(Err(ErrorCode::InternalError(Some(
"body consumed".into(),
)))))
}

fn is_end_stream(&self) -> bool {
true
}

fn size_hint(&self) -> http_body::SizeHint {
http_body::SizeHint::with_exact(0)
}
}

pub(crate) struct GuestTrailerConsumer<T> {
pub(crate) tx: oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>,
pub(crate) getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>,
Comment thread
rvolosatovs marked this conversation as resolved.
}

impl<D> FutureConsumer<D> for GuestTrailerConsumer<D>
where
D: 'static,
{
type Item = Result<Option<Resource<Trailers>>, ErrorCode>;

async fn consume(self, store: &Accessor<D>, res: Self::Item) -> wasmtime::Result<()> {
match res {
Ok(Some(trailers)) => store
.with_getter::<WasiHttp>(self.getter)
.with(|mut store| {
let WasiHttpCtxView { table, .. } = store.get();
let trailers = table
.delete(trailers)
.context("failed to delete trailers")?;
_ = self.tx.send(Ok(Some(Arc::from(trailers))));
Ok(())
}),
Ok(None) => {
_ = self.tx.send(Ok(None));
Ok(())
}
Err(err) => {
_ = self.tx.send(Err(err));
Ok(())
}
}
}
}

pub(crate) struct IncomingResponseBody {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding a small doc block here explaining what this is doing? The code is pretty self-explanatory but I still think it'd be good to have an intro

pub incoming: hyper::body::Incoming,
pub timeout: tokio::time::Interval,
}

impl http_body::Body for IncomingResponseBody {
type Data = <hyper::body::Incoming as http_body::Body>::Data;
type Error = ErrorCode;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
match Pin::new(&mut self.as_mut().incoming).poll_frame(cx) {
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(err))) => {
Poll::Ready(Some(Err(ErrorCode::from_hyper_response_error(err))))
}
Poll::Ready(Some(Ok(frame))) => {
self.timeout.reset();
Poll::Ready(Some(Ok(frame)))
}
Poll::Pending => {
ready!(self.timeout.poll_tick(cx));
Poll::Ready(Some(Err(ErrorCode::ConnectionReadTimeout)))
}
}
}

fn is_end_stream(&self) -> bool {
self.incoming.is_end_stream()
}

fn size_hint(&self) -> http_body::SizeHint {
self.incoming.size_hint()
}
}
36 changes: 36 additions & 0 deletions crates/wasi-http/src/p3/conv.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,48 @@
use crate::p3::bindings::http::types::{ErrorCode, Method, Scheme};
use core::convert::Infallible;
use core::error::Error as _;
use tracing::warn;

impl From<Infallible> for ErrorCode {
fn from(x: Infallible) -> Self {
match x {}
}
}

impl ErrorCode {
/// Translate a [`hyper::Error`] to a wasi-http [ErrorCode] in the context of a request.
pub fn from_hyper_request_error(err: hyper::Error) -> Self {
// If there's a source, we might be able to extract a wasi-http error from it.
if let Some(cause) = err.source() {
if let Some(err) = cause.downcast_ref::<Self>() {
return err.clone();
}
}

warn!("hyper request error: {err:?}");

Self::HttpProtocolError
}

/// Translate a [`hyper::Error`] to a wasi-http [ErrorCode] in the context of a response.
pub fn from_hyper_response_error(err: hyper::Error) -> Self {
if err.is_timeout() {
return ErrorCode::HttpResponseTimeout;
}

// If there's a source, we might be able to extract a wasi-http error from it.
if let Some(cause) = err.source() {
if let Some(err) = cause.downcast_ref::<Self>() {
return err.clone();
}
}

warn!("hyper response error: {err:?}");

ErrorCode::HttpProtocolError
}
}

impl From<http::Method> for Method {
fn from(method: http::Method) -> Self {
Self::from(&method)
Expand Down
Loading