From 6b77a2c0750abc5387f98c4f8fc666df22f61b7e Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 10 Jun 2026 17:18:32 +0300 Subject: [PATCH 1/9] fix(sort): record output_batches, output_bytes and end_time for when not using merge sort --- datafusion/physical-plan/src/sorts/sort.rs | 53 ++++++++++++---------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 929ff4f7dfc85..02a4418112529 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -45,7 +45,7 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use crate::stream::ReservationStream; use crate::topk::TopK; use crate::topk::TopKDynamicFilters; @@ -373,7 +373,7 @@ impl ExternalSorter { // allocation. Only needed for the non-spill path; the spill // path transfers the reservation to the merge stream instead. self.merge_reservation.free(); - self.in_mem_sort_stream(self.metrics.baseline.clone()) + self.in_mem_sort_stream(true) } } @@ -476,7 +476,7 @@ impl ExternalSorter { self.merge_reservation.free(); let mut sorted_stream = - self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + self.in_mem_sort_stream(false)?; // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. assert_or_internal_err!( @@ -585,17 +585,20 @@ impl ExternalSorter { /// ``` fn in_mem_sort_stream( &mut self, - metrics: BaselineMetrics, + is_output_stream: bool, ) -> Result { if self.in_mem_batches.is_empty() { - return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))); + let empty_stream = Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); + if(is_output_stream) { + return Ok(Box::pin(ObservedStream::new(empty_stream, self.metrics.baseline.clone(), None))); + } else { + return Ok(empty_stream); + } } // The elapsed compute timer is updated when the value is dropped. // There is no need for an explicit call to drop. - let elapsed_compute = metrics.elapsed_compute().clone(); + let elapsed_compute = self.metrics.baseline.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // Please pay attention that any operation inside of `in_mem_sort_stream` will @@ -607,7 +610,12 @@ impl ExternalSorter { if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.swap_remove(0); let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, &metrics, reservation); + let sorted_stream = self.sort_batch_stream(batch, reservation)?; + if(is_output_stream) { + return Ok(Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None))); + } else { + return Ok(sorted_stream); + } } // If less than sort_in_place_threshold_bytes, concatenate and sort in place @@ -619,17 +627,21 @@ impl ExternalSorter { .try_resize(get_reserved_bytes_for_record_batch(&batch)?) .map_err(Self::err_with_oom_context)?; let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, &metrics, reservation); + let sorted_stream = self.sort_batch_stream(batch, reservation)?; + if(is_output_stream) { + return Ok(Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None))); + } else { + return Ok(sorted_stream); + } } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() .map(|batch| { - let metrics = self.metrics.baseline.intermediate(); let reservation = self .reservation .split(get_reserved_bytes_for_record_batch(&batch)?); - let input = self.sort_batch_stream(batch, &metrics, reservation)?; + let input = self.sort_batch_stream(batch, reservation)?; Ok(spawn_buffered(input, 1)) }) .collect::>()?; @@ -638,7 +650,11 @@ impl ExternalSorter { .with_streams(streams) .with_schema(Arc::clone(&self.schema)) .with_expressions(&self.expr.clone()) - .with_metrics(metrics) + .with_metrics(if is_output_stream { + self.metrics.baseline.clone() + } else { + self.metrics.baseline.intermediate() + }) .with_batch_size(self.batch_size) .with_fetch(None) .with_reservation(self.merge_reservation.new_empty()) @@ -657,7 +673,6 @@ impl ExternalSorter { fn sort_batch_stream( &self, batch: RecordBatch, - metrics: &BaselineMetrics, reservation: MemoryReservation, ) -> Result { assert_eq!( @@ -668,7 +683,6 @@ impl ExternalSorter { let schema = batch.schema(); let expressions = self.expr.clone(); let batch_size = self.batch_size; - let output_row_metrics = metrics.output_rows().clone(); let stream = futures::stream::once(async move { let schema = batch.schema(); @@ -698,14 +712,7 @@ impl ExternalSorter { reservation, )) as SendableRecordBatchStream) }) - .try_flatten() - .map(move |batch| match batch { - Ok(batch) => { - output_row_metrics.add(batch.num_rows()); - Ok(batch) - } - Err(e) => Err(e), - }); + .try_flatten(); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } From 99d20389ccb7390ecf2c6a2ae6b5806aa0fb3217 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 10 Jun 2026 17:35:47 +0300 Subject: [PATCH 2/9] fix in multi level sort as well for the last stream --- .../src/sorts/multi_level_merge.rs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 8985e1d8c70ee..396cf1dd7b78e 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -33,7 +33,7 @@ use datafusion_execution::memory_pool::MemoryReservation; use crate::sorts::builder::try_grow_reservation_to_at_least; use crate::sorts::sort::get_reserved_bytes_for_record_batch_size; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::TryStreamExt; @@ -225,25 +225,34 @@ impl MultiLevelMergeBuilder { ) -> Result { match (self.sorted_spill_files.len(), self.sorted_streams.len()) { // No data so empty batch - (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + (0, 0) => { + let empty_stream = Box::pin(EmptyRecordBatchStream::new(Arc::clone( &self.schema, - )))), + ))); + Ok(Box::pin(ObservedStream::new(empty_stream, self.metrics.clone(), None))) + }, // Only in-memory stream, return that - (0, 1) => Ok(self.sorted_streams.remove(0)), + (0, 1) => { + let output_stream = self.sorted_streams.remove(0); + Ok(Box::pin(ObservedStream::new(output_stream, self.metrics.clone(), None))) + }, // Only single sorted spill file so return it (1, 0) => { let spill_file = self.sorted_spill_files.remove(0); // Not reserving any memory for this disk as we are not holding it in memory - self.spill_manager - .read_spill_as_stream(spill_file.file, None) + let output_stream = self.spill_manager + .read_spill_as_stream(spill_file.file, None)?; + + Ok(Box::pin(ObservedStream::new(output_stream, self.metrics.clone(), None))) } // Only in memory streams, so merge them all in a single pass (0, _) => { let sorted_stream = mem::take(&mut self.sorted_streams); + // No need to wrap with observed stream since merge sort will update the observed metrics self.create_new_merge_sort( sorted_stream, // If we have no sorted spill files left, this is the last run From 8c88f1c4810c854249128404642fb1196963a9c1 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 10 Jun 2026 17:52:26 +0300 Subject: [PATCH 3/9] format --- datafusion/physical-plan/src/sorts/sort.rs | 28 +++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 02a4418112529..b7ee2686b27d2 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -589,11 +589,11 @@ impl ExternalSorter { ) -> Result { if self.in_mem_batches.is_empty() { let empty_stream = Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); - if(is_output_stream) { - return Ok(Box::pin(ObservedStream::new(empty_stream, self.metrics.baseline.clone(), None))); + return Ok(if is_output_stream { + Box::pin(ObservedStream::new(empty_stream, self.metrics.baseline.clone(), None)) } else { - return Ok(empty_stream); - } + empty_stream + }); } // The elapsed compute timer is updated when the value is dropped. @@ -611,11 +611,11 @@ impl ExternalSorter { let batch = self.in_mem_batches.swap_remove(0); let reservation = self.reservation.take(); let sorted_stream = self.sort_batch_stream(batch, reservation)?; - if(is_output_stream) { - return Ok(Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None))); + return Ok(if is_output_stream { + Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None)) } else { - return Ok(sorted_stream); - } + sorted_stream + }) } // If less than sort_in_place_threshold_bytes, concatenate and sort in place @@ -628,11 +628,11 @@ impl ExternalSorter { .map_err(Self::err_with_oom_context)?; let reservation = self.reservation.take(); let sorted_stream = self.sort_batch_stream(batch, reservation)?; - if(is_output_stream) { - return Ok(Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None))); + return Ok(if is_output_stream { + Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None)) } else { - return Ok(sorted_stream); - } + sorted_stream + }) } let streams = std::mem::take(&mut self.in_mem_batches) @@ -822,8 +822,8 @@ pub fn sort_batch( fetch: Option, ) -> Result { let sort_columns = expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(batch)) + .iter() + .map(|expr| expr.evaluate_to_sort_column(batch)) .collect::>>()?; let indices = lexsort_to_indices(&sort_columns, fetch)?; From 007409c0579e90f8c157eb470459daecee37fe3e Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 10 Jun 2026 18:58:02 +0300 Subject: [PATCH 4/9] add tests --- datafusion/physical-plan/src/sorts/sort.rs | 148 ++++++++++++++++----- 1 file changed, 114 insertions(+), 34 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index b7ee2686b27d2..c5834be1b9d8c 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -45,8 +45,8 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; -use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use crate::stream::ReservationStream; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use crate::topk::TopK; use crate::topk::TopKDynamicFilters; use crate::{ @@ -475,8 +475,7 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - let mut sorted_stream = - self.in_mem_sort_stream(false)?; + let mut sorted_stream = self.in_mem_sort_stream(false)?; // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. assert_or_internal_err!( @@ -588,9 +587,14 @@ impl ExternalSorter { is_output_stream: bool, ) -> Result { if self.in_mem_batches.is_empty() { - let empty_stream = Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); + let empty_stream = + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); return Ok(if is_output_stream { - Box::pin(ObservedStream::new(empty_stream, self.metrics.baseline.clone(), None)) + Box::pin(ObservedStream::new( + empty_stream, + self.metrics.baseline.clone(), + None, + )) } else { empty_stream }); @@ -612,10 +616,14 @@ impl ExternalSorter { let reservation = self.reservation.take(); let sorted_stream = self.sort_batch_stream(batch, reservation)?; return Ok(if is_output_stream { - Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None)) + Box::pin(ObservedStream::new( + sorted_stream, + self.metrics.baseline.clone(), + None, + )) } else { sorted_stream - }) + }); } // If less than sort_in_place_threshold_bytes, concatenate and sort in place @@ -629,10 +637,14 @@ impl ExternalSorter { let reservation = self.reservation.take(); let sorted_stream = self.sort_batch_stream(batch, reservation)?; return Ok(if is_output_stream { - Box::pin(ObservedStream::new(sorted_stream, self.metrics.baseline.clone(), None)) + Box::pin(ObservedStream::new( + sorted_stream, + self.metrics.baseline.clone(), + None, + )) } else { sorted_stream - }) + }); } let streams = std::mem::take(&mut self.in_mem_batches) @@ -822,8 +834,8 @@ pub fn sort_batch( fetch: Option, ) -> Result { let sort_columns = expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(batch)) + .iter() + .map(|expr| expr.evaluate_to_sort_column(batch)) .collect::>>()?; let indices = lexsort_to_indices(&sort_columns, fetch)?; @@ -1460,6 +1472,7 @@ mod tests { use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::expressions::{Column, Literal}; + use datafusion_physical_expr_common::metrics::MetricValue; use futures::{FutureExt, Stream, TryStreamExt}; use insta::assert_snapshot; @@ -2304,7 +2317,7 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> { + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics() -> Result<()> { let batch_size = 100; let create_task_ctx = |_: &[RecordBatch]| { @@ -2316,19 +2329,19 @@ mod tests { }; // Smaller than batch size and require more than a single batch to get the requested batch size - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx).await?; // Not evenly divisible by batch size - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx).await?; // Evenly divisible by batch size and is larger than 2 output batches - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx).await?; Ok(()) } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_sorting_in_place() -> Result<()> { let batch_size = 100; @@ -2343,7 +2356,7 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { let metrics = - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx).await?; assert_eq!( metrics.spill_count(), @@ -2355,7 +2368,7 @@ mod tests { // Not evenly divisible by batch size { let metrics = - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx).await?; assert_eq!( metrics.spill_count(), @@ -2367,7 +2380,7 @@ mod tests { // Evenly divisible by batch size and is larger than 2 output batches { let metrics = - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx).await?; assert_eq!( metrics.spill_count(), @@ -2380,7 +2393,7 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_having_a_single_batch() -> Result<()> { let batch_size = 100; @@ -2391,7 +2404,7 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size / 4, @@ -2408,7 +2421,7 @@ mod tests { // Not evenly divisible by batch size { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size + 7, @@ -2425,7 +2438,7 @@ mod tests { // Evenly divisible by batch size and is larger than 2 output batches { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size * 3, @@ -2444,7 +2457,7 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_having_to_spill() -> Result<()> { let batch_size = 100; @@ -2473,7 +2486,7 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { let metrics = - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx).await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } @@ -2481,7 +2494,7 @@ mod tests { // Not evenly divisible by batch size { let metrics = - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx).await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } @@ -2489,7 +2502,7 @@ mod tests { // Evenly divisible by batch size and is larger than 2 batches { let metrics = - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx).await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } @@ -2497,7 +2510,7 @@ mod tests { Ok(()) } - async fn test_sort_output_batch_size( + async fn test_sort_output_batch_size_and_base_metrics( number_of_batches: usize, batch_size_to_generate: usize, create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext, @@ -2507,10 +2520,13 @@ mod tests { .collect::>(); let task_ctx = create_task_ctx(batches.as_slice()); + let output_rows = batches.iter().map(|item| item.num_rows()).sum(); + let expected_batch_size = task_ctx.session_config().batch_size(); + let schema = batches[0].schema(); let (mut output_batches, metrics) = - run_sort_on_input(task_ctx, "i", batches).await?; + run_sort_on_input(task_ctx, "i", batches, schema).await?; let last_batch = output_batches.pop().unwrap(); @@ -2524,19 +2540,80 @@ mod tests { last_expected_batch_size = expected_batch_size; } assert_eq!(last_batch.num_rows(), last_expected_batch_size); + + assert_baseline_metrics_for_non_empty_output(&metrics, output_rows, expected_batch_size); Ok(metrics) } + + #[tokio::test] + async fn empty_sort_stream_should_report_end_time() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + let task_ctx = TaskContext::default(); + + let (_, metrics) = run_sort_on_input(task_ctx, "i", vec![], schema).await?; + + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + assert_eq!(metrics.spill_count().unwrap_or_default(), 0, "expected to not have spills"); + assert_ne!(end_time.value(), None); + + Ok(()) + } + + fn assert_baseline_metrics_for_non_empty_output( + metrics: &MetricsSet, + output_rows: usize, + batch_size: usize, + ) { + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_ne!(end_time.value(), None); + + assert_eq!(metrics.output_rows(), Some(output_rows)); + + let output_bytes = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBytes(total) => Some(total), + _ => None, + }) + .expect("Must have output_bytes metric since it exists in the baseline"); + + assert_ne!(output_bytes.value(), 0_usize); + + let output_batches = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBatches(total) => Some(total), + _ => None, + }) + .expect("Must have output_batches metric since it exists in the baseline"); + + assert_eq!(output_batches.value(), output_rows.div_ceil(batch_size)); + } + async fn run_sort_on_input( task_ctx: TaskContext, order_by_col: &str, batches: Vec, + schema: SchemaRef, ) -> Result<(Vec, MetricsSet)> { let task_ctx = Arc::new(task_ctx); // let task_ctx = env. - let schema = batches[0].schema(); let ordering: LexOrdering = [PhysicalSortExpr { expr: col(order_by_col, &schema)?, options: SortOptions { @@ -2547,7 +2624,11 @@ mod tests { .into(); let sort_exec: Arc = Arc::new(SortExec::new( ordering.clone(), - TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?, + TestMemoryExec::try_new_exec( + std::slice::from_ref(&batches), + Arc::clone(&schema), + None, + )?, )); let sorted_batches = @@ -2557,11 +2638,10 @@ mod tests { // assert output { - let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?; + let input_batches_concat = concat_batches(&schema, &batches)?; let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?; - let sorted_batches_concat = - concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?; + let sorted_batches_concat = concat_batches(&schema, &sorted_batches)?; assert_eq!(sorted_input_batch, sorted_batches_concat); } From 9143cb809ffddda8e1b6b8bead0e2295a59fbd8f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 10 Jun 2026 18:58:28 +0300 Subject: [PATCH 5/9] add tests for multi level merge sort --- ...spilling_fuzz_in_memory_constrained_env.rs | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index d401557e966d6..4025bba95f6e2 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -589,9 +589,17 @@ async fn run_test( result_stream: SendableRecordBatchStream, ) -> Result { let number_of_record_batches = args.number_of_record_batches; + let output_batch_size = args.task_ctx.session_config().batch_size(); + let number_of_rows = number_of_record_batches * output_batch_size; consume_stream_and_simulate_other_running_memory_consumers(args, result_stream) .await?; + + assert_baseline_metrics_for_non_empty_output( + plan.metrics().expect("must have metrics"), + number_of_rows, + output_batch_size + ); let spill_count = assert_spill_count_metric(true, plan); @@ -656,3 +664,41 @@ async fn consume_stream_and_simulate_other_running_memory_consumers( Ok(()) } + +fn assert_baseline_metrics_for_non_empty_output( + metrics: &MetricsSet, + expected_output_rows: usize, + output_batch_size: usize, +) { + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_ne!(end_time.value(), None); + + assert_eq!(metrics.output_rows(), Some(expected_output_rows)); + + let output_bytes = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBytes(total) => Some(total), + _ => None, + }) + .expect("Must have output_bytes metric since it exists in the baseline"); + + assert_ne!(output_bytes.value(), 0_usize); + + let output_batches = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBatches(total) => Some(total), + _ => None, + }) + .expect("Must have output_batches metric since it exists in the baseline"); + + assert_eq!(output_batches.value(), expected_output_rows.div_ceil(output_batch_size)); +} From 17f4cf31f710e8c2da013c3d8959a36b31a4d541 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 10 Jun 2026 19:07:11 +0300 Subject: [PATCH 6/9] change to be only for sort --- ...spilling_fuzz_in_memory_constrained_env.rs | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index 4025bba95f6e2..6f42ae2f0c5c7 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -45,6 +45,7 @@ use datafusion_physical_plan::aggregates::{ }; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::StreamExt; +use datafusion_physical_expr_common::metrics::MetricsSet; #[tokio::test] async fn test_sort_with_limited_memory() -> Result<()> { @@ -69,7 +70,7 @@ async fn test_sort_with_limited_memory() -> Result<()> { // Basic test with a lot of groups that cannot all fit in memory and 1 record batch // from each spill file is too much memory - let spill_count = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { + let metrics = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { pool_size, task_ctx: Arc::new(task_ctx), number_of_record_batches: 100, @@ -78,7 +79,7 @@ async fn test_sort_with_limited_memory() -> Result<()> { }) .await?; - let total_spill_files_size = spill_count * record_batch_size; + let total_spill_files_size = metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -252,7 +253,7 @@ enum MemoryBehavior { async fn run_sort_test_with_limited_memory( mut args: RunTestWithLimitedMemoryArgs, -) -> Result { +) -> Result { let get_size_of_record_batch_to_generate = std::mem::replace( &mut args.get_size_of_record_batch_to_generate, Box::pin(move |_| unreachable!("should not be called after take")), @@ -312,7 +313,17 @@ async fn run_sort_test_with_limited_memory( let result = sort_exec.execute(0, Arc::clone(&args.task_ctx))?; - run_test(args, sort_exec, result).await + let number_of_record_batches = args.number_of_record_batches; + + let metrics = run_test(args, sort_exec, result).await?; + + assert_baseline_metrics_for_non_empty_output( + metrics, + number_of_record_batches * output_batch_size, + record_batch_size as usize + ); + + Ok(metrics) } fn grow_memory_as_much_as_possible( @@ -346,7 +357,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<() // Basic test with a lot of groups that cannot all fit in memory and 1 record batch // from each spill file is too much memory - let spill_count = + let metrics = run_test_aggregate_with_high_cardinality(RunTestWithLimitedMemoryArgs { pool_size, task_ctx: Arc::new(task_ctx), @@ -356,7 +367,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<() }) .await?; - let total_spill_files_size = spill_count * record_batch_size; + let total_spill_files_size = metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -498,7 +509,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_large_reco async fn run_test_aggregate_with_high_cardinality( mut args: RunTestWithLimitedMemoryArgs, -) -> Result { +) -> Result { let get_size_of_record_batch_to_generate = std::mem::replace( &mut args.get_size_of_record_batch_to_generate, Box::pin(move |_| unreachable!("should not be called after take")), @@ -587,20 +598,13 @@ async fn run_test( args: RunTestWithLimitedMemoryArgs, plan: Arc, result_stream: SendableRecordBatchStream, -) -> Result { +) -> Result { let number_of_record_batches = args.number_of_record_batches; - let output_batch_size = args.task_ctx.session_config().batch_size(); - let number_of_rows = number_of_record_batches * output_batch_size; consume_stream_and_simulate_other_running_memory_consumers(args, result_stream) .await?; - - assert_baseline_metrics_for_non_empty_output( - plan.metrics().expect("must have metrics"), - number_of_rows, - output_batch_size - ); + let metrics = plan.metrics().expect("must have metrics"); let spill_count = assert_spill_count_metric(true, plan); assert!( @@ -608,7 +612,7 @@ async fn run_test( "Expected spill, but did not, number of record batches: {number_of_record_batches}", ); - Ok(spill_count) + Ok(metrics) } /// Consume the stream and change the amount of memory used while consuming it based on the [`MemoryBehavior`] provided From 777f89e4dea755631138a48bb7d70c722a5c9a88 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 10 Jun 2026 20:11:21 +0300 Subject: [PATCH 7/9] add import and use correct var --- ...spilling_fuzz_in_memory_constrained_env.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index 6f42ae2f0c5c7..3e6ea942d19e1 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -39,13 +39,14 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{Column, col}; +use datafusion_physical_expr_common::metrics::MetricsSet; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; +use datafusion_physical_plan::metrics::MetricValue; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::StreamExt; -use datafusion_physical_expr_common::metrics::MetricsSet; #[tokio::test] async fn test_sort_with_limited_memory() -> Result<()> { @@ -79,7 +80,8 @@ async fn test_sort_with_limited_memory() -> Result<()> { }) .await?; - let total_spill_files_size = metrics.spill_count().unwrap_or_default() * record_batch_size; + let total_spill_files_size = + metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -318,9 +320,9 @@ async fn run_sort_test_with_limited_memory( let metrics = run_test(args, sort_exec, result).await?; assert_baseline_metrics_for_non_empty_output( - metrics, - number_of_record_batches * output_batch_size, - record_batch_size as usize + &metrics, + number_of_record_batches * record_batch_size, + record_batch_size as usize, ); Ok(metrics) @@ -367,7 +369,8 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<() }) .await?; - let total_spill_files_size = metrics.spill_count().unwrap_or_default() * record_batch_size; + let total_spill_files_size = + metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -704,5 +707,8 @@ fn assert_baseline_metrics_for_non_empty_output( }) .expect("Must have output_batches metric since it exists in the baseline"); - assert_eq!(output_batches.value(), expected_output_rows.div_ceil(output_batch_size)); + assert_eq!( + output_batches.value(), + expected_output_rows.div_ceil(output_batch_size) + ); } From 916b81bb021aaf7c9b56b7fe45d10b610f97593a Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 11 Jun 2026 00:10:35 +0300 Subject: [PATCH 8/9] fix --- .../tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index 3e6ea942d19e1..2c7666a264226 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -321,7 +321,7 @@ async fn run_sort_test_with_limited_memory( assert_baseline_metrics_for_non_empty_output( &metrics, - number_of_record_batches * record_batch_size, + number_of_record_batches * record_batch_size as usize, record_batch_size as usize, ); From 8f5b2fc369b6b7f0da7afaf18c6df719345443ad Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 11 Jun 2026 11:13:00 +0300 Subject: [PATCH 9/9] format --- .../src/sorts/multi_level_merge.rs | 30 ++++--- datafusion/physical-plan/src/sorts/sort.rs | 80 ++++++++++++++----- 2 files changed, 79 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 396cf1dd7b78e..8a223bb792ceb 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -226,27 +226,39 @@ impl MultiLevelMergeBuilder { match (self.sorted_spill_files.len(), self.sorted_streams.len()) { // No data so empty batch (0, 0) => { - let empty_stream = Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - ))); - Ok(Box::pin(ObservedStream::new(empty_stream, self.metrics.clone(), None))) - }, + let empty_stream = + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); + Ok(Box::pin(ObservedStream::new( + empty_stream, + self.metrics.clone(), + None, + ))) + } // Only in-memory stream, return that (0, 1) => { let output_stream = self.sorted_streams.remove(0); - Ok(Box::pin(ObservedStream::new(output_stream, self.metrics.clone(), None))) - }, + Ok(Box::pin(ObservedStream::new( + output_stream, + self.metrics.clone(), + None, + ))) + } // Only single sorted spill file so return it (1, 0) => { let spill_file = self.sorted_spill_files.remove(0); // Not reserving any memory for this disk as we are not holding it in memory - let output_stream = self.spill_manager + let output_stream = self + .spill_manager .read_spill_as_stream(spill_file.file, None)?; - Ok(Box::pin(ObservedStream::new(output_stream, self.metrics.clone(), None))) + Ok(Box::pin(ObservedStream::new( + output_stream, + self.metrics.clone(), + None, + ))) } // Only in memory streams, so merge them all in a single pass diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c5834be1b9d8c..fae42f85aca51 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -2317,7 +2317,8 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics() -> Result<()> { + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics() + -> Result<()> { let batch_size = 100; let create_task_ctx = |_: &[RecordBatch]| { @@ -2329,13 +2330,16 @@ mod tests { }; // Smaller than batch size and require more than a single batch to get the requested batch size - test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx) + .await?; // Not evenly divisible by batch size - test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx) + .await?; // Evenly divisible by batch size and is larger than 2 output batches - test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx) + .await?; Ok(()) } @@ -2355,8 +2359,12 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = - test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size / 4, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2367,8 +2375,12 @@ mod tests { // Not evenly divisible by batch size { - let metrics = - test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size + 7, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2379,8 +2391,12 @@ mod tests { // Evenly divisible by batch size and is larger than 2 output batches { - let metrics = - test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size * 3, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2485,24 +2501,36 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = - test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size / 4, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } // Not evenly divisible by batch size { - let metrics = - test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size + 7, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } // Evenly divisible by batch size and is larger than 2 batches { - let metrics = - test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size * 3, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } @@ -2521,7 +2549,7 @@ mod tests { let task_ctx = create_task_ctx(batches.as_slice()); let output_rows = batches.iter().map(|item| item.num_rows()).sum(); - + let expected_batch_size = task_ctx.session_config().batch_size(); let schema = batches[0].schema(); @@ -2540,12 +2568,16 @@ mod tests { last_expected_batch_size = expected_batch_size; } assert_eq!(last_batch.num_rows(), last_expected_batch_size); - - assert_baseline_metrics_for_non_empty_output(&metrics, output_rows, expected_batch_size); + + assert_baseline_metrics_for_non_empty_output( + &metrics, + output_rows, + expected_batch_size, + ); Ok(metrics) } - + #[tokio::test] async fn empty_sort_stream_should_report_end_time() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); @@ -2561,7 +2593,11 @@ mod tests { }) .expect("Must have end time metric since it exists in the baseline"); - assert_eq!(metrics.spill_count().unwrap_or_default(), 0, "expected to not have spills"); + assert_eq!( + metrics.spill_count().unwrap_or_default(), + 0, + "expected to not have spills" + ); assert_ne!(end_time.value(), None); Ok(()) @@ -2604,7 +2640,7 @@ mod tests { assert_eq!(output_batches.value(), output_rows.div_ceil(batch_size)); } - + async fn run_sort_on_input( task_ctx: TaskContext, order_by_col: &str,