diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index d401557e966d6..2c7666a264226 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -39,10 +39,12 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{Column, col}; +use datafusion_physical_expr_common::metrics::MetricsSet; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; +use datafusion_physical_plan::metrics::MetricValue; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::StreamExt; @@ -69,7 +71,7 @@ async fn test_sort_with_limited_memory() -> Result<()> { // Basic test with a lot of groups that cannot all fit in memory and 1 record batch // from each spill file is too much memory - let spill_count = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { + let metrics = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { pool_size, task_ctx: Arc::new(task_ctx), number_of_record_batches: 100, @@ -78,7 +80,8 @@ async fn test_sort_with_limited_memory() -> Result<()> { }) .await?; - let total_spill_files_size = spill_count * record_batch_size; + let total_spill_files_size = + metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -252,7 +255,7 @@ enum MemoryBehavior { async fn run_sort_test_with_limited_memory( mut args: RunTestWithLimitedMemoryArgs, -) -> Result { +) -> Result { let get_size_of_record_batch_to_generate = std::mem::replace( &mut args.get_size_of_record_batch_to_generate, Box::pin(move |_| unreachable!("should not be called after take")), @@ -312,7 +315,17 @@ async fn run_sort_test_with_limited_memory( let result = sort_exec.execute(0, Arc::clone(&args.task_ctx))?; - run_test(args, sort_exec, result).await + let number_of_record_batches = args.number_of_record_batches; + + let metrics = run_test(args, sort_exec, result).await?; + + assert_baseline_metrics_for_non_empty_output( + &metrics, + number_of_record_batches * record_batch_size as usize, + record_batch_size as usize, + ); + + Ok(metrics) } fn grow_memory_as_much_as_possible( @@ -346,7 +359,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<() // Basic test with a lot of groups that cannot all fit in memory and 1 record batch // from each spill file is too much memory - let spill_count = + let metrics = run_test_aggregate_with_high_cardinality(RunTestWithLimitedMemoryArgs { pool_size, task_ctx: Arc::new(task_ctx), @@ -356,7 +369,8 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<() }) .await?; - let total_spill_files_size = spill_count * record_batch_size; + let total_spill_files_size = + metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -498,7 +512,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_large_reco async fn run_test_aggregate_with_high_cardinality( mut args: RunTestWithLimitedMemoryArgs, -) -> Result { +) -> Result { let get_size_of_record_batch_to_generate = std::mem::replace( &mut args.get_size_of_record_batch_to_generate, Box::pin(move |_| unreachable!("should not be called after take")), @@ -587,12 +601,13 @@ async fn run_test( args: RunTestWithLimitedMemoryArgs, plan: Arc, result_stream: SendableRecordBatchStream, -) -> Result { +) -> Result { let number_of_record_batches = args.number_of_record_batches; consume_stream_and_simulate_other_running_memory_consumers(args, result_stream) .await?; + let metrics = plan.metrics().expect("must have metrics"); let spill_count = assert_spill_count_metric(true, plan); assert!( @@ -600,7 +615,7 @@ async fn run_test( "Expected spill, but did not, number of record batches: {number_of_record_batches}", ); - Ok(spill_count) + Ok(metrics) } /// Consume the stream and change the amount of memory used while consuming it based on the [`MemoryBehavior`] provided @@ -656,3 +671,44 @@ async fn consume_stream_and_simulate_other_running_memory_consumers( Ok(()) } + +fn assert_baseline_metrics_for_non_empty_output( + metrics: &MetricsSet, + expected_output_rows: usize, + output_batch_size: usize, +) { + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_ne!(end_time.value(), None); + + assert_eq!(metrics.output_rows(), Some(expected_output_rows)); + + let output_bytes = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBytes(total) => Some(total), + _ => None, + }) + .expect("Must have output_bytes metric since it exists in the baseline"); + + assert_ne!(output_bytes.value(), 0_usize); + + let output_batches = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBatches(total) => Some(total), + _ => None, + }) + .expect("Must have output_batches metric since it exists in the baseline"); + + assert_eq!( + output_batches.value(), + expected_output_rows.div_ceil(output_batch_size) + ); +} diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 8985e1d8c70ee..8a223bb792ceb 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -33,7 +33,7 @@ use datafusion_execution::memory_pool::MemoryReservation; use crate::sorts::builder::try_grow_reservation_to_at_least; use crate::sorts::sort::get_reserved_bytes_for_record_batch_size; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::TryStreamExt; @@ -225,25 +225,46 @@ impl MultiLevelMergeBuilder { ) -> Result { match (self.sorted_spill_files.len(), self.sorted_streams.len()) { // No data so empty batch - (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))), + (0, 0) => { + let empty_stream = + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); + Ok(Box::pin(ObservedStream::new( + empty_stream, + self.metrics.clone(), + None, + ))) + } // Only in-memory stream, return that - (0, 1) => Ok(self.sorted_streams.remove(0)), + (0, 1) => { + let output_stream = self.sorted_streams.remove(0); + Ok(Box::pin(ObservedStream::new( + output_stream, + self.metrics.clone(), + None, + ))) + } // Only single sorted spill file so return it (1, 0) => { let spill_file = self.sorted_spill_files.remove(0); // Not reserving any memory for this disk as we are not holding it in memory - self.spill_manager - .read_spill_as_stream(spill_file.file, None) + let output_stream = self + .spill_manager + .read_spill_as_stream(spill_file.file, None)?; + + Ok(Box::pin(ObservedStream::new( + output_stream, + self.metrics.clone(), + None, + ))) } // Only in memory streams, so merge them all in a single pass (0, _) => { let sorted_stream = mem::take(&mut self.sorted_streams); + // No need to wrap with observed stream since merge sort will update the observed metrics self.create_new_merge_sort( sorted_stream, // If we have no sorted spill files left, this is the last run diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 929ff4f7dfc85..fae42f85aca51 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -45,8 +45,8 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; -use crate::stream::RecordBatchStreamAdapter; use crate::stream::ReservationStream; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use crate::topk::TopK; use crate::topk::TopKDynamicFilters; use crate::{ @@ -373,7 +373,7 @@ impl ExternalSorter { // allocation. Only needed for the non-spill path; the spill // path transfers the reservation to the merge stream instead. self.merge_reservation.free(); - self.in_mem_sort_stream(self.metrics.baseline.clone()) + self.in_mem_sort_stream(true) } } @@ -475,8 +475,7 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - let mut sorted_stream = - self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + let mut sorted_stream = self.in_mem_sort_stream(false)?; // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. assert_or_internal_err!( @@ -585,17 +584,25 @@ impl ExternalSorter { /// ``` fn in_mem_sort_stream( &mut self, - metrics: BaselineMetrics, + is_output_stream: bool, ) -> Result { if self.in_mem_batches.is_empty() { - return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))); + let empty_stream = + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); + return Ok(if is_output_stream { + Box::pin(ObservedStream::new( + empty_stream, + self.metrics.baseline.clone(), + None, + )) + } else { + empty_stream + }); } // The elapsed compute timer is updated when the value is dropped. // There is no need for an explicit call to drop. - let elapsed_compute = metrics.elapsed_compute().clone(); + let elapsed_compute = self.metrics.baseline.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // Please pay attention that any operation inside of `in_mem_sort_stream` will @@ -607,7 +614,16 @@ impl ExternalSorter { if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.swap_remove(0); let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, &metrics, reservation); + let sorted_stream = self.sort_batch_stream(batch, reservation)?; + return Ok(if is_output_stream { + Box::pin(ObservedStream::new( + sorted_stream, + self.metrics.baseline.clone(), + None, + )) + } else { + sorted_stream + }); } // If less than sort_in_place_threshold_bytes, concatenate and sort in place @@ -619,17 +635,25 @@ impl ExternalSorter { .try_resize(get_reserved_bytes_for_record_batch(&batch)?) .map_err(Self::err_with_oom_context)?; let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, &metrics, reservation); + let sorted_stream = self.sort_batch_stream(batch, reservation)?; + return Ok(if is_output_stream { + Box::pin(ObservedStream::new( + sorted_stream, + self.metrics.baseline.clone(), + None, + )) + } else { + sorted_stream + }); } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() .map(|batch| { - let metrics = self.metrics.baseline.intermediate(); let reservation = self .reservation .split(get_reserved_bytes_for_record_batch(&batch)?); - let input = self.sort_batch_stream(batch, &metrics, reservation)?; + let input = self.sort_batch_stream(batch, reservation)?; Ok(spawn_buffered(input, 1)) }) .collect::>()?; @@ -638,7 +662,11 @@ impl ExternalSorter { .with_streams(streams) .with_schema(Arc::clone(&self.schema)) .with_expressions(&self.expr.clone()) - .with_metrics(metrics) + .with_metrics(if is_output_stream { + self.metrics.baseline.clone() + } else { + self.metrics.baseline.intermediate() + }) .with_batch_size(self.batch_size) .with_fetch(None) .with_reservation(self.merge_reservation.new_empty()) @@ -657,7 +685,6 @@ impl ExternalSorter { fn sort_batch_stream( &self, batch: RecordBatch, - metrics: &BaselineMetrics, reservation: MemoryReservation, ) -> Result { assert_eq!( @@ -668,7 +695,6 @@ impl ExternalSorter { let schema = batch.schema(); let expressions = self.expr.clone(); let batch_size = self.batch_size; - let output_row_metrics = metrics.output_rows().clone(); let stream = futures::stream::once(async move { let schema = batch.schema(); @@ -698,14 +724,7 @@ impl ExternalSorter { reservation, )) as SendableRecordBatchStream) }) - .try_flatten() - .map(move |batch| match batch { - Ok(batch) => { - output_row_metrics.add(batch.num_rows()); - Ok(batch) - } - Err(e) => Err(e), - }); + .try_flatten(); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } @@ -1453,6 +1472,7 @@ mod tests { use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::expressions::{Column, Literal}; + use datafusion_physical_expr_common::metrics::MetricValue; use futures::{FutureExt, Stream, TryStreamExt}; use insta::assert_snapshot; @@ -2297,7 +2317,8 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> { + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics() + -> Result<()> { let batch_size = 100; let create_task_ctx = |_: &[RecordBatch]| { @@ -2309,19 +2330,22 @@ mod tests { }; // Smaller than batch size and require more than a single batch to get the requested batch size - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx) + .await?; // Not evenly divisible by batch size - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx) + .await?; // Evenly divisible by batch size and is larger than 2 output batches - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx) + .await?; Ok(()) } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_sorting_in_place() -> Result<()> { let batch_size = 100; @@ -2335,8 +2359,12 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size / 4, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2347,8 +2375,12 @@ mod tests { // Not evenly divisible by batch size { - let metrics = - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size + 7, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2359,8 +2391,12 @@ mod tests { // Evenly divisible by batch size and is larger than 2 output batches { - let metrics = - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size * 3, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2373,7 +2409,7 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_having_a_single_batch() -> Result<()> { let batch_size = 100; @@ -2384,7 +2420,7 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size / 4, @@ -2401,7 +2437,7 @@ mod tests { // Not evenly divisible by batch size { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size + 7, @@ -2418,7 +2454,7 @@ mod tests { // Evenly divisible by batch size and is larger than 2 output batches { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size * 3, @@ -2437,7 +2473,7 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_having_to_spill() -> Result<()> { let batch_size = 100; @@ -2465,24 +2501,36 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size / 4, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } // Not evenly divisible by batch size { - let metrics = - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size + 7, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } // Evenly divisible by batch size and is larger than 2 batches { - let metrics = - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size * 3, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } @@ -2490,7 +2538,7 @@ mod tests { Ok(()) } - async fn test_sort_output_batch_size( + async fn test_sort_output_batch_size_and_base_metrics( number_of_batches: usize, batch_size_to_generate: usize, create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext, @@ -2500,10 +2548,13 @@ mod tests { .collect::>(); let task_ctx = create_task_ctx(batches.as_slice()); + let output_rows = batches.iter().map(|item| item.num_rows()).sum(); + let expected_batch_size = task_ctx.session_config().batch_size(); + let schema = batches[0].schema(); let (mut output_batches, metrics) = - run_sort_on_input(task_ctx, "i", batches).await?; + run_sort_on_input(task_ctx, "i", batches, schema).await?; let last_batch = output_batches.pop().unwrap(); @@ -2518,18 +2569,87 @@ mod tests { } assert_eq!(last_batch.num_rows(), last_expected_batch_size); + assert_baseline_metrics_for_non_empty_output( + &metrics, + output_rows, + expected_batch_size, + ); + Ok(metrics) } + #[tokio::test] + async fn empty_sort_stream_should_report_end_time() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + let task_ctx = TaskContext::default(); + + let (_, metrics) = run_sort_on_input(task_ctx, "i", vec![], schema).await?; + + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_eq!( + metrics.spill_count().unwrap_or_default(), + 0, + "expected to not have spills" + ); + assert_ne!(end_time.value(), None); + + Ok(()) + } + + fn assert_baseline_metrics_for_non_empty_output( + metrics: &MetricsSet, + output_rows: usize, + batch_size: usize, + ) { + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_ne!(end_time.value(), None); + + assert_eq!(metrics.output_rows(), Some(output_rows)); + + let output_bytes = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBytes(total) => Some(total), + _ => None, + }) + .expect("Must have output_bytes metric since it exists in the baseline"); + + assert_ne!(output_bytes.value(), 0_usize); + + let output_batches = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBatches(total) => Some(total), + _ => None, + }) + .expect("Must have output_batches metric since it exists in the baseline"); + + assert_eq!(output_batches.value(), output_rows.div_ceil(batch_size)); + } + async fn run_sort_on_input( task_ctx: TaskContext, order_by_col: &str, batches: Vec, + schema: SchemaRef, ) -> Result<(Vec, MetricsSet)> { let task_ctx = Arc::new(task_ctx); // let task_ctx = env. - let schema = batches[0].schema(); let ordering: LexOrdering = [PhysicalSortExpr { expr: col(order_by_col, &schema)?, options: SortOptions { @@ -2540,7 +2660,11 @@ mod tests { .into(); let sort_exec: Arc = Arc::new(SortExec::new( ordering.clone(), - TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?, + TestMemoryExec::try_new_exec( + std::slice::from_ref(&batches), + Arc::clone(&schema), + None, + )?, )); let sorted_batches = @@ -2550,11 +2674,10 @@ mod tests { // assert output { - let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?; + let input_batches_concat = concat_batches(&schema, &batches)?; let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?; - let sorted_batches_concat = - concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?; + let sorted_batches_concat = concat_batches(&schema, &sorted_batches)?; assert_eq!(sorted_input_batch, sorted_batches_concat); }