Skip to content

Commit e222cff

Browse files
committed
Scaffold Compactor supervisor and pipeline
1 parent 2862e87 commit e222cff

File tree

5 files changed

+434
-2
lines changed

5 files changed

+434
-2
lines changed

quickwit/Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-compaction/Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "quickwit-compaction"
3-
description = "Merge planner and merge worker services for split compaction"
3+
description = "Compactor implementation and CompactionService"
44

55
version.workspace = true
66
edition.workspace = true
@@ -12,4 +12,11 @@ license.workspace = true
1212

1313
[dependencies]
1414
async-trait = { workspace = true }
15-
quickwit-proto = { workspace = true }
15+
quickwit-actors = { workspace = true }
16+
quickwit-common = { workspace = true }
17+
quickwit-indexing = { workspace = true }
18+
quickwit-proto = { workspace = true }
19+
quickwit-storage = { workspace = true }
20+
serde = { workspace = true }
21+
tracing = { workspace = true }
22+
tokio = { workspace = true }
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use quickwit_actors::{ActorHandle, Health, Supervisable};
16+
use quickwit_common::KillSwitch;
17+
use quickwit_common::temp_dir::TempDirectory;
18+
use quickwit_indexing::actors::{
19+
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader,
20+
};
21+
use tracing::{debug, error};
22+
23+
pub struct CompactionPipelineHandles {
24+
pub merge_split_downloader: ActorHandle<MergeSplitDownloader>,
25+
pub merge_executor: ActorHandle<MergeExecutor>,
26+
pub merge_packager: ActorHandle<Packager>,
27+
pub merge_uploader: ActorHandle<Uploader>,
28+
pub merge_publisher: ActorHandle<Publisher>,
29+
}
30+
31+
/// A single-use merge execution pipeline. Processes one merge task and
32+
/// terminates.
33+
///
34+
/// Owned by the `CompactorSupervisor`, which periodically calls
35+
/// `check_actor_health()` and acts on the result (retry, reap, etc.).
36+
pub struct CompactionPipeline {
37+
pub task_id: String,
38+
pub split_ids: Vec<String>,
39+
pub retry_count: usize,
40+
pub kill_switch: KillSwitch,
41+
pub scratch_directory: TempDirectory,
42+
pub handles: Option<CompactionPipelineHandles>,
43+
}
44+
45+
impl CompactionPipeline {
46+
pub fn new(task_id: String, split_ids: Vec<String>, scratch_directory: TempDirectory) -> Self {
47+
CompactionPipeline {
48+
task_id,
49+
split_ids,
50+
retry_count: 0,
51+
kill_switch: KillSwitch::default(),
52+
scratch_directory,
53+
handles: None,
54+
}
55+
}
56+
57+
fn supervisables(&self) -> Vec<&dyn Supervisable> {
58+
let Some(handles) = &self.handles else {
59+
return Vec::new();
60+
};
61+
vec![
62+
&handles.merge_split_downloader,
63+
&handles.merge_executor,
64+
&handles.merge_packager,
65+
&handles.merge_uploader,
66+
&handles.merge_publisher,
67+
]
68+
}
69+
70+
/// Checks child actor health.
71+
///
72+
/// `check_for_progress` controls whether stall detection is performed
73+
/// (actors that are alive but haven't recorded progress since last check).
74+
/// The supervisor controls the cadence of progress checks.
75+
///
76+
/// Returns:
77+
/// - `Success` when all actors have completed (merge published).
78+
/// - `FailureOrUnhealthy` when any actor has died or stalled.
79+
/// - `Healthy` when actors are running and making progress.
80+
pub fn check_actor_health(&self) -> Health {
81+
if self.handles.is_none() {
82+
return Health::Healthy;
83+
}
84+
85+
let mut healthy_actors: Vec<&str> = Vec::new();
86+
let mut failure_or_unhealthy_actors: Vec<&str> = Vec::new();
87+
let mut success_actors: Vec<&str> = Vec::new();
88+
89+
for supervisable in self.supervisables() {
90+
match supervisable.check_health(true) {
91+
Health::Healthy => {
92+
healthy_actors.push(supervisable.name());
93+
}
94+
Health::FailureOrUnhealthy => {
95+
failure_or_unhealthy_actors.push(supervisable.name());
96+
}
97+
Health::Success => {
98+
success_actors.push(supervisable.name());
99+
}
100+
}
101+
}
102+
103+
if !failure_or_unhealthy_actors.is_empty() {
104+
error!(
105+
task_id=%self.task_id,
106+
healthy_actors=?healthy_actors,
107+
failed_or_unhealthy_actors=?failure_or_unhealthy_actors,
108+
success_actors=?success_actors,
109+
"compaction pipeline actor failure detected"
110+
);
111+
return Health::FailureOrUnhealthy;
112+
}
113+
if healthy_actors.is_empty() {
114+
debug!(task_id=%self.task_id, "all compaction pipeline actors completed");
115+
return Health::Success;
116+
}
117+
Health::Healthy
118+
}
119+
120+
pub async fn terminate(&mut self) {
121+
self.kill_switch.kill();
122+
if let Some(handles) = self.handles.take() {
123+
tokio::join!(
124+
handles.merge_split_downloader.kill(),
125+
handles.merge_executor.kill(),
126+
handles.merge_packager.kill(),
127+
handles.merge_uploader.kill(),
128+
handles.merge_publisher.kill(),
129+
);
130+
}
131+
}
132+
133+
/// Terminates the current actor chain, increments retry count, and
134+
/// re-spawns. Downloaded splits remain on disk in the scratch directory.
135+
pub async fn restart(&mut self) {
136+
self.terminate().await;
137+
self.retry_count += 1;
138+
self.spawn_pipeline();
139+
}
140+
141+
/// Spawns the actor chain. Currently a no-op stub — actor chain
142+
/// construction will be implemented in a later PR.
143+
fn spawn_pipeline(&mut self) {
144+
// TODO: construct MergeSplitDownloader → MergeExecutor → Packager →
145+
// Uploader → Publisher actor chain and set self.handles.
146+
}
147+
}
148+
149+
#[cfg(test)]
150+
mod tests {
151+
use quickwit_actors::Health;
152+
use quickwit_common::temp_dir::TempDirectory;
153+
154+
use super::CompactionPipeline;
155+
156+
fn test_pipeline() -> CompactionPipeline {
157+
CompactionPipeline::new(
158+
"test-task".to_string(),
159+
vec!["split-1".to_string(), "split-2".to_string()],
160+
TempDirectory::for_test(),
161+
)
162+
}
163+
164+
#[test]
165+
fn test_pipeline_no_handles_is_healthy() {
166+
let pipeline = test_pipeline();
167+
assert!(pipeline.handles.is_none());
168+
assert_eq!(pipeline.check_actor_health(), Health::Healthy);
169+
}
170+
171+
#[tokio::test]
172+
async fn test_pipeline_terminate_without_handles() {
173+
let mut pipeline = test_pipeline();
174+
// Should not panic when there are no handles.
175+
pipeline.terminate().await;
176+
assert!(pipeline.handles.is_none());
177+
}
178+
179+
#[tokio::test]
180+
async fn test_pipeline_restart_increments_retry_count() {
181+
let mut pipeline = test_pipeline();
182+
assert_eq!(pipeline.retry_count, 0);
183+
pipeline.restart().await;
184+
assert_eq!(pipeline.retry_count, 1);
185+
}
186+
}

0 commit comments

Comments
 (0)