Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ jobs:
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async-spawn
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async,macros
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features inter-task-wakeup
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features futures-stream

# Verity that documentation can be generated for the rust bindings crate.
- run: rustup update nightly --no-self-update
Expand Down
1 change: 1 addition & 0 deletions crates/guest-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ std = []
async = ["std", "wit-bindgen-rust-macro?/async"]
bitflags = ["dep:bitflags"]
async-spawn = ['async', 'dep:futures']
futures-stream = ['async', 'dep:futures']
macro-string = ["wit-bindgen-rust-macro?/macro-string"]

# Unstable feature to support being a libstd dependency
Expand Down
4 changes: 4 additions & 0 deletions crates/guest-rust/src/rt/async_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ mod abi_buffer;
mod cabi;
mod error_context;
mod future_support;
#[cfg(feature = "futures-stream")]
mod futures_stream;
#[cfg(feature = "inter-task-wakeup")]
mod inter_task_wakeup;
mod stream_support;
Expand All @@ -79,6 +81,8 @@ use self::waitable_set::WaitableSet;
pub use abi_buffer::*;
pub use error_context::*;
pub use future_support::*;
#[cfg(feature = "futures-stream")]
pub use futures_stream::*;
pub use stream_support::*;
#[doc(hidden)]
pub use subtask::Subtask;
Expand Down
100 changes: 100 additions & 0 deletions crates/guest-rust/src/rt/async_support/futures_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::stream_support::{RawStreamReader, StreamOps, StreamVtable};
use std::boxed::Box;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`].
///
/// Obtain one via [`RawStreamReader::into_stream`] or
/// [`RawStreamReaderStream::new`].
pub struct RawStreamReaderStream<O: StreamOps + 'static> {
state: StreamAdapterState<O>,
}

// SAFETY: No field is structurally pinned. The inner `Pin<Box<dyn Future>>`
// is itself `Unpin`, and `RawStreamReader` is only stored when idle.
impl<O: StreamOps + 'static> Unpin for RawStreamReaderStream<O> {}

/// Convenience alias for the common vtable-based case.
pub type StreamReaderStream<T> = RawStreamReaderStream<&'static StreamVtable<T>>;

type ReadNextFut<O> =
Pin<Box<dyn Future<Output = (RawStreamReader<O>, Option<<O as StreamOps>::Payload>)>>>;

enum StreamAdapterState<O: StreamOps + 'static> {
/// The reader is idle and ready for the next read.
Idle(RawStreamReader<O>),
/// A read is in progress.
Reading(ReadNextFut<O>),
/// The stream has been exhausted.
Complete,
}

impl<O: StreamOps + 'static> RawStreamReaderStream<O> {
/// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`].
pub fn new(reader: RawStreamReader<O>) -> Self {
Self {
state: StreamAdapterState::Idle(reader),
}
}

/// Recover the underlying [`RawStreamReader`], if no read is in flight.
///
/// Returns `None` when a read is currently in progress or the stream has
/// already finished.
pub fn into_inner(self) -> Option<RawStreamReader<O>> {
match self.state {
StreamAdapterState::Idle(reader) => Some(reader),
_ => None,
}
}
}

impl<O: StreamOps + 'static> futures::stream::Stream for RawStreamReaderStream<O> {
type Item = O::Payload;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>`
// can be freely projected.
loop {
match std::mem::replace(&mut self.state, StreamAdapterState::Complete) {
StreamAdapterState::Idle(mut reader) => {
let fut: ReadNextFut<O> = Box::pin(async move {
let item = reader.next().await;
(reader, item)
});
self.state = StreamAdapterState::Reading(fut);
// Loop to immediately poll the new future.
}
StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) {
Poll::Pending => {
self.state = StreamAdapterState::Reading(fut);
return Poll::Pending;
}
Poll::Ready((reader, Some(item))) => {
self.state = StreamAdapterState::Idle(reader);
return Poll::Ready(Some(item));
}
Poll::Ready((_reader, None)) => {
self.state = StreamAdapterState::Complete;
return Poll::Ready(None);
}
},
StreamAdapterState::Complete => {
self.state = StreamAdapterState::Complete;
return Poll::Ready(None);
}
}
}
}
}

impl<O: StreamOps + 'static> RawStreamReader<O> {
/// Convert this reader into a [`futures::Stream`].
pub fn into_stream(self) -> RawStreamReaderStream<O> {
RawStreamReaderStream::new(self)
}
}
2 changes: 1 addition & 1 deletion crates/test/src/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ name = "tmp"
[workspace]

[dependencies]
wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup'] }}
wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup', 'futures-stream'] }}
futures = "0.3.31"

[lib]
Expand Down
24 changes: 24 additions & 0 deletions tests/runtime-async/async/stream-to-futures-stream/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//@ wasmtime-flags = '-Wcomponent-model-async'

include!(env!("BINDINGS"));

use crate::my::test::i::*;
use wit_bindgen::StreamResult;

struct Component;

export!(Component);

impl Guest for Component {
async fn run() {
let (mut tx, rx) = wit_stream::new();
let test = async {
let (result, _ret) = tx.write(vec![10, 20, 30]).await;
assert_eq!(result, StreamResult::Complete(3));

// Drop the writer so the reader sees the end of the stream.
drop(tx);
};
let ((), ()) = futures::join!(test, read_stream(rx));
}
}
29 changes: 29 additions & 0 deletions tests/runtime-async/async/stream-to-futures-stream/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use futures::stream::StreamExt;
use wit_bindgen::StreamReader;

include!(env!("BINDINGS"));

struct Component;

export!(Component);

impl crate::exports::my::test::i::Guest for Component {
async fn read_stream(x: StreamReader<u8>) {
// Convert the low-level StreamReader into a futures::Stream
let mut stream = x.into_stream();

// Read all items via StreamExt::next()
let first = stream.next().await;
assert_eq!(first, Some(10));

let second = stream.next().await;
assert_eq!(second, Some(20));

let third = stream.next().await;
assert_eq!(third, Some(30));

// Stream should be exhausted after the writer is dropped
let end = stream.next().await;
assert_eq!(end, None);
}
}
15 changes: 15 additions & 0 deletions tests/runtime-async/async/stream-to-futures-stream/test.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package my:test;

interface i {
read-stream: async func(x: stream<u8>);
}

world test {
export i;
}

world runner {
import i;

export run: async func();
}
Loading