From 2d66d645252a148397d226a8b616d2d9fe516ede Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 28 May 2026 11:02:38 +0200 Subject: [PATCH] feat(ingest): add max_shards cap to IngestSettings and ScalingArbiter Adds `max_shards: Option` to `IngestSettings` so operators can bound the number of shards the control plane will open for a source. The scaling arbiter now accepts a `RangeInclusive` and clamps scale-up decisions within [min_shards, max_shards]; the scale-down path is unblocked when shards exceed the cap (e.g. after a config change). --- .../quickwit-config/src/index_config/mod.rs | 10 ++ .../src/index_config/serialize.rs | 8 ++ .../src/ingest/ingest_controller.rs | 15 ++- .../src/ingest/scaling_arbiter.rs | 96 ++++++++++++++----- 4 files changed, 102 insertions(+), 27 deletions(-) diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index d536e4edab5..eb209860704 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -272,6 +272,12 @@ pub struct IngestSettings { #[schema(default = 1, value_type = usize)] #[serde(default = "IngestSettings::default_min_shards")] pub min_shards: NonZeroUsize, + /// Configures the maximum number of shards the control plane may open for ingestion. + /// When absent, the number of shards is unbounded. + /// Must be >= `min_shards` when set. + #[schema(value_type = Option)] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub max_shards: Option, /// Whether to validate documents against the current doc mapping during ingestion. /// Defaults to true. When false, documents will be written directly to the WAL without /// validation, but might still be rejected during indexing when applying the doc mapping @@ -294,6 +300,7 @@ impl Default for IngestSettings { fn default() -> Self { Self { min_shards: Self::default_min_shards(), + max_shards: None, validate_docs: true, } } @@ -594,6 +601,7 @@ impl crate::TestableForRegression for IndexConfig { }; let ingest_settings = IngestSettings { min_shards: NonZeroUsize::new(12).unwrap(), + max_shards: None, validate_docs: true, }; let search_settings = SearchSettings { @@ -1114,6 +1122,7 @@ mod tests { fn test_ingest_settings_serde() { let settings = IngestSettings { min_shards: NonZeroUsize::MIN, + max_shards: None, validate_docs: false, }; let settings_yaml = serde_yaml::to_string(&settings).unwrap(); @@ -1124,6 +1133,7 @@ mod tests { let settings = IngestSettings { min_shards: NonZeroUsize::MIN, + max_shards: None, validate_docs: true, }; let settings_yaml = serde_yaml::to_string(&settings).unwrap(); diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index 24fcc6d1ac2..1c175e5c6b0 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -141,6 +141,14 @@ impl IndexConfigForSerialization { &index_config.search_settings, &index_config.retention_policy_opt, )?; + if let Some(max_shards) = index_config.ingest_settings.max_shards { + ensure!( + max_shards >= index_config.ingest_settings.min_shards, + "ingest_settings.max_shards ({}) must be >= ingest_settings.min_shards ({})", + max_shards.get(), + index_config.ingest_settings.min_shards.get() + ); + } Ok(index_config) } } diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index c991b52947d..0286250f83f 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -396,14 +396,19 @@ impl IngestController { &local_shards_update.source_uid, &local_shards_update.shard_infos, ); - let min_shards = model + let ingest_settings = &model .index_metadata(&local_shards_update.source_uid.index_uid) .expect("index should exist") .index_config - .ingest_settings - .min_shards; - - let Some(scaling_mode) = self.scaling_arbiter.should_scale(shard_stats, min_shards) else { + .ingest_settings; + let min_shards = ingest_settings.min_shards; + let max_shards = ingest_settings.max_shards.unwrap_or(NonZeroUsize::MAX); + let num_shards_range = min_shards..=max_shards; + + let Some(scaling_mode) = self + .scaling_arbiter + .should_scale(shard_stats, num_shards_range) + else { return Ok(()); }; match scaling_mode { diff --git a/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs index 7294b9f9176..6ca922c91f8 100644 --- a/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs +++ b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::num::NonZeroUsize; +use std::ops::RangeInclusive; use crate::model::{ScalingMode, ShardStats}; @@ -71,8 +72,10 @@ impl ScalingArbiter { pub(crate) fn should_scale( &self, shard_stats: ShardStats, - min_shards: NonZeroUsize, + num_shards_range: RangeInclusive, ) -> Option { + let min_shards = *num_shards_range.start(); + let max_shards = *num_shards_range.end(); // If ingest is idle, there is nothing to do. Idle shards are automatically closed by // ingesters (see `quickwit_ingest::ingest_v2::idle::CloseIdleShardsTask`). if shard_stats.num_open_shards == 0 || shard_stats.avg_long_term_ingestion_rate == 0.0 { @@ -85,15 +88,19 @@ impl ScalingArbiter { } // Scale up based on the short term metric value while making sure that // the long term value doesn't get near the scale down threshold. + // We skip the scale-up block entirely when already at the cap so that + // execution can fall through to the scale-down check below (relevant + // when max_shards was lowered after shards were already opened). if shard_stats.avg_short_term_ingestion_rate >= self.scale_up_shards_short_term_threshold_mib_per_sec + && shard_stats.num_open_shards < max_shards.get() { let new_calculated_num_shards = usize::min( self.long_term_scale_up_threshold_max_shards(shard_stats), self.scale_up_factor_target_shards(shard_stats), ); - - let target_num_shards = usize::max(min_shards.get(), new_calculated_num_shards); + let target_num_shards = + new_calculated_num_shards.clamp(min_shards.get(), max_shards.get()); if target_num_shards > shard_stats.num_open_shards { let num_shards_to_open = target_num_shards - shard_stats.num_open_shards; @@ -133,7 +140,7 @@ mod tests { avg_short_term_ingestion_rate: 0.0, avg_long_term_ingestion_rate: 0.0, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None, ); @@ -145,7 +152,7 @@ mod tests { avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 6.0, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None ); @@ -157,7 +164,7 @@ mod tests { avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), Some(ScalingMode::Up(1)) ); @@ -169,7 +176,7 @@ mod tests { avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), Some(ScalingMode::Up(1)) ); @@ -181,7 +188,7 @@ mod tests { avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), Some(ScalingMode::Down) ); @@ -193,7 +200,7 @@ mod tests { avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None, ); @@ -205,7 +212,7 @@ mod tests { avg_short_term_ingestion_rate: 8.0, avg_long_term_ingestion_rate: 3.0, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None, ); @@ -224,7 +231,7 @@ mod tests { avg_short_term_ingestion_rate: 0.0, avg_long_term_ingestion_rate: 0.0, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None, ); @@ -236,7 +243,7 @@ mod tests { avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 6.0, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None ); @@ -248,7 +255,7 @@ mod tests { avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), Some(ScalingMode::Up(1)) ); @@ -260,7 +267,7 @@ mod tests { avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), Some(ScalingMode::Up(2)) ); @@ -272,7 +279,7 @@ mod tests { avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), Some(ScalingMode::Down) ); @@ -284,7 +291,7 @@ mod tests { avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None, ); @@ -296,7 +303,7 @@ mod tests { avg_short_term_ingestion_rate: 8.0, avg_long_term_ingestion_rate: 3.1, }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), None, ); @@ -309,7 +316,7 @@ mod tests { avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 5., }, - NonZeroUsize::MIN + NonZeroUsize::MIN..=NonZeroUsize::MAX ), Some(ScalingMode::Up(1)), ); @@ -408,8 +415,8 @@ mod tests { avg_short_term_ingestion_rate: 0.0, avg_long_term_ingestion_rate: 0.0, }; - let min_shards = NonZeroUsize::MIN; - let scaling_mode = scaling_arbiter.should_scale(shard_stats, min_shards); + let scaling_mode = + scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=NonZeroUsize::MAX); assert!(scaling_mode.is_none()); let shard_stats = ShardStats { @@ -419,7 +426,8 @@ mod tests { avg_long_term_ingestion_rate: 0.0, }; let min_shards = NonZeroUsize::new(2).unwrap(); - let scaling_mode = scaling_arbiter.should_scale(shard_stats, min_shards); + let scaling_mode = + scaling_arbiter.should_scale(shard_stats, min_shards..=NonZeroUsize::MAX); assert!(scaling_mode.is_none()); } @@ -436,8 +444,52 @@ mod tests { }; let min_shards = NonZeroUsize::new(5).unwrap(); let scaling_mode = scaling_arbiter - .should_scale(shard_stats, min_shards) + .should_scale(shard_stats, min_shards..=NonZeroUsize::MAX) .unwrap(); assert_eq!(scaling_mode, ScalingMode::Up(4)); } + + #[test] + fn test_scaling_arbiter_max_shards() { + let scaling_arbiter = + ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(10.0, 2.0); + + // Already at max — scale-up should be suppressed even when load is high. + let shard_stats = ShardStats { + num_open_shards: 3, + num_closed_shards: 0, + avg_short_term_ingestion_rate: 9.0, + avg_long_term_ingestion_rate: 9.0, + }; + let max_shards = NonZeroUsize::new(3).unwrap(); + assert_eq!( + scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=max_shards), + None, + ); + + // Below max — scale-up is allowed but capped at max. + let shard_stats = ShardStats { + num_open_shards: 2, + num_closed_shards: 0, + avg_short_term_ingestion_rate: 9.0, + avg_long_term_ingestion_rate: 9.0, + }; + assert_eq!( + scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=max_shards), + Some(ScalingMode::Up(1)), + ); + + // Above max (e.g. max_shards was lowered after shards were opened) with low load — + // scale-down should still trigger because execution falls through the scale-up block. + let shard_stats = ShardStats { + num_open_shards: 4, + num_closed_shards: 0, + avg_short_term_ingestion_rate: 9.0, + avg_long_term_ingestion_rate: 1.0, + }; + assert_eq!( + scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=max_shards), + Some(ScalingMode::Down), + ); + } }