Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_functions_aggregate::array_agg::array_agg_udaf;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::{Column, col};
use datafusion_physical_expr_common::metrics::MetricsSet;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use datafusion_physical_plan::metrics::MetricValue;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::StreamExt;

Expand All @@ -69,7 +71,7 @@ async fn test_sort_with_limited_memory() -> Result<()> {

// Basic test with a lot of groups that cannot all fit in memory and 1 record batch
// from each spill file is too much memory
let spill_count = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs {
let metrics = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs {
pool_size,
task_ctx: Arc::new(task_ctx),
number_of_record_batches: 100,
Expand All @@ -78,7 +80,8 @@ async fn test_sort_with_limited_memory() -> Result<()> {
})
.await?;

let total_spill_files_size = spill_count * record_batch_size;
let total_spill_files_size =
metrics.spill_count().unwrap_or_default() * record_batch_size;
assert!(
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
Expand Down Expand Up @@ -252,7 +255,7 @@ enum MemoryBehavior {

async fn run_sort_test_with_limited_memory(
mut args: RunTestWithLimitedMemoryArgs,
) -> Result<usize> {
) -> Result<MetricsSet> {
let get_size_of_record_batch_to_generate = std::mem::replace(
&mut args.get_size_of_record_batch_to_generate,
Box::pin(move |_| unreachable!("should not be called after take")),
Expand Down Expand Up @@ -312,7 +315,17 @@ async fn run_sort_test_with_limited_memory(

let result = sort_exec.execute(0, Arc::clone(&args.task_ctx))?;

run_test(args, sort_exec, result).await
let number_of_record_batches = args.number_of_record_batches;

let metrics = run_test(args, sort_exec, result).await?;

assert_baseline_metrics_for_non_empty_output(
&metrics,
number_of_record_batches * record_batch_size as usize,
record_batch_size as usize,
);

Ok(metrics)
}

fn grow_memory_as_much_as_possible(
Expand Down Expand Up @@ -346,7 +359,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<()

// Basic test with a lot of groups that cannot all fit in memory and 1 record batch
// from each spill file is too much memory
let spill_count =
let metrics =
run_test_aggregate_with_high_cardinality(RunTestWithLimitedMemoryArgs {
pool_size,
task_ctx: Arc::new(task_ctx),
Expand All @@ -356,7 +369,8 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<()
})
.await?;

let total_spill_files_size = spill_count * record_batch_size;
let total_spill_files_size =
metrics.spill_count().unwrap_or_default() * record_batch_size;
assert!(
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
Expand Down Expand Up @@ -498,7 +512,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_large_reco

async fn run_test_aggregate_with_high_cardinality(
mut args: RunTestWithLimitedMemoryArgs,
) -> Result<usize> {
) -> Result<MetricsSet> {
let get_size_of_record_batch_to_generate = std::mem::replace(
&mut args.get_size_of_record_batch_to_generate,
Box::pin(move |_| unreachable!("should not be called after take")),
Expand Down Expand Up @@ -587,20 +601,21 @@ async fn run_test(
args: RunTestWithLimitedMemoryArgs,
plan: Arc<dyn ExecutionPlan>,
result_stream: SendableRecordBatchStream,
) -> Result<usize> {
) -> Result<MetricsSet> {
let number_of_record_batches = args.number_of_record_batches;

consume_stream_and_simulate_other_running_memory_consumers(args, result_stream)
.await?;

let metrics = plan.metrics().expect("must have metrics");
let spill_count = assert_spill_count_metric(true, plan);

assert!(
spill_count > 0,
"Expected spill, but did not, number of record batches: {number_of_record_batches}",
);

Ok(spill_count)
Ok(metrics)
}

/// Consume the stream and change the amount of memory used while consuming it based on the [`MemoryBehavior`] provided
Expand Down Expand Up @@ -656,3 +671,44 @@ async fn consume_stream_and_simulate_other_running_memory_consumers(

Ok(())
}

fn assert_baseline_metrics_for_non_empty_output(
metrics: &MetricsSet,
expected_output_rows: usize,
output_batch_size: usize,
) {
let end_time = metrics
.iter()
.find_map(|item| match item.value() {
MetricValue::EndTimestamp(end) => Some(end),
_ => None,
})
.expect("Must have end time metric since it exists in the baseline");

assert_ne!(end_time.value(), None);

assert_eq!(metrics.output_rows(), Some(expected_output_rows));

let output_bytes = metrics
.iter()
.find_map(|item| match item.value() {
MetricValue::OutputBytes(total) => Some(total),
_ => None,
})
.expect("Must have output_bytes metric since it exists in the baseline");

assert_ne!(output_bytes.value(), 0_usize);

let output_batches = metrics
.iter()
.find_map(|item| match item.value() {
MetricValue::OutputBatches(total) => Some(total),
_ => None,
})
.expect("Must have output_batches metric since it exists in the baseline");

assert_eq!(
output_batches.value(),
expected_output_rows.div_ceil(output_batch_size)
);
}
35 changes: 28 additions & 7 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_execution::memory_pool::MemoryReservation;
use crate::sorts::builder::try_grow_reservation_to_at_least;
use crate::sorts::sort::get_reserved_bytes_for_record_batch_size;
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::stream::RecordBatchStreamAdapter;
use crate::stream::{ObservedStream, RecordBatchStreamAdapter};
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use futures::TryStreamExt;
Expand Down Expand Up @@ -225,25 +225,46 @@ impl MultiLevelMergeBuilder {
) -> Result<SendableRecordBatchStream> {
match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
// No data so empty batch
(0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
&self.schema,
)))),
(0, 0) => {
let empty_stream =
Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema)));
Ok(Box::pin(ObservedStream::new(
empty_stream,
self.metrics.clone(),
None,
)))
}

// Only in-memory stream, return that
(0, 1) => Ok(self.sorted_streams.remove(0)),
(0, 1) => {
let output_stream = self.sorted_streams.remove(0);
Ok(Box::pin(ObservedStream::new(
output_stream,
self.metrics.clone(),
None,
)))
}

// Only single sorted spill file so return it
(1, 0) => {
let spill_file = self.sorted_spill_files.remove(0);

// Not reserving any memory for this disk as we are not holding it in memory
self.spill_manager
.read_spill_as_stream(spill_file.file, None)
let output_stream = self
.spill_manager
.read_spill_as_stream(spill_file.file, None)?;

Ok(Box::pin(ObservedStream::new(
output_stream,
self.metrics.clone(),
None,
)))
}

// Only in memory streams, so merge them all in a single pass
(0, _) => {
let sorted_stream = mem::take(&mut self.sorted_streams);
// No need to wrap with observed stream since merge sort will update the observed metrics
self.create_new_merge_sort(
sorted_stream,
// If we have no sorted spill files left, this is the last run
Expand Down
Loading