The combinators that poll a stream for exhaustion in a loop have a problem that's already been raised in #869: if the upstream consecutively returns Ready for a long time, the loop never breaks and the combinator's poll never returns for that long, starving other pending operations in the task from being polled.
To illustrate how this can be a problem for other code, consider this simple adapter for making futures cancellable:
use futures::channel::oneshot::{self, Canceled};
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::ready;
use pin_utils::unsafe_pinned;
struct CancelHandle(oneshot::Sender<()>);
#[derive(Debug)]
struct AlreadyDropped;
impl CancelHandle {
fn cancel(self) -> Result<(), AlreadyDropped> {
self.0.send(()).map_err(|()| AlreadyDropped)
}
}
struct Cancelable<F> {
op: F,
stop_rx: oneshot::Receiver<()>,
}
impl<F> Cancelable<F> {
unsafe_pinned!(op: F);
unsafe_pinned!(stop_rx: oneshot::Receiver<()>);
}
impl<F: Unpin> Unpin for Cancelable<F> {}
impl<F> Future for Cancelable<F>
where
F: Future,
{
type Output = Result<F::Output, Canceled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().stop_rx().poll(cx) {
Poll::Pending => {
let output = ready!(self.as_mut().op().poll(cx));
Ok(output).into()
}
Poll::Ready(_res) => Err(Canceled).into(),
}
}
}
fn make_cancelable<F>(op: F) -> (Cancelable<F>, CancelHandle) {
let (stop_tx, stop_rx) = oneshot::channel();
let fut = Cancelable { op, stop_rx };
let handle = CancelHandle(stop_tx);
(fut, handle)
}
It looks rather useful and intuitive, but this contrived example hangs with a busy-looping thread rather than canceling the task:
fn main() {
let mut a = 0;
let (fut, stop_handle) = make_cancelable(
stream::repeat(1).for_each(move |n| {
a += n;
future::ready(())
})
);
let mut executor = ThreadPool::new().unwrap();
let res_handle = executor.spawn_with_handle(fut).unwrap();
thread::sleep(Duration::from_millis(1));
stop_handle.cancel().unwrap();
let res = executor.run(res_handle);
assert!(res.is_err());
}
In non-contrived usage with real streams, too, a ForEach with an always-ready processing closure will delay cancellation for as long as the stream yields items.
The combinators that poll a stream for exhaustion in a loop have a problem that's already been raised in #869: if the upstream consecutively returns
Readyfor a long time, the loop never breaks and the combinator'spollnever returns for that long, starving other pending operations in the task from being polled.To illustrate how this can be a problem for other code, consider this simple adapter for making futures cancellable:
It looks rather useful and intuitive, but this contrived example hangs with a busy-looping thread rather than canceling the task:
In non-contrived usage with real streams, too, a
ForEachwith an always-ready processing closure will delay cancellation for as long as the stream yields items.