diff --git a/src/bin/rustup-init.rs b/src/bin/rustup-init.rs index 5c74f48e2a..e844ddc19e 100644 --- a/src/bin/rustup-init.rs +++ b/src/bin/rustup-init.rs @@ -40,7 +40,7 @@ fn main() -> Result { let process = Process::os(); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(process.io_thread_count()?) + .worker_threads(process.io_thread_count()?.into()) .build() .unwrap(); diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index 549101e0dc..f2400ed118 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -62,6 +62,7 @@ use std::{fmt::Debug, fs::OpenOptions}; use anyhow::Result; use tracing::{error, trace, warn}; +use crate::process::IoThreadCount; use crate::utils::units::Size; pub(crate) mod immediate; @@ -445,15 +446,32 @@ pub(crate) fn create_dir>(path: P) -> io::Result<()> { /// Get the executor for disk IO. pub(crate) fn get_executor<'a>( ram_budget: usize, - io_thread_count: usize, + thread_count: IoThreadCount, ) -> Box { // If this gets lots of use, consider exposing via the config file. - match io_thread_count { + let threads = effective_thread_count(ram_budget, thread_count); + match threads { 0 | 1 => Box::new(immediate::ImmediateUnpacker::new()), n => Box::new(threaded::Threaded::new(n, ram_budget)), } } +fn effective_thread_count(ram_budget: usize, thread_count: IoThreadCount) -> usize { + match thread_count { + IoThreadCount::Default(n) if n > 1 && ram_budget < LOW_MEMORY_THRESHOLD => { + warn!( + "using single-threaded unpacking due to low memory \ + (ram budget: {} < {} threshold), \ + set RUSTUP_IO_THREADS to override", + Size::new(ram_budget), + Size::new(LOW_MEMORY_THRESHOLD) + ); + 1 + } + IoThreadCount::Default(n) | IoThreadCount::UserSpecified(n) => n, + } +} + pub(crate) fn unpack_ram(io_chunk_size: usize, budget: Option) -> usize { const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024; let minimum_ram = io_chunk_size * 2; @@ -505,3 +523,11 @@ pub(crate) fn unpack_ram(io_chunk_size: usize, budget: Option) -> usize { } static RAM_NOTICE_SHOWN: OnceLock<()> = OnceLock::new(); + +/// The Threaded executor uses substantially more memory than its ram_budget +/// accounts for (pool overhead, sharded_slab metadata, multiple in-flight +/// operations, thread stacks, allocator fragmentation). On systems where the +/// ram_budget is under this threshold, fall back to single-threaded unpacking +/// which peaks at ~110MB. +/// See https://github.com/rust-lang/rustup/issues/3125 +const LOW_MEMORY_THRESHOLD: usize = 512 * 1024 * 1024; diff --git a/src/diskio/test.rs b/src/diskio/test.rs index 0a893162f9..d82fbf6ca8 100644 --- a/src/diskio/test.rs +++ b/src/diskio/test.rs @@ -163,3 +163,46 @@ fn test_complete_file_immediate() { fn test_complete_file_threaded() { test_complete_file("2").unwrap() } + +#[test] +fn test_effective_thread_count() { + use super::{LOW_MEMORY_THRESHOLD, effective_thread_count}; + use crate::process::IoThreadCount::{Default, UserSpecified}; + + // Already single-threaded: no change regardless of budget + assert_eq!( + effective_thread_count(LOW_MEMORY_THRESHOLD / 16, Default(1)), + 1 + ); + assert_eq!( + effective_thread_count(LOW_MEMORY_THRESHOLD / 16, Default(0)), + 0 + ); + + // Below threshold: forced to single-threaded + assert_eq!( + effective_thread_count(LOW_MEMORY_THRESHOLD / 2, Default(8)), + 1 + ); + assert_eq!( + effective_thread_count(LOW_MEMORY_THRESHOLD / 2, Default(4)), + 1 + ); + + // At or above threshold: thread count unchanged + assert_eq!(effective_thread_count(LOW_MEMORY_THRESHOLD, Default(4)), 4); + assert_eq!( + effective_thread_count(LOW_MEMORY_THRESHOLD * 2, Default(8)), + 8 + ); + + // User-specified threads are always respected + assert_eq!( + effective_thread_count(LOW_MEMORY_THRESHOLD / 16, UserSpecified(4)), + 4 + ); + assert_eq!( + effective_thread_count(LOW_MEMORY_THRESHOLD / 2, UserSpecified(8)), + 8 + ); +} diff --git a/src/install.rs b/src/install.rs index 061f3696df..74b0f9bd16 100644 --- a/src/install.rs +++ b/src/install.rs @@ -41,7 +41,7 @@ impl InstallMethod<'_, '_> { // Initialize rayon for use by the remove_dir_all crate limiting the number of threads. // This will error if rayon is already initialized but it's fine to ignore that. let _ = rayon::ThreadPoolBuilder::new() - .num_threads(self.cfg().process.io_thread_count()?) + .num_threads(self.cfg().process.io_thread_count()?.into()) .build_global(); match &self { InstallMethod::Copy { .. } diff --git a/src/process.rs b/src/process.rs index b0f0c14ee5..2c209fc529 100644 --- a/src/process.rs +++ b/src/process.rs @@ -69,24 +69,25 @@ impl Process { home::env::rustup_home_with_env(self).context("failed to determine rustup home dir") } - pub fn io_thread_count(&self) -> Result { + pub fn io_thread_count(&self) -> Result { if let Ok(n) = self.var("RUSTUP_IO_THREADS") { let threads = usize::from_str(&n).context( "invalid value in RUSTUP_IO_THREADS -- must be a natural number greater than zero", )?; match threads { 0 => bail!("RUSTUP_IO_THREADS must be a natural number greater than zero"), - _ => return Ok(threads), + _ => return Ok(IoThreadCount::UserSpecified(threads)), } }; - Ok(match thread::available_parallelism() { + let count = match thread::available_parallelism() { // Don't spawn more than 8 I/O threads unless the user tells us to. // Feel free to increase this value if it improves performance. Ok(threads) => Ord::min(threads.get(), 8), // Unknown for target platform or no permission to query. Err(_) => 1, - }) + }; + Ok(IoThreadCount::Default(count)) } pub(crate) fn unpack_ram(&self) -> Result, env::VarError> { @@ -235,6 +236,19 @@ impl Process { } } +pub enum IoThreadCount { + Default(usize), + UserSpecified(usize), +} + +impl From for usize { + fn from(c: IoThreadCount) -> Self { + match c { + IoThreadCount::Default(n) | IoThreadCount::UserSpecified(n) => n, + } + } +} + impl home::env::Env for Process { fn home_dir(&self) -> Option { match self {