diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 71828f1e3..ab31392a1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -226,6 +226,7 @@ jobs: - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async-spawn - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async,macros - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features inter-task-wakeup + - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features futures-stream # Verity that documentation can be generated for the rust bindings crate. - run: rustup update nightly --no-self-update diff --git a/crates/guest-rust/Cargo.toml b/crates/guest-rust/Cargo.toml index c38a70e63..d7b7f80d1 100644 --- a/crates/guest-rust/Cargo.toml +++ b/crates/guest-rust/Cargo.toml @@ -35,6 +35,7 @@ std = [] async = ["std", "wit-bindgen-rust-macro?/async"] bitflags = ["dep:bitflags"] async-spawn = ['async', 'dep:futures'] +futures-stream = ['async', 'dep:futures'] macro-string = ["wit-bindgen-rust-macro?/macro-string"] # Unstable feature to support being a libstd dependency diff --git a/crates/guest-rust/src/rt/async_support.rs b/crates/guest-rust/src/rt/async_support.rs index 080919a8c..3d5a3f68a 100644 --- a/crates/guest-rust/src/rt/async_support.rs +++ b/crates/guest-rust/src/rt/async_support.rs @@ -61,6 +61,8 @@ mod abi_buffer; mod cabi; mod error_context; mod future_support; +#[cfg(feature = "futures-stream")] +mod futures_stream; #[cfg(feature = "inter-task-wakeup")] mod inter_task_wakeup; mod stream_support; @@ -79,6 +81,8 @@ use self::waitable_set::WaitableSet; pub use abi_buffer::*; pub use error_context::*; pub use future_support::*; +#[cfg(feature = "futures-stream")] +pub use futures_stream::*; pub use stream_support::*; #[doc(hidden)] pub use subtask::Subtask; diff --git a/crates/guest-rust/src/rt/async_support/futures_stream.rs b/crates/guest-rust/src/rt/async_support/futures_stream.rs new file mode 100644 index 000000000..893df8294 --- /dev/null +++ b/crates/guest-rust/src/rt/async_support/futures_stream.rs @@ -0,0 +1,100 @@ +use super::stream_support::{RawStreamReader, StreamOps, StreamVtable}; +use std::boxed::Box; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`]. +/// +/// Obtain one via [`RawStreamReader::into_stream`] or +/// [`RawStreamReaderStream::new`]. +pub struct RawStreamReaderStream { + state: StreamAdapterState, +} + +// SAFETY: No field is structurally pinned. The inner `Pin>` +// is itself `Unpin`, and `RawStreamReader` is only stored when idle. +impl Unpin for RawStreamReaderStream {} + +/// Convenience alias for the common vtable-based case. +pub type StreamReaderStream = RawStreamReaderStream<&'static StreamVtable>; + +type ReadNextFut = + Pin, Option<::Payload>)>>>; + +enum StreamAdapterState { + /// The reader is idle and ready for the next read. + Idle(RawStreamReader), + /// A read is in progress. + Reading(ReadNextFut), + /// The stream has been exhausted. + Complete, +} + +impl RawStreamReaderStream { + /// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`]. + pub fn new(reader: RawStreamReader) -> Self { + Self { + state: StreamAdapterState::Idle(reader), + } + } + + /// Recover the underlying [`RawStreamReader`], if no read is in flight. + /// + /// Returns `None` when a read is currently in progress or the stream has + /// already finished. + pub fn into_inner(self) -> Option> { + match self.state { + StreamAdapterState::Idle(reader) => Some(reader), + _ => None, + } + } +} + +impl futures::stream::Stream for RawStreamReaderStream { + type Item = O::Payload; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>` + // can be freely projected. + loop { + match std::mem::replace(&mut self.state, StreamAdapterState::Complete) { + StreamAdapterState::Idle(mut reader) => { + let fut: ReadNextFut = Box::pin(async move { + let item = reader.next().await; + (reader, item) + }); + self.state = StreamAdapterState::Reading(fut); + // Loop to immediately poll the new future. + } + StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) { + Poll::Pending => { + self.state = StreamAdapterState::Reading(fut); + return Poll::Pending; + } + Poll::Ready((reader, Some(item))) => { + self.state = StreamAdapterState::Idle(reader); + return Poll::Ready(Some(item)); + } + Poll::Ready((_reader, None)) => { + self.state = StreamAdapterState::Complete; + return Poll::Ready(None); + } + }, + StreamAdapterState::Complete => { + self.state = StreamAdapterState::Complete; + return Poll::Ready(None); + } + } + } + } +} + +impl RawStreamReader { + /// Convert this reader into a [`futures::Stream`]. + pub fn into_stream(self) -> RawStreamReaderStream { + RawStreamReaderStream::new(self) + } +} diff --git a/crates/test/src/rust.rs b/crates/test/src/rust.rs index a89447e4d..5b56e34f9 100644 --- a/crates/test/src/rust.rs +++ b/crates/test/src/rust.rs @@ -132,7 +132,7 @@ name = "tmp" [workspace] [dependencies] -wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup'] }} +wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup', 'futures-stream'] }} futures = "0.3.31" [lib] diff --git a/tests/runtime-async/async/stream-to-futures-stream/runner.rs b/tests/runtime-async/async/stream-to-futures-stream/runner.rs new file mode 100644 index 000000000..1ff63ff2a --- /dev/null +++ b/tests/runtime-async/async/stream-to-futures-stream/runner.rs @@ -0,0 +1,24 @@ +//@ wasmtime-flags = '-Wcomponent-model-async' + +include!(env!("BINDINGS")); + +use crate::my::test::i::*; +use wit_bindgen::StreamResult; + +struct Component; + +export!(Component); + +impl Guest for Component { + async fn run() { + let (mut tx, rx) = wit_stream::new(); + let test = async { + let (result, _ret) = tx.write(vec![10, 20, 30]).await; + assert_eq!(result, StreamResult::Complete(3)); + + // Drop the writer so the reader sees the end of the stream. + drop(tx); + }; + let ((), ()) = futures::join!(test, read_stream(rx)); + } +} diff --git a/tests/runtime-async/async/stream-to-futures-stream/test.rs b/tests/runtime-async/async/stream-to-futures-stream/test.rs new file mode 100644 index 000000000..9791ccfcf --- /dev/null +++ b/tests/runtime-async/async/stream-to-futures-stream/test.rs @@ -0,0 +1,29 @@ +use futures::stream::StreamExt; +use wit_bindgen::StreamReader; + +include!(env!("BINDINGS")); + +struct Component; + +export!(Component); + +impl crate::exports::my::test::i::Guest for Component { + async fn read_stream(x: StreamReader) { + // Convert the low-level StreamReader into a futures::Stream + let mut stream = x.into_stream(); + + // Read all items via StreamExt::next() + let first = stream.next().await; + assert_eq!(first, Some(10)); + + let second = stream.next().await; + assert_eq!(second, Some(20)); + + let third = stream.next().await; + assert_eq!(third, Some(30)); + + // Stream should be exhausted after the writer is dropped + let end = stream.next().await; + assert_eq!(end, None); + } +} diff --git a/tests/runtime-async/async/stream-to-futures-stream/test.wit b/tests/runtime-async/async/stream-to-futures-stream/test.wit new file mode 100644 index 000000000..b776823fd --- /dev/null +++ b/tests/runtime-async/async/stream-to-futures-stream/test.wit @@ -0,0 +1,15 @@ +package my:test; + +interface i { + read-stream: async func(x: stream); +} + +world test { + export i; +} + +world runner { + import i; + + export run: async func(); +}