Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 90 additions & 38 deletions src/common/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,53 +85,105 @@ where
}
}

/*
#[cfg(test)]
mod tests {
use super::Rewind;
use bytes::Bytes;
use tokio::io::AsyncReadExt;

#[cfg(not(miri))]
#[tokio::test]
async fn partial_rewind() {
let underlying = [104, 101, 108, 108, 111];

let mock = tokio_test::io::Builder::new().read(&underlying).build();

let mut stream = Rewind::new(mock);

// Read off some bytes, ensure we filled o1
let mut buf = [0; 2];
stream.read_exact(&mut buf).await.expect("read1");

// Rewind the stream so that it is as if we never read in the first place.
stream.rewind(Bytes::copy_from_slice(&buf[..]));

let mut buf = [0; 5];
stream.read_exact(&mut buf).await.expect("read1");

// At this point we should have read everything that was in the MockStream
assert_eq!(&buf, &underlying);
use std::cmp;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use hyper::rt::{Read, ReadBuf, ReadBufCursor};

struct MockIo(&'static [u8]);

impl Read for MockIo {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
mut buf: ReadBufCursor<'_>,
) -> Poll<io::Result<()>> {
let len = cmp::min(self.0.len(), buf.remaining());
buf.put_slice(&self.0[..len]);
self.0 = &self.0[len..];
Poll::Ready(Ok(()))
}
}

#[cfg(not(miri))]
#[tokio::test]
async fn full_rewind() {
let underlying = [104, 101, 108, 108, 111];

let mock = tokio_test::io::Builder::new().read(&underlying).build();
fn noop_cx() -> Context<'static> {
static VTABLE: RawWakerVTable = RawWakerVTable::new(
|_| RawWaker::new(std::ptr::null(), &VTABLE),
|_| {},
|_| {},
|_| {},
);
let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) };
Context::from_waker(&waker)
}

let mut stream = Rewind::new(mock);
#[test]
fn read_full_from_prebuf() {
let mut rewind = Rewind {
pre: Some(Bytes::from_static(b"hello")),
inner: MockIo(b"world"),
};

let mut buf = [0u8; 5];
let mut read_buf = ReadBuf::new(&mut buf);
let res = Pin::new(&mut rewind).poll_read(&mut noop_cx(), read_buf.unfilled());
assert!(res.is_ready());
assert_eq!(read_buf.filled(), b"hello");
assert!(rewind.pre.is_none());
}

let mut buf = [0; 5];
stream.read_exact(&mut buf).await.expect("read1");
#[test]
fn read_partial_from_prebuf_leaves_remainder() {
let mut rewind = Rewind {
pre: Some(Bytes::from_static(b"hello")),
inner: MockIo(b"world"),
};

let mut buf = [0u8; 3];
let mut read_buf = ReadBuf::new(&mut buf);
let res = Pin::new(&mut rewind).poll_read(&mut noop_cx(), read_buf.unfilled());
assert!(res.is_ready());
assert_eq!(read_buf.filled(), b"hel");
assert_eq!(rewind.pre.as_ref().map(|b| &b[..]), Some(&b"lo"[..]));
}

// Rewind the stream so that it is as if we never read in the first place.
stream.rewind(Bytes::copy_from_slice(&buf[..]));
#[test]
fn read_exhausts_prebuf_then_reads_inner() {
let mut rewind = Rewind {
pre: Some(Bytes::from_static(b"hi")),
inner: MockIo(b"world"),
};

let mut buf = [0u8; 2];
let mut read_buf = ReadBuf::new(&mut buf);
let res = Pin::new(&mut rewind).poll_read(&mut noop_cx(), read_buf.unfilled());
assert!(res.is_ready());
assert_eq!(read_buf.filled(), b"hi");
assert!(rewind.pre.is_none());

let mut buf = [0u8; 5];
let mut read_buf = ReadBuf::new(&mut buf);
let res = Pin::new(&mut rewind).poll_read(&mut noop_cx(), read_buf.unfilled());
assert!(res.is_ready());
assert_eq!(read_buf.filled(), b"world");
}

let mut buf = [0; 5];
stream.read_exact(&mut buf).await.expect("read1");
#[test]
fn read_with_empty_prebuf_uses_inner() {
let mut rewind = Rewind {
pre: Some(Bytes::from_static(b"")),
inner: MockIo(b"world"),
};

let mut buf = [0u8; 5];
let mut read_buf = ReadBuf::new(&mut buf);
let res = Pin::new(&mut rewind).poll_read(&mut noop_cx(), read_buf.unfilled());
assert!(res.is_ready());
assert_eq!(read_buf.filled(), b"world");
assert!(rewind.pre.is_none());
}
}
*/