From e222cff3811519bcc082f6b6492c8ea020019f3a Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 9 Apr 2026 14:38:42 -0400 Subject: [PATCH] Scaffold Compactor supervisor and pipeline --- quickwit/Cargo.lock | 9 + quickwit/quickwit-compaction/Cargo.toml | 11 +- .../src/compaction_pipeline.rs | 186 +++++++++++++++ .../src/compactor_supervisor.rs | 224 ++++++++++++++++++ quickwit/quickwit-compaction/src/lib.rs | 6 + 5 files changed, 434 insertions(+), 2 deletions(-) create mode 100644 quickwit/quickwit-compaction/src/compaction_pipeline.rs create mode 100644 quickwit/quickwit-compaction/src/compactor_supervisor.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 544b882d332..f97d39a633b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7148,8 +7148,17 @@ dependencies = [ name = "quickwit-compaction" version = "0.8.0" dependencies = [ + "anyhow", "async-trait", + "quickwit-actors", + "quickwit-common", + "quickwit-indexing", + "quickwit-metastore", "quickwit-proto", + "quickwit-storage", + "serde", + "tokio", + "tracing", ] [[package]] diff --git a/quickwit/quickwit-compaction/Cargo.toml b/quickwit/quickwit-compaction/Cargo.toml index 07925537dce..9efa0e2ce94 100644 --- a/quickwit/quickwit-compaction/Cargo.toml +++ b/quickwit/quickwit-compaction/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quickwit-compaction" -description = "Merge planner and merge worker services for split compaction" +description = "Compactor implementation and CompactionService" version.workspace = true edition.workspace = true @@ -12,4 +12,11 @@ license.workspace = true [dependencies] async-trait = { workspace = true } -quickwit-proto = { workspace = true } \ No newline at end of file +quickwit-actors = { workspace = true } +quickwit-common = { workspace = true } +quickwit-indexing = { workspace = true } +quickwit-proto = { workspace = true } +quickwit-storage = { workspace = true } +serde = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true } \ No newline at end of file diff --git a/quickwit/quickwit-compaction/src/compaction_pipeline.rs b/quickwit/quickwit-compaction/src/compaction_pipeline.rs new file mode 100644 index 00000000000..926a5a5eb93 --- /dev/null +++ b/quickwit/quickwit-compaction/src/compaction_pipeline.rs @@ -0,0 +1,186 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_actors::{ActorHandle, Health, Supervisable}; +use quickwit_common::KillSwitch; +use quickwit_common::temp_dir::TempDirectory; +use quickwit_indexing::actors::{ + MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, +}; +use tracing::{debug, error}; + +pub struct CompactionPipelineHandles { + pub merge_split_downloader: ActorHandle, + pub merge_executor: ActorHandle, + pub merge_packager: ActorHandle, + pub merge_uploader: ActorHandle, + pub merge_publisher: ActorHandle, +} + +/// A single-use merge execution pipeline. Processes one merge task and +/// terminates. +/// +/// Owned by the `CompactorSupervisor`, which periodically calls +/// `check_actor_health()` and acts on the result (retry, reap, etc.). +pub struct CompactionPipeline { + pub task_id: String, + pub split_ids: Vec, + pub retry_count: usize, + pub kill_switch: KillSwitch, + pub scratch_directory: TempDirectory, + pub handles: Option, +} + +impl CompactionPipeline { + pub fn new(task_id: String, split_ids: Vec, scratch_directory: TempDirectory) -> Self { + CompactionPipeline { + task_id, + split_ids, + retry_count: 0, + kill_switch: KillSwitch::default(), + scratch_directory, + handles: None, + } + } + + fn supervisables(&self) -> Vec<&dyn Supervisable> { + let Some(handles) = &self.handles else { + return Vec::new(); + }; + vec![ + &handles.merge_split_downloader, + &handles.merge_executor, + &handles.merge_packager, + &handles.merge_uploader, + &handles.merge_publisher, + ] + } + + /// Checks child actor health. + /// + /// `check_for_progress` controls whether stall detection is performed + /// (actors that are alive but haven't recorded progress since last check). + /// The supervisor controls the cadence of progress checks. + /// + /// Returns: + /// - `Success` when all actors have completed (merge published). + /// - `FailureOrUnhealthy` when any actor has died or stalled. + /// - `Healthy` when actors are running and making progress. + pub fn check_actor_health(&self) -> Health { + if self.handles.is_none() { + return Health::Healthy; + } + + let mut healthy_actors: Vec<&str> = Vec::new(); + let mut failure_or_unhealthy_actors: Vec<&str> = Vec::new(); + let mut success_actors: Vec<&str> = Vec::new(); + + for supervisable in self.supervisables() { + match supervisable.check_health(true) { + Health::Healthy => { + healthy_actors.push(supervisable.name()); + } + Health::FailureOrUnhealthy => { + failure_or_unhealthy_actors.push(supervisable.name()); + } + Health::Success => { + success_actors.push(supervisable.name()); + } + } + } + + if !failure_or_unhealthy_actors.is_empty() { + error!( + task_id=%self.task_id, + healthy_actors=?healthy_actors, + failed_or_unhealthy_actors=?failure_or_unhealthy_actors, + success_actors=?success_actors, + "compaction pipeline actor failure detected" + ); + return Health::FailureOrUnhealthy; + } + if healthy_actors.is_empty() { + debug!(task_id=%self.task_id, "all compaction pipeline actors completed"); + return Health::Success; + } + Health::Healthy + } + + pub async fn terminate(&mut self) { + self.kill_switch.kill(); + if let Some(handles) = self.handles.take() { + tokio::join!( + handles.merge_split_downloader.kill(), + handles.merge_executor.kill(), + handles.merge_packager.kill(), + handles.merge_uploader.kill(), + handles.merge_publisher.kill(), + ); + } + } + + /// Terminates the current actor chain, increments retry count, and + /// re-spawns. Downloaded splits remain on disk in the scratch directory. + pub async fn restart(&mut self) { + self.terminate().await; + self.retry_count += 1; + self.spawn_pipeline(); + } + + /// Spawns the actor chain. Currently a no-op stub — actor chain + /// construction will be implemented in a later PR. + fn spawn_pipeline(&mut self) { + // TODO: construct MergeSplitDownloader → MergeExecutor → Packager → + // Uploader → Publisher actor chain and set self.handles. + } +} + +#[cfg(test)] +mod tests { + use quickwit_actors::Health; + use quickwit_common::temp_dir::TempDirectory; + + use super::CompactionPipeline; + + fn test_pipeline() -> CompactionPipeline { + CompactionPipeline::new( + "test-task".to_string(), + vec!["split-1".to_string(), "split-2".to_string()], + TempDirectory::for_test(), + ) + } + + #[test] + fn test_pipeline_no_handles_is_healthy() { + let pipeline = test_pipeline(); + assert!(pipeline.handles.is_none()); + assert_eq!(pipeline.check_actor_health(), Health::Healthy); + } + + #[tokio::test] + async fn test_pipeline_terminate_without_handles() { + let mut pipeline = test_pipeline(); + // Should not panic when there are no handles. + pipeline.terminate().await; + assert!(pipeline.handles.is_none()); + } + + #[tokio::test] + async fn test_pipeline_restart_increments_retry_count() { + let mut pipeline = test_pipeline(); + assert_eq!(pipeline.retry_count, 0); + pipeline.restart().await; + assert_eq!(pipeline.retry_count, 1); + } +} diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs new file mode 100644 index 00000000000..78e87c79df8 --- /dev/null +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -0,0 +1,224 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use async_trait::async_trait; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Health}; +use quickwit_common::io::Limiter; +use quickwit_common::pubsub::EventBroker; +use quickwit_common::temp_dir::TempDirectory; +use quickwit_indexing::IndexingSplitStore; +use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_storage::StorageResolver; +use serde::Serialize; +use tracing::{error, info}; + +use crate::compaction_pipeline::CompactionPipeline; + +const SUPERVISE_LOOP_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(Debug)] +struct SuperviseLoop; + +#[derive(Clone, Debug, Default, Serialize)] +pub struct CompactorSupervisorState { + pub num_pipeline_slots: usize, + pub num_occupied_slots: usize, + pub num_completed_tasks: usize, + pub num_failed_tasks: usize, +} + +/// Manages a pool of `CompactionPipeline`s, each executing a single merge task. +/// +/// Periodically checks pipeline health, handles retries on failure, and reaps +/// completed/failed pipelines. +pub struct CompactorSupervisor { + pipelines: Vec>, + + // Shared resources distributed to pipelines when spawning actor chains. + io_throughput_limiter: Option, + split_store: IndexingSplitStore, + metastore: MetastoreServiceClient, + storage_resolver: StorageResolver, + max_concurrent_split_uploads: usize, + event_broker: EventBroker, + + // Scratch directory root (/compaction/). + compaction_root_directory: TempDirectory, + + max_local_retries: usize, + + // dummy counters until we have real state + num_completed_tasks: usize, + num_failed_tasks: usize, +} + +impl CompactorSupervisor { + #[allow(clippy::too_many_arguments)] + pub fn new( + num_pipeline_slots: usize, + io_throughput_limiter: Option, + split_store: IndexingSplitStore, + metastore: MetastoreServiceClient, + storage_resolver: StorageResolver, + max_concurrent_split_uploads: usize, + event_broker: EventBroker, + compaction_root_directory: TempDirectory, + max_local_retries: usize, + ) -> Self { + let pipelines = (0..num_pipeline_slots).map(|_| None).collect(); + CompactorSupervisor { + pipelines, + io_throughput_limiter, + split_store, + metastore, + storage_resolver, + max_concurrent_split_uploads, + event_broker, + compaction_root_directory, + max_local_retries, + num_completed_tasks: 0, + num_failed_tasks: 0, + } + } + + async fn supervise(&mut self) { + for slot in &mut self.pipelines { + let Some(pipeline) = slot else { + continue; + }; + + match pipeline.check_actor_health() { + Health::Healthy => {} + Health::Success => { + info!(task_id=%pipeline.task_id, "compaction task completed"); + self.num_completed_tasks += 1; + *slot = None; + } + Health::FailureOrUnhealthy => { + if pipeline.retry_count < self.max_local_retries { + info!( + task_id=%pipeline.task_id, + retry_count=%pipeline.retry_count, + "retrying compaction pipeline" + ); + pipeline.restart().await; + } else { + error!( + task_id=%pipeline.task_id, + retry_count=%pipeline.retry_count, + "compaction pipeline exhausted retries" + ); + pipeline.terminate().await; + self.num_failed_tasks += 1; + *slot = None; + } + } + } + } + } +} + +#[async_trait] +impl Actor for CompactorSupervisor { + type ObservableState = (); + + fn name(&self) -> String { + "CompactorSupervisor".to_string() + } + + fn observable_state(&self) -> Self::ObservableState {} + + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + info!( + num_pipeline_slots=%self.pipelines.len(), + "compactor supervisor started" + ); + ctx.schedule_self_msg(SUPERVISE_LOOP_INTERVAL, SuperviseLoop); + Ok(()) + } +} + +#[async_trait] +impl Handler for CompactorSupervisor { + type Reply = (); + + async fn handle( + &mut self, + _msg: SuperviseLoop, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + self.supervise().await; + ctx.schedule_self_msg(SUPERVISE_LOOP_INTERVAL, SuperviseLoop); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use quickwit_actors::Universe; + use quickwit_common::temp_dir::TempDirectory; + use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService}; + use quickwit_storage::{RamStorage, StorageResolver}; + + use super::*; + use crate::compaction_pipeline::CompactionPipeline; + + fn test_supervisor(num_slots: usize) -> CompactorSupervisor { + let storage = Arc::new(RamStorage::default()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); + let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); + CompactorSupervisor::new( + num_slots, + None, + split_store, + metastore, + StorageResolver::for_test(), + 2, + EventBroker::default(), + TempDirectory::for_test(), + 2, + ) + } + + #[tokio::test] + async fn test_supervisor_starts_with_empty_slots() { + let universe = Universe::with_accelerated_time(); + let supervisor = test_supervisor(4); + let (_mailbox, handle) = universe.spawn_builder().spawn(supervisor); + let obs = handle.process_pending_and_observe().await; + assert_eq!(obs.obs_type, quickwit_actors::ObservationType::Alive); + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_supervisor_supervise_reaps_no_handle_pipelines() { + // A pipeline with no handles returns Healthy, so it stays in its slot. + let mut supervisor = test_supervisor(2); + let pipeline = CompactionPipeline::new( + "task-1".to_string(), + vec!["split-1".to_string()], + TempDirectory::for_test(), + ); + supervisor.pipelines[0] = Some(pipeline); + supervisor.supervise().await; + // Pipeline has no handles → Healthy → not reaped. + assert!(supervisor.pipelines[0].is_some()); + assert_eq!(supervisor.num_completed_tasks, 0); + assert_eq!(supervisor.num_failed_tasks, 0); + } +} diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs index d59933e227e..b867eed1b86 100644 --- a/quickwit/quickwit-compaction/src/lib.rs +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -14,4 +14,10 @@ #![deny(clippy::disallowed_methods)] +#[allow(dead_code)] +mod compaction_pipeline; +#[allow(dead_code)] +mod compactor_supervisor; pub mod planner; + +pub use compactor_supervisor::CompactorSupervisor;