From be3ef4a9a35104f7b9fee700ee505971340625e0 Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 8 Apr 2026 16:33:01 +0200 Subject: [PATCH 1/7] move p2 impl to sys/p2 --- Cargo.toml | 5 + src/http/body.rs | 610 +----------------- src/http/error.rs | 2 +- src/http/mod.rs | 12 +- src/http/request.rs | 110 +--- src/http/response.rs | 39 +- src/http/server.rs | 68 +- src/io/mod.rs | 6 +- src/lib.rs | 3 + src/net/mod.rs | 28 +- src/rand/mod.rs | 24 +- src/runtime/mod.rs | 25 +- src/sys/mod.rs | 11 + src/sys/p2/http/body.rs | 609 +++++++++++++++++ src/{ => sys/p2}/http/client.rs | 6 +- src/{ => sys/p2}/http/fields.rs | 3 +- src/{ => sys/p2}/http/method.rs | 0 src/sys/p2/http/mod.rs | 10 + src/sys/p2/http/request.rs | 108 ++++ src/sys/p2/http/response.rs | 29 + src/{ => sys/p2}/http/scheme.rs | 0 src/sys/p2/http/server.rs | 68 ++ src/{io/streams.rs => sys/p2/io.rs} | 10 +- src/sys/p2/mod.rs | 7 + src/sys/p2/net/mod.rs | 27 + src/{ => sys/p2}/net/tcp_listener.rs | 0 src/{ => sys/p2}/net/tcp_stream.rs | 0 src/sys/p2/random.rs | 23 + .../block_on.rs => sys/p2/runtime/mod.rs} | 25 +- src/{ => sys/p2}/runtime/reactor.rs | 0 src/{io => sys/p2}/stdio.rs | 2 +- src/sys/p2/time.rs | 46 ++ src/time/duration.rs | 3 +- src/time/instant.rs | 5 +- src/time/mod.rs | 31 +- 35 files changed, 1001 insertions(+), 954 deletions(-) create mode 100644 src/sys/mod.rs create mode 100644 src/sys/p2/http/body.rs rename src/{ => sys/p2}/http/client.rs (96%) rename src/{ => sys/p2}/http/fields.rs (94%) rename src/{ => sys/p2}/http/method.rs (100%) create mode 100644 src/sys/p2/http/mod.rs create mode 100644 src/sys/p2/http/request.rs create mode 100644 src/sys/p2/http/response.rs rename src/{ => sys/p2}/http/scheme.rs (100%) create mode 100644 src/sys/p2/http/server.rs rename src/{io/streams.rs => sys/p2/io.rs} (98%) create mode 100644 src/sys/p2/mod.rs create mode 100644 src/sys/p2/net/mod.rs rename src/{ => sys/p2}/net/tcp_listener.rs (100%) rename src/{ => sys/p2}/net/tcp_stream.rs (100%) create mode 100644 src/sys/p2/random.rs rename src/{runtime/block_on.rs => sys/p2/runtime/mod.rs} (77%) rename src/{ => sys/p2}/runtime/reactor.rs (100%) rename src/{io => sys/p2}/stdio.rs (98%) create mode 100644 src/sys/p2/time.rs diff --git a/Cargo.toml b/Cargo.toml index 4e248eb..92fe25b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,14 @@ rust-version.workspace = true default = ["json"] json = ["dep:serde", "dep:serde_json"] +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(feature, values("wasip3"))'] } + [dependencies] anyhow.workspace = true async-task.workspace = true bytes.workspace = true +cfg-if.workspace = true futures-lite.workspace = true http-body-util.workspace = true http-body.workspace = true @@ -72,6 +76,7 @@ async-task = "4.7" axum = { version = "0.8.6", default-features = false } bytes = "1.10.1" cargo_metadata = "0.22" +cfg-if = "1" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" diff --git a/src/http/body.rs b/src/http/body.rs index c23a3c0..b190a4b 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -1,609 +1 @@ -use crate::http::{ - Error, HeaderMap, - error::Context as _, - fields::{header_map_from_wasi, header_map_to_wasi}, -}; -use crate::io::{AsyncInputStream, AsyncOutputStream}; -use crate::runtime::{AsyncPollable, Reactor, WaitFor}; - -pub use ::http_body::{Body as HttpBody, Frame, SizeHint}; -pub use bytes::Bytes; - -use http::header::CONTENT_LENGTH; -use http_body_util::{BodyExt, combinators::UnsyncBoxBody}; -use std::fmt; -use std::future::{Future, poll_fn}; -use std::pin::{Pin, pin}; -use std::task::{Context, Poll}; -use wasip2::http::types::{ - FutureTrailers, IncomingBody as WasiIncomingBody, OutgoingBody as WasiOutgoingBody, -}; -use wasip2::io::streams::{InputStream as WasiInputStream, StreamError}; - -pub mod util { - pub use http_body_util::*; -} - -/// A HTTP Body. -/// -/// Construct this HTTP body using: -/// * `Body::empty` for the empty body, or `impl From<()> for Body` -/// * `From<&[u8]>` (which will make a clone) or `From>` or -/// `From` for a `Body` from bytes. -/// * `From<&str>` (which will make a clone) or `From` for a `Body` -/// from strings. -/// * `Body::from_json` for a `Body` from a `Serialize` (requires feature -/// `json`) -/// * `From` for a `Body` with contents given by the -/// contents of a WASI input-stream. -/// * `Body::from_stream` or `Body::from_try_stream` for a `Body` from a -/// `Stream` of `Into` -/// -/// Consume this HTTP body using: -/// * `Body::into_boxed_body` converts it to an `UnsyncBoxBody`. -/// This is a boxed representation of `http_body::Body` that is `Send` but not -/// `Sync`. The Unsync variant is required for compatibility with the `axum` -/// crate. -/// * `async fn Body::contents(&mut self) -> Result<&[u8], Error>` is ready -/// when all contents of the body have been collected, and gives them as a -/// byte slice. -/// * `async fn Body::str_contents(&mut self) -> Result<&str, Error>` is ready -/// when all contents of the body have been collected, and gives them as a str -/// slice. -/// * `async fn Body::json(&mut self) -> Result` gathers body -/// contents and then uses `T: serde::Deserialize` to deserialize to json -/// (requires feature `json`). -#[derive(Debug)] -pub struct Body(BodyInner); - -#[derive(Debug)] -enum BodyInner { - // a boxed http_body::Body impl - Boxed(UnsyncBoxBody), - // a body created from a wasi-http incoming-body (WasiIncomingBody) - Incoming(Incoming), - // a body in memory - Complete { - data: Bytes, - trailers: Option, - }, -} - -impl Body { - pub(crate) async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> { - match self.0 { - BodyInner::Incoming(incoming) => incoming.send(outgoing_body).await, - BodyInner::Boxed(box_body) => { - let out_stream = AsyncOutputStream::new( - outgoing_body - .write() - .expect("outgoing body already written"), - ); - let mut body = pin!(box_body); - let mut trailers = None; - loop { - match poll_fn(|cx| body.as_mut().poll_frame(cx)).await { - Some(Ok(frame)) if frame.is_data() => { - let data = frame.data_ref().unwrap(); - out_stream.write_all(data).await?; - } - Some(Ok(frame)) if frame.is_trailers() => { - trailers = - Some(header_map_to_wasi(frame.trailers_ref().unwrap()).map_err( - |e| Error::from(e).context("outoging trailers to wasi"), - )?); - } - Some(Err(err)) => break Err(err.context("sending outgoing body")), - None => { - drop(out_stream); - WasiOutgoingBody::finish(outgoing_body, trailers) - .map_err(|e| Error::from(e).context("finishing outgoing body"))?; - break Ok(()); - } - _ => unreachable!(), - } - } - } - BodyInner::Complete { data, trailers } => { - let out_stream = AsyncOutputStream::new( - outgoing_body - .write() - .expect("outgoing body already written"), - ); - out_stream.write_all(&data).await?; - drop(out_stream); - let trailers = trailers - .map(|t| header_map_to_wasi(&t).context("trailers")) - .transpose()?; - WasiOutgoingBody::finish(outgoing_body, trailers) - .map_err(|e| Error::from(e).context("finishing outgoing body"))?; - Ok(()) - } - } - } - - /// Convert this `Body` into an `UnsyncBoxBody`, which - /// exists to implement the `http_body::Body` trait. Consume the contents - /// using `http_body_utils::BodyExt`, or anywhere else an impl of - /// `http_body::Body` is accepted. - pub fn into_boxed_body(self) -> UnsyncBoxBody { - fn map_e(_: std::convert::Infallible) -> Error { - unreachable!() - } - match self.0 { - BodyInner::Incoming(i) => i.into_http_body().boxed_unsync(), - BodyInner::Complete { data, trailers } => http_body_util::Full::new(data) - .map_err(map_e) - .with_trailers(async move { Ok(trailers).transpose() }) - .boxed_unsync(), - BodyInner::Boxed(b) => b, - } - } - - /// Collect the entire contents of this `Body`, and expose them as a - /// byte slice. This async fn will be pending until the entire `Body` is - /// copied into memory, or an error occurs. - pub async fn contents(&mut self) -> Result<&[u8], Error> { - match &mut self.0 { - BodyInner::Complete { data, .. } => Ok(&*data), - inner => { - let mut prev = BodyInner::Complete { - data: Bytes::new(), - trailers: None, - }; - std::mem::swap(inner, &mut prev); - let boxed_body = match prev { - BodyInner::Incoming(i) => i.into_http_body().boxed_unsync(), - BodyInner::Boxed(b) => b, - BodyInner::Complete { .. } => unreachable!(), - }; - let collected = boxed_body.collect().await?; - let trailers = collected.trailers().cloned(); - *inner = BodyInner::Complete { - data: collected.to_bytes(), - trailers, - }; - Ok(match inner { - BodyInner::Complete { data, .. } => &*data, - _ => unreachable!(), - }) - } - } - } - - /// Get a value for the length of this `Body`'s content, in bytes, if - /// known. This value can come from either the Content-Length header - /// recieved in the incoming request or response assocated with the body, - /// or be provided by an exact `http_body::Body::size_hint` if the `Body` - /// is constructed from an `http_body::Body` impl. - pub fn content_length(&self) -> Option { - match &self.0 { - BodyInner::Boxed(b) => b.size_hint().exact(), - BodyInner::Complete { data, .. } => Some(data.len() as u64), - BodyInner::Incoming(i) => i.size_hint.content_length(), - } - } - - /// Construct an empty Body - pub fn empty() -> Self { - Body(BodyInner::Complete { - data: Bytes::new(), - trailers: None, - }) - } - - /// Collect the entire contents of this `Body`, and expose them as a - /// string slice. This async fn will be pending until the entire `Body` is - /// copied into memory, or an error occurs. Additonally errors if the - /// contents of the `Body` were not a utf-8 encoded string. - pub async fn str_contents(&mut self) -> Result<&str, Error> { - let bs = self.contents().await?; - std::str::from_utf8(bs).context("decoding body contents as string") - } - - /// Construct a `Body` by serializing a type to json. Can fail with a - /// `serde_json::Error` if serilization fails. - #[cfg(feature = "json")] - pub fn from_json(data: &T) -> Result { - Ok(Self::from(serde_json::to_vec(data)?)) - } - - /// Collect the entire contents of this `Body`, and deserialize them from - /// json. Can fail if the body contents are not utf-8 encoded, are not - /// valid json, or the json is not accepted by the `serde::Deserialize` impl. - #[cfg(feature = "json")] - pub async fn json serde::Deserialize<'a>>(&mut self) -> Result { - let str = self.str_contents().await?; - serde_json::from_str(str).context("decoding body contents as json") - } - - pub(crate) fn from_incoming(body: WasiIncomingBody, size_hint: BodyHint) -> Self { - Body(BodyInner::Incoming(Incoming { body, size_hint })) - } - - /// Construct a `Body` backed by a `futures_lite::Stream` impl. The stream - /// will be polled as the body is sent. - pub fn from_stream(stream: S) -> Self - where - S: futures_lite::Stream + Send + 'static, - ::Item: Into, - { - use futures_lite::StreamExt; - Self::from_http_body(http_body_util::StreamBody::new( - stream.map(|bs| Ok::<_, Error>(Frame::data(bs.into()))), - )) - } - - /// Construct a `Body` backed by a `futures_lite::Stream` impl. The stream - /// will be polled as the body is sent. If the stream gives an error, the - /// body will canceled, which closes the underlying connection. - pub fn from_try_stream(stream: S) -> Self - where - S: futures_lite::Stream> + Send + 'static, - D: Into, - E: std::error::Error + Send + Sync + 'static, - { - use futures_lite::StreamExt; - Self::from_http_body(http_body_util::StreamBody::new( - stream.map(|bs| Ok::<_, Error>(Frame::data(bs?.into()))), - )) - } - - /// Construct a `Body` backed by a `http_body::Body`. The http_body will - /// be polled as the body is sent. If the http_body poll gives an error, - /// the body will be canceled, which closes the underlying connection. - /// - /// Note, this is the only constructor which permits adding trailers to - /// the `Body`. - pub fn from_http_body(http_body: B) -> Self - where - B: HttpBody + Send + 'static, - ::Data: Into, - ::Error: Into, - { - use util::BodyExt; - Body(BodyInner::Boxed( - http_body - .map_frame(|f| f.map_data(Into::into)) - .map_err(Into::into) - .boxed_unsync(), - )) - } -} - -impl From<()> for Body { - fn from(_: ()) -> Body { - Body::empty() - } -} -impl From<&[u8]> for Body { - fn from(bytes: &[u8]) -> Body { - Body::from(bytes.to_owned()) - } -} -impl From> for Body { - fn from(bytes: Vec) -> Body { - Body::from(Bytes::from(bytes)) - } -} -impl From for Body { - fn from(data: Bytes) -> Body { - Body(BodyInner::Complete { - data, - trailers: None, - }) - } -} -impl From<&str> for Body { - fn from(data: &str) -> Body { - Body::from(data.as_bytes()) - } -} -impl From for Body { - fn from(data: String) -> Body { - Body::from(data.into_bytes()) - } -} - -impl From for Body { - fn from(r: crate::io::AsyncInputStream) -> Body { - // TODO: this is skipping the wstd::io::copy optimization. - // in future, with another BodyInner variant for a boxed AsyncRead for - // which as_input_stream is_some, this could allow for use of - // crate::io::copy. But, we probably need to redesign AsyncRead to be - // a poll_read func in order to make it possible to use from - // http_body::Body::poll_frame. - use futures_lite::stream::StreamExt; - Body(BodyInner::Boxed(http_body_util::BodyExt::boxed_unsync( - http_body_util::StreamBody::new(r.into_stream().map(|res| { - res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec))) - .map_err(Into::into) - })), - ))) - } -} - -#[derive(Debug)] -struct Incoming { - body: WasiIncomingBody, - size_hint: BodyHint, -} - -impl Incoming { - fn into_http_body(self) -> IncomingBody { - IncomingBody::new(self.body, self.size_hint) - } - async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> { - let in_body = self.body; - let in_stream = - AsyncInputStream::new(in_body.stream().expect("incoming body already read")); - let out_stream = AsyncOutputStream::new( - outgoing_body - .write() - .expect("outgoing body already written"), - ); - in_stream.copy_to(&out_stream).await.map_err(|e| { - Error::from(e).context("copying incoming body stream to outgoing body stream") - })?; - drop(in_stream); - drop(out_stream); - let future_in_trailers = WasiIncomingBody::finish(in_body); - Reactor::current() - .schedule(future_in_trailers.subscribe()) - .wait_for() - .await; - let in_trailers: Option = future_in_trailers - .get() - .expect("pollable ready") - .expect("got once") - .map_err(|e| Error::from(e).context("recieving incoming trailers"))?; - WasiOutgoingBody::finish(outgoing_body, in_trailers) - .map_err(|e| Error::from(e).context("finishing outgoing body"))?; - Ok(()) - } -} - -#[derive(Clone, Copy, Debug)] -pub enum BodyHint { - ContentLength(u64), - Unknown, -} - -impl BodyHint { - pub fn from_headers(headers: &HeaderMap) -> Result { - if let Some(val) = headers.get(CONTENT_LENGTH) { - let len = std::str::from_utf8(val.as_ref()) - .map_err(|_| InvalidContentLength)? - .parse::() - .map_err(|_| InvalidContentLength)?; - Ok(BodyHint::ContentLength(len)) - } else { - Ok(BodyHint::Unknown) - } - } - fn content_length(&self) -> Option { - match self { - BodyHint::ContentLength(l) => Some(*l), - _ => None, - } - } -} -#[derive(Debug)] -pub struct InvalidContentLength; -impl fmt::Display for InvalidContentLength { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Invalid Content-Length header") - } -} -impl std::error::Error for InvalidContentLength {} - -#[derive(Debug)] -pub struct IncomingBody { - state: Option>>, - size_hint: BodyHint, -} - -impl IncomingBody { - fn new(body: WasiIncomingBody, size_hint: BodyHint) -> Self { - Self { - state: Some(Box::pin(IncomingBodyState::Body { - read_state: BodyState { - wait: None, - subscription: None, - stream: body - .stream() - .expect("wasi incoming-body stream should not yet be taken"), - }, - body: Some(body), - })), - size_hint, - } - } -} - -impl HttpBody for IncomingBody { - type Data = Bytes; - type Error = Error; - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - loop { - let state = self.as_mut().state.take(); - if state.is_none() { - return Poll::Ready(None); - } - let mut state = state.unwrap(); - match state.as_mut().project() { - IBSProj::Body { read_state, body } => match read_state.poll_frame(cx) { - Poll::Pending => { - self.as_mut().state = Some(state); - return Poll::Pending; - } - Poll::Ready(Some(r)) => { - self.as_mut().state = Some(state); - return Poll::Ready(Some(r)); - } - Poll::Ready(None) => { - // state contains children of the incoming-body. Must drop it - // in order to finish - let body = body.take().expect("finishing Body state"); - drop(state); - let trailers_state = TrailersState::new(WasiIncomingBody::finish(body)); - self.as_mut().state = - Some(Box::pin(IncomingBodyState::Trailers { trailers_state })); - continue; - } - }, - IBSProj::Trailers { trailers_state } => match trailers_state.poll_frame(cx) { - Poll::Pending => { - self.as_mut().state = Some(state); - return Poll::Pending; - } - Poll::Ready(r) => return Poll::Ready(r), - }, - } - } - } - fn is_end_stream(&self) -> bool { - self.state.is_none() - } - fn size_hint(&self) -> SizeHint { - match self.size_hint { - BodyHint::ContentLength(l) => SizeHint::with_exact(l), - _ => Default::default(), - } - } -} - -pin_project_lite::pin_project! { - #[project = IBSProj] - #[derive(Debug)] - enum IncomingBodyState { - Body { - #[pin] - read_state: BodyState, - // body is Some until we need to remove it from a projection - // during a state transition - body: Option - }, - Trailers { - #[pin] - trailers_state: TrailersState - }, - } -} - -#[derive(Debug)] -struct BodyState { - wait: Option>>, - subscription: Option, - stream: WasiInputStream, -} - -const MAX_FRAME_SIZE: u64 = 64 * 1024; - -impl BodyState { - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Error>>> { - loop { - match self.stream.read(MAX_FRAME_SIZE) { - Ok(bs) if !bs.is_empty() => { - return Poll::Ready(Some(Ok(Frame::data(Bytes::from(bs))))); - } - Err(StreamError::Closed) => return Poll::Ready(None), - Err(StreamError::LastOperationFailed(err)) => { - return Poll::Ready(Some(Err( - Error::msg(err.to_debug_string()).context("reading incoming body stream") - ))); - } - Ok(_empty) => { - if self.subscription.is_none() { - self.as_mut().subscription = - Some(Reactor::current().schedule(self.stream.subscribe())); - } - if self.wait.is_none() { - let wait = self.as_ref().subscription.as_ref().unwrap().wait_for(); - self.as_mut().wait = Some(Box::pin(wait)); - } - let mut taken_wait = self.as_mut().wait.take().unwrap(); - match taken_wait.as_mut().poll(cx) { - Poll::Pending => { - self.as_mut().wait = Some(taken_wait); - return Poll::Pending; - } - // Its possible that, after returning ready, the - // stream does not actually provide any input. This - // behavior should only occur once. - Poll::Ready(()) => { - continue; - } - } - } - } - } - } -} - -#[derive(Debug)] -struct TrailersState { - wait: Option>>, - subscription: Option, - future_trailers: FutureTrailers, -} - -impl TrailersState { - fn new(future_trailers: FutureTrailers) -> Self { - Self { - wait: None, - subscription: None, - future_trailers, - } - } - - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Error>>> { - loop { - if let Some(ready) = self.future_trailers.get() { - return match ready { - Ok(Ok(Some(trailers))) => match header_map_from_wasi(trailers) { - Ok(header_map) => Poll::Ready(Some(Ok(Frame::trailers(header_map)))), - Err(e) => { - Poll::Ready(Some(Err(e.context("decoding incoming body trailers")))) - } - }, - Ok(Ok(None)) => Poll::Ready(None), - Ok(Err(e)) => Poll::Ready(Some(Err( - Error::from(e).context("reading incoming body trailers") - ))), - Err(()) => unreachable!("future_trailers.get with some called at most once"), - }; - } - if self.subscription.is_none() { - self.as_mut().subscription = - Some(Reactor::current().schedule(self.future_trailers.subscribe())); - } - if self.wait.is_none() { - let wait = self.as_ref().subscription.as_ref().unwrap().wait_for(); - self.as_mut().wait = Some(Box::pin(wait)); - } - let mut taken_wait = self.as_mut().wait.take().unwrap(); - match taken_wait.as_mut().poll(cx) { - Poll::Pending => { - self.as_mut().wait = Some(taken_wait); - return Poll::Pending; - } - // Its possible that, after returning ready, the - // future_trailers.get() does not actually provide any input. This - // behavior should only occur once. - Poll::Ready(()) => { - continue; - } - } - } - } -} +pub use crate::sys::http::body::*; diff --git a/src/http/error.rs b/src/http/error.rs index a4f22b0..aaef852 100644 --- a/src/http/error.rs +++ b/src/http/error.rs @@ -6,7 +6,7 @@ pub use crate::http::body::InvalidContentLength; pub use anyhow::Context; pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; pub use http::method::InvalidMethod; -pub use wasip2::http::types::{ErrorCode, HeaderError}; +pub use crate::sys::http::{ErrorCode, HeaderError}; pub type Error = anyhow::Error; /// The `http` result type. diff --git a/src/http/mod.rs b/src/http/mod.rs index 39f0a40..952a875 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -5,21 +5,17 @@ pub use http::uri::{Authority, PathAndQuery, Uri}; #[doc(inline)] pub use body::{Body, util::BodyExt}; -pub use client::Client; +pub use crate::sys::http::client::Client; pub use error::{Error, ErrorCode, Result}; -pub use fields::{HeaderMap, HeaderName, HeaderValue}; -pub use method::Method; +pub use crate::sys::http::fields::{HeaderMap, HeaderName, HeaderValue}; +pub use crate::sys::http::method::Method; pub use request::Request; pub use response::Response; -pub use scheme::{InvalidUri, Scheme}; +pub use crate::sys::http::scheme::{InvalidUri, Scheme}; pub mod body; -mod client; pub mod error; -mod fields; -mod method; pub mod request; pub mod response; -mod scheme; pub mod server; diff --git a/src/http/request.rs b/src/http/request.rs index 6694d03..cbe2b91 100644 --- a/src/http/request.rs +++ b/src/http/request.rs @@ -1,109 +1 @@ -use super::{ - Authority, HeaderMap, PathAndQuery, Uri, - body::{Body, BodyHint}, - error::{Context, Error, ErrorCode}, - fields::{header_map_from_wasi, header_map_to_wasi}, - method::{from_wasi_method, to_wasi_method}, - scheme::{from_wasi_scheme, to_wasi_scheme}, -}; -use wasip2::http::outgoing_handler::OutgoingRequest; -use wasip2::http::types::IncomingRequest; - -pub use http::request::{Builder, Request}; - -// TODO: go back and add json stuff??? - -pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingRequest, T), Error> { - let wasi_req = OutgoingRequest::new(header_map_to_wasi(request.headers())?); - - let (parts, body) = request.into_parts(); - - // Set the HTTP method - let method = to_wasi_method(parts.method); - wasi_req - .set_method(&method) - .map_err(|()| anyhow::anyhow!("method rejected by wasi-http: {method:?}"))?; - - // Set the url scheme - let scheme = parts - .uri - .scheme() - .map(to_wasi_scheme) - .unwrap_or(wasip2::http::types::Scheme::Https); - wasi_req - .set_scheme(Some(&scheme)) - .map_err(|()| anyhow::anyhow!("scheme rejected by wasi-http: {scheme:?}"))?; - - // Set authority - let authority = parts.uri.authority().map(Authority::as_str); - wasi_req - .set_authority(authority) - .map_err(|()| anyhow::anyhow!("authority rejected by wasi-http {authority:?}"))?; - - // Set the url path + query string - if let Some(p_and_q) = parts.uri.path_and_query() { - wasi_req - .set_path_with_query(Some(p_and_q.as_str())) - .map_err(|()| anyhow::anyhow!("path and query rejected by wasi-http {p_and_q:?}"))?; - } - - // All done; request is ready for send-off - Ok((wasi_req, body)) -} - -/// This is used by the `http_server` macro. -#[doc(hidden)] -pub fn try_from_incoming(incoming: IncomingRequest) -> Result, Error> { - let headers: HeaderMap = header_map_from_wasi(incoming.headers()) - .context("headers provided by wasi rejected by http::HeaderMap")?; - - let method = - from_wasi_method(incoming.method()).map_err(|_| ErrorCode::HttpRequestMethodInvalid)?; - let scheme = incoming - .scheme() - .map(|scheme| { - from_wasi_scheme(scheme).context("scheme provided by wasi rejected by http::Scheme") - }) - .transpose()?; - let authority = incoming - .authority() - .map(|authority| { - Authority::from_maybe_shared(authority) - .context("authority provided by wasi rejected by http::Authority") - }) - .transpose()?; - let path_and_query = incoming - .path_with_query() - .map(|path_and_query| { - PathAndQuery::from_maybe_shared(path_and_query) - .context("path and query provided by wasi rejected by http::PathAndQuery") - }) - .transpose()?; - - let hint = BodyHint::from_headers(&headers)?; - - // `body_stream` is a child of `incoming_body` which means we cannot - // drop the parent before we drop the child - let incoming_body = incoming - .consume() - .expect("`consume` should not have been called previously on this incoming-request"); - let body = Body::from_incoming(incoming_body, hint); - - let mut uri = Uri::builder(); - if let Some(scheme) = scheme { - uri = uri.scheme(scheme); - } - if let Some(authority) = authority { - uri = uri.authority(authority); - } - if let Some(path_and_query) = path_and_query { - uri = uri.path_and_query(path_and_query); - } - let uri = uri.build().context("building uri from wasi")?; - - let mut request = Request::builder().method(method).uri(uri); - if let Some(headers_mut) = request.headers_mut() { - *headers_mut = headers; - } - request.body(body).context("building request from wasi") -} +pub use crate::sys::http::request::*; diff --git a/src/http/response.rs b/src/http/response.rs index 2ab8d87..07dd215 100644 --- a/src/http/response.rs +++ b/src/http/response.rs @@ -1,38 +1 @@ -use http::StatusCode; -use wasip2::http::types::IncomingResponse; - -use crate::http::body::{Body, BodyHint}; -use crate::http::error::Error; -use crate::http::fields::{HeaderMap, header_map_from_wasi}; - -pub use http::response::{Builder, Response}; - -pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result, Error> { - let headers: HeaderMap = header_map_from_wasi(incoming.headers())?; - // TODO: Does WASI guarantee that the incoming status is valid? - let status = StatusCode::from_u16(incoming.status()) - .map_err(|err| anyhow::anyhow!("wasi provided invalid status code ({err})"))?; - - let hint = BodyHint::from_headers(&headers)?; - // `body_stream` is a child of `incoming_body` which means we cannot - // drop the parent before we drop the child - let incoming_body = incoming - .consume() - .expect("cannot call `consume` twice on incoming response"); - let body = Body::from_incoming(incoming_body, hint); - - let mut builder = Response::builder().status(status); - // The [`http::response::Builder`] keeps internal state of whether the - // builder has errored, which is only reachable by passing - // [`Builder::header`] an erroring `TryInto` or - // `TryInto`. Since the `Builder::header` method is never - // used, we know `Builder::headers_mut` will never give the None case, nor - // will `Builder::body` give the error case. So, rather than treat those - // as control flow, we unwrap if this invariant is ever broken because - // that would only be possible due to some unrecoverable bug in wstd, - // rather than incorrect use or invalid input. - *builder.headers_mut().expect("builder has not errored") = headers; - Ok(builder - .body(body) - .expect("response builder should not error")) -} +pub use crate::sys::http::response::*; diff --git a/src/http/server.rs b/src/http/server.rs index 9fb6ff4..853743d 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -18,70 +18,4 @@ //! [`Response`]: crate::http::Response //! [`http_server`]: crate::http_server -use super::{Body, Error, Response, error::ErrorCode, fields::header_map_to_wasi}; -use http::header::CONTENT_LENGTH; -use wasip2::exports::http::incoming_handler::ResponseOutparam; -use wasip2::http::types::OutgoingResponse; - -/// For use by the [`http_server`] macro only. -/// -/// [`http_server`]: crate::http_server -#[doc(hidden)] -#[must_use] -pub struct Responder { - outparam: ResponseOutparam, -} - -impl Responder { - /// This is used by the `http_server` macro. - #[doc(hidden)] - pub async fn respond>(self, response: Response) -> Result<(), Error> { - let headers = response.headers(); - let status = response.status().as_u16(); - - let wasi_headers = header_map_to_wasi(headers).expect("header error"); - - // Consume the `response` and prepare to write the body. - let body = response.into_body().into(); - - // Automatically add a Content-Length header. - if let Some(len) = body.content_length() { - let mut buffer = itoa::Buffer::new(); - wasi_headers - .append(CONTENT_LENGTH.as_str(), buffer.format(len).as_bytes()) - .unwrap(); - } - - let wasi_response = OutgoingResponse::new(wasi_headers); - - // Unwrap because `StatusCode` has already validated the status. - wasi_response.set_status_code(status).unwrap(); - - // Unwrap because we can be sure we only call these once. - let wasi_body = wasi_response.body().unwrap(); - - // Set the outparam to the response, which allows wasi-http to send - // the response status and headers. - ResponseOutparam::set(self.outparam, Ok(wasi_response)); - - // Then send the body. The response will be fully sent once this - // future is ready. - body.send(wasi_body).await - } - - /// This is used by the `http_server` macro. - #[doc(hidden)] - pub fn new(outparam: ResponseOutparam) -> Self { - Self { outparam } - } - - /// This is used by the `http_server` macro. - #[doc(hidden)] - pub fn fail(self, err: Error) { - let e = match err.downcast_ref::() { - Some(e) => e.clone(), - None => ErrorCode::InternalError(Some(format!("{err:?}"))), - }; - ResponseOutparam::set(self.outparam, Err(e)); - } -} +pub use crate::sys::http::server::*; diff --git a/src/io/mod.rs b/src/io/mod.rs index 0f34b1b..898d4d7 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -5,18 +5,16 @@ mod cursor; mod empty; mod read; mod seek; -mod stdio; -mod streams; mod write; pub use crate::runtime::AsyncPollable; +pub use crate::sys::io::*; +pub use crate::sys::stdio::*; pub use copy::*; pub use cursor::*; pub use empty::*; pub use read::*; pub use seek::*; -pub use stdio::*; -pub use streams::*; pub use write::*; /// The error type for I/O operations. diff --git a/src/lib.rs b/src/lib.rs index ebc673d..e4edc55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,9 @@ //! These are unique capabilities provided by WASI 0.2, and because this library //! is specific to that are exposed from here. +#[allow(unreachable_pub)] +mod sys; + pub mod future; #[macro_use] pub mod http; diff --git a/src/net/mod.rs b/src/net/mod.rs index 1600edc..6ddff7d 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,29 +1,3 @@ //! Async network abstractions. -use std::io::{self, ErrorKind}; -use wasip2::sockets::network::ErrorCode; - -mod tcp_listener; -mod tcp_stream; - -pub use tcp_listener::*; -pub use tcp_stream::*; - -fn to_io_err(err: ErrorCode) -> io::Error { - match err { - ErrorCode::Unknown => ErrorKind::Other.into(), - ErrorCode::AccessDenied => ErrorKind::PermissionDenied.into(), - ErrorCode::NotSupported => ErrorKind::Unsupported.into(), - ErrorCode::InvalidArgument => ErrorKind::InvalidInput.into(), - ErrorCode::OutOfMemory => ErrorKind::OutOfMemory.into(), - ErrorCode::Timeout => ErrorKind::TimedOut.into(), - ErrorCode::WouldBlock => ErrorKind::WouldBlock.into(), - ErrorCode::InvalidState => ErrorKind::InvalidData.into(), - ErrorCode::AddressInUse => ErrorKind::AddrInUse.into(), - ErrorCode::ConnectionRefused => ErrorKind::ConnectionRefused.into(), - ErrorCode::ConnectionReset => ErrorKind::ConnectionReset.into(), - ErrorCode::ConnectionAborted => ErrorKind::ConnectionAborted.into(), - ErrorCode::ConcurrencyConflict => ErrorKind::AlreadyExists.into(), - _ => ErrorKind::Other.into(), - } -} +pub use crate::sys::net::*; diff --git a/src/rand/mod.rs b/src/rand/mod.rs index 8474b80..e79a70b 100644 --- a/src/rand/mod.rs +++ b/src/rand/mod.rs @@ -1,25 +1,3 @@ //! Random number generation. -use wasip2::random; - -/// Fill the slice with cryptographically secure random bytes. -pub fn get_random_bytes(buf: &mut [u8]) { - match buf.len() { - 0 => {} - _ => { - let output = random::random::get_random_bytes(buf.len() as u64); - buf.copy_from_slice(&output[..]); - } - } -} - -/// Fill the slice with insecure random bytes. -pub fn get_insecure_random_bytes(buf: &mut [u8]) { - match buf.len() { - 0 => {} - _ => { - let output = random::insecure::get_insecure_random_bytes(buf.len() as u64); - buf.copy_from_slice(&output[..]); - } - } -} +pub use crate::sys::random::{get_insecure_random_bytes, get_random_bytes}; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 24b9fc2..19dd379 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -10,27 +10,4 @@ #![deny(missing_debug_implementations, nonstandard_style)] #![warn(missing_docs, unreachable_pub)] -mod block_on; -mod reactor; - -pub use ::async_task::Task; -pub use block_on::block_on; -pub use reactor::{AsyncPollable, Reactor, WaitFor}; -use std::cell::RefCell; - -// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all -// use sites in the background. -std::thread_local! { -pub(crate) static REACTOR: RefCell> = const { RefCell::new(None) }; -} - -/// Spawn a `Future` as a `Task` on the current `Reactor`. -/// -/// Panics if called from outside `block_on`. -pub fn spawn(fut: F) -> Task -where - F: std::future::Future + 'static, - T: 'static, -{ - Reactor::current().spawn(fut) -} +pub use crate::sys::runtime::*; diff --git a/src/sys/mod.rs b/src/sys/mod.rs new file mode 100644 index 0000000..5d2f16a --- /dev/null +++ b/src/sys/mod.rs @@ -0,0 +1,11 @@ +cfg_if::cfg_if! { + if #[cfg(any(feature = "wasip3", all(target_os = "wasi", target_env = "p3")))] { + mod p3; + use p3 as backend; + } else { + mod p2; + use p2 as backend; + } +} + +pub use backend::*; diff --git a/src/sys/p2/http/body.rs b/src/sys/p2/http/body.rs new file mode 100644 index 0000000..b070a59 --- /dev/null +++ b/src/sys/p2/http/body.rs @@ -0,0 +1,609 @@ +use super::fields::{header_map_from_wasi, header_map_to_wasi}; +use crate::io::{AsyncInputStream, AsyncOutputStream}; +use crate::runtime::{AsyncPollable, Reactor, WaitFor}; + +pub use ::http_body::{Body as HttpBody, Frame, SizeHint}; +pub use bytes::Bytes; + +use anyhow::Context as _; +use http::header::CONTENT_LENGTH; +use http_body_util::{BodyExt, combinators::UnsyncBoxBody}; +use std::fmt; +use std::future::{Future, poll_fn}; +use std::pin::{Pin, pin}; +use std::task::{Context, Poll}; +use wasip2::http::types::{ + FutureTrailers, IncomingBody as WasiIncomingBody, OutgoingBody as WasiOutgoingBody, +}; +use wasip2::io::streams::{InputStream as WasiInputStream, StreamError}; + +type Error = anyhow::Error; +type HeaderMap = http::header::HeaderMap; + +pub mod util { + pub use http_body_util::*; +} + +/// A HTTP Body. +/// +/// Construct this HTTP body using: +/// * `Body::empty` for the empty body, or `impl From<()> for Body` +/// * `From<&[u8]>` (which will make a clone) or `From>` or +/// `From` for a `Body` from bytes. +/// * `From<&str>` (which will make a clone) or `From` for a `Body` +/// from strings. +/// * `Body::from_json` for a `Body` from a `Serialize` (requires feature +/// `json`) +/// * `From` for a `Body` with contents given by the +/// contents of a WASI input-stream. +/// * `Body::from_stream` or `Body::from_try_stream` for a `Body` from a +/// `Stream` of `Into` +/// +/// Consume this HTTP body using: +/// * `Body::into_boxed_body` converts it to an `UnsyncBoxBody`. +/// This is a boxed representation of `http_body::Body` that is `Send` but not +/// `Sync`. The Unsync variant is required for compatibility with the `axum` +/// crate. +/// * `async fn Body::contents(&mut self) -> Result<&[u8], Error>` is ready +/// when all contents of the body have been collected, and gives them as a +/// byte slice. +/// * `async fn Body::str_contents(&mut self) -> Result<&str, Error>` is ready +/// when all contents of the body have been collected, and gives them as a str +/// slice. +/// * `async fn Body::json(&mut self) -> Result` gathers body +/// contents and then uses `T: serde::Deserialize` to deserialize to json +/// (requires feature `json`). +#[derive(Debug)] +pub struct Body(BodyInner); + +#[derive(Debug)] +enum BodyInner { + // a boxed http_body::Body impl + Boxed(UnsyncBoxBody), + // a body created from a wasi-http incoming-body (WasiIncomingBody) + Incoming(Incoming), + // a body in memory + Complete { + data: Bytes, + trailers: Option, + }, +} + +impl Body { + pub(crate) async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> { + match self.0 { + BodyInner::Incoming(incoming) => incoming.send(outgoing_body).await, + BodyInner::Boxed(box_body) => { + let out_stream = AsyncOutputStream::new( + outgoing_body + .write() + .expect("outgoing body already written"), + ); + let mut body = pin!(box_body); + let mut trailers = None; + loop { + match poll_fn(|cx| body.as_mut().poll_frame(cx)).await { + Some(Ok(frame)) if frame.is_data() => { + let data = frame.data_ref().unwrap(); + out_stream.write_all(data).await?; + } + Some(Ok(frame)) if frame.is_trailers() => { + trailers = + Some(header_map_to_wasi(frame.trailers_ref().unwrap()).map_err( + |e| Error::from(e).context("outoging trailers to wasi"), + )?); + } + Some(Err(err)) => break Err(err.context("sending outgoing body")), + None => { + drop(out_stream); + WasiOutgoingBody::finish(outgoing_body, trailers) + .map_err(|e| Error::from(e).context("finishing outgoing body"))?; + break Ok(()); + } + _ => unreachable!(), + } + } + } + BodyInner::Complete { data, trailers } => { + let out_stream = AsyncOutputStream::new( + outgoing_body + .write() + .expect("outgoing body already written"), + ); + out_stream.write_all(&data).await?; + drop(out_stream); + let trailers = trailers + .map(|t| header_map_to_wasi(&t).context("trailers")) + .transpose()?; + WasiOutgoingBody::finish(outgoing_body, trailers) + .map_err(|e| Error::from(e).context("finishing outgoing body"))?; + Ok(()) + } + } + } + + /// Convert this `Body` into an `UnsyncBoxBody`, which + /// exists to implement the `http_body::Body` trait. Consume the contents + /// using `http_body_utils::BodyExt`, or anywhere else an impl of + /// `http_body::Body` is accepted. + pub fn into_boxed_body(self) -> UnsyncBoxBody { + fn map_e(_: std::convert::Infallible) -> Error { + unreachable!() + } + match self.0 { + BodyInner::Incoming(i) => i.into_http_body().boxed_unsync(), + BodyInner::Complete { data, trailers } => http_body_util::Full::new(data) + .map_err(map_e) + .with_trailers(async move { Ok(trailers).transpose() }) + .boxed_unsync(), + BodyInner::Boxed(b) => b, + } + } + + /// Collect the entire contents of this `Body`, and expose them as a + /// byte slice. This async fn will be pending until the entire `Body` is + /// copied into memory, or an error occurs. + pub async fn contents(&mut self) -> Result<&[u8], Error> { + match &mut self.0 { + BodyInner::Complete { data, .. } => Ok(&*data), + inner => { + let mut prev = BodyInner::Complete { + data: Bytes::new(), + trailers: None, + }; + std::mem::swap(inner, &mut prev); + let boxed_body = match prev { + BodyInner::Incoming(i) => i.into_http_body().boxed_unsync(), + BodyInner::Boxed(b) => b, + BodyInner::Complete { .. } => unreachable!(), + }; + let collected = boxed_body.collect().await?; + let trailers = collected.trailers().cloned(); + *inner = BodyInner::Complete { + data: collected.to_bytes(), + trailers, + }; + Ok(match inner { + BodyInner::Complete { data, .. } => &*data, + _ => unreachable!(), + }) + } + } + } + + /// Get a value for the length of this `Body`'s content, in bytes, if + /// known. This value can come from either the Content-Length header + /// recieved in the incoming request or response assocated with the body, + /// or be provided by an exact `http_body::Body::size_hint` if the `Body` + /// is constructed from an `http_body::Body` impl. + pub fn content_length(&self) -> Option { + match &self.0 { + BodyInner::Boxed(b) => b.size_hint().exact(), + BodyInner::Complete { data, .. } => Some(data.len() as u64), + BodyInner::Incoming(i) => i.size_hint.content_length(), + } + } + + /// Construct an empty Body + pub fn empty() -> Self { + Body(BodyInner::Complete { + data: Bytes::new(), + trailers: None, + }) + } + + /// Collect the entire contents of this `Body`, and expose them as a + /// string slice. This async fn will be pending until the entire `Body` is + /// copied into memory, or an error occurs. Additonally errors if the + /// contents of the `Body` were not a utf-8 encoded string. + pub async fn str_contents(&mut self) -> Result<&str, Error> { + let bs = self.contents().await?; + std::str::from_utf8(bs).context("decoding body contents as string") + } + + /// Construct a `Body` by serializing a type to json. Can fail with a + /// `serde_json::Error` if serilization fails. + #[cfg(feature = "json")] + pub fn from_json(data: &T) -> Result { + Ok(Self::from(serde_json::to_vec(data)?)) + } + + /// Collect the entire contents of this `Body`, and deserialize them from + /// json. Can fail if the body contents are not utf-8 encoded, are not + /// valid json, or the json is not accepted by the `serde::Deserialize` impl. + #[cfg(feature = "json")] + pub async fn json serde::Deserialize<'a>>(&mut self) -> Result { + let str = self.str_contents().await?; + serde_json::from_str(str).context("decoding body contents as json") + } + + pub(crate) fn from_incoming(body: WasiIncomingBody, size_hint: BodyHint) -> Self { + Body(BodyInner::Incoming(Incoming { body, size_hint })) + } + + /// Construct a `Body` backed by a `futures_lite::Stream` impl. The stream + /// will be polled as the body is sent. + pub fn from_stream(stream: S) -> Self + where + S: futures_lite::Stream + Send + 'static, + ::Item: Into, + { + use futures_lite::StreamExt; + Self::from_http_body(http_body_util::StreamBody::new( + stream.map(|bs| Ok::<_, Error>(Frame::data(bs.into()))), + )) + } + + /// Construct a `Body` backed by a `futures_lite::Stream` impl. The stream + /// will be polled as the body is sent. If the stream gives an error, the + /// body will canceled, which closes the underlying connection. + pub fn from_try_stream(stream: S) -> Self + where + S: futures_lite::Stream> + Send + 'static, + D: Into, + E: std::error::Error + Send + Sync + 'static, + { + use futures_lite::StreamExt; + Self::from_http_body(http_body_util::StreamBody::new( + stream.map(|bs| Ok::<_, Error>(Frame::data(bs?.into()))), + )) + } + + /// Construct a `Body` backed by a `http_body::Body`. The http_body will + /// be polled as the body is sent. If the http_body poll gives an error, + /// the body will be canceled, which closes the underlying connection. + /// + /// Note, this is the only constructor which permits adding trailers to + /// the `Body`. + pub fn from_http_body(http_body: B) -> Self + where + B: HttpBody + Send + 'static, + ::Data: Into, + ::Error: Into, + { + use util::BodyExt; + Body(BodyInner::Boxed( + http_body + .map_frame(|f| f.map_data(Into::into)) + .map_err(Into::into) + .boxed_unsync(), + )) + } +} + +impl From<()> for Body { + fn from(_: ()) -> Body { + Body::empty() + } +} +impl From<&[u8]> for Body { + fn from(bytes: &[u8]) -> Body { + Body::from(bytes.to_owned()) + } +} +impl From> for Body { + fn from(bytes: Vec) -> Body { + Body::from(Bytes::from(bytes)) + } +} +impl From for Body { + fn from(data: Bytes) -> Body { + Body(BodyInner::Complete { + data, + trailers: None, + }) + } +} +impl From<&str> for Body { + fn from(data: &str) -> Body { + Body::from(data.as_bytes()) + } +} +impl From for Body { + fn from(data: String) -> Body { + Body::from(data.into_bytes()) + } +} + +impl From for Body { + fn from(r: crate::io::AsyncInputStream) -> Body { + // TODO: this is skipping the wstd::io::copy optimization. + // in future, with another BodyInner variant for a boxed AsyncRead for + // which as_input_stream is_some, this could allow for use of + // crate::io::copy. But, we probably need to redesign AsyncRead to be + // a poll_read func in order to make it possible to use from + // http_body::Body::poll_frame. + use futures_lite::stream::StreamExt; + Body(BodyInner::Boxed(http_body_util::BodyExt::boxed_unsync( + http_body_util::StreamBody::new(r.into_stream().map(|res| { + res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec))) + .map_err(Into::into) + })), + ))) + } +} + +#[derive(Debug)] +struct Incoming { + body: WasiIncomingBody, + size_hint: BodyHint, +} + +impl Incoming { + fn into_http_body(self) -> IncomingBody { + IncomingBody::new(self.body, self.size_hint) + } + async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> { + let in_body = self.body; + let in_stream = + AsyncInputStream::new(in_body.stream().expect("incoming body already read")); + let out_stream = AsyncOutputStream::new( + outgoing_body + .write() + .expect("outgoing body already written"), + ); + in_stream.copy_to(&out_stream).await.map_err(|e| { + Error::from(e).context("copying incoming body stream to outgoing body stream") + })?; + drop(in_stream); + drop(out_stream); + let future_in_trailers = WasiIncomingBody::finish(in_body); + Reactor::current() + .schedule(future_in_trailers.subscribe()) + .wait_for() + .await; + let in_trailers: Option = future_in_trailers + .get() + .expect("pollable ready") + .expect("got once") + .map_err(|e| Error::from(e).context("recieving incoming trailers"))?; + WasiOutgoingBody::finish(outgoing_body, in_trailers) + .map_err(|e| Error::from(e).context("finishing outgoing body"))?; + Ok(()) + } +} + +#[derive(Clone, Copy, Debug)] +pub enum BodyHint { + ContentLength(u64), + Unknown, +} + +impl BodyHint { + pub fn from_headers(headers: &HeaderMap) -> Result { + if let Some(val) = headers.get(CONTENT_LENGTH) { + let len = std::str::from_utf8(val.as_ref()) + .map_err(|_| InvalidContentLength)? + .parse::() + .map_err(|_| InvalidContentLength)?; + Ok(BodyHint::ContentLength(len)) + } else { + Ok(BodyHint::Unknown) + } + } + fn content_length(&self) -> Option { + match self { + BodyHint::ContentLength(l) => Some(*l), + _ => None, + } + } +} +#[derive(Debug)] +pub struct InvalidContentLength; +impl fmt::Display for InvalidContentLength { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Invalid Content-Length header") + } +} +impl std::error::Error for InvalidContentLength {} + +#[derive(Debug)] +pub struct IncomingBody { + state: Option>>, + size_hint: BodyHint, +} + +impl IncomingBody { + fn new(body: WasiIncomingBody, size_hint: BodyHint) -> Self { + Self { + state: Some(Box::pin(IncomingBodyState::Body { + read_state: BodyState { + wait: None, + subscription: None, + stream: body + .stream() + .expect("wasi incoming-body stream should not yet be taken"), + }, + body: Some(body), + })), + size_hint, + } + } +} + +impl HttpBody for IncomingBody { + type Data = Bytes; + type Error = Error; + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + loop { + let state = self.as_mut().state.take(); + if state.is_none() { + return Poll::Ready(None); + } + let mut state = state.unwrap(); + match state.as_mut().project() { + IBSProj::Body { read_state, body } => match read_state.poll_frame(cx) { + Poll::Pending => { + self.as_mut().state = Some(state); + return Poll::Pending; + } + Poll::Ready(Some(r)) => { + self.as_mut().state = Some(state); + return Poll::Ready(Some(r)); + } + Poll::Ready(None) => { + // state contains children of the incoming-body. Must drop it + // in order to finish + let body = body.take().expect("finishing Body state"); + drop(state); + let trailers_state = TrailersState::new(WasiIncomingBody::finish(body)); + self.as_mut().state = + Some(Box::pin(IncomingBodyState::Trailers { trailers_state })); + continue; + } + }, + IBSProj::Trailers { trailers_state } => match trailers_state.poll_frame(cx) { + Poll::Pending => { + self.as_mut().state = Some(state); + return Poll::Pending; + } + Poll::Ready(r) => return Poll::Ready(r), + }, + } + } + } + fn is_end_stream(&self) -> bool { + self.state.is_none() + } + fn size_hint(&self) -> SizeHint { + match self.size_hint { + BodyHint::ContentLength(l) => SizeHint::with_exact(l), + _ => Default::default(), + } + } +} + +pin_project_lite::pin_project! { + #[project = IBSProj] + #[derive(Debug)] + enum IncomingBodyState { + Body { + #[pin] + read_state: BodyState, + // body is Some until we need to remove it from a projection + // during a state transition + body: Option + }, + Trailers { + #[pin] + trailers_state: TrailersState + }, + } +} + +#[derive(Debug)] +struct BodyState { + wait: Option>>, + subscription: Option, + stream: WasiInputStream, +} + +const MAX_FRAME_SIZE: u64 = 64 * 1024; + +impl BodyState { + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>>> { + loop { + match self.stream.read(MAX_FRAME_SIZE) { + Ok(bs) if !bs.is_empty() => { + return Poll::Ready(Some(Ok(Frame::data(Bytes::from(bs))))); + } + Err(StreamError::Closed) => return Poll::Ready(None), + Err(StreamError::LastOperationFailed(err)) => { + return Poll::Ready(Some(Err( + Error::msg(err.to_debug_string()).context("reading incoming body stream") + ))); + } + Ok(_empty) => { + if self.subscription.is_none() { + self.as_mut().subscription = + Some(Reactor::current().schedule(self.stream.subscribe())); + } + if self.wait.is_none() { + let wait = self.as_ref().subscription.as_ref().unwrap().wait_for(); + self.as_mut().wait = Some(Box::pin(wait)); + } + let mut taken_wait = self.as_mut().wait.take().unwrap(); + match taken_wait.as_mut().poll(cx) { + Poll::Pending => { + self.as_mut().wait = Some(taken_wait); + return Poll::Pending; + } + // Its possible that, after returning ready, the + // stream does not actually provide any input. This + // behavior should only occur once. + Poll::Ready(()) => { + continue; + } + } + } + } + } + } +} + +#[derive(Debug)] +struct TrailersState { + wait: Option>>, + subscription: Option, + future_trailers: FutureTrailers, +} + +impl TrailersState { + fn new(future_trailers: FutureTrailers) -> Self { + Self { + wait: None, + subscription: None, + future_trailers, + } + } + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>>> { + loop { + if let Some(ready) = self.future_trailers.get() { + return match ready { + Ok(Ok(Some(trailers))) => match header_map_from_wasi(trailers) { + Ok(header_map) => Poll::Ready(Some(Ok(Frame::trailers(header_map)))), + Err(e) => { + Poll::Ready(Some(Err(e.context("decoding incoming body trailers")))) + } + }, + Ok(Ok(None)) => Poll::Ready(None), + Ok(Err(e)) => Poll::Ready(Some(Err( + Error::from(e).context("reading incoming body trailers") + ))), + Err(()) => unreachable!("future_trailers.get with some called at most once"), + }; + } + if self.subscription.is_none() { + self.as_mut().subscription = + Some(Reactor::current().schedule(self.future_trailers.subscribe())); + } + if self.wait.is_none() { + let wait = self.as_ref().subscription.as_ref().unwrap().wait_for(); + self.as_mut().wait = Some(Box::pin(wait)); + } + let mut taken_wait = self.as_mut().wait.take().unwrap(); + match taken_wait.as_mut().poll(cx) { + Poll::Pending => { + self.as_mut().wait = Some(taken_wait); + return Poll::Pending; + } + // Its possible that, after returning ready, the + // future_trailers.get() does not actually provide any input. This + // behavior should only occur once. + Poll::Ready(()) => { + continue; + } + } + } + } +} diff --git a/src/http/client.rs b/src/sys/p2/http/client.rs similarity index 96% rename from src/http/client.rs rename to src/sys/p2/http/client.rs index 3676fa8..6520a16 100644 --- a/src/http/client.rs +++ b/src/sys/p2/http/client.rs @@ -1,6 +1,6 @@ -use super::{Body, Error, Request, Response}; -use crate::http::request::try_into_outgoing; -use crate::http::response::try_from_incoming; +use crate::http::{Body, Error, Request, Response}; +use super::request::try_into_outgoing; +use super::response::try_from_incoming; use crate::io::AsyncPollable; use crate::time::Duration; use wasip2::http::types::RequestOptions as WasiRequestOptions; diff --git a/src/http/fields.rs b/src/sys/p2/http/fields.rs similarity index 94% rename from src/http/fields.rs rename to src/sys/p2/http/fields.rs index de6df16..5c40d50 100644 --- a/src/http/fields.rs +++ b/src/sys/p2/http/fields.rs @@ -1,6 +1,7 @@ pub use http::header::{HeaderMap, HeaderName, HeaderValue}; -use super::{Error, error::Context}; +use crate::http::Error; +use crate::http::error::Context; use wasip2::http::types::Fields; pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { diff --git a/src/http/method.rs b/src/sys/p2/http/method.rs similarity index 100% rename from src/http/method.rs rename to src/sys/p2/http/method.rs diff --git a/src/sys/p2/http/mod.rs b/src/sys/p2/http/mod.rs new file mode 100644 index 0000000..e62627a --- /dev/null +++ b/src/sys/p2/http/mod.rs @@ -0,0 +1,10 @@ +pub mod body; +pub(crate) mod client; +pub(crate) mod fields; +pub(crate) mod method; +pub mod request; +pub mod response; +pub(crate) mod scheme; +pub mod server; + +pub use wasip2::http::types::{ErrorCode, HeaderError}; diff --git a/src/sys/p2/http/request.rs b/src/sys/p2/http/request.rs new file mode 100644 index 0000000..efde40d --- /dev/null +++ b/src/sys/p2/http/request.rs @@ -0,0 +1,108 @@ +use crate::http::{ + Authority, HeaderMap, PathAndQuery, Uri, + body::{Body, BodyHint}, + error::{Context, Error, ErrorCode}, +}; +use super::fields::{header_map_from_wasi, header_map_to_wasi}; +use super::method::{from_wasi_method, to_wasi_method}; +use super::scheme::{from_wasi_scheme, to_wasi_scheme}; + +use wasip2::http::outgoing_handler::OutgoingRequest; +use wasip2::http::types::IncomingRequest; + +pub use http::request::{Builder, Request}; + +pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingRequest, T), Error> { + let wasi_req = OutgoingRequest::new(header_map_to_wasi(request.headers())?); + + let (parts, body) = request.into_parts(); + + // Set the HTTP method + let method = to_wasi_method(parts.method); + wasi_req + .set_method(&method) + .map_err(|()| anyhow::anyhow!("method rejected by wasi-http: {method:?}"))?; + + // Set the url scheme + let scheme = parts + .uri + .scheme() + .map(to_wasi_scheme) + .unwrap_or(wasip2::http::types::Scheme::Https); + wasi_req + .set_scheme(Some(&scheme)) + .map_err(|()| anyhow::anyhow!("scheme rejected by wasi-http: {scheme:?}"))?; + + // Set authority + let authority = parts.uri.authority().map(Authority::as_str); + wasi_req + .set_authority(authority) + .map_err(|()| anyhow::anyhow!("authority rejected by wasi-http {authority:?}"))?; + + // Set the url path + query string + if let Some(p_and_q) = parts.uri.path_and_query() { + wasi_req + .set_path_with_query(Some(p_and_q.as_str())) + .map_err(|()| anyhow::anyhow!("path and query rejected by wasi-http {p_and_q:?}"))?; + } + + // All done; request is ready for send-off + Ok((wasi_req, body)) +} + +/// This is used by the `http_server` macro. +#[doc(hidden)] +pub fn try_from_incoming(incoming: IncomingRequest) -> Result, Error> { + let headers: HeaderMap = header_map_from_wasi(incoming.headers()) + .context("headers provided by wasi rejected by http::HeaderMap")?; + + let method = + from_wasi_method(incoming.method()).map_err(|_| ErrorCode::HttpRequestMethodInvalid)?; + let scheme = incoming + .scheme() + .map(|scheme| { + from_wasi_scheme(scheme).context("scheme provided by wasi rejected by http::Scheme") + }) + .transpose()?; + let authority = incoming + .authority() + .map(|authority| { + Authority::from_maybe_shared(authority) + .context("authority provided by wasi rejected by http::Authority") + }) + .transpose()?; + let path_and_query = incoming + .path_with_query() + .map(|path_and_query| { + PathAndQuery::from_maybe_shared(path_and_query) + .context("path and query provided by wasi rejected by http::PathAndQuery") + }) + .transpose()?; + + let hint = BodyHint::from_headers(&headers)?; + + // `body_stream` is a child of `incoming_body` which means we cannot + // drop the parent before we drop the child + let incoming_body = incoming + .consume() + .expect("`consume` should not have been called previously on this incoming-request"); + let body = Body::from_incoming(incoming_body, hint); + + let mut uri = Uri::builder(); + if let Some(scheme) = scheme { + uri = uri.scheme(scheme); + } + if let Some(authority) = authority { + uri = uri.authority(authority); + } + if let Some(path_and_query) = path_and_query { + uri = uri.path_and_query(path_and_query); + } + let uri = uri.build().context("building uri from wasi")?; + + let mut request = Request::builder().method(method).uri(uri); + if let Some(headers_mut) = request.headers_mut() { + *headers_mut = headers; + } + request.body(body).context("building request from wasi") +} diff --git a/src/sys/p2/http/response.rs b/src/sys/p2/http/response.rs new file mode 100644 index 0000000..c25693a --- /dev/null +++ b/src/sys/p2/http/response.rs @@ -0,0 +1,29 @@ +use http::StatusCode; +use wasip2::http::types::IncomingResponse; + +use crate::http::body::{Body, BodyHint}; +use crate::http::error::Error; +use super::fields::{HeaderMap, header_map_from_wasi}; + +pub use http::response::{Builder, Response}; + +pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result, Error> { + let headers: HeaderMap = header_map_from_wasi(incoming.headers())?; + // TODO: Does WASI guarantee that the incoming status is valid? + let status = StatusCode::from_u16(incoming.status()) + .map_err(|err| anyhow::anyhow!("wasi provided invalid status code ({err})"))?; + + let hint = BodyHint::from_headers(&headers)?; + // `body_stream` is a child of `incoming_body` which means we cannot + // drop the parent before we drop the child + let incoming_body = incoming + .consume() + .expect("cannot call `consume` twice on incoming response"); + let body = Body::from_incoming(incoming_body, hint); + + let mut builder = Response::builder().status(status); + *builder.headers_mut().expect("builder has not errored") = headers; + Ok(builder + .body(body) + .expect("response builder should not error")) +} diff --git a/src/http/scheme.rs b/src/sys/p2/http/scheme.rs similarity index 100% rename from src/http/scheme.rs rename to src/sys/p2/http/scheme.rs diff --git a/src/sys/p2/http/server.rs b/src/sys/p2/http/server.rs new file mode 100644 index 0000000..36d9f94 --- /dev/null +++ b/src/sys/p2/http/server.rs @@ -0,0 +1,68 @@ +use crate::http::{Body, Error, Response, error::ErrorCode}; +use super::fields::header_map_to_wasi; +use http::header::CONTENT_LENGTH; +use wasip2::exports::http::incoming_handler::ResponseOutparam; +use wasip2::http::types::OutgoingResponse; + +/// For use by the [`http_server`] macro only. +/// +/// [`http_server`]: crate::http_server +#[doc(hidden)] +#[must_use] +pub struct Responder { + outparam: ResponseOutparam, +} + +impl Responder { + /// This is used by the `http_server` macro. + #[doc(hidden)] + pub async fn respond>(self, response: Response) -> Result<(), Error> { + let headers = response.headers(); + let status = response.status().as_u16(); + + let wasi_headers = header_map_to_wasi(headers).expect("header error"); + + // Consume the `response` and prepare to write the body. + let body = response.into_body().into(); + + // Automatically add a Content-Length header. + if let Some(len) = body.content_length() { + let mut buffer = itoa::Buffer::new(); + wasi_headers + .append(CONTENT_LENGTH.as_str(), buffer.format(len).as_bytes()) + .unwrap(); + } + + let wasi_response = OutgoingResponse::new(wasi_headers); + + // Unwrap because `StatusCode` has already validated the status. + wasi_response.set_status_code(status).unwrap(); + + // Unwrap because we can be sure we only call these once. + let wasi_body = wasi_response.body().unwrap(); + + // Set the outparam to the response, which allows wasi-http to send + // the response status and headers. + ResponseOutparam::set(self.outparam, Ok(wasi_response)); + + // Then send the body. The response will be fully sent once this + // future is ready. + body.send(wasi_body).await + } + + /// This is used by the `http_server` macro. + #[doc(hidden)] + pub fn new(outparam: ResponseOutparam) -> Self { + Self { outparam } + } + + /// This is used by the `http_server` macro. + #[doc(hidden)] + pub fn fail(self, err: Error) { + let e = match err.downcast_ref::() { + Some(e) => e.clone(), + None => ErrorCode::InternalError(Some(format!("{err:?}"))), + }; + ResponseOutparam::set(self.outparam, Err(e)); + } +} diff --git a/src/io/streams.rs b/src/sys/p2/io.rs similarity index 98% rename from src/io/streams.rs rename to src/sys/p2/io.rs index 3676d21..f5fe7af 100644 --- a/src/io/streams.rs +++ b/src/sys/p2/io.rs @@ -1,5 +1,5 @@ -use super::{AsyncPollable, AsyncRead, AsyncWrite}; -use crate::runtime::WaitFor; +use crate::io::{AsyncRead, AsyncWrite}; +use crate::runtime::{AsyncPollable, WaitFor}; use std::future::{Future, poll_fn}; use std::pin::Pin; use std::sync::{Mutex, OnceLock}; @@ -26,7 +26,7 @@ impl AsyncInputStream { stream, } } - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + pub(crate) fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { // Lazily initialize the AsyncPollable let subscription = self .subscription @@ -43,7 +43,7 @@ impl AsyncInputStream { } } /// Await for read readiness. - async fn ready(&self) { + pub(crate) async fn ready(&self) { poll_fn(|cx| self.poll_ready(cx)).await } /// Asynchronously read from the input stream. @@ -233,7 +233,7 @@ impl AsyncOutputStream { } } /// Await write readiness. - async fn ready(&self) { + pub(crate) async fn ready(&self) { // Lazily initialize the AsyncPollable let subscription = self .subscription diff --git a/src/sys/p2/mod.rs b/src/sys/p2/mod.rs new file mode 100644 index 0000000..b8a7c47 --- /dev/null +++ b/src/sys/p2/mod.rs @@ -0,0 +1,7 @@ +pub mod http; +pub mod io; +pub mod net; +pub mod random; +pub mod runtime; +pub mod stdio; +pub mod time; diff --git a/src/sys/p2/net/mod.rs b/src/sys/p2/net/mod.rs new file mode 100644 index 0000000..dfd36ad --- /dev/null +++ b/src/sys/p2/net/mod.rs @@ -0,0 +1,27 @@ +use std::io::{self, ErrorKind}; +use wasip2::sockets::network::ErrorCode; + +mod tcp_listener; +mod tcp_stream; + +pub use tcp_listener::*; +pub use tcp_stream::*; + +pub(crate) fn to_io_err(err: ErrorCode) -> io::Error { + match err { + ErrorCode::Unknown => ErrorKind::Other.into(), + ErrorCode::AccessDenied => ErrorKind::PermissionDenied.into(), + ErrorCode::NotSupported => ErrorKind::Unsupported.into(), + ErrorCode::InvalidArgument => ErrorKind::InvalidInput.into(), + ErrorCode::OutOfMemory => ErrorKind::OutOfMemory.into(), + ErrorCode::Timeout => ErrorKind::TimedOut.into(), + ErrorCode::WouldBlock => ErrorKind::WouldBlock.into(), + ErrorCode::InvalidState => ErrorKind::InvalidData.into(), + ErrorCode::AddressInUse => ErrorKind::AddrInUse.into(), + ErrorCode::ConnectionRefused => ErrorKind::ConnectionRefused.into(), + ErrorCode::ConnectionReset => ErrorKind::ConnectionReset.into(), + ErrorCode::ConnectionAborted => ErrorKind::ConnectionAborted.into(), + ErrorCode::ConcurrencyConflict => ErrorKind::AlreadyExists.into(), + _ => ErrorKind::Other.into(), + } +} diff --git a/src/net/tcp_listener.rs b/src/sys/p2/net/tcp_listener.rs similarity index 100% rename from src/net/tcp_listener.rs rename to src/sys/p2/net/tcp_listener.rs diff --git a/src/net/tcp_stream.rs b/src/sys/p2/net/tcp_stream.rs similarity index 100% rename from src/net/tcp_stream.rs rename to src/sys/p2/net/tcp_stream.rs diff --git a/src/sys/p2/random.rs b/src/sys/p2/random.rs new file mode 100644 index 0000000..c477026 --- /dev/null +++ b/src/sys/p2/random.rs @@ -0,0 +1,23 @@ +use wasip2::random; + +/// Fill the slice with cryptographically secure random bytes. +pub fn get_random_bytes(buf: &mut [u8]) { + match buf.len() { + 0 => {} + _ => { + let output = random::random::get_random_bytes(buf.len() as u64); + buf.copy_from_slice(&output[..]); + } + } +} + +/// Fill the slice with insecure random bytes. +pub fn get_insecure_random_bytes(buf: &mut [u8]) { + match buf.len() { + 0 => {} + _ => { + let output = random::insecure::get_insecure_random_bytes(buf.len() as u64); + buf.copy_from_slice(&output[..]); + } + } +} diff --git a/src/runtime/block_on.rs b/src/sys/p2/runtime/mod.rs similarity index 77% rename from src/runtime/block_on.rs rename to src/sys/p2/runtime/mod.rs index c7bbd31..4a1595a 100644 --- a/src/runtime/block_on.rs +++ b/src/sys/p2/runtime/mod.rs @@ -1,10 +1,20 @@ -use super::{REACTOR, Reactor}; +mod reactor; + +pub use ::async_task::Task; +pub use reactor::{AsyncPollable, Reactor, WaitFor}; +use std::cell::RefCell; + +// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all +// use sites in the background. +std::thread_local! { +pub(crate) static REACTOR: RefCell> = const { RefCell::new(None) }; +} use std::future::Future; use std::pin::pin; use std::task::{Context, Poll, Waker}; -/// Start the event loop. Blocks until the future +/// Start the event loop. Blocks until the future completes. pub fn block_on(fut: F) -> F::Output where F: Future, @@ -62,3 +72,14 @@ where } } } + +/// Spawn a `Future` as a `Task` on the current `Reactor`. +/// +/// Panics if called from outside `block_on`. +pub fn spawn(fut: F) -> Task +where + F: std::future::Future + 'static, + T: 'static, +{ + Reactor::current().spawn(fut) +} diff --git a/src/runtime/reactor.rs b/src/sys/p2/runtime/reactor.rs similarity index 100% rename from src/runtime/reactor.rs rename to src/sys/p2/runtime/reactor.rs diff --git a/src/io/stdio.rs b/src/sys/p2/stdio.rs similarity index 98% rename from src/io/stdio.rs rename to src/sys/p2/stdio.rs index b2ac153..fa183a3 100644 --- a/src/io/stdio.rs +++ b/src/sys/p2/stdio.rs @@ -1,4 +1,4 @@ -use super::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Result}; +use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Result}; use std::cell::LazyCell; use wasip2::cli::terminal_input::TerminalInput; use wasip2::cli::terminal_output::TerminalOutput; diff --git a/src/sys/p2/time.rs b/src/sys/p2/time.rs new file mode 100644 index 0000000..a6ec660 --- /dev/null +++ b/src/sys/p2/time.rs @@ -0,0 +1,46 @@ +use wasip2::clocks::{ + monotonic_clock::{self, subscribe_duration, subscribe_instant}, + wall_clock, +}; + +use crate::runtime::{AsyncPollable, Reactor}; + +/// A measurement of a monotonically nondecreasing clock. Opaque and useful only +/// with Duration. +pub type MonotonicInstant = monotonic_clock::Instant; + +/// A duration from the monotonic clock, in nanoseconds. +pub type MonotonicDuration = monotonic_clock::Duration; + +/// Return the current monotonic clock instant. +pub fn now() -> MonotonicInstant { + monotonic_clock::now() +} + +/// A measurement of the system clock. +#[derive(Debug, Clone, Copy)] +pub struct SystemTime(wall_clock::Datetime); + +impl SystemTime { + pub fn now() -> Self { + Self(wall_clock::now()) + } +} + +impl From for std::time::SystemTime { + fn from(st: SystemTime) -> Self { + std::time::SystemTime::UNIX_EPOCH + + std::time::Duration::from_secs(st.0.seconds) + + std::time::Duration::from_nanos(st.0.nanoseconds.into()) + } +} + +/// Create a timer that fires at a specific monotonic clock instant. +pub fn subscribe_at(instant: MonotonicInstant) -> AsyncPollable { + Reactor::current().schedule(subscribe_instant(instant)) +} + +/// Create a timer that fires after a monotonic clock duration. +pub fn subscribe_after(duration: MonotonicDuration) -> AsyncPollable { + Reactor::current().schedule(subscribe_duration(duration)) +} diff --git a/src/time/duration.rs b/src/time/duration.rs index 7f67ceb..0533f44 100644 --- a/src/time/duration.rs +++ b/src/time/duration.rs @@ -1,7 +1,6 @@ use super::{Instant, Wait}; use std::future::IntoFuture; use std::ops::{Add, AddAssign, Sub, SubAssign}; -use wasip2::clocks::monotonic_clock; /// A Duration type to represent a span of time, typically used for system /// timeouts. @@ -10,7 +9,7 @@ use wasip2::clocks::monotonic_clock; /// without coherence issues, just like if we were implementing this in the /// stdlib. #[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] -pub struct Duration(pub(crate) monotonic_clock::Duration); +pub struct Duration(pub(crate) crate::sys::time::MonotonicDuration); impl Duration { /// Creates a new `Duration` from the specified number of whole seconds and /// additional nanoseconds. diff --git a/src/time/instant.rs b/src/time/instant.rs index 6e9cf97..79910aa 100644 --- a/src/time/instant.rs +++ b/src/time/instant.rs @@ -1,7 +1,6 @@ use super::{Duration, Wait}; use std::future::IntoFuture; use std::ops::{Add, AddAssign, Sub, SubAssign}; -use wasip2::clocks::monotonic_clock; /// A measurement of a monotonically nondecreasing clock. Opaque and useful only /// with Duration. @@ -10,7 +9,7 @@ use wasip2::clocks::monotonic_clock; /// without coherence issues, just like if we were implementing this in the /// stdlib. #[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] -pub struct Instant(pub(crate) monotonic_clock::Instant); +pub struct Instant(pub(crate) crate::sys::time::MonotonicInstant); impl Instant { /// Returns an instant corresponding to "now". @@ -24,7 +23,7 @@ impl Instant { /// ``` #[must_use] pub fn now() -> Self { - Instant(wasip2::clocks::monotonic_clock::now()) + Instant(crate::sys::time::now()) } /// Returns the amount of time elapsed from another instant to this one, or zero duration if diff --git a/src/time/mod.rs b/src/time/mod.rs index db0e1b3..7a2a821 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -11,36 +11,13 @@ use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use wasip2::clocks::{ - monotonic_clock::{subscribe_duration, subscribe_instant}, - wall_clock, -}; use crate::{ iter::AsyncIterator, - runtime::{AsyncPollable, Reactor}, + runtime::AsyncPollable, }; -/// A measurement of the system clock, useful for talking to external entities -/// like the file system or other processes. May be converted losslessly to a -/// more useful `std::time::SystemTime` to provide more methods. -#[derive(Debug, Clone, Copy)] -#[allow(dead_code)] -pub struct SystemTime(wall_clock::Datetime); - -impl SystemTime { - pub fn now() -> Self { - Self(wall_clock::now()) - } -} - -impl From for std::time::SystemTime { - fn from(st: SystemTime) -> Self { - std::time::SystemTime::UNIX_EPOCH - + std::time::Duration::from_secs(st.0.seconds) - + std::time::Duration::from_nanos(st.0.nanoseconds.into()) - } -} +pub use crate::sys::time::SystemTime; /// An async iterator representing notifications at fixed interval. pub fn interval(duration: Duration) -> Interval { @@ -70,11 +47,11 @@ impl Timer { Timer(None) } pub fn at(deadline: Instant) -> Timer { - let pollable = Reactor::current().schedule(subscribe_instant(deadline.0)); + let pollable = crate::sys::time::subscribe_at(deadline.0); Timer(Some(pollable)) } pub fn after(duration: Duration) -> Timer { - let pollable = Reactor::current().schedule(subscribe_duration(duration.0)); + let pollable = crate::sys::time::subscribe_after(duration.0); Timer(Some(pollable)) } pub fn set_after(&mut self, duration: Duration) { From b09df479a47934fb824a2d9681c223bfd618a16e Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 8 Apr 2026 16:35:48 +0200 Subject: [PATCH 2/7] format cargo.toml --- Cargo.toml | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 92fe25b..4501978 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,9 @@ default = ["json"] json = ["dep:serde", "dep:serde_json"] [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(feature, values("wasip3"))'] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(feature, values("wasip3"))', +] } [dependencies] anyhow.workspace = true @@ -48,12 +50,7 @@ serde = { workspace = true, features = ["derive"] } serde_json.workspace = true [workspace] -members = [ - "axum", - "axum/macro", - "macro", - "test-programs", -] +members = ["axum", "axum/macro", "macro", "test-programs"] resolver = "2" [workspace.package] @@ -89,7 +86,7 @@ http-body-util = "0.1.3" itoa = "1" pin-project-lite = "0.2.8" quote = "1.0" -serde= "1" +serde = "1" serde_json = "1" serde_qs = "0.15" sync_wrapper = "1" @@ -107,6 +104,4 @@ wstd-macro = { path = "./macro", version = "=0.6.6" } [package.metadata.docs.rs] all-features = true -targets = [ - "wasm32-wasip2" -] +targets = ["wasm32-wasip2"] From 4e76cc2ca15c3f85460f2cb106ab66db655574fb Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 8 Apr 2026 16:44:52 +0200 Subject: [PATCH 3/7] flatten top-level stubs --- src/future.rs | 233 +++++++++++++++++++++++++++++ src/future/delay.rs | 63 -------- src/future/future_ext.rs | 76 ---------- src/future/mod.rs | 35 ----- src/future/timeout.rs | 60 -------- src/http.rs | 69 +++++++++ src/http/body.rs | 1 - src/http/error.rs | 13 -- src/http/mod.rs | 21 --- src/http/request.rs | 1 - src/http/response.rs | 1 - src/http/server.rs | 21 --- src/{iter/mod.rs => iter.rs} | 0 src/{net/mod.rs => net.rs} | 0 src/{rand/mod.rs => rand.rs} | 0 src/{runtime/mod.rs => runtime.rs} | 0 src/{time/duration.rs => time.rs} | 203 ++++++++++++++++++++++++- src/time/instant.rs | 90 ----------- src/time/mod.rs | 115 -------------- src/time/utils.rs | 5 - 20 files changed, 502 insertions(+), 505 deletions(-) create mode 100644 src/future.rs delete mode 100644 src/future/delay.rs delete mode 100644 src/future/future_ext.rs delete mode 100644 src/future/mod.rs delete mode 100644 src/future/timeout.rs create mode 100644 src/http.rs delete mode 100644 src/http/body.rs delete mode 100644 src/http/error.rs delete mode 100644 src/http/mod.rs delete mode 100644 src/http/request.rs delete mode 100644 src/http/response.rs delete mode 100644 src/http/server.rs rename src/{iter/mod.rs => iter.rs} (100%) rename src/{net/mod.rs => net.rs} (100%) rename src/{rand/mod.rs => rand.rs} (100%) rename src/{runtime/mod.rs => runtime.rs} (100%) rename src/{time/duration.rs => time.rs} (55%) delete mode 100644 src/time/instant.rs delete mode 100644 src/time/mod.rs delete mode 100644 src/time/utils.rs diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..02c3ce8 --- /dev/null +++ b/src/future.rs @@ -0,0 +1,233 @@ +//! Asynchronous values. +//! +//! # Cancellation +//! +//! Futures can be cancelled by dropping them before they finish executing. This +//! is useful when we're no longer interested in the result of an operation, as +//! it allows us to stop doing needless work. This also means that a future may cancel at any `.await` point, and so just +//! like with `?` we have to be careful to roll back local state if our future +//! halts there. +//! +//! +//! ```no_run +//! use futures_lite::prelude::*; +//! use wstd::prelude::*; +//! use wstd::time::Duration; +//! +//! #[wstd::main] +//! async fn main() { +//! let mut counter = 0; +//! let value = async { "meow" } +//! .delay(Duration::from_millis(100)) +//! .timeout(Duration::from_millis(200)) +//! .await; +//! +//! assert_eq!(value.unwrap(), "meow"); +//! } +//! ``` + +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; + +use pin_project_lite::pin_project; + +use crate::time::utils::timeout_err; + +pub use self::future_ext::FutureExt; + +// ---- Delay ---- + +pin_project! { + /// Suspends a future until the specified deadline. + /// + /// This `struct` is created by the [`delay`] method on [`FutureExt`]. See its + /// documentation for more. + /// + /// [`delay`]: crate::future::FutureExt::delay + /// [`FutureExt`]: crate::future::futureExt + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Delay { + #[pin] + future: F, + #[pin] + deadline: D, + state: DelayState, + } +} + +/// The internal state +#[derive(Debug)] +enum DelayState { + Started, + PollFuture, + Completed, +} + +impl Delay { + fn new(future: F, deadline: D) -> Self { + Self { + future, + deadline, + state: DelayState::Started, + } + } +} + +impl Future for Delay { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + match this.state { + DelayState::Started => { + ready!(this.deadline.as_mut().poll(cx)); + *this.state = DelayState::PollFuture; + } + DelayState::PollFuture => { + let value = ready!(this.future.as_mut().poll(cx)); + *this.state = DelayState::Completed; + return Poll::Ready(value); + } + DelayState::Completed => panic!("future polled after completing"), + } + } + } +} + +// ---- Timeout ---- + +pin_project! { + /// A future that times out after a duration of time. + /// + /// This `struct` is created by the [`timeout`] method on [`FutureExt`]. See its + /// documentation for more. + /// + /// [`timeout`]: crate::future::FutureExt::timeout + /// [`FutureExt`]: crate::future::futureExt + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Timeout { + #[pin] + future: F, + #[pin] + deadline: D, + completed: bool, + } +} + +impl Timeout { + fn new(future: F, deadline: D) -> Self { + Self { + future, + deadline, + completed: false, + } + } +} + +impl Future for Timeout { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + assert!(!*this.completed, "future polled after completing"); + + match this.future.poll(cx) { + Poll::Ready(v) => { + *this.completed = true; + Poll::Ready(Ok(v)) + } + Poll::Pending => match this.deadline.poll(cx) { + Poll::Ready(_) => { + *this.completed = true; + Poll::Ready(Err(timeout_err("future timed out"))) + } + Poll::Pending => Poll::Pending, + }, + } + } +} + +// ---- FutureExt ---- + +mod future_ext { + use super::{Delay, Timeout}; + use std::future::{Future, IntoFuture}; + + /// Extend `Future` with time-based operations. + pub trait FutureExt: Future { + /// Return an error if a future does not complete within a given time span. + /// + /// Typically timeouts are, as the name implies, based on _time_. However + /// this method can time out based on any future. This can be useful in + /// combination with channels, as it allows (long-lived) futures to be + /// cancelled based on some external event. + /// + /// When a timeout is returned, the future will be dropped and destructors + /// will be run. + /// + /// # Example + /// + /// ```no_run + /// use wstd::prelude::*; + /// use wstd::time::{Instant, Duration}; + /// use std::io; + /// + /// #[wstd::main] + /// async fn main() { + /// let res = async { "meow" } + /// .delay(Duration::from_millis(100)) // longer delay + /// .timeout(Duration::from_millis(50)) // shorter timeout + /// .await; + /// assert_eq!(res.unwrap_err().kind(), io::ErrorKind::TimedOut); // error + /// + /// let res = async { "meow" } + /// .delay(Duration::from_millis(50)) // shorter delay + /// .timeout(Duration::from_millis(100)) // longer timeout + /// .await; + /// assert_eq!(res.unwrap(), "meow"); // success + /// } + /// ``` + fn timeout(self, deadline: D) -> Timeout + where + Self: Sized, + D: IntoFuture, + { + Timeout::new(self, deadline.into_future()) + } + + /// Delay resolving the future until the given deadline. + /// + /// The underlying future will not be polled until the deadline has expired. In addition + /// to using a time source as a deadline, any future can be used as a + /// deadline too. When used in combination with a multi-consumer channel, + /// this method can be used to synchronize the start of multiple futures and streams. + /// + /// # Example + /// + /// ```no_run + /// use wstd::prelude::*; + /// use wstd::time::{Instant, Duration}; + /// + /// #[wstd::main] + /// async fn main() { + /// let now = Instant::now(); + /// let delay = Duration::from_millis(100); + /// let _ = async { "meow" }.delay(delay).await; + /// assert!(now.elapsed() >= delay); + /// } + /// ``` + fn delay(self, deadline: D) -> Delay + where + Self: Sized, + D: IntoFuture, + { + Delay::new(self, deadline.into_future()) + } + } + + impl FutureExt for T where T: Future {} +} diff --git a/src/future/delay.rs b/src/future/delay.rs deleted file mode 100644 index 20d6753..0000000 --- a/src/future/delay.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll, ready}; - -use pin_project_lite::pin_project; - -pin_project! { - /// Suspends a future until the specified deadline. - /// - /// This `struct` is created by the [`delay`] method on [`FutureExt`]. See its - /// documentation for more. - /// - /// [`delay`]: crate::future::FutureExt::delay - /// [`FutureExt`]: crate::future::futureExt - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Delay { - #[pin] - future: F, - #[pin] - deadline: D, - state: State, - } -} - -/// The internal state -#[derive(Debug)] -enum State { - Started, - PollFuture, - Completed, -} - -impl Delay { - pub(super) fn new(future: F, deadline: D) -> Self { - Self { - future, - deadline, - state: State::Started, - } - } -} - -impl Future for Delay { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - match this.state { - State::Started => { - ready!(this.deadline.as_mut().poll(cx)); - *this.state = State::PollFuture; - } - State::PollFuture => { - let value = ready!(this.future.as_mut().poll(cx)); - *this.state = State::Completed; - return Poll::Ready(value); - } - State::Completed => panic!("future polled after completing"), - } - } - } -} diff --git a/src/future/future_ext.rs b/src/future/future_ext.rs deleted file mode 100644 index 2835f4b..0000000 --- a/src/future/future_ext.rs +++ /dev/null @@ -1,76 +0,0 @@ -use super::{Delay, Timeout}; -use std::future::{Future, IntoFuture}; - -/// Extend `Future` with time-based operations. -pub trait FutureExt: Future { - /// Return an error if a future does not complete within a given time span. - /// - /// Typically timeouts are, as the name implies, based on _time_. However - /// this method can time out based on any future. This can be useful in - /// combination with channels, as it allows (long-lived) futures to be - /// cancelled based on some external event. - /// - /// When a timeout is returned, the future will be dropped and destructors - /// will be run. - /// - /// # Example - /// - /// ```no_run - /// use wstd::prelude::*; - /// use wstd::time::{Instant, Duration}; - /// use std::io; - /// - /// #[wstd::main] - /// async fn main() { - /// let res = async { "meow" } - /// .delay(Duration::from_millis(100)) // longer delay - /// .timeout(Duration::from_millis(50)) // shorter timeout - /// .await; - /// assert_eq!(res.unwrap_err().kind(), io::ErrorKind::TimedOut); // error - /// - /// let res = async { "meow" } - /// .delay(Duration::from_millis(50)) // shorter delay - /// .timeout(Duration::from_millis(100)) // longer timeout - /// .await; - /// assert_eq!(res.unwrap(), "meow"); // success - /// } - /// ``` - fn timeout(self, deadline: D) -> Timeout - where - Self: Sized, - D: IntoFuture, - { - Timeout::new(self, deadline.into_future()) - } - - /// Delay resolving the future until the given deadline. - /// - /// The underlying future will not be polled until the deadline has expired. In addition - /// to using a time source as a deadline, any future can be used as a - /// deadline too. When used in combination with a multi-consumer channel, - /// this method can be used to synchronize the start of multiple futures and streams. - /// - /// # Example - /// - /// ```no_run - /// use wstd::prelude::*; - /// use wstd::time::{Instant, Duration}; - /// - /// #[wstd::main] - /// async fn main() { - /// let now = Instant::now(); - /// let delay = Duration::from_millis(100); - /// let _ = async { "meow" }.delay(delay).await; - /// assert!(now.elapsed() >= delay); - /// } - /// ``` - fn delay(self, deadline: D) -> Delay - where - Self: Sized, - D: IntoFuture, - { - Delay::new(self, deadline.into_future()) - } -} - -impl FutureExt for T where T: Future {} diff --git a/src/future/mod.rs b/src/future/mod.rs deleted file mode 100644 index a359afd..0000000 --- a/src/future/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! Asynchronous values. -//! -//! # Cancellation -//! -//! Futures can be cancelled by dropping them before they finish executing. This -//! is useful when we're no longer interested in the result of an operation, as -//! it allows us to stop doing needless work. This also means that a future may cancel at any `.await` point, and so just -//! like with `?` we have to be careful to roll back local state if our future -//! halts there. -//! -//! -//! ```no_run -//! use futures_lite::prelude::*; -//! use wstd::prelude::*; -//! use wstd::time::Duration; -//! -//! #[wstd::main] -//! async fn main() { -//! let mut counter = 0; -//! let value = async { "meow" } -//! .delay(Duration::from_millis(100)) -//! .timeout(Duration::from_millis(200)) -//! .await; -//! -//! assert_eq!(value.unwrap(), "meow"); -//! } -//! ``` - -mod delay; -mod future_ext; -mod timeout; - -pub use delay::Delay; -pub use future_ext::FutureExt; -pub use timeout::Timeout; diff --git a/src/future/timeout.rs b/src/future/timeout.rs deleted file mode 100644 index 9b00e1b..0000000 --- a/src/future/timeout.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::time::utils::timeout_err; - -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use pin_project_lite::pin_project; - -pin_project! { - /// A future that times out after a duration of time. - /// - /// This `struct` is created by the [`timeout`] method on [`FutureExt`]. See its - /// documentation for more. - /// - /// [`timeout`]: crate::future::FutureExt::timeout - /// [`FutureExt`]: crate::future::futureExt - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Timeout { - #[pin] - future: F, - #[pin] - deadline: D, - completed: bool, - } -} - -impl Timeout { - pub(super) fn new(future: F, deadline: D) -> Self { - Self { - future, - deadline, - completed: false, - } - } -} - -impl Future for Timeout { - type Output = io::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - assert!(!*this.completed, "future polled after completing"); - - match this.future.poll(cx) { - Poll::Ready(v) => { - *this.completed = true; - Poll::Ready(Ok(v)) - } - Poll::Pending => match this.deadline.poll(cx) { - Poll::Ready(_) => { - *this.completed = true; - Poll::Ready(Err(timeout_err("future timed out"))) - } - Poll::Pending => Poll::Pending, - }, - } - } -} diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..7180da7 --- /dev/null +++ b/src/http.rs @@ -0,0 +1,69 @@ +//! HTTP networking support + +pub use http::status::StatusCode; +pub use http::uri::{Authority, PathAndQuery, Uri}; + +#[doc(inline)] +pub use body::{Body, util::BodyExt}; +pub use crate::sys::http::client::Client; +pub use error::{Error, ErrorCode, Result}; +pub use crate::sys::http::fields::{HeaderMap, HeaderName, HeaderValue}; +pub use crate::sys::http::method::Method; +pub use request::Request; +pub use response::Response; +pub use crate::sys::http::scheme::{InvalidUri, Scheme}; + +pub mod body { + //! HTTP body types. + pub use crate::sys::http::body::*; +} + +pub mod error { + //! The http portion of wstd uses `anyhow::Error` as its `Error` type. + //! + //! There are various concrete error types + + pub use crate::http::body::InvalidContentLength; + pub use anyhow::Context; + pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; + pub use http::method::InvalidMethod; + pub use crate::sys::http::{ErrorCode, HeaderError}; + + pub type Error = anyhow::Error; + /// The `http` result type. + pub type Result = std::result::Result; +} + +pub mod request { + //! HTTP request types. + pub use crate::sys::http::request::*; +} + +pub mod response { + //! HTTP response types. + pub use crate::sys::http::response::*; +} + +pub mod server { + //! HTTP servers + //! + //! The WASI HTTP server uses the [typed main] idiom, with a `main` function + //! that takes a [`Request`] and succeeds with a [`Response`], using the + //! [`http_server`] macro: + //! + //! ```no_run + //! use wstd::http::{Request, Response, Body, Error}; + //! #[wstd::http_server] + //! async fn main(_request: Request) -> Result, Error> { + //! Ok(Response::new("Hello!\n".into())) + //! } + //! ``` + //! + //! [typed main]: https://sunfishcode.github.io/typed-main-wasi-presentation/chapter_1.html + //! [`Request`]: crate::http::Request + //! [`Responder`]: crate::http::server::Responder + //! [`Response`]: crate::http::Response + //! [`http_server`]: crate::http_server + + pub use crate::sys::http::server::*; +} diff --git a/src/http/body.rs b/src/http/body.rs deleted file mode 100644 index b190a4b..0000000 --- a/src/http/body.rs +++ /dev/null @@ -1 +0,0 @@ -pub use crate::sys::http::body::*; diff --git a/src/http/error.rs b/src/http/error.rs deleted file mode 100644 index aaef852..0000000 --- a/src/http/error.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! The http portion of wstd uses `anyhow::Error` as its `Error` type. -//! -//! There are various concrete error types - -pub use crate::http::body::InvalidContentLength; -pub use anyhow::Context; -pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; -pub use http::method::InvalidMethod; -pub use crate::sys::http::{ErrorCode, HeaderError}; - -pub type Error = anyhow::Error; -/// The `http` result type. -pub type Result = std::result::Result; diff --git a/src/http/mod.rs b/src/http/mod.rs deleted file mode 100644 index 952a875..0000000 --- a/src/http/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! HTTP networking support -//! -pub use http::status::StatusCode; -pub use http::uri::{Authority, PathAndQuery, Uri}; - -#[doc(inline)] -pub use body::{Body, util::BodyExt}; -pub use crate::sys::http::client::Client; -pub use error::{Error, ErrorCode, Result}; -pub use crate::sys::http::fields::{HeaderMap, HeaderName, HeaderValue}; -pub use crate::sys::http::method::Method; -pub use request::Request; -pub use response::Response; -pub use crate::sys::http::scheme::{InvalidUri, Scheme}; - -pub mod body; - -pub mod error; -pub mod request; -pub mod response; -pub mod server; diff --git a/src/http/request.rs b/src/http/request.rs deleted file mode 100644 index cbe2b91..0000000 --- a/src/http/request.rs +++ /dev/null @@ -1 +0,0 @@ -pub use crate::sys::http::request::*; diff --git a/src/http/response.rs b/src/http/response.rs deleted file mode 100644 index 07dd215..0000000 --- a/src/http/response.rs +++ /dev/null @@ -1 +0,0 @@ -pub use crate::sys::http::response::*; diff --git a/src/http/server.rs b/src/http/server.rs deleted file mode 100644 index 853743d..0000000 --- a/src/http/server.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! HTTP servers -//! -//! The WASI HTTP server uses the [typed main] idiom, with a `main` function -//! that takes a [`Request`] and succeeds with a [`Response`], using the -//! [`http_server`] macro: -//! -//! ```no_run -//! use wstd::http::{Request, Response, Body, Error}; -//! #[wstd::http_server] -//! async fn main(_request: Request) -> Result, Error> { -//! Ok(Response::new("Hello!\n".into())) -//! } -//! ``` -//! -//! [typed main]: https://sunfishcode.github.io/typed-main-wasi-presentation/chapter_1.html -//! [`Request`]: crate::http::Request -//! [`Responder`]: crate::http::server::Responder -//! [`Response`]: crate::http::Response -//! [`http_server`]: crate::http_server - -pub use crate::sys::http::server::*; diff --git a/src/iter/mod.rs b/src/iter.rs similarity index 100% rename from src/iter/mod.rs rename to src/iter.rs diff --git a/src/net/mod.rs b/src/net.rs similarity index 100% rename from src/net/mod.rs rename to src/net.rs diff --git a/src/rand/mod.rs b/src/rand.rs similarity index 100% rename from src/rand/mod.rs rename to src/rand.rs diff --git a/src/runtime/mod.rs b/src/runtime.rs similarity index 100% rename from src/runtime/mod.rs rename to src/runtime.rs diff --git a/src/time/duration.rs b/src/time.rs similarity index 55% rename from src/time/duration.rs rename to src/time.rs index 0533f44..125194b 100644 --- a/src/time/duration.rs +++ b/src/time.rs @@ -1,6 +1,27 @@ -use super::{Instant, Wait}; -use std::future::IntoFuture; +//! Async time interfaces. + +pub(crate) mod utils { + use std::io; + + pub(crate) fn timeout_err(msg: &'static str) -> io::Error { + io::Error::new(io::ErrorKind::TimedOut, msg) + } +} + +use pin_project_lite::pin_project; +use std::future::{Future, IntoFuture}; use std::ops::{Add, AddAssign, Sub, SubAssign}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::{ + iter::AsyncIterator, + runtime::AsyncPollable, +}; + +pub use crate::sys::time::SystemTime; + +// ---- Duration ---- /// A Duration type to represent a span of time, typically used for system /// timeouts. @@ -161,10 +182,186 @@ impl IntoFuture for Duration { } } +// ---- Instant ---- + +/// A measurement of a monotonically nondecreasing clock. Opaque and useful only +/// with Duration. +/// +/// This type wraps `std::time::Duration` so we can implement traits on it +/// without coherence issues, just like if we were implementing this in the +/// stdlib. +#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] +pub struct Instant(pub(crate) crate::sys::time::MonotonicInstant); + +impl Instant { + /// Returns an instant corresponding to "now". + /// + /// # Examples + /// + /// ```no_run + /// use wstd::time::Instant; + /// + /// let now = Instant::now(); + /// ``` + #[must_use] + pub fn now() -> Self { + Instant(crate::sys::time::now()) + } + + /// Returns the amount of time elapsed from another instant to this one, or zero duration if + /// that instant is later than this one. + pub fn duration_since(&self, earlier: Instant) -> Duration { + Duration::from_nanos(self.0.saturating_sub(earlier.0)) + } + + /// Returns the amount of time elapsed since this instant. + pub fn elapsed(&self) -> Duration { + Instant::now().duration_since(*self) + } +} + +impl Add for Instant { + type Output = Self; + + fn add(self, rhs: Duration) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, rhs: Duration) { + *self = Self(self.0 + rhs.0) + } +} + +impl Sub for Instant { + type Output = Self; + + fn sub(self, rhs: Duration) -> Self::Output { + Self(self.0 - rhs.0) + } +} + +impl SubAssign for Instant { + fn sub_assign(&mut self, rhs: Duration) { + *self = Self(self.0 - rhs.0) + } +} + +impl IntoFuture for Instant { + type Output = Instant; + + type IntoFuture = Wait; + + fn into_future(self) -> Self::IntoFuture { + crate::task::sleep_until(self) + } +} + +// ---- Timer / Interval ---- + +/// An async iterator representing notifications at fixed interval. +pub fn interval(duration: Duration) -> Interval { + Interval { duration } +} + +/// An async iterator representing notifications at fixed interval. +/// +/// See the [`interval`] function for more. +#[derive(Debug)] +pub struct Interval { + duration: Duration, +} +impl AsyncIterator for Interval { + type Item = Instant; + + async fn next(&mut self) -> Option { + Some(Timer::after(self.duration).wait().await) + } +} + +#[derive(Debug)] +pub struct Timer(Option); + +impl Timer { + pub fn never() -> Timer { + Timer(None) + } + pub fn at(deadline: Instant) -> Timer { + let pollable = crate::sys::time::subscribe_at(deadline.0); + Timer(Some(pollable)) + } + pub fn after(duration: Duration) -> Timer { + let pollable = crate::sys::time::subscribe_after(duration.0); + Timer(Some(pollable)) + } + pub fn set_after(&mut self, duration: Duration) { + *self = Self::after(duration); + } + pub fn wait(&self) -> Wait { + let wait_for = self.0.as_ref().map(AsyncPollable::wait_for); + Wait { wait_for } + } +} + +pin_project! { + /// Future created by [`Timer::wait`] + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Wait { + #[pin] + wait_for: Option + } +} + +impl Future for Wait { + type Output = Instant; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.wait_for.as_pin_mut() { + None => Poll::Pending, + Some(f) => match f.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready(Instant::now()), + }, + } + } +} + #[cfg(test)] -mod tests { +mod test { use super::*; + async fn debug_duration(what: &str, f: impl Future) { + let start = Instant::now(); + let now = f.await; + let d = now.duration_since(start); + let d: std::time::Duration = d.into(); + println!("{what} awaited for {} s", d.as_secs_f32()); + } + + #[test] + fn timer_now() { + crate::runtime::block_on(debug_duration("timer_now", async { + Timer::at(Instant::now()).wait().await + })); + } + + #[test] + fn timer_after_100_milliseconds() { + crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { + Timer::after(Duration::from_millis(100)).wait().await + })); + } + + #[test] + fn test_duration_since() { + let x = Instant::now(); + let d = Duration::new(456, 789); + let y = x + d; + assert_eq!(y.duration_since(x), d); + } + #[test] fn test_new_from_as() { assert_eq!(Duration::new(456, 864209753).as_secs(), 456); diff --git a/src/time/instant.rs b/src/time/instant.rs deleted file mode 100644 index 79910aa..0000000 --- a/src/time/instant.rs +++ /dev/null @@ -1,90 +0,0 @@ -use super::{Duration, Wait}; -use std::future::IntoFuture; -use std::ops::{Add, AddAssign, Sub, SubAssign}; - -/// A measurement of a monotonically nondecreasing clock. Opaque and useful only -/// with Duration. -/// -/// This type wraps `std::time::Duration` so we can implement traits on it -/// without coherence issues, just like if we were implementing this in the -/// stdlib. -#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] -pub struct Instant(pub(crate) crate::sys::time::MonotonicInstant); - -impl Instant { - /// Returns an instant corresponding to "now". - /// - /// # Examples - /// - /// ```no_run - /// use wstd::time::Instant; - /// - /// let now = Instant::now(); - /// ``` - #[must_use] - pub fn now() -> Self { - Instant(crate::sys::time::now()) - } - - /// Returns the amount of time elapsed from another instant to this one, or zero duration if - /// that instant is later than this one. - pub fn duration_since(&self, earlier: Instant) -> Duration { - Duration::from_nanos(self.0.saturating_sub(earlier.0)) - } - - /// Returns the amount of time elapsed since this instant. - pub fn elapsed(&self) -> Duration { - Instant::now().duration_since(*self) - } -} - -impl Add for Instant { - type Output = Self; - - fn add(self, rhs: Duration) -> Self::Output { - Self(self.0 + rhs.0) - } -} - -impl AddAssign for Instant { - fn add_assign(&mut self, rhs: Duration) { - *self = Self(self.0 + rhs.0) - } -} - -impl Sub for Instant { - type Output = Self; - - fn sub(self, rhs: Duration) -> Self::Output { - Self(self.0 - rhs.0) - } -} - -impl SubAssign for Instant { - fn sub_assign(&mut self, rhs: Duration) { - *self = Self(self.0 - rhs.0) - } -} - -impl IntoFuture for Instant { - type Output = Instant; - - type IntoFuture = Wait; - - fn into_future(self) -> Self::IntoFuture { - crate::task::sleep_until(self) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_duration_since() { - let x = Instant::now(); - let d = Duration::new(456, 789); - let y = x + d; - assert_eq!(y.duration_since(x), d); - } -} diff --git a/src/time/mod.rs b/src/time/mod.rs deleted file mode 100644 index 7a2a821..0000000 --- a/src/time/mod.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! Async time interfaces. - -pub(crate) mod utils; - -mod duration; -mod instant; -pub use duration::Duration; -pub use instant::Instant; - -use pin_project_lite::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use crate::{ - iter::AsyncIterator, - runtime::AsyncPollable, -}; - -pub use crate::sys::time::SystemTime; - -/// An async iterator representing notifications at fixed interval. -pub fn interval(duration: Duration) -> Interval { - Interval { duration } -} - -/// An async iterator representing notifications at fixed interval. -/// -/// See the [`interval`] function for more. -#[derive(Debug)] -pub struct Interval { - duration: Duration, -} -impl AsyncIterator for Interval { - type Item = Instant; - - async fn next(&mut self) -> Option { - Some(Timer::after(self.duration).wait().await) - } -} - -#[derive(Debug)] -pub struct Timer(Option); - -impl Timer { - pub fn never() -> Timer { - Timer(None) - } - pub fn at(deadline: Instant) -> Timer { - let pollable = crate::sys::time::subscribe_at(deadline.0); - Timer(Some(pollable)) - } - pub fn after(duration: Duration) -> Timer { - let pollable = crate::sys::time::subscribe_after(duration.0); - Timer(Some(pollable)) - } - pub fn set_after(&mut self, duration: Duration) { - *self = Self::after(duration); - } - pub fn wait(&self) -> Wait { - let wait_for = self.0.as_ref().map(AsyncPollable::wait_for); - Wait { wait_for } - } -} - -pin_project! { - /// Future created by [`Timer::wait`] - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Wait { - #[pin] - wait_for: Option - } -} - -impl Future for Wait { - type Output = Instant; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.wait_for.as_pin_mut() { - None => Poll::Pending, - Some(f) => match f.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(()) => Poll::Ready(Instant::now()), - }, - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - async fn debug_duration(what: &str, f: impl Future) { - let start = Instant::now(); - let now = f.await; - let d = now.duration_since(start); - let d: std::time::Duration = d.into(); - println!("{what} awaited for {} s", d.as_secs_f32()); - } - - #[test] - fn timer_now() { - crate::runtime::block_on(debug_duration("timer_now", async { - Timer::at(Instant::now()).wait().await - })); - } - - #[test] - fn timer_after_100_milliseconds() { - crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { - Timer::after(Duration::from_millis(100)).wait().await - })); - } -} diff --git a/src/time/utils.rs b/src/time/utils.rs deleted file mode 100644 index e6e3993..0000000 --- a/src/time/utils.rs +++ /dev/null @@ -1,5 +0,0 @@ -use std::io; - -pub(crate) fn timeout_err(msg: &'static str) -> io::Error { - io::Error::new(io::ErrorKind::TimedOut, msg) -} From 2c72f72e352141d57a9c3923c95d6cd5be37ebd6 Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 8 Apr 2026 17:00:45 +0200 Subject: [PATCH 4/7] format --- src/http.rs | 10 +++++----- src/time.rs | 5 +---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/http.rs b/src/http.rs index 7180da7..12309be 100644 --- a/src/http.rs +++ b/src/http.rs @@ -3,15 +3,15 @@ pub use http::status::StatusCode; pub use http::uri::{Authority, PathAndQuery, Uri}; -#[doc(inline)] -pub use body::{Body, util::BodyExt}; pub use crate::sys::http::client::Client; -pub use error::{Error, ErrorCode, Result}; pub use crate::sys::http::fields::{HeaderMap, HeaderName, HeaderValue}; pub use crate::sys::http::method::Method; +pub use crate::sys::http::scheme::{InvalidUri, Scheme}; +#[doc(inline)] +pub use body::{Body, util::BodyExt}; +pub use error::{Error, ErrorCode, Result}; pub use request::Request; pub use response::Response; -pub use crate::sys::http::scheme::{InvalidUri, Scheme}; pub mod body { //! HTTP body types. @@ -24,10 +24,10 @@ pub mod error { //! There are various concrete error types pub use crate::http::body::InvalidContentLength; + pub use crate::sys::http::{ErrorCode, HeaderError}; pub use anyhow::Context; pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; pub use http::method::InvalidMethod; - pub use crate::sys::http::{ErrorCode, HeaderError}; pub type Error = anyhow::Error; /// The `http` result type. diff --git a/src/time.rs b/src/time.rs index 125194b..c9a9495 100644 --- a/src/time.rs +++ b/src/time.rs @@ -14,10 +14,7 @@ use std::ops::{Add, AddAssign, Sub, SubAssign}; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::{ - iter::AsyncIterator, - runtime::AsyncPollable, -}; +use crate::{iter::AsyncIterator, runtime::AsyncPollable}; pub use crate::sys::time::SystemTime; From 0da08c43a58f084464430ac0348816f1462f2ff4 Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 8 Apr 2026 17:39:24 +0200 Subject: [PATCH 5/7] undo migration changes --- src/future.rs | 16 ++++++++-------- src/sys/p2/http/request.rs | 2 ++ src/sys/p2/http/response.rs | 9 +++++++++ src/sys/p2/http/server.rs | 20 ++++++++++++++++++++ src/sys/p2/io.rs | 6 +++--- src/sys/p2/net/mod.rs | 4 +++- src/sys/p2/random.rs | 2 ++ src/sys/p2/runtime/mod.rs | 2 +- src/sys/p2/time.rs | 5 ++++- 9 files changed, 52 insertions(+), 14 deletions(-) diff --git a/src/future.rs b/src/future.rs index 02c3ce8..b4762e0 100644 --- a/src/future.rs +++ b/src/future.rs @@ -53,13 +53,13 @@ pin_project! { future: F, #[pin] deadline: D, - state: DelayState, + state: State, } } /// The internal state #[derive(Debug)] -enum DelayState { +enum State { Started, PollFuture, Completed, @@ -70,7 +70,7 @@ impl Delay { Self { future, deadline, - state: DelayState::Started, + state: State::Started, } } } @@ -82,16 +82,16 @@ impl Future for Delay { let mut this = self.project(); loop { match this.state { - DelayState::Started => { + State::Started => { ready!(this.deadline.as_mut().poll(cx)); - *this.state = DelayState::PollFuture; + *this.state = State::PollFuture; } - DelayState::PollFuture => { + State::PollFuture => { let value = ready!(this.future.as_mut().poll(cx)); - *this.state = DelayState::Completed; + *this.state = State::Completed; return Poll::Ready(value); } - DelayState::Completed => panic!("future polled after completing"), + State::Completed => panic!("future polled after completing"), } } } diff --git a/src/sys/p2/http/request.rs b/src/sys/p2/http/request.rs index efde40d..6a429ce 100644 --- a/src/sys/p2/http/request.rs +++ b/src/sys/p2/http/request.rs @@ -12,6 +12,8 @@ use wasip2::http::types::IncomingRequest; pub use http::request::{Builder, Request}; +// TODO: go back and add json stuff??? + pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingRequest, T), Error> { let wasi_req = OutgoingRequest::new(header_map_to_wasi(request.headers())?); diff --git a/src/sys/p2/http/response.rs b/src/sys/p2/http/response.rs index c25693a..db09136 100644 --- a/src/sys/p2/http/response.rs +++ b/src/sys/p2/http/response.rs @@ -21,6 +21,15 @@ pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result` or + // `TryInto`. Since the `Builder::header` method is never + // used, we know `Builder::headers_mut` will never give the None case, nor + // will `Builder::body` give the error case. So, rather than treat those + // as control flow, we unwrap if this invariant is ever broken because + // that would only be possible due to some unrecoverable bug in wstd, + // rather than incorrect use or invalid input. let mut builder = Response::builder().status(status); *builder.headers_mut().expect("builder has not errored") = headers; Ok(builder diff --git a/src/sys/p2/http/server.rs b/src/sys/p2/http/server.rs index 36d9f94..b87cdc8 100644 --- a/src/sys/p2/http/server.rs +++ b/src/sys/p2/http/server.rs @@ -1,3 +1,23 @@ +//! HTTP servers +//! +//! The WASI HTTP server uses the [typed main] idiom, with a `main` function +//! that takes a [`Request`] and succeeds with a [`Response`], using the +//! [`http_server`] macro: +//! +//! ```no_run +//! use wstd::http::{Request, Response, Body, Error}; +//! #[wstd::http_server] +//! async fn main(_request: Request) -> Result, Error> { +//! Ok(Response::new("Hello!\n".into())) +//! } +//! ``` +//! +//! [typed main]: https://sunfishcode.github.io/typed-main-wasi-presentation/chapter_1.html +//! [`Request`]: crate::http::Request +//! [`Responder`]: crate::http::server::Responder +//! [`Response`]: crate::http::Response +//! [`http_server`]: crate::http_server + use crate::http::{Body, Error, Response, error::ErrorCode}; use super::fields::header_map_to_wasi; use http::header::CONTENT_LENGTH; diff --git a/src/sys/p2/io.rs b/src/sys/p2/io.rs index f5fe7af..5f95ca5 100644 --- a/src/sys/p2/io.rs +++ b/src/sys/p2/io.rs @@ -26,7 +26,7 @@ impl AsyncInputStream { stream, } } - pub(crate) fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { // Lazily initialize the AsyncPollable let subscription = self .subscription @@ -43,7 +43,7 @@ impl AsyncInputStream { } } /// Await for read readiness. - pub(crate) async fn ready(&self) { + async fn ready(&self) { poll_fn(|cx| self.poll_ready(cx)).await } /// Asynchronously read from the input stream. @@ -233,7 +233,7 @@ impl AsyncOutputStream { } } /// Await write readiness. - pub(crate) async fn ready(&self) { + async fn ready(&self) { // Lazily initialize the AsyncPollable let subscription = self .subscription diff --git a/src/sys/p2/net/mod.rs b/src/sys/p2/net/mod.rs index dfd36ad..1600edc 100644 --- a/src/sys/p2/net/mod.rs +++ b/src/sys/p2/net/mod.rs @@ -1,3 +1,5 @@ +//! Async network abstractions. + use std::io::{self, ErrorKind}; use wasip2::sockets::network::ErrorCode; @@ -7,7 +9,7 @@ mod tcp_stream; pub use tcp_listener::*; pub use tcp_stream::*; -pub(crate) fn to_io_err(err: ErrorCode) -> io::Error { +fn to_io_err(err: ErrorCode) -> io::Error { match err { ErrorCode::Unknown => ErrorKind::Other.into(), ErrorCode::AccessDenied => ErrorKind::PermissionDenied.into(), diff --git a/src/sys/p2/random.rs b/src/sys/p2/random.rs index c477026..8474b80 100644 --- a/src/sys/p2/random.rs +++ b/src/sys/p2/random.rs @@ -1,3 +1,5 @@ +//! Random number generation. + use wasip2::random; /// Fill the slice with cryptographically secure random bytes. diff --git a/src/sys/p2/runtime/mod.rs b/src/sys/p2/runtime/mod.rs index 4a1595a..713dc6f 100644 --- a/src/sys/p2/runtime/mod.rs +++ b/src/sys/p2/runtime/mod.rs @@ -14,7 +14,7 @@ use std::future::Future; use std::pin::pin; use std::task::{Context, Poll, Waker}; -/// Start the event loop. Blocks until the future completes. +/// Start the event loop. Blocks until the future pub fn block_on(fut: F) -> F::Output where F: Future, diff --git a/src/sys/p2/time.rs b/src/sys/p2/time.rs index a6ec660..238a218 100644 --- a/src/sys/p2/time.rs +++ b/src/sys/p2/time.rs @@ -17,8 +17,11 @@ pub fn now() -> MonotonicInstant { monotonic_clock::now() } -/// A measurement of the system clock. +/// A measurement of the system clock, useful for talking to external entities +/// like the file system or other processes. May be converted losslessly to a +/// more useful `std::time::SystemTime` to provide more methods. #[derive(Debug, Clone, Copy)] +#[allow(dead_code)] pub struct SystemTime(wall_clock::Datetime); impl SystemTime { From 1231184343a8bb6922279c56064f54f1d58efd6f Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 8 Apr 2026 17:43:23 +0200 Subject: [PATCH 6/7] fix cfg --- src/sys/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 5d2f16a..aaa8e41 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -1,10 +1,9 @@ cfg_if::cfg_if! { - if #[cfg(any(feature = "wasip3", all(target_os = "wasi", target_env = "p3")))] { - mod p3; - use p3 as backend; - } else { + if #[cfg(all(target_os = "wasi", target_env = "p2"))] { mod p2; use p2 as backend; + } else { + compile_error!("unsupported target: wstd only compiles on `wasm32-wasip2`"); } } From d876500b24a27ffe6acd54fdcc77201687a37109 Mon Sep 17 00:00:00 2001 From: Yosh Date: Wed, 8 Apr 2026 17:43:35 +0200 Subject: [PATCH 7/7] fmt --- src/sys/p2/http/client.rs | 2 +- src/sys/p2/http/request.rs | 6 +++--- src/sys/p2/http/response.rs | 2 +- src/sys/p2/http/server.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/sys/p2/http/client.rs b/src/sys/p2/http/client.rs index 6520a16..4957f72 100644 --- a/src/sys/p2/http/client.rs +++ b/src/sys/p2/http/client.rs @@ -1,6 +1,6 @@ -use crate::http::{Body, Error, Request, Response}; use super::request::try_into_outgoing; use super::response::try_from_incoming; +use crate::http::{Body, Error, Request, Response}; use crate::io::AsyncPollable; use crate::time::Duration; use wasip2::http::types::RequestOptions as WasiRequestOptions; diff --git a/src/sys/p2/http/request.rs b/src/sys/p2/http/request.rs index 6a429ce..a2aa9f8 100644 --- a/src/sys/p2/http/request.rs +++ b/src/sys/p2/http/request.rs @@ -1,11 +1,11 @@ +use super::fields::{header_map_from_wasi, header_map_to_wasi}; +use super::method::{from_wasi_method, to_wasi_method}; +use super::scheme::{from_wasi_scheme, to_wasi_scheme}; use crate::http::{ Authority, HeaderMap, PathAndQuery, Uri, body::{Body, BodyHint}, error::{Context, Error, ErrorCode}, }; -use super::fields::{header_map_from_wasi, header_map_to_wasi}; -use super::method::{from_wasi_method, to_wasi_method}; -use super::scheme::{from_wasi_scheme, to_wasi_scheme}; use wasip2::http::outgoing_handler::OutgoingRequest; use wasip2::http::types::IncomingRequest; diff --git a/src/sys/p2/http/response.rs b/src/sys/p2/http/response.rs index db09136..c3cae53 100644 --- a/src/sys/p2/http/response.rs +++ b/src/sys/p2/http/response.rs @@ -1,9 +1,9 @@ use http::StatusCode; use wasip2::http::types::IncomingResponse; +use super::fields::{HeaderMap, header_map_from_wasi}; use crate::http::body::{Body, BodyHint}; use crate::http::error::Error; -use super::fields::{HeaderMap, header_map_from_wasi}; pub use http::response::{Builder, Response}; diff --git a/src/sys/p2/http/server.rs b/src/sys/p2/http/server.rs index b87cdc8..2673200 100644 --- a/src/sys/p2/http/server.rs +++ b/src/sys/p2/http/server.rs @@ -18,8 +18,8 @@ //! [`Response`]: crate::http::Response //! [`http_server`]: crate::http_server -use crate::http::{Body, Error, Response, error::ErrorCode}; use super::fields::header_map_to_wasi; +use crate::http::{Body, Error, Response, error::ErrorCode}; use http::header::CONTENT_LENGTH; use wasip2::exports::http::incoming_handler::ResponseOutparam; use wasip2::http::types::OutgoingResponse;