From 23ac8a13932e98fbed3881b7aad1e833eddb4096 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 9 Apr 2026 16:07:12 -0400 Subject: [PATCH 1/3] Spawn merge pipeline from new compactor --- quickwit/Cargo.lock | 2 + quickwit/quickwit-compaction/Cargo.toml | 11 +- .../src/compaction_pipeline.rs | 206 ++++++++++++++++-- .../src/compactor_supervisor.rs | 27 +-- quickwit/quickwit-indexing/failpoints/mod.rs | 5 +- .../src/actors/merge_executor.rs | 61 +++--- .../src/actors/merge_split_downloader.rs | 40 +++- .../src/models/merge_scratch.rs | 11 +- 8 files changed, 279 insertions(+), 84 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index f97d39a633b..39cef7929cb 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7152,6 +7152,8 @@ dependencies = [ "async-trait", "quickwit-actors", "quickwit-common", + "quickwit-config", + "quickwit-doc-mapper", "quickwit-indexing", "quickwit-metastore", "quickwit-proto", diff --git a/quickwit/quickwit-compaction/Cargo.toml b/quickwit/quickwit-compaction/Cargo.toml index 9efa0e2ce94..4687075b15d 100644 --- a/quickwit/quickwit-compaction/Cargo.toml +++ b/quickwit/quickwit-compaction/Cargo.toml @@ -11,12 +11,21 @@ authors.workspace = true license.workspace = true [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } quickwit-actors = { workspace = true } quickwit-common = { workspace = true } +quickwit-config = { workspace = true } +quickwit-doc-mapper = { workspace = true } quickwit-indexing = { workspace = true } quickwit-proto = { workspace = true } quickwit-storage = { workspace = true } serde = { workspace = true } tracing = { workspace = true } -tokio = { workspace = true } \ No newline at end of file +tokio = { workspace = true } + +[dev-dependencies] +quickwit-doc-mapper = { workspace = true, features = ["testsuite"] } +quickwit-metastore = { workspace = true, features = ["testsuite"] } +quickwit-proto = { workspace = true, features = ["testsuite"] } +quickwit-storage = { workspace = true, features = ["testsuite"] } \ No newline at end of file diff --git a/quickwit/quickwit-compaction/src/compaction_pipeline.rs b/quickwit/quickwit-compaction/src/compaction_pipeline.rs index 926a5a5eb93..309a5865a34 100644 --- a/quickwit/quickwit-compaction/src/compaction_pipeline.rs +++ b/quickwit/quickwit-compaction/src/compaction_pipeline.rs @@ -12,13 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use quickwit_actors::{ActorHandle, Health, Supervisable}; +use std::sync::Arc; + +use quickwit_actors::{ActorContext, ActorHandle, Health, Supervisable}; use quickwit_common::KillSwitch; +use quickwit_common::io::{IoControls, Limiter}; +use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; +use quickwit_config::RetentionPolicy; +use quickwit_doc_mapper::DocMapper; use quickwit_indexing::actors::{ - MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, + MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType, }; -use tracing::{debug, error}; +use quickwit_indexing::merge_policy::MergeOperation; +use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; +use quickwit_proto::indexing::MergePipelineId; +use quickwit_proto::metastore::MetastoreServiceClient; +use tracing::{debug, error, info}; + +use crate::CompactorSupervisor; pub struct CompactionPipelineHandles { pub merge_split_downloader: ActorHandle, @@ -35,22 +47,58 @@ pub struct CompactionPipelineHandles { /// `check_actor_health()` and acts on the result (retry, reap, etc.). pub struct CompactionPipeline { pub task_id: String, - pub split_ids: Vec, pub retry_count: usize, pub kill_switch: KillSwitch, pub scratch_directory: TempDirectory, pub handles: Option, + + // Per-task parameters. + pub merge_operation: MergeOperation, + pub pipeline_id: MergePipelineId, + pub doc_mapper: Arc, + pub merge_policy: Arc, + pub retention_policy: Option, + + // Shared resources (cloned from CompactorSupervisor). + pub metastore: MetastoreServiceClient, + pub split_store: IndexingSplitStore, + pub io_throughput_limiter: Option, + pub max_concurrent_split_uploads: usize, + pub event_broker: EventBroker, } impl CompactionPipeline { - pub fn new(task_id: String, split_ids: Vec, scratch_directory: TempDirectory) -> Self { + #[allow(clippy::too_many_arguments)] + pub fn new( + task_id: String, + scratch_directory: TempDirectory, + merge_operation: MergeOperation, + pipeline_id: MergePipelineId, + doc_mapper: Arc, + merge_policy: Arc, + retention_policy: Option, + metastore: MetastoreServiceClient, + split_store: IndexingSplitStore, + io_throughput_limiter: Option, + max_concurrent_split_uploads: usize, + event_broker: EventBroker, + ) -> Self { CompactionPipeline { task_id, - split_ids, retry_count: 0, kill_switch: KillSwitch::default(), scratch_directory, handles: None, + merge_operation, + pipeline_id, + doc_mapper, + merge_policy, + retention_policy, + metastore, + split_store, + io_throughput_limiter, + max_concurrent_split_uploads, + event_broker, } } @@ -132,32 +180,153 @@ impl CompactionPipeline { /// Terminates the current actor chain, increments retry count, and /// re-spawns. Downloaded splits remain on disk in the scratch directory. - pub async fn restart(&mut self) { + pub async fn restart(&mut self, ctx: &ActorContext) { self.terminate().await; self.retry_count += 1; - self.spawn_pipeline(); + if let Err(err) = self.spawn_pipeline(ctx) { + error!(task_id=%self.task_id, error=?err, "failed to respawn compaction pipeline"); + } } - /// Spawns the actor chain. Currently a no-op stub — actor chain - /// construction will be implemented in a later PR. - fn spawn_pipeline(&mut self) { - // TODO: construct MergeSplitDownloader → MergeExecutor → Packager → - // Uploader → Publisher actor chain and set self.handles. + /// Spawns the 5-actor merge execution chain and sends the `MergeOperation` + /// to the downloader to kick off execution. + fn spawn_pipeline(&mut self, ctx: &ActorContext) -> anyhow::Result<()> { + self.kill_switch = ctx.kill_switch().child(); + + info!( + task_id=%self.task_id, + pipeline_id=%self.pipeline_id, + "spawning compaction pipeline" + ); + + // Publisher (no merge planner feedback, no source) + let merge_publisher = Publisher::new( + PublisherType::MergePublisher, + self.metastore.clone(), + None, + None, + ); + let (merge_publisher_mailbox, merge_publisher_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_publisher); + + // Uploader + let merge_uploader = Uploader::new( + UploaderType::MergeUploader, + self.metastore.clone(), + self.merge_policy.clone(), + self.retention_policy.clone(), + self.split_store.clone(), + SplitsUpdateMailbox::from(merge_publisher_mailbox), + self.max_concurrent_split_uploads, + self.event_broker.clone(), + ); + let (merge_uploader_mailbox, merge_uploader_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_uploader); + + // Packager + let tag_fields = self.doc_mapper.tag_named_fields()?; + let merge_packager = Packager::new("MergePackager", tag_fields, merge_uploader_mailbox); + let (merge_packager_mailbox, merge_packager_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_packager); + + // MergeExecutor + let split_downloader_io_controls = IoControls::default() + .set_throughput_limiter_opt(self.io_throughput_limiter.clone()) + .set_component("split_downloader_merge"); + let merge_executor_io_controls = + split_downloader_io_controls.clone().set_component("merger"); + + let merge_executor = MergeExecutor::new( + self.pipeline_id.clone(), + self.metastore.clone(), + self.doc_mapper.clone(), + merge_executor_io_controls, + merge_packager_mailbox, + ); + let (merge_executor_mailbox, merge_executor_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_executor); + + // MergeSplitDownloader + let merge_split_downloader = MergeSplitDownloader { + scratch_directory: self.scratch_directory.clone(), + split_store: self.split_store.clone(), + executor_mailbox: merge_executor_mailbox, + io_controls: split_downloader_io_controls, + }; + let (merge_split_downloader_mailbox, merge_split_downloader_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_split_downloader); + + // Kick off the pipeline. + merge_split_downloader_mailbox + .try_send_message(self.merge_operation.clone()) + .map_err(|err| { + anyhow::anyhow!("failed to send merge operation to downloader: {err:?}") + })?; + + self.handles = Some(CompactionPipelineHandles { + merge_split_downloader: merge_split_downloader_handle, + merge_executor: merge_executor_handle, + merge_packager: merge_packager_handle, + merge_uploader: merge_uploader_handle, + merge_publisher: merge_publisher_handle, + }); + + Ok(()) } } #[cfg(test)] mod tests { + use std::sync::Arc; + use quickwit_actors::Health; + use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; + use quickwit_doc_mapper::default_doc_mapper_for_test; + use quickwit_indexing::IndexingSplitStore; + use quickwit_indexing::merge_policy::{MergeOperation, default_merge_policy}; + use quickwit_metastore::SplitMetadata; + use quickwit_proto::indexing::MergePipelineId; + use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService}; + use quickwit_proto::types::{IndexUid, NodeId}; + use quickwit_storage::RamStorage; use super::CompactionPipeline; fn test_pipeline() -> CompactionPipeline { + let storage = Arc::new(RamStorage::default()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); + let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); + let splits = vec![SplitMetadata::for_test("split-1".to_string())]; + let merge_operation = MergeOperation::new_merge_operation(splits); + let pipeline_id = MergePipelineId { + node_id: NodeId::from("test-node"), + index_uid: IndexUid::for_test("test-index", 0), + source_id: "test-source".to_string(), + }; CompactionPipeline::new( "test-task".to_string(), - vec!["split-1".to_string(), "split-2".to_string()], TempDirectory::for_test(), + merge_operation, + pipeline_id, + Arc::new(default_doc_mapper_for_test()), + default_merge_policy(), + None, + metastore, + split_store, + None, + 2, + EventBroker::default(), ) } @@ -171,16 +340,7 @@ mod tests { #[tokio::test] async fn test_pipeline_terminate_without_handles() { let mut pipeline = test_pipeline(); - // Should not panic when there are no handles. pipeline.terminate().await; assert!(pipeline.handles.is_none()); } - - #[tokio::test] - async fn test_pipeline_restart_increments_retry_count() { - let mut pipeline = test_pipeline(); - assert_eq!(pipeline.retry_count, 0); - pipeline.restart().await; - assert_eq!(pipeline.retry_count, 1); - } } diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs index 78e87c79df8..4ba270c270b 100644 --- a/quickwit/quickwit-compaction/src/compactor_supervisor.rs +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -94,7 +94,7 @@ impl CompactorSupervisor { } } - async fn supervise(&mut self) { + async fn supervise(&mut self, ctx: &ActorContext) { for slot in &mut self.pipelines { let Some(pipeline) = slot else { continue; @@ -114,7 +114,7 @@ impl CompactorSupervisor { retry_count=%pipeline.retry_count, "retrying compaction pipeline" ); - pipeline.restart().await; + pipeline.restart(ctx).await; } else { error!( task_id=%pipeline.task_id, @@ -160,7 +160,7 @@ impl Handler for CompactorSupervisor { _msg: SuperviseLoop, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - self.supervise().await; + self.supervise(ctx).await; ctx.schedule_self_msg(SUPERVISE_LOOP_INTERVAL, SuperviseLoop); Ok(()) } @@ -176,7 +176,6 @@ mod tests { use quickwit_storage::{RamStorage, StorageResolver}; use super::*; - use crate::compaction_pipeline::CompactionPipeline; fn test_supervisor(num_slots: usize) -> CompactorSupervisor { let storage = Arc::new(RamStorage::default()); @@ -208,17 +207,13 @@ mod tests { #[tokio::test] async fn test_supervisor_supervise_reaps_no_handle_pipelines() { // A pipeline with no handles returns Healthy, so it stays in its slot. - let mut supervisor = test_supervisor(2); - let pipeline = CompactionPipeline::new( - "task-1".to_string(), - vec!["split-1".to_string()], - TempDirectory::for_test(), - ); - supervisor.pipelines[0] = Some(pipeline); - supervisor.supervise().await; - // Pipeline has no handles → Healthy → not reaped. - assert!(supervisor.pipelines[0].is_some()); - assert_eq!(supervisor.num_completed_tasks, 0); - assert_eq!(supervisor.num_failed_tasks, 0); + // We spawn the supervisor as an actor so we can get a context for supervise(). + let universe = Universe::with_accelerated_time(); + let supervisor = test_supervisor(2); + let (_mailbox, handle) = universe.spawn_builder().spawn(supervisor); + // Let the supervisor run one supervision loop (it schedules SuperviseLoop on init). + let obs = handle.process_pending_and_observe().await; + assert_eq!(obs.obs_type, quickwit_actors::ObservationType::Alive); + universe.assert_quit().await; } } diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index d8c5ab0e418..b68245ff718 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -285,9 +285,10 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap()); } let merge_operation = MergeOperation::new_merge_operation(split_metadatas); - let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone()); let merge_scratch = MergeScratch { - merge_task, + merge_operation, + merge_task: Some(merge_task), merge_scratch_directory, downloaded_splits_directory, tantivy_dirs, diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index d6ada9dde0f..bd0923e891e 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -81,57 +81,52 @@ impl Actor for MergeExecutor { impl Handler for MergeExecutor { type Reply = (); - #[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_task.merge_parent_span.id(), skip_all)] + #[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_operation.merge_parent_span.id(), skip_all)] async fn handle( &mut self, merge_scratch: MergeScratch, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { let start = Instant::now(); - let merge_task = merge_scratch.merge_task; - let indexed_split_opt: Option = match merge_task.operation_type { + let MergeScratch { + merge_operation, + merge_task, + tantivy_dirs, + merge_scratch_directory, + .. + } = merge_scratch; + let indexed_split_opt: Option = match merge_operation.operation_type { MergeOperationType::Merge => { let merge_res = self .process_merge( - merge_task.merge_split_id.clone(), - merge_task.splits.clone(), - merge_scratch.tantivy_dirs, - merge_scratch.merge_scratch_directory, + merge_operation.merge_split_id.clone(), + merge_operation.splits.clone(), + tantivy_dirs, + merge_scratch_directory, ctx, ) .await; match merge_res { Ok(indexed_split) => Some(indexed_split), Err(err) => { - // A failure in a merge is a bit special. - // - // Instead of failing the pipeline, we just log it. - // The idea is to limit the risk associated with a potential split of death. - // - // Such a split is now not tracked by the merge planner and won't undergo a - // merge until the merge pipeline is restarted. - // - // With a merge policy that marks splits as mature after a day or so, this - // limits the noise associated to those failed - // merges. - error!(task=?merge_task, err=?err, "failed to merge splits"); + error!(task=?merge_operation, err=?err, "failed to merge splits"); return Ok(()); } } } MergeOperationType::DeleteAndMerge => { assert_eq!( - merge_task.splits.len(), + merge_operation.splits.len(), 1, "Delete tasks can be applied only on one split." ); - assert_eq!(merge_scratch.tantivy_dirs.len(), 1); - let split_with_docs_to_delete = merge_task.splits[0].clone(); + assert_eq!(tantivy_dirs.len(), 1); + let split_with_docs_to_delete = merge_operation.splits[0].clone(); self.process_delete_and_merge( - merge_task.merge_split_id.clone(), + merge_operation.merge_split_id.clone(), split_with_docs_to_delete, - merge_scratch.tantivy_dirs, - merge_scratch.merge_scratch_directory, + tantivy_dirs, + merge_scratch_directory, ctx, ) .await? @@ -141,7 +136,7 @@ impl Handler for MergeExecutor { info!( merged_num_docs = %indexed_split.split_attrs.num_docs, elapsed_secs = %start.elapsed().as_secs_f32(), - operation_type = %merge_task.operation_type, + operation_type = %merge_operation.operation_type, "merge-operation-success" ); ctx.send_message( @@ -151,8 +146,8 @@ impl Handler for MergeExecutor { checkpoint_delta_opt: Default::default(), publish_lock: PublishLock::default(), publish_token_opt: None, - batch_parent_span: merge_task.merge_parent_span.clone(), - merge_task_opt: Some(merge_task), + batch_parent_span: merge_operation.merge_parent_span.clone(), + merge_task_opt: merge_task, }, ) .await?; @@ -642,9 +637,10 @@ mod tests { tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap()) } let merge_operation = MergeOperation::new_merge_operation(split_metas); - let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone()); let merge_scratch = MergeScratch { - merge_task, + merge_operation, + merge_task: Some(merge_task), tantivy_dirs, merge_scratch_directory, downloaded_splits_directory, @@ -786,9 +782,10 @@ mod tests { .await?; let tantivy_dir = get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap(); let merge_operation = MergeOperation::new_delete_and_merge_operation(new_split_metadata); - let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone()); let merge_scratch = MergeScratch { - merge_task, + merge_operation, + merge_task: Some(merge_task), tantivy_dirs: vec![tantivy_dir], merge_scratch_directory, downloaded_splits_directory, diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 5d68bb59285..6b1bcc0ddb9 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -23,7 +23,7 @@ use tantivy::Directory; use tracing::{debug, info, instrument}; use super::MergeExecutor; -use crate::merge_policy::MergeTask; +use crate::merge_policy::{MergeOperation, MergeTask}; use crate::models::MergeScratch; use crate::split_store::IndexingSplitStore; @@ -62,6 +62,35 @@ impl Handler for MergeSplitDownloader { merge_task: MergeTask, ctx: &ActorContext, ) -> Result<(), quickwit_actors::ActorExitStatus> { + let merge_operation = merge_task.merge_operation.as_ref().clone(); + self.download_and_send(merge_operation, ctx).await + } +} + +#[async_trait] +impl Handler for MergeSplitDownloader { + type Reply = (); + + #[instrument( + name = "merge_split_downloader", + parent = merge_operation.merge_parent_span.id(), + skip_all, + )] + async fn handle( + &mut self, + merge_operation: MergeOperation, + ctx: &ActorContext, + ) -> Result<(), quickwit_actors::ActorExitStatus> { + self.download_and_send(merge_operation, ctx).await + } +} + +impl MergeSplitDownloader { + async fn download_and_send( + &mut self, + merge_operation: MergeOperation, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { let merge_scratch_directory = temp_dir::Builder::default() .join("merge") .tempdir_in(self.scratch_directory.path()) @@ -73,13 +102,14 @@ impl Handler for MergeSplitDownloader { .map_err(|error| anyhow::anyhow!(error))?; let tantivy_dirs = self .download_splits( - merge_task.splits_as_slice(), + merge_operation.splits_as_slice(), downloaded_splits_directory.path(), ctx, ) .await?; let msg = MergeScratch { - merge_task, + merge_operation, + merge_task: None, merge_scratch_directory, downloaded_splits_directory, tantivy_dirs, @@ -190,8 +220,8 @@ mod tests { .unwrap() .downcast::() .unwrap(); - assert_eq!(merge_scratch.merge_task.splits_as_slice().len(), 10); - for split in merge_scratch.merge_task.splits_as_slice() { + assert_eq!(merge_scratch.merge_operation.splits_as_slice().len(), 10); + for split in merge_scratch.merge_operation.splits_as_slice() { let split_filename = split_file(split.split_id()); let split_filepath = merge_scratch .downloaded_splits_directory diff --git a/quickwit/quickwit-indexing/src/models/merge_scratch.rs b/quickwit/quickwit-indexing/src/models/merge_scratch.rs index 392ca60b42c..a0296f69d2d 100644 --- a/quickwit/quickwit-indexing/src/models/merge_scratch.rs +++ b/quickwit/quickwit-indexing/src/models/merge_scratch.rs @@ -15,14 +15,15 @@ use quickwit_common::temp_dir::TempDirectory; use tantivy::Directory; -use crate::merge_policy::MergeTask; +use crate::merge_policy::{MergeOperation, MergeTask}; #[derive(Debug)] pub struct MergeScratch { - /// A [`MergeTask`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` - /// See planners docs to understand the usage. - pub merge_task: MergeTask, - /// Scratch directory for computing the merge. + /// The merge operation data (splits, merge_split_id, operation type). + pub merge_operation: MergeOperation, + // TODO: remove once the old MergePipeline is deleted and the + // DeleteTaskPipeline no longer routes through MergeSchedulerService. + pub merge_task: Option, pub merge_scratch_directory: TempDirectory, pub downloaded_splits_directory: TempDirectory, pub tantivy_dirs: Vec>, From e6113ef52a3bd5a65cf5787f8bbba9fc86eee4f6 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 9 Apr 2026 16:09:26 -0400 Subject: [PATCH 2/3] new line because adrien is annoyed about it --- quickwit/quickwit-compaction/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickwit/quickwit-compaction/Cargo.toml b/quickwit/quickwit-compaction/Cargo.toml index 4687075b15d..878ca2384ba 100644 --- a/quickwit/quickwit-compaction/Cargo.toml +++ b/quickwit/quickwit-compaction/Cargo.toml @@ -28,4 +28,4 @@ tokio = { workspace = true } quickwit-doc-mapper = { workspace = true, features = ["testsuite"] } quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } -quickwit-storage = { workspace = true, features = ["testsuite"] } \ No newline at end of file +quickwit-storage = { workspace = true, features = ["testsuite"] } From 81dd30344849054f9e4f88ef31dc7cfe929dbdd2 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 9 Apr 2026 16:11:27 -0400 Subject: [PATCH 3/3] comment --- .../quickwit-indexing/src/actors/merge_executor.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index bd0923e891e..5141227bee9 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -109,6 +109,17 @@ impl Handler for MergeExecutor { match merge_res { Ok(indexed_split) => Some(indexed_split), Err(err) => { + // A failure in a merge is a bit special. + // + // Instead of failing the pipeline, we just log it. + // The idea is to limit the risk associated with a potential split of death. + // + // Such a split is now not tracked by the merge planner and won't undergo a + // merge until the merge pipeline is restarted. + // + // With a merge policy that marks splits as mature after a day or so, this + // limits the noise associated to those failed + // merges. error!(task=?merge_operation, err=?err, "failed to merge splits"); return Ok(()); }