From 7b7910055a9a765db1d2c903aecb91f51e22ab63 Mon Sep 17 00:00:00 2001 From: Brian Hardock Date: Tue, 31 Mar 2026 11:02:35 -0600 Subject: [PATCH 1/3] Adapter to impl futures::Stream Signed-off-by: Brian Hardock --- crates/guest-rust/Cargo.toml | 1 + .../src/rt/async_support/stream_support.rs | 103 ++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/crates/guest-rust/Cargo.toml b/crates/guest-rust/Cargo.toml index c38a70e63..d7b7f80d1 100644 --- a/crates/guest-rust/Cargo.toml +++ b/crates/guest-rust/Cargo.toml @@ -35,6 +35,7 @@ std = [] async = ["std", "wit-bindgen-rust-macro?/async"] bitflags = ["dep:bitflags"] async-spawn = ['async', 'dep:futures'] +futures-stream = ['async', 'dep:futures'] macro-string = ["wit-bindgen-rust-macro?/macro-string"] # Unstable feature to support being a libstd dependency diff --git a/crates/guest-rust/src/rt/async_support/stream_support.rs b/crates/guest-rust/src/rt/async_support/stream_support.rs index 6e4771827..37993afc1 100644 --- a/crates/guest-rust/src/rt/async_support/stream_support.rs +++ b/crates/guest-rust/src/rt/async_support/stream_support.rs @@ -3,6 +3,8 @@ use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation}; use crate::rt::async_support::{AbiBuffer, DROPPED, ReturnCode}; +#[cfg(feature = "futures-stream")] +use std::boxed::Box; use { crate::rt::Cleanup, std::{ @@ -742,3 +744,104 @@ where self.pin_project().cancel() } } + +/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`]. +/// +/// Obtain one via [`RawStreamReader::into_stream`] or +/// [`RawStreamReaderStream::new`]. +#[cfg(feature = "futures-stream")] +pub struct RawStreamReaderStream { + state: StreamAdapterState, +} + +// SAFETY: No field is structurally pinned. The inner `Pin>` +// is itself `Unpin`, and `RawStreamReader` is only stored when idle. +#[cfg(feature = "futures-stream")] +impl Unpin for RawStreamReaderStream {} + +/// Convenience alias for the common vtable-based case. +#[cfg(feature = "futures-stream")] +pub type StreamReaderStream = RawStreamReaderStream<&'static StreamVtable>; + +#[cfg(feature = "futures-stream")] +type ReadNextFut = + Pin, Option<::Payload>)>>>; + +#[cfg(feature = "futures-stream")] +enum StreamAdapterState { + /// The reader is idle and ready for the next read. + Idle(RawStreamReader), + /// A read is in progress. + Reading(ReadNextFut), + /// The stream has been exhausted. + Complete, +} + +#[cfg(feature = "futures-stream")] +impl RawStreamReaderStream { + /// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`]. + pub fn new(reader: RawStreamReader) -> Self { + Self { + state: StreamAdapterState::Idle(reader), + } + } + + /// Recover the underlying [`RawStreamReader`], if no read is in flight. + /// + /// Returns `None` when a read is currently in progress or the stream has + /// already finished. + pub fn into_inner(self) -> Option> { + match self.state { + StreamAdapterState::Idle(reader) => Some(reader), + _ => None, + } + } +} + +#[cfg(feature = "futures-stream")] +impl futures::stream::Stream for RawStreamReaderStream { + type Item = O::Payload; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>` + // can be freely projected. + loop { + match std::mem::replace(&mut self.state, StreamAdapterState::Complete) { + StreamAdapterState::Idle(mut reader) => { + let fut: ReadNextFut = Box::pin(async move { + let item = reader.next().await; + (reader, item) + }); + self.state = StreamAdapterState::Reading(fut); + // Loop to immediately poll the new future. + } + StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) { + Poll::Pending => { + self.state = StreamAdapterState::Reading(fut); + return Poll::Pending; + } + Poll::Ready((reader, Some(item))) => { + self.state = StreamAdapterState::Idle(reader); + return Poll::Ready(Some(item)); + } + Poll::Ready((_reader, None)) => { + self.state = StreamAdapterState::Complete; + return Poll::Ready(None); + } + }, + StreamAdapterState::Complete => { + self.state = StreamAdapterState::Complete; + return Poll::Ready(None); + } + } + } + } +} + +#[cfg(feature = "futures-stream")] +impl RawStreamReader { + /// Convert this reader into a [`futures::Stream`]. + pub fn into_stream(self) -> RawStreamReaderStream { + RawStreamReaderStream::new(self) + } +} From b49cc7b1af1e64e86f4c82727d010ef3e771e9b6 Mon Sep 17 00:00:00 2001 From: Brian Hardock Date: Tue, 31 Mar 2026 14:43:12 -0600 Subject: [PATCH 2/3] Move to futures_stream mod Signed-off-by: Brian Hardock --- crates/guest-rust/src/rt/async_support.rs | 4 + .../src/rt/async_support/futures_stream.rs | 100 +++++++++++++++++ .../src/rt/async_support/stream_support.rs | 103 ------------------ 3 files changed, 104 insertions(+), 103 deletions(-) create mode 100644 crates/guest-rust/src/rt/async_support/futures_stream.rs diff --git a/crates/guest-rust/src/rt/async_support.rs b/crates/guest-rust/src/rt/async_support.rs index 080919a8c..3d5a3f68a 100644 --- a/crates/guest-rust/src/rt/async_support.rs +++ b/crates/guest-rust/src/rt/async_support.rs @@ -61,6 +61,8 @@ mod abi_buffer; mod cabi; mod error_context; mod future_support; +#[cfg(feature = "futures-stream")] +mod futures_stream; #[cfg(feature = "inter-task-wakeup")] mod inter_task_wakeup; mod stream_support; @@ -79,6 +81,8 @@ use self::waitable_set::WaitableSet; pub use abi_buffer::*; pub use error_context::*; pub use future_support::*; +#[cfg(feature = "futures-stream")] +pub use futures_stream::*; pub use stream_support::*; #[doc(hidden)] pub use subtask::Subtask; diff --git a/crates/guest-rust/src/rt/async_support/futures_stream.rs b/crates/guest-rust/src/rt/async_support/futures_stream.rs new file mode 100644 index 000000000..893df8294 --- /dev/null +++ b/crates/guest-rust/src/rt/async_support/futures_stream.rs @@ -0,0 +1,100 @@ +use super::stream_support::{RawStreamReader, StreamOps, StreamVtable}; +use std::boxed::Box; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`]. +/// +/// Obtain one via [`RawStreamReader::into_stream`] or +/// [`RawStreamReaderStream::new`]. +pub struct RawStreamReaderStream { + state: StreamAdapterState, +} + +// SAFETY: No field is structurally pinned. The inner `Pin>` +// is itself `Unpin`, and `RawStreamReader` is only stored when idle. +impl Unpin for RawStreamReaderStream {} + +/// Convenience alias for the common vtable-based case. +pub type StreamReaderStream = RawStreamReaderStream<&'static StreamVtable>; + +type ReadNextFut = + Pin, Option<::Payload>)>>>; + +enum StreamAdapterState { + /// The reader is idle and ready for the next read. + Idle(RawStreamReader), + /// A read is in progress. + Reading(ReadNextFut), + /// The stream has been exhausted. + Complete, +} + +impl RawStreamReaderStream { + /// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`]. + pub fn new(reader: RawStreamReader) -> Self { + Self { + state: StreamAdapterState::Idle(reader), + } + } + + /// Recover the underlying [`RawStreamReader`], if no read is in flight. + /// + /// Returns `None` when a read is currently in progress or the stream has + /// already finished. + pub fn into_inner(self) -> Option> { + match self.state { + StreamAdapterState::Idle(reader) => Some(reader), + _ => None, + } + } +} + +impl futures::stream::Stream for RawStreamReaderStream { + type Item = O::Payload; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>` + // can be freely projected. + loop { + match std::mem::replace(&mut self.state, StreamAdapterState::Complete) { + StreamAdapterState::Idle(mut reader) => { + let fut: ReadNextFut = Box::pin(async move { + let item = reader.next().await; + (reader, item) + }); + self.state = StreamAdapterState::Reading(fut); + // Loop to immediately poll the new future. + } + StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) { + Poll::Pending => { + self.state = StreamAdapterState::Reading(fut); + return Poll::Pending; + } + Poll::Ready((reader, Some(item))) => { + self.state = StreamAdapterState::Idle(reader); + return Poll::Ready(Some(item)); + } + Poll::Ready((_reader, None)) => { + self.state = StreamAdapterState::Complete; + return Poll::Ready(None); + } + }, + StreamAdapterState::Complete => { + self.state = StreamAdapterState::Complete; + return Poll::Ready(None); + } + } + } + } +} + +impl RawStreamReader { + /// Convert this reader into a [`futures::Stream`]. + pub fn into_stream(self) -> RawStreamReaderStream { + RawStreamReaderStream::new(self) + } +} diff --git a/crates/guest-rust/src/rt/async_support/stream_support.rs b/crates/guest-rust/src/rt/async_support/stream_support.rs index 37993afc1..6e4771827 100644 --- a/crates/guest-rust/src/rt/async_support/stream_support.rs +++ b/crates/guest-rust/src/rt/async_support/stream_support.rs @@ -3,8 +3,6 @@ use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation}; use crate::rt::async_support::{AbiBuffer, DROPPED, ReturnCode}; -#[cfg(feature = "futures-stream")] -use std::boxed::Box; use { crate::rt::Cleanup, std::{ @@ -744,104 +742,3 @@ where self.pin_project().cancel() } } - -/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`]. -/// -/// Obtain one via [`RawStreamReader::into_stream`] or -/// [`RawStreamReaderStream::new`]. -#[cfg(feature = "futures-stream")] -pub struct RawStreamReaderStream { - state: StreamAdapterState, -} - -// SAFETY: No field is structurally pinned. The inner `Pin>` -// is itself `Unpin`, and `RawStreamReader` is only stored when idle. -#[cfg(feature = "futures-stream")] -impl Unpin for RawStreamReaderStream {} - -/// Convenience alias for the common vtable-based case. -#[cfg(feature = "futures-stream")] -pub type StreamReaderStream = RawStreamReaderStream<&'static StreamVtable>; - -#[cfg(feature = "futures-stream")] -type ReadNextFut = - Pin, Option<::Payload>)>>>; - -#[cfg(feature = "futures-stream")] -enum StreamAdapterState { - /// The reader is idle and ready for the next read. - Idle(RawStreamReader), - /// A read is in progress. - Reading(ReadNextFut), - /// The stream has been exhausted. - Complete, -} - -#[cfg(feature = "futures-stream")] -impl RawStreamReaderStream { - /// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`]. - pub fn new(reader: RawStreamReader) -> Self { - Self { - state: StreamAdapterState::Idle(reader), - } - } - - /// Recover the underlying [`RawStreamReader`], if no read is in flight. - /// - /// Returns `None` when a read is currently in progress or the stream has - /// already finished. - pub fn into_inner(self) -> Option> { - match self.state { - StreamAdapterState::Idle(reader) => Some(reader), - _ => None, - } - } -} - -#[cfg(feature = "futures-stream")] -impl futures::stream::Stream for RawStreamReaderStream { - type Item = O::Payload; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>` - // can be freely projected. - loop { - match std::mem::replace(&mut self.state, StreamAdapterState::Complete) { - StreamAdapterState::Idle(mut reader) => { - let fut: ReadNextFut = Box::pin(async move { - let item = reader.next().await; - (reader, item) - }); - self.state = StreamAdapterState::Reading(fut); - // Loop to immediately poll the new future. - } - StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) { - Poll::Pending => { - self.state = StreamAdapterState::Reading(fut); - return Poll::Pending; - } - Poll::Ready((reader, Some(item))) => { - self.state = StreamAdapterState::Idle(reader); - return Poll::Ready(Some(item)); - } - Poll::Ready((_reader, None)) => { - self.state = StreamAdapterState::Complete; - return Poll::Ready(None); - } - }, - StreamAdapterState::Complete => { - self.state = StreamAdapterState::Complete; - return Poll::Ready(None); - } - } - } - } -} - -#[cfg(feature = "futures-stream")] -impl RawStreamReader { - /// Convert this reader into a [`futures::Stream`]. - pub fn into_stream(self) -> RawStreamReaderStream { - RawStreamReaderStream::new(self) - } -} From d7c55b8d844284d34bbfbadced5fbaf1c822aaa8 Mon Sep 17 00:00:00 2001 From: Brian Hardock Date: Tue, 31 Mar 2026 15:01:04 -0600 Subject: [PATCH 3/3] Add tests and build check Signed-off-by: Brian Hardock --- .github/workflows/main.yml | 1 + crates/test/src/rust.rs | 2 +- .../async/stream-to-futures-stream/runner.rs | 24 +++++++++++++++ .../async/stream-to-futures-stream/test.rs | 29 +++++++++++++++++++ .../async/stream-to-futures-stream/test.wit | 15 ++++++++++ 5 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 tests/runtime-async/async/stream-to-futures-stream/runner.rs create mode 100644 tests/runtime-async/async/stream-to-futures-stream/test.rs create mode 100644 tests/runtime-async/async/stream-to-futures-stream/test.wit diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 71828f1e3..ab31392a1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -226,6 +226,7 @@ jobs: - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async-spawn - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async,macros - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features inter-task-wakeup + - run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features futures-stream # Verity that documentation can be generated for the rust bindings crate. - run: rustup update nightly --no-self-update diff --git a/crates/test/src/rust.rs b/crates/test/src/rust.rs index a89447e4d..5b56e34f9 100644 --- a/crates/test/src/rust.rs +++ b/crates/test/src/rust.rs @@ -132,7 +132,7 @@ name = "tmp" [workspace] [dependencies] -wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup'] }} +wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup', 'futures-stream'] }} futures = "0.3.31" [lib] diff --git a/tests/runtime-async/async/stream-to-futures-stream/runner.rs b/tests/runtime-async/async/stream-to-futures-stream/runner.rs new file mode 100644 index 000000000..1ff63ff2a --- /dev/null +++ b/tests/runtime-async/async/stream-to-futures-stream/runner.rs @@ -0,0 +1,24 @@ +//@ wasmtime-flags = '-Wcomponent-model-async' + +include!(env!("BINDINGS")); + +use crate::my::test::i::*; +use wit_bindgen::StreamResult; + +struct Component; + +export!(Component); + +impl Guest for Component { + async fn run() { + let (mut tx, rx) = wit_stream::new(); + let test = async { + let (result, _ret) = tx.write(vec![10, 20, 30]).await; + assert_eq!(result, StreamResult::Complete(3)); + + // Drop the writer so the reader sees the end of the stream. + drop(tx); + }; + let ((), ()) = futures::join!(test, read_stream(rx)); + } +} diff --git a/tests/runtime-async/async/stream-to-futures-stream/test.rs b/tests/runtime-async/async/stream-to-futures-stream/test.rs new file mode 100644 index 000000000..9791ccfcf --- /dev/null +++ b/tests/runtime-async/async/stream-to-futures-stream/test.rs @@ -0,0 +1,29 @@ +use futures::stream::StreamExt; +use wit_bindgen::StreamReader; + +include!(env!("BINDINGS")); + +struct Component; + +export!(Component); + +impl crate::exports::my::test::i::Guest for Component { + async fn read_stream(x: StreamReader) { + // Convert the low-level StreamReader into a futures::Stream + let mut stream = x.into_stream(); + + // Read all items via StreamExt::next() + let first = stream.next().await; + assert_eq!(first, Some(10)); + + let second = stream.next().await; + assert_eq!(second, Some(20)); + + let third = stream.next().await; + assert_eq!(third, Some(30)); + + // Stream should be exhausted after the writer is dropped + let end = stream.next().await; + assert_eq!(end, None); + } +} diff --git a/tests/runtime-async/async/stream-to-futures-stream/test.wit b/tests/runtime-async/async/stream-to-futures-stream/test.wit new file mode 100644 index 000000000..b776823fd --- /dev/null +++ b/tests/runtime-async/async/stream-to-futures-stream/test.wit @@ -0,0 +1,15 @@ +package my:test; + +interface i { + read-stream: async func(x: stream); +} + +world test { + export i; +} + +world runner { + import i; + + export run: async func(); +}