-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(p3): implement wasi:http
#11440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
eaee3a1
127d1e7
66d87ce
d7b5ed6
0b34085
db4652c
989769c
f2ff970
22a5159
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,257 @@ | ||
| use crate::p3::bindings::http::types::{ErrorCode, Trailers}; | ||
| use crate::p3::{WasiHttp, WasiHttpCtxView}; | ||
| use anyhow::Context as _; | ||
| use bytes::Bytes; | ||
| use core::pin::Pin; | ||
| use core::task::{Context, Poll, ready}; | ||
| use http::HeaderMap; | ||
| use http_body_util::combinators::BoxBody; | ||
| use std::sync::Arc; | ||
| use tokio::sync::{mpsc, oneshot}; | ||
| use tokio_util::sync::PollSender; | ||
| use wasmtime::component::{ | ||
| Accessor, FutureConsumer, FutureReader, Resource, Source, StreamConsumer, StreamReader, | ||
| StreamResult, | ||
| }; | ||
| use wasmtime::{AsContextMut, StoreContextMut}; | ||
|
|
||
| /// The concrete type behind a `wasi:http/types/body` resource. | ||
| pub(crate) enum Body { | ||
| /// Body constructed by the guest | ||
| Guest { | ||
| /// The body stream | ||
| contents_rx: Option<StreamReader<u8>>, | ||
| /// Future, on which guest will write result and optional trailers | ||
| trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>, | ||
| /// Channel, on which transmission result will be written | ||
| result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>, | ||
| }, | ||
| /// Body constructed by the host. | ||
| Host { | ||
| body: BoxBody<Bytes, ErrorCode>, | ||
| /// Channel, on which transmission result will be written | ||
| result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>, | ||
| }, | ||
| /// Body is consumed. | ||
| Consumed, | ||
| } | ||
|
|
||
| pub(crate) struct GuestBodyConsumer { | ||
| pub(crate) tx: PollSender<Bytes>, | ||
| } | ||
|
|
||
| impl<D> StreamConsumer<D> for GuestBodyConsumer { | ||
| type Item = u8; | ||
|
|
||
| fn poll_consume( | ||
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| store: StoreContextMut<D>, | ||
| src: Source<Self::Item>, | ||
| finish: bool, | ||
| ) -> Poll<wasmtime::Result<StreamResult>> { | ||
| match self.tx.poll_reserve(cx) { | ||
| Poll::Ready(Ok(())) => { | ||
| let mut src = src.as_direct(store); | ||
| let buf = Bytes::copy_from_slice(src.remaining()); | ||
| let n = buf.len(); | ||
| match self.tx.send_item(buf) { | ||
| Ok(()) => { | ||
| src.mark_read(n); | ||
| Poll::Ready(Ok(StreamResult::Completed)) | ||
| } | ||
| Err(..) => Poll::Ready(Ok(StreamResult::Dropped)), | ||
| } | ||
| } | ||
| Poll::Ready(Err(..)) => Poll::Ready(Ok(StreamResult::Dropped)), | ||
| Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)), | ||
| Poll::Pending => Poll::Pending, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pub(crate) struct GuestBody { | ||
| pub(crate) contents_rx: Option<mpsc::Receiver<Bytes>>, | ||
| pub(crate) trailers_rx: | ||
| Option<oneshot::Receiver<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>>, | ||
| } | ||
|
|
||
| impl GuestBody { | ||
| pub fn new<T: 'static>( | ||
| mut store: impl AsContextMut<Data = T>, | ||
| contents_rx: Option<StreamReader<u8>>, | ||
| trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>, | ||
| getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>, | ||
| ) -> Self { | ||
| let (trailers_http_tx, trailers_http_rx) = oneshot::channel(); | ||
| trailers_rx.pipe( | ||
| &mut store, | ||
| GuestTrailerConsumer { | ||
| tx: trailers_http_tx, | ||
| getter, | ||
| }, | ||
| ); | ||
| let contents_rx = contents_rx.map(|rx| { | ||
| let (http_tx, http_rx) = mpsc::channel(1); | ||
| rx.pipe( | ||
| store, | ||
| GuestBodyConsumer { | ||
| tx: PollSender::new(http_tx), | ||
| }, | ||
| ); | ||
| http_rx | ||
| }); | ||
| Self { | ||
| trailers_rx: Some(trailers_http_rx), | ||
| contents_rx, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl http_body::Body for GuestBody { | ||
| type Data = Bytes; | ||
| type Error = ErrorCode; | ||
|
|
||
| fn poll_frame( | ||
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> { | ||
| if let Some(contents_rx) = self.contents_rx.as_mut() { | ||
| while let Some(buf) = ready!(contents_rx.poll_recv(cx)) { | ||
| return Poll::Ready(Some(Ok(http_body::Frame::data(buf)))); | ||
| } | ||
| self.contents_rx = None; | ||
| } | ||
|
|
||
| let Some(trailers_rx) = self.trailers_rx.as_mut() else { | ||
| return Poll::Ready(None); | ||
| }; | ||
|
|
||
| let res = ready!(Pin::new(trailers_rx).poll(cx)); | ||
| self.trailers_rx = None; | ||
| match res { | ||
| Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers( | ||
| Arc::unwrap_or_clone(trailers), | ||
| )))), | ||
| Ok(Ok(None)) => Poll::Ready(None), | ||
| Ok(Err(err)) => Poll::Ready(Some(Err(err))), | ||
| Err(..) => Poll::Ready(None), | ||
| } | ||
| } | ||
|
|
||
| fn is_end_stream(&self) -> bool { | ||
| if let Some(contents_rx) = self.contents_rx.as_ref() { | ||
| if !contents_rx.is_empty() || !contents_rx.is_closed() { | ||
| return false; | ||
| } | ||
| } | ||
| if let Some(trailers_rx) = self.trailers_rx.as_ref() { | ||
| if !trailers_rx.is_terminated() { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| fn size_hint(&self) -> http_body::SizeHint { | ||
| // TODO: use content-length | ||
| http_body::SizeHint::default() | ||
| } | ||
| } | ||
|
|
||
| pub(crate) struct ConsumedBody; | ||
|
|
||
| impl http_body::Body for ConsumedBody { | ||
| type Data = Bytes; | ||
| type Error = ErrorCode; | ||
|
|
||
| fn poll_frame( | ||
| self: Pin<&mut Self>, | ||
| _cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> { | ||
| Poll::Ready(Some(Err(ErrorCode::InternalError(Some( | ||
| "body consumed".into(), | ||
| ))))) | ||
| } | ||
|
|
||
| fn is_end_stream(&self) -> bool { | ||
| true | ||
| } | ||
|
|
||
| fn size_hint(&self) -> http_body::SizeHint { | ||
| http_body::SizeHint::with_exact(0) | ||
| } | ||
| } | ||
|
|
||
| pub(crate) struct GuestTrailerConsumer<T> { | ||
| pub(crate) tx: oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>, | ||
| pub(crate) getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>, | ||
|
rvolosatovs marked this conversation as resolved.
|
||
| } | ||
|
|
||
| impl<D> FutureConsumer<D> for GuestTrailerConsumer<D> | ||
| where | ||
| D: 'static, | ||
| { | ||
| type Item = Result<Option<Resource<Trailers>>, ErrorCode>; | ||
|
|
||
| async fn consume(self, store: &Accessor<D>, res: Self::Item) -> wasmtime::Result<()> { | ||
| match res { | ||
| Ok(Some(trailers)) => store | ||
| .with_getter::<WasiHttp>(self.getter) | ||
| .with(|mut store| { | ||
| let WasiHttpCtxView { table, .. } = store.get(); | ||
| let trailers = table | ||
| .delete(trailers) | ||
| .context("failed to delete trailers")?; | ||
| _ = self.tx.send(Ok(Some(Arc::from(trailers)))); | ||
| Ok(()) | ||
| }), | ||
| Ok(None) => { | ||
| _ = self.tx.send(Ok(None)); | ||
| Ok(()) | ||
| } | ||
| Err(err) => { | ||
| _ = self.tx.send(Err(err)); | ||
| Ok(()) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pub(crate) struct IncomingResponseBody { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mind adding a small doc block here explaining what this is doing? The code is pretty self-explanatory but I still think it'd be good to have an intro |
||
| pub incoming: hyper::body::Incoming, | ||
| pub timeout: tokio::time::Interval, | ||
| } | ||
|
|
||
| impl http_body::Body for IncomingResponseBody { | ||
| type Data = <hyper::body::Incoming as http_body::Body>::Data; | ||
| type Error = ErrorCode; | ||
|
|
||
| fn poll_frame( | ||
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> { | ||
| match Pin::new(&mut self.as_mut().incoming).poll_frame(cx) { | ||
| Poll::Ready(None) => Poll::Ready(None), | ||
| Poll::Ready(Some(Err(err))) => { | ||
| Poll::Ready(Some(Err(ErrorCode::from_hyper_response_error(err)))) | ||
| } | ||
| Poll::Ready(Some(Ok(frame))) => { | ||
| self.timeout.reset(); | ||
| Poll::Ready(Some(Ok(frame))) | ||
| } | ||
| Poll::Pending => { | ||
| ready!(self.timeout.poll_tick(cx)); | ||
| Poll::Ready(Some(Err(ErrorCode::ConnectionReadTimeout))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn is_end_stream(&self) -> bool { | ||
| self.incoming.is_end_stream() | ||
| } | ||
|
|
||
| fn size_hint(&self) -> http_body::SizeHint { | ||
| self.incoming.size_hint() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind leaving some comments here (and in
is_end_stream) about the state machine being managed here? Doesn't need to be too wordy but having headings to grasp on I think would be helpful