diff --git a/src/uu/tee/Cargo.toml b/src/uu/tee/Cargo.toml index e52fe7bc022..b6cbc702933 100644 --- a/src/uu/tee/Cargo.toml +++ b/src/uu/tee/Cargo.toml @@ -30,6 +30,10 @@ rustix = { workspace = true, features = ["stdio", "fs"] } tempfile = { workspace = true } uucore = { workspace = true, features = ["benchmark"] } +[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] +rustix = { workspace = true, features = ["pipe"] } +uucore = { workspace = true, features = ["pipes"] } + [[bin]] name = "tee" path = "src/main.rs" diff --git a/src/uu/tee/src/tee.rs b/src/uu/tee/src/tee.rs index f4a5876ee4f..817e26bf981 100644 --- a/src/uu/tee/src/tee.rs +++ b/src/uu/tee/src/tee.rs @@ -22,6 +22,11 @@ use uucore::signals::ensure_stdout_not_broken; #[cfg(unix)] use uucore::signals::{disable_pipe_errors, ignore_interrupts}; +#[cfg(any(target_os = "linux", target_os = "android"))] +use rustix::fd::AsFd; +#[cfg(any(target_os = "linux", target_os = "android"))] +use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact}; + #[uucore::main] pub fn uumain(args: impl uucore::Args) -> UResult<()> { let matches = uucore::clap_localization::handle_clap_result(uu_app(), args)?; @@ -150,17 +155,86 @@ struct MultiWriter { impl MultiWriter { /// Copies all bytes from the input buffer to the output buffer /// without buffering which is POSIX requirement. - pub fn copy_unbuffered(&mut self, mut input: R) -> Result<()> { - // todo: support splice() and tee() fast-path at here + pub fn copy_unbuffered(&mut self, mut input: NamedReader) -> Result<()> { + #[cfg(any(target_os = "linux", target_os = "android"))] + macro_rules! splice_or_fallback { + ($pipe:expr, $writer:expr, $len:expr) => { + let fd = $writer.inner.as_fd(); + if splice_exact($pipe, &fd, $len).is_err() { + debug_assert!($len <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); + let mut drain = Vec::with_capacity($len); + let mut reader = ($pipe).take($len as u64); + let res = (|| { + reader.read_to_end(&mut drain)?; + $writer.inner.write_all(&drain)?; + $writer.inner.flush() + })(); + if let Err(e) = res { + if let Err(e) = process_error( + self.output_error_mode, + e, + $writer, + &mut self.ignored_errors, + ) { + self.aborted.get_or_insert(e); + } + $writer.name.clear(); //mark as exited + } + } + }; + } + #[cfg(any(target_os = "linux", target_os = "android"))] + { + let (pipe_read, pipe_write) = pipe()?; // needed to duplicate input + let (pipe2_read, pipe2_write) = pipe()?; // force-tee() even output is not pipe + let input = input.inner.as_fd(); + // improve throughput + let _ = rustix::pipe::fcntl_setpipe_size( + self.writers[0].inner.as_fd(), + MAX_ROOTLESS_PIPE_SIZE, + ); + 'splice: loop { + match splice(&input, &pipe_write, MAX_ROOTLESS_PIPE_SIZE) { + Ok(0) => return Ok(()), + Err(_) => break 'splice, + Ok(s) => { + if let Some((last, others)) = self.writers.split_last_mut() { + for other in others { + // do not consume input + assert_eq!( + uucore::pipes::tee(&pipe_read, &pipe2_write, s), + Ok(s), + "tee() between internal pipes should not be blocked" + ); + splice_or_fallback!(&pipe2_read, other, s); + } + // last one consumes input + splice_or_fallback!(&pipe_read, last, s); + } else { + // all writers exited + return Err(Error::from(ErrorKind::Other)); + } + } + } + self.writers.retain(|w| !w.name.is_empty()); + if let Some(e) = self.aborted.take() { + return Err(e); + } + } + } // The implementation for this function is adopted from the generic buffer copy implementation from // the standard library: // https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/io/copy.rs#L271-L297 // Use buffer size from std implementation // https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/sys/io/mod.rs#L44 + #[allow(clippy::items_after_statements)] const BUF_SIZE: usize = 8 * 1024; + #[cfg(not(any(target_os = "linux", target_os = "android")))] let mut buffer = [0u8; BUF_SIZE]; - // fast-path for small input. needs 2+ read to catch end of file + // fast-path for small input on the platform missing splice + // needs 2+ read to catch end of file + #[cfg(not(any(target_os = "linux", target_os = "android")))] for _ in 0..2 { match input.read(&mut buffer) { Ok(0) => return Ok(()), // end of file @@ -290,3 +364,13 @@ impl Read for NamedReader { }) } } + +#[cfg(any(target_os = "linux", target_os = "android"))] +impl AsFd for Writer { + fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> { + match self { + Self::File(f) => f.as_fd(), + Self::Stdout(s) => s.as_fd(), + } + } +}