Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/uu/tee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ rustix = { workspace = true, features = ["stdio", "fs"] }
tempfile = { workspace = true }
uucore = { workspace = true, features = ["benchmark"] }

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
rustix = { workspace = true, features = ["pipe"] }
uucore = { workspace = true, features = ["pipes"] }

[[bin]]
name = "tee"
path = "src/main.rs"
Expand Down
90 changes: 87 additions & 3 deletions src/uu/tee/src/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ use uucore::signals::ensure_stdout_not_broken;
#[cfg(unix)]
use uucore::signals::{disable_pipe_errors, ignore_interrupts};

#[cfg(any(target_os = "linux", target_os = "android"))]
use rustix::fd::AsFd;
#[cfg(any(target_os = "linux", target_os = "android"))]
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact};

#[uucore::main]
pub fn uumain(args: impl uucore::Args) -> UResult<()> {
let matches = uucore::clap_localization::handle_clap_result(uu_app(), args)?;
Expand Down Expand Up @@ -150,17 +155,86 @@ struct MultiWriter {
impl MultiWriter {
/// Copies all bytes from the input buffer to the output buffer
/// without buffering which is POSIX requirement.
pub fn copy_unbuffered<R: Read>(&mut self, mut input: R) -> Result<()> {
// todo: support splice() and tee() fast-path at here
pub fn copy_unbuffered(&mut self, mut input: NamedReader) -> Result<()> {
#[cfg(any(target_os = "linux", target_os = "android"))]
macro_rules! splice_or_fallback {
($pipe:expr, $writer:expr, $len:expr) => {
let fd = $writer.inner.as_fd();
if splice_exact($pipe, &fd, $len).is_err() {
debug_assert!($len <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
let mut drain = Vec::with_capacity($len);
let mut reader = ($pipe).take($len as u64);
let res = (|| {
reader.read_to_end(&mut drain)?;
$writer.inner.write_all(&drain)?;
$writer.inner.flush()
})();
if let Err(e) = res {
if let Err(e) = process_error(
self.output_error_mode,
e,
$writer,
&mut self.ignored_errors,
) {
self.aborted.get_or_insert(e);
}
$writer.name.clear(); //mark as exited
}
}
};
}
#[cfg(any(target_os = "linux", target_os = "android"))]
{
let (pipe_read, pipe_write) = pipe()?; // needed to duplicate input
let (pipe2_read, pipe2_write) = pipe()?; // force-tee() even output is not pipe
let input = input.inner.as_fd();
// improve throughput
let _ = rustix::pipe::fcntl_setpipe_size(
self.writers[0].inner.as_fd(),
MAX_ROOTLESS_PIPE_SIZE,
);
'splice: loop {
match splice(&input, &pipe_write, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(()),
Err(_) => break 'splice,
Ok(s) => {
if let Some((last, others)) = self.writers.split_last_mut() {
for other in others {
// do not consume input
assert_eq!(
uucore::pipes::tee(&pipe_read, &pipe2_write, s),
Ok(s),
"tee() between internal pipes should not be blocked"
);
splice_or_fallback!(&pipe2_read, other, s);
}
// last one consumes input
splice_or_fallback!(&pipe_read, last, s);
} else {
// all writers exited
return Err(Error::from(ErrorKind::Other));
}
}
}
self.writers.retain(|w| !w.name.is_empty());
if let Some(e) = self.aborted.take() {
return Err(e);
}
}
}
// The implementation for this function is adopted from the generic buffer copy implementation from
// the standard library:
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/io/copy.rs#L271-L297

// Use buffer size from std implementation
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/sys/io/mod.rs#L44
#[allow(clippy::items_after_statements)]
const BUF_SIZE: usize = 8 * 1024;
#[cfg(not(any(target_os = "linux", target_os = "android")))]
let mut buffer = [0u8; BUF_SIZE];
// fast-path for small input. needs 2+ read to catch end of file
// fast-path for small input on the platform missing splice
// needs 2+ read to catch end of file
#[cfg(not(any(target_os = "linux", target_os = "android")))]
for _ in 0..2 {
match input.read(&mut buffer) {
Ok(0) => return Ok(()), // end of file
Expand Down Expand Up @@ -290,3 +364,13 @@ impl Read for NamedReader {
})
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
impl AsFd for Writer {
fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> {
match self {
Self::File(f) => f.as_fd(),
Self::Stdout(s) => s.as_fd(),
}
}
}
Loading