Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ pub struct IngestSettings {
#[schema(default = 1, value_type = usize)]
#[serde(default = "IngestSettings::default_min_shards")]
pub min_shards: NonZeroUsize,
/// Configures the maximum number of shards the control plane may open for ingestion.
/// When absent, the number of shards is unbounded.
/// Must be >= `min_shards` when set.
#[schema(value_type = Option<usize>)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_shards: Option<NonZeroUsize>,
/// Whether to validate documents against the current doc mapping during ingestion.
/// Defaults to true. When false, documents will be written directly to the WAL without
/// validation, but might still be rejected during indexing when applying the doc mapping
Expand All @@ -294,6 +300,7 @@ impl Default for IngestSettings {
fn default() -> Self {
Self {
min_shards: Self::default_min_shards(),
max_shards: None,
validate_docs: true,
}
}
Expand Down Expand Up @@ -594,6 +601,7 @@ impl crate::TestableForRegression for IndexConfig {
};
let ingest_settings = IngestSettings {
min_shards: NonZeroUsize::new(12).unwrap(),
max_shards: None,
validate_docs: true,
};
let search_settings = SearchSettings {
Expand Down Expand Up @@ -1114,6 +1122,7 @@ mod tests {
fn test_ingest_settings_serde() {
let settings = IngestSettings {
min_shards: NonZeroUsize::MIN,
max_shards: None,
validate_docs: false,
};
let settings_yaml = serde_yaml::to_string(&settings).unwrap();
Expand All @@ -1124,6 +1133,7 @@ mod tests {

let settings = IngestSettings {
min_shards: NonZeroUsize::MIN,
max_shards: None,
validate_docs: true,
};
let settings_yaml = serde_yaml::to_string(&settings).unwrap();
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ impl IndexConfigForSerialization {
&index_config.search_settings,
&index_config.retention_policy_opt,
)?;
if let Some(max_shards) = index_config.ingest_settings.max_shards {
ensure!(
max_shards >= index_config.ingest_settings.min_shards,
"ingest_settings.max_shards ({}) must be >= ingest_settings.min_shards ({})",
max_shards.get(),
index_config.ingest_settings.min_shards.get()
);
}
Ok(index_config)
}
}
Expand Down
15 changes: 10 additions & 5 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,19 @@ impl IngestController {
&local_shards_update.source_uid,
&local_shards_update.shard_infos,
);
let min_shards = model
let ingest_settings = &model
.index_metadata(&local_shards_update.source_uid.index_uid)
.expect("index should exist")
.index_config
.ingest_settings
.min_shards;

let Some(scaling_mode) = self.scaling_arbiter.should_scale(shard_stats, min_shards) else {
.ingest_settings;
let min_shards = ingest_settings.min_shards;
let max_shards = ingest_settings.max_shards.unwrap_or(NonZeroUsize::MAX);
let num_shards_range = min_shards..=max_shards;

let Some(scaling_mode) = self
.scaling_arbiter
.should_scale(shard_stats, num_shards_range)
else {
return Ok(());
};
match scaling_mode {
Expand Down
96 changes: 74 additions & 22 deletions quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::num::NonZeroUsize;
use std::ops::RangeInclusive;

use crate::model::{ScalingMode, ShardStats};

Expand Down Expand Up @@ -71,8 +72,10 @@ impl ScalingArbiter {
pub(crate) fn should_scale(
&self,
shard_stats: ShardStats,
min_shards: NonZeroUsize,
num_shards_range: RangeInclusive<NonZeroUsize>,
) -> Option<ScalingMode> {
let min_shards = *num_shards_range.start();
let max_shards = *num_shards_range.end();
// If ingest is idle, there is nothing to do. Idle shards are automatically closed by
// ingesters (see `quickwit_ingest::ingest_v2::idle::CloseIdleShardsTask`).
if shard_stats.num_open_shards == 0 || shard_stats.avg_long_term_ingestion_rate == 0.0 {
Expand All @@ -85,15 +88,19 @@ impl ScalingArbiter {
}
// Scale up based on the short term metric value while making sure that
// the long term value doesn't get near the scale down threshold.
// We skip the scale-up block entirely when already at the cap so that
// execution can fall through to the scale-down check below (relevant
// when max_shards was lowered after shards were already opened).
if shard_stats.avg_short_term_ingestion_rate
>= self.scale_up_shards_short_term_threshold_mib_per_sec
&& shard_stats.num_open_shards < max_shards.get()
{
let new_calculated_num_shards = usize::min(
self.long_term_scale_up_threshold_max_shards(shard_stats),
self.scale_up_factor_target_shards(shard_stats),
);

let target_num_shards = usize::max(min_shards.get(), new_calculated_num_shards);
let target_num_shards =
new_calculated_num_shards.clamp(min_shards.get(), max_shards.get());

if target_num_shards > shard_stats.num_open_shards {
let num_shards_to_open = target_num_shards - shard_stats.num_open_shards;
Expand Down Expand Up @@ -133,7 +140,7 @@ mod tests {
avg_short_term_ingestion_rate: 0.0,
avg_long_term_ingestion_rate: 0.0,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None,
);
Expand All @@ -145,7 +152,7 @@ mod tests {
avg_short_term_ingestion_rate: 5.0,
avg_long_term_ingestion_rate: 6.0,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None
);
Expand All @@ -157,7 +164,7 @@ mod tests {
avg_short_term_ingestion_rate: 8.1,
avg_long_term_ingestion_rate: 8.1,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
Some(ScalingMode::Up(1))
);
Expand All @@ -169,7 +176,7 @@ mod tests {
avg_short_term_ingestion_rate: 8.1,
avg_long_term_ingestion_rate: 8.1,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
Some(ScalingMode::Up(1))
);
Expand All @@ -181,7 +188,7 @@ mod tests {
avg_short_term_ingestion_rate: 3.0,
avg_long_term_ingestion_rate: 1.5,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
Some(ScalingMode::Down)
);
Expand All @@ -193,7 +200,7 @@ mod tests {
avg_short_term_ingestion_rate: 3.0,
avg_long_term_ingestion_rate: 1.5,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None,
);
Expand All @@ -205,7 +212,7 @@ mod tests {
avg_short_term_ingestion_rate: 8.0,
avg_long_term_ingestion_rate: 3.0,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None,
);
Expand All @@ -224,7 +231,7 @@ mod tests {
avg_short_term_ingestion_rate: 0.0,
avg_long_term_ingestion_rate: 0.0,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None,
);
Expand All @@ -236,7 +243,7 @@ mod tests {
avg_short_term_ingestion_rate: 5.0,
avg_long_term_ingestion_rate: 6.0,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None
);
Expand All @@ -248,7 +255,7 @@ mod tests {
avg_short_term_ingestion_rate: 8.1,
avg_long_term_ingestion_rate: 8.1,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
Some(ScalingMode::Up(1))
);
Expand All @@ -260,7 +267,7 @@ mod tests {
avg_short_term_ingestion_rate: 8.1,
avg_long_term_ingestion_rate: 8.1,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
Some(ScalingMode::Up(2))
);
Expand All @@ -272,7 +279,7 @@ mod tests {
avg_short_term_ingestion_rate: 3.0,
avg_long_term_ingestion_rate: 1.5,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
Some(ScalingMode::Down)
);
Expand All @@ -284,7 +291,7 @@ mod tests {
avg_short_term_ingestion_rate: 3.0,
avg_long_term_ingestion_rate: 1.5,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None,
);
Expand All @@ -296,7 +303,7 @@ mod tests {
avg_short_term_ingestion_rate: 8.0,
avg_long_term_ingestion_rate: 3.1,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
None,
);
Expand All @@ -309,7 +316,7 @@ mod tests {
avg_short_term_ingestion_rate: 8.1,
avg_long_term_ingestion_rate: 5.,
},
NonZeroUsize::MIN
NonZeroUsize::MIN..=NonZeroUsize::MAX
),
Some(ScalingMode::Up(1)),
);
Expand Down Expand Up @@ -408,8 +415,8 @@ mod tests {
avg_short_term_ingestion_rate: 0.0,
avg_long_term_ingestion_rate: 0.0,
};
let min_shards = NonZeroUsize::MIN;
let scaling_mode = scaling_arbiter.should_scale(shard_stats, min_shards);
let scaling_mode =
scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=NonZeroUsize::MAX);
assert!(scaling_mode.is_none());

let shard_stats = ShardStats {
Expand All @@ -419,7 +426,8 @@ mod tests {
avg_long_term_ingestion_rate: 0.0,
};
let min_shards = NonZeroUsize::new(2).unwrap();
let scaling_mode = scaling_arbiter.should_scale(shard_stats, min_shards);
let scaling_mode =
scaling_arbiter.should_scale(shard_stats, min_shards..=NonZeroUsize::MAX);
assert!(scaling_mode.is_none());
}

Expand All @@ -436,8 +444,52 @@ mod tests {
};
let min_shards = NonZeroUsize::new(5).unwrap();
let scaling_mode = scaling_arbiter
.should_scale(shard_stats, min_shards)
.should_scale(shard_stats, min_shards..=NonZeroUsize::MAX)
.unwrap();
assert_eq!(scaling_mode, ScalingMode::Up(4));
}

#[test]
fn test_scaling_arbiter_max_shards() {
let scaling_arbiter =
ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(10.0, 2.0);

// Already at max — scale-up should be suppressed even when load is high.
let shard_stats = ShardStats {
num_open_shards: 3,
num_closed_shards: 0,
avg_short_term_ingestion_rate: 9.0,
avg_long_term_ingestion_rate: 9.0,
};
let max_shards = NonZeroUsize::new(3).unwrap();
assert_eq!(
scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=max_shards),
None,
);

// Below max — scale-up is allowed but capped at max.
let shard_stats = ShardStats {
num_open_shards: 2,
num_closed_shards: 0,
avg_short_term_ingestion_rate: 9.0,
avg_long_term_ingestion_rate: 9.0,
};
assert_eq!(
scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=max_shards),
Some(ScalingMode::Up(1)),
);

// Above max (e.g. max_shards was lowered after shards were opened) with low load —
// scale-down should still trigger because execution falls through the scale-up block.
let shard_stats = ShardStats {
num_open_shards: 4,
num_closed_shards: 0,
avg_short_term_ingestion_rate: 9.0,
avg_long_term_ingestion_rate: 1.0,
};
assert_eq!(
scaling_arbiter.should_scale(shard_stats, NonZeroUsize::MIN..=max_shards),
Some(ScalingMode::Down),
);
}
}
Loading