Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/guest-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ std = []
async = ["std", "wit-bindgen-rust-macro?/async"]
bitflags = ["dep:bitflags"]
async-spawn = ['async', 'dep:futures']
futures-stream = ['async', 'dep:futures']
macro-string = ["wit-bindgen-rust-macro?/macro-string"]

# Unstable feature to support being a libstd dependency
Expand Down
103 changes: 103 additions & 0 deletions crates/guest-rust/src/rt/async_support/stream_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
use crate::rt::async_support::{AbiBuffer, DROPPED, ReturnCode};
#[cfg(feature = "futures-stream")]
use std::boxed::Box;
use {
crate::rt::Cleanup,
std::{
Expand Down Expand Up @@ -742,3 +744,104 @@ where
self.pin_project().cancel()
}
}

/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`].
///
/// Obtain one via [`RawStreamReader::into_stream`] or
/// [`RawStreamReaderStream::new`].
#[cfg(feature = "futures-stream")]
pub struct RawStreamReaderStream<O: StreamOps + 'static> {
state: StreamAdapterState<O>,
}

// SAFETY: No field is structurally pinned. The inner `Pin<Box<dyn Future>>`
// is itself `Unpin`, and `RawStreamReader` is only stored when idle.
#[cfg(feature = "futures-stream")]
impl<O: StreamOps + 'static> Unpin for RawStreamReaderStream<O> {}

/// Convenience alias for the common vtable-based case.
#[cfg(feature = "futures-stream")]
pub type StreamReaderStream<T> = RawStreamReaderStream<&'static StreamVtable<T>>;

#[cfg(feature = "futures-stream")]
type ReadNextFut<O> =
Pin<Box<dyn Future<Output = (RawStreamReader<O>, Option<<O as StreamOps>::Payload>)>>>;

#[cfg(feature = "futures-stream")]
enum StreamAdapterState<O: StreamOps + 'static> {
/// The reader is idle and ready for the next read.
Idle(RawStreamReader<O>),
/// A read is in progress.
Reading(ReadNextFut<O>),
/// The stream has been exhausted.
Complete,
}

#[cfg(feature = "futures-stream")]
impl<O: StreamOps + 'static> RawStreamReaderStream<O> {
/// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`].
pub fn new(reader: RawStreamReader<O>) -> Self {
Self {
state: StreamAdapterState::Idle(reader),
}
}

/// Recover the underlying [`RawStreamReader`], if no read is in flight.
///
/// Returns `None` when a read is currently in progress or the stream has
/// already finished.
pub fn into_inner(self) -> Option<RawStreamReader<O>> {
match self.state {
StreamAdapterState::Idle(reader) => Some(reader),
_ => None,
}
}
}

#[cfg(feature = "futures-stream")]
impl<O: StreamOps + 'static> futures::stream::Stream for RawStreamReaderStream<O> {
type Item = O::Payload;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>`
// can be freely projected.
loop {
match std::mem::replace(&mut self.state, StreamAdapterState::Complete) {
StreamAdapterState::Idle(mut reader) => {
let fut: ReadNextFut<O> = Box::pin(async move {
let item = reader.next().await;
(reader, item)
});
self.state = StreamAdapterState::Reading(fut);
// Loop to immediately poll the new future.
}
StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) {
Poll::Pending => {
self.state = StreamAdapterState::Reading(fut);
return Poll::Pending;
}
Poll::Ready((reader, Some(item))) => {
self.state = StreamAdapterState::Idle(reader);
return Poll::Ready(Some(item));
}
Poll::Ready((_reader, None)) => {
self.state = StreamAdapterState::Complete;
return Poll::Ready(None);
}
},
StreamAdapterState::Complete => {
self.state = StreamAdapterState::Complete;
return Poll::Ready(None);
}
}
}
}
}

#[cfg(feature = "futures-stream")]
impl<O: StreamOps + 'static> RawStreamReader<O> {
/// Convert this reader into a [`futures::Stream`].
pub fn into_stream(self) -> RawStreamReaderStream<O> {
RawStreamReaderStream::new(self)
}
}
Loading