Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bin/rustup-init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn main() -> Result<ExitCode> {
let process = Process::os();
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(process.io_thread_count()?)
.worker_threads(process.io_thread_count()?.into())
.build()
.unwrap();

Expand Down
30 changes: 28 additions & 2 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use std::{fmt::Debug, fs::OpenOptions};
use anyhow::Result;
use tracing::{error, trace, warn};

use crate::process::IoThreadCount;
use crate::utils::units::Size;

pub(crate) mod immediate;
Expand Down Expand Up @@ -445,15 +446,32 @@ pub(crate) fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// Get the executor for disk IO.
pub(crate) fn get_executor<'a>(
ram_budget: usize,
io_thread_count: usize,
thread_count: IoThreadCount,
) -> Box<dyn Executor + 'a> {
// If this gets lots of use, consider exposing via the config file.
match io_thread_count {
let threads = effective_thread_count(ram_budget, thread_count);
match threads {
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
n => Box::new(threaded::Threaded::new(n, ram_budget)),
}
}

fn effective_thread_count(ram_budget: usize, thread_count: IoThreadCount) -> usize {
match thread_count {
IoThreadCount::Default(n) if n > 1 && ram_budget < LOW_MEMORY_THRESHOLD => {
warn!(
"using single-threaded unpacking due to low memory \
(ram budget: {} < {} threshold), \
set RUSTUP_IO_THREADS to override",
Size::new(ram_budget),
Size::new(LOW_MEMORY_THRESHOLD)
);
1
}
IoThreadCount::Default(n) | IoThreadCount::UserSpecified(n) => n,
}
}

pub(crate) fn unpack_ram(io_chunk_size: usize, budget: Option<usize>) -> usize {
const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024;
let minimum_ram = io_chunk_size * 2;
Expand Down Expand Up @@ -505,3 +523,11 @@ pub(crate) fn unpack_ram(io_chunk_size: usize, budget: Option<usize>) -> usize {
}

static RAM_NOTICE_SHOWN: OnceLock<()> = OnceLock::new();

/// The Threaded executor uses substantially more memory than its ram_budget
/// accounts for (pool overhead, sharded_slab metadata, multiple in-flight
/// operations, thread stacks, allocator fragmentation). On systems where the
/// ram_budget is under this threshold, fall back to single-threaded unpacking
/// which peaks at ~110MB.
/// See https://github.com/rust-lang/rustup/issues/3125
const LOW_MEMORY_THRESHOLD: usize = 512 * 1024 * 1024;
43 changes: 43 additions & 0 deletions src/diskio/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,46 @@ fn test_complete_file_immediate() {
fn test_complete_file_threaded() {
test_complete_file("2").unwrap()
}

#[test]
fn test_effective_thread_count() {
use super::{LOW_MEMORY_THRESHOLD, effective_thread_count};
use crate::process::IoThreadCount::{Default, UserSpecified};

// Already single-threaded: no change regardless of budget
assert_eq!(
effective_thread_count(LOW_MEMORY_THRESHOLD / 16, Default(1)),
1
);
assert_eq!(
effective_thread_count(LOW_MEMORY_THRESHOLD / 16, Default(0)),
0
);

// Below threshold: forced to single-threaded
assert_eq!(
effective_thread_count(LOW_MEMORY_THRESHOLD / 2, Default(8)),
1
);
assert_eq!(
effective_thread_count(LOW_MEMORY_THRESHOLD / 2, Default(4)),
1
);

// At or above threshold: thread count unchanged
assert_eq!(effective_thread_count(LOW_MEMORY_THRESHOLD, Default(4)), 4);
assert_eq!(
effective_thread_count(LOW_MEMORY_THRESHOLD * 2, Default(8)),
8
);

// User-specified threads are always respected
assert_eq!(
effective_thread_count(LOW_MEMORY_THRESHOLD / 16, UserSpecified(4)),
4
);
assert_eq!(
effective_thread_count(LOW_MEMORY_THRESHOLD / 2, UserSpecified(8)),
8
);
}
2 changes: 1 addition & 1 deletion src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl InstallMethod<'_, '_> {
// Initialize rayon for use by the remove_dir_all crate limiting the number of threads.
// This will error if rayon is already initialized but it's fine to ignore that.
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(self.cfg().process.io_thread_count()?)
.num_threads(self.cfg().process.io_thread_count()?.into())
.build_global();
match &self {
InstallMethod::Copy { .. }
Expand Down
22 changes: 18 additions & 4 deletions src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,25 @@ impl Process {
home::env::rustup_home_with_env(self).context("failed to determine rustup home dir")
}

pub fn io_thread_count(&self) -> Result<usize> {
pub fn io_thread_count(&self) -> Result<IoThreadCount> {
if let Ok(n) = self.var("RUSTUP_IO_THREADS") {
let threads = usize::from_str(&n).context(
"invalid value in RUSTUP_IO_THREADS -- must be a natural number greater than zero",
)?;
match threads {
0 => bail!("RUSTUP_IO_THREADS must be a natural number greater than zero"),
_ => return Ok(threads),
_ => return Ok(IoThreadCount::UserSpecified(threads)),
}
};

Ok(match thread::available_parallelism() {
let count = match thread::available_parallelism() {
// Don't spawn more than 8 I/O threads unless the user tells us to.
// Feel free to increase this value if it improves performance.
Ok(threads) => Ord::min(threads.get(), 8),
// Unknown for target platform or no permission to query.
Err(_) => 1,
})
};
Ok(IoThreadCount::Default(count))
}

pub(crate) fn unpack_ram(&self) -> Result<Option<usize>, env::VarError> {
Expand Down Expand Up @@ -235,6 +236,19 @@ impl Process {
}
}

pub enum IoThreadCount {
Default(usize),
UserSpecified(usize),
}

impl From<IoThreadCount> for usize {
fn from(c: IoThreadCount) -> Self {
match c {
IoThreadCount::Default(n) | IoThreadCount::UserSpecified(n) => n,
}
}
}

impl home::env::Env for Process {
fn home_dir(&self) -> Option<PathBuf> {
match self {
Expand Down