From d4701d54517503e087db95497fd95d8e83fec033 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Tue, 16 Jun 2026 21:49:45 +0200 Subject: [PATCH] refactor(physical-plan): externalize statistics traversal into StatisticsContext Decouple operator statistics propagation from tree traversal and caching, per #22958. - Add `StatisticsContext`, which owns the bottom-up walk and per-walk memoization cache. `compute(plan, args)` resolves each child before invoking the operator. - Add `ExecutionPlan::statistics_from_inputs(&self, input_stats, args)`: a stateless per-operator method that computes a node's statistics from pre-resolved child statistics. Default delegates to the deprecated `partition_statistics`. - Add `ExecutionPlan::child_stats_requests(&self, partition) -> Vec` letting a node declare, per child, which partition to request (`ChildStats::At`) or to skip entirely (`ChildStats::Skip`). Union skips non-owning inputs on partition-specific requests. Operators no longer call back into the framework to fetch child statistics; caching is shared across a single walk via the context. --- .../examples/relation_planner/table_sample.rs | 10 +- .../core/src/datasource/file_format/csv.rs | 9 +- .../core/src/datasource/file_format/json.rs | 9 +- .../src/datasource/file_format/parquet.rs | 16 +- .../core/src/datasource/listing/table.rs | 33 ++-- .../core/tests/custom_sources_cases/mod.rs | 6 +- .../tests/custom_sources_cases/statistics.rs | 23 ++- .../core/tests/parquet/file_statistics.rs | 33 ++-- .../physical_optimizer/join_selection.rs | 77 ++++---- .../partition_statistics.rs | 177 ++++++++++++------ .../tests/physical_optimizer/test_utils.rs | 6 +- datafusion/core/tests/sql/path_partition.rs | 10 +- .../datasource/src/file_scan_config/mod.rs | 9 +- datafusion/datasource/src/memory.rs | 4 +- datafusion/datasource/src/source.rs | 6 +- datafusion/ffi/src/execution_plan.rs | 30 ++- .../src/aggregate_statistics.rs | 7 +- .../enforce_distribution.rs | 6 +- .../physical-optimizer/src/join_selection.rs | 4 +- .../physical-optimizer/src/limit_pushdown.rs | 6 +- .../src/output_requirements.rs | 11 +- .../benches/compute_statistics.rs | 51 +++-- .../physical-plan/src/aggregates/mod.rs | 50 +++-- datafusion/physical-plan/src/buffer.rs | 8 +- .../physical-plan/src/coalesce_batches.rs | 10 +- .../physical-plan/src/coalesce_partitions.rs | 15 +- datafusion/physical-plan/src/coop.rs | 8 +- datafusion/physical-plan/src/display.rs | 16 +- datafusion/physical-plan/src/empty.rs | 6 +- .../physical-plan/src/execution_plan.rs | 53 ++++-- datafusion/physical-plan/src/filter.rs | 117 ++++++++---- .../physical-plan/src/joins/cross_join.rs | 23 ++- .../physical-plan/src/joins/hash_join/exec.rs | 79 +++----- .../src/joins/nested_loop_join.rs | 28 +-- .../src/joins/sort_merge_join/exec.rs | 14 +- .../src/joins/sort_merge_join/tests.rs | 9 +- datafusion/physical-plan/src/lib.rs | 2 +- datafusion/physical-plan/src/limit.rs | 34 ++-- .../src/operator_statistics/mod.rs | 16 +- .../physical-plan/src/placeholder_row.rs | 6 +- datafusion/physical-plan/src/projection.rs | 16 +- .../physical-plan/src/repartition/mod.rs | 33 ++-- .../physical-plan/src/scalar_subquery.rs | 8 +- .../physical-plan/src/sorts/partial_sort.rs | 8 +- datafusion/physical-plan/src/sorts/sort.rs | 19 +- .../src/sorts/sort_preserving_merge.rs | 14 +- datafusion/physical-plan/src/statistics.rs | 171 ++++++++++------- datafusion/physical-plan/src/test.rs | 6 +- datafusion/physical-plan/src/test/exec.rs | 18 +- datafusion/physical-plan/src/union.rs | 94 ++++++---- .../src/windows/bounded_window_agg_exec.rs | 10 +- .../src/windows/window_agg_exec.rs | 10 +- datafusion/physical-plan/src/work_table.rs | 6 +- .../library-user-guide/upgrading/55.0.0.md | 71 +++---- 54 files changed, 937 insertions(+), 584 deletions(-) diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 6df1113e477e3..bf14ddd976883 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -722,10 +722,12 @@ impl ExecutionPlan for SampleExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let mut stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let mut stats = input_stats[0].as_ref().clone(); let ratio = self.upper_bound - self.lower_bound; // Scale statistics by sampling ratio (inexact due to randomness) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 9392d6daecde9..3ee9ebc1d71ee 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -45,7 +45,7 @@ mod tests { use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::write::BatchSerializer; use datafusion_expr::{col, lit}; - use datafusion_physical_plan::statistics::StatisticsArgs; + use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::{ExecutionPlan, collect}; use arrow::array::{ @@ -217,11 +217,14 @@ mod tests { // test metadata assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())?.num_rows, + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? + .num_rows, Precision::Absent ); assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent ); diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 5dd3817829478..7d7761c74cf21 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -36,7 +36,7 @@ mod tests { BatchDeserializer, DecoderDeserializer, DeserializerOutput, }; use datafusion_datasource::file_format::FileFormat; - use datafusion_physical_plan::statistics::StatisticsArgs; + use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::{ExecutionPlan, collect}; use arrow::compute::concat_batches; @@ -119,11 +119,14 @@ mod tests { // test metadata assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())?.num_rows, + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? + .num_rows, Precision::Absent ); assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent ); diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 5f7fc2eebf300..0f5db4a057d76 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -141,7 +141,7 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::dml::InsertOp; - use datafusion_physical_plan::statistics::StatisticsArgs; + use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{ExecutionPlan, collect}; @@ -716,12 +716,15 @@ mod tests { // test metadata assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())?.num_rows, + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? + .num_rows, Precision::Exact(8) ); // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent, ); @@ -766,11 +769,14 @@ mod tests { // note: even if the limit is set, the executor rounds up to the batch size assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())?.num_rows, + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? + .num_rows, Precision::Exact(8) ); assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent, ); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 50b3855a0ab7c..3f10a5fb5676b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -142,7 +142,7 @@ mod tests { use datafusion_physical_expr::expressions::binary; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; - use datafusion_physical_plan::statistics::StatisticsArgs; + use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::{ExecutionPlanProperties, collect}; use std::collections::HashMap; use std::io::Write; @@ -246,11 +246,14 @@ mod tests { // test metadata assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())?.num_rows, + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? + .num_rows, Precision::Exact(8) ); assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent, ); @@ -1362,16 +1365,16 @@ mod tests { let exec_default = table_default.scan(&state, None, &[], None).await?; assert_eq!( - exec_default - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec_default.as_ref(), &StatisticsArgs::new())? .num_rows, Precision::Exact(8) ); // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!( - exec_default - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec_default.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent ); @@ -1388,14 +1391,14 @@ mod tests { let exec_disabled = table_disabled.scan(&state, None, &[], None).await?; assert_eq!( - exec_disabled - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec_disabled.as_ref(), &StatisticsArgs::new())? .num_rows, Precision::Absent ); assert_eq!( - exec_disabled - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec_disabled.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent ); @@ -1412,15 +1415,15 @@ mod tests { let exec_enabled = table_enabled.scan(&state, None, &[], None).await?; assert_eq!( - exec_enabled - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec_enabled.as_ref(), &StatisticsArgs::new())? .num_rows, Precision::Exact(8) ); // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!( - exec_enabled - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(exec_enabled.as_ref(), &StatisticsArgs::new())? .total_byte_size, Precision::Absent, ); diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 0b0df57e5a917..c70722cb2f2ff 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -179,7 +179,11 @@ impl ExecutionPlan for CustomExecutionPlan { Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 })) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if args.partition().is_some() { return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 1ea2b202b1f9d..d289b5c348b3c 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -35,8 +35,8 @@ use datafusion::{ use datafusion_catalog::Session; use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_plan::StatisticsArgs; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::{StatisticsArgs, StatisticsContext}; use async_trait::async_trait; @@ -174,7 +174,11 @@ impl ExecutionPlan for StatisticsValidation { unimplemented!("This plan only serves for testing statistics") } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if args.partition().is_some() { Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { @@ -233,7 +237,8 @@ async fn sql_basic() -> Result<()> { // the statistics should be those of the source assert_eq!( stats, - *physical_plan.statistics_with_args(&StatisticsArgs::new())? + *StatisticsContext::new() + .compute(physical_plan.as_ref(), &StatisticsArgs::new())? ); Ok(()) @@ -250,7 +255,8 @@ async fn sql_filter() -> Result<()> { .unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); - let stats = physical_plan.statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new() + .compute(physical_plan.as_ref(), &StatisticsArgs::new())?; assert_eq!(stats.num_rows, Precision::Inexact(7)); Ok(()) @@ -265,7 +271,8 @@ async fn sql_limit() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is smaller than the original number of lines we mark the statistics as inexact // and cap NDV at the new row count - let limit_stats = physical_plan.statistics_with_args(&StatisticsArgs::new())?; + let limit_stats = StatisticsContext::new() + .compute(physical_plan.as_ref(), &StatisticsArgs::new())?; assert_eq!(limit_stats.num_rows, Precision::Exact(5)); // c1: NDV=2 stays at 2 (already below limit of 5) assert_eq!( @@ -286,7 +293,8 @@ async fn sql_limit() -> Result<()> { // when the limit is larger than the original number of lines, statistics remain unchanged assert_eq!( stats, - *physical_plan.statistics_with_args(&StatisticsArgs::new())? + *StatisticsContext::new() + .compute(physical_plan.as_ref(), &StatisticsArgs::new())? ); Ok(()) @@ -304,7 +312,8 @@ async fn sql_window() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); - let result = physical_plan.statistics_with_args(&StatisticsArgs::new())?; + let result = StatisticsContext::new() + .compute(physical_plan.as_ref(), &StatisticsArgs::new())?; assert_eq!(stats.num_rows, result.num_rows); let col_stats = &result.column_statistics; diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 45c0b66a6c5e4..11f0abb879e00 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -44,7 +44,7 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::filter::FilterExec; -use datafusion_physical_plan::statistics::StatisticsArgs; +use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use tempfile::tempdir; #[tokio::test] @@ -64,7 +64,8 @@ async fn check_stats_precision_with_filter_pushdown() { // Scan without filter, stats are exact let exec = table.scan(&state, None, &[], None).await.unwrap(); assert_eq!( - exec.statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(exec.as_ref(), &StatisticsArgs::new()) .unwrap() .num_rows, Precision::Exact(8), @@ -98,8 +99,8 @@ async fn check_stats_precision_with_filter_pushdown() { ); // Scan with filter pushdown, stats are inexact assert_eq!( - optimized_exec - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(optimized_exec.as_ref(), &StatisticsArgs::new()) .unwrap() .num_rows, Precision::Inexact(8), @@ -132,15 +133,15 @@ async fn load_table_stats_with_session_level_cache() { let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); assert_eq!( - exec1 - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(exec1.as_ref(), &StatisticsArgs::new()) .unwrap() .num_rows, Precision::Exact(8) ); assert_eq!( - exec1 - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(exec1.as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, // Byte size is absent because we cannot estimate the output size @@ -154,15 +155,15 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(get_static_cache_size(&state2), 0); let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); assert_eq!( - exec2 - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(exec2.as_ref(), &StatisticsArgs::new()) .unwrap() .num_rows, Precision::Exact(8) ); assert_eq!( - exec2 - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(exec2.as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, // Absent because the data contains variable length columns @@ -175,15 +176,15 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(get_static_cache_size(&state1), 1); let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); assert_eq!( - exec3 - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(exec3.as_ref(), &StatisticsArgs::new()) .unwrap() .num_rows, Precision::Exact(8) ); assert_eq!( - exec3 - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(exec3.as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, // Absent because the data contains variable length columns diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 80e0a3f23e736..2db9f18f31f7f 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -45,6 +45,7 @@ use datafusion_physical_plan::joins::{HashJoinExec, NestedLoopJoinExec, Partitio use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsArgs, + StatisticsContext, execution_plan::{Boundedness, EmissionType}, }; @@ -248,17 +249,15 @@ async fn test_join_with_swap() { .expect("The type of the plan should not be changed"); assert_eq!( - swapped_join - .left() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.left().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join - .right() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.right().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(2097152) @@ -296,17 +295,15 @@ async fn test_left_join_no_swap() { .expect("The type of the plan should not be changed"); assert_eq!( - swapped_join - .left() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.left().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join - .right() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.right().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(2097152) @@ -347,17 +344,15 @@ async fn test_join_with_swap_semi() { assert_eq!(swapped_join.schema().fields().len(), 1); assert_eq!( - swapped_join - .left() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.left().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join - .right() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.right().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(2097152) @@ -400,17 +395,15 @@ async fn test_join_with_swap_mark() { assert_eq!(swapped_join.schema().fields().len(), 2); assert_eq!( - swapped_join - .left() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.left().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join - .right() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.right().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(2097152) @@ -528,17 +521,15 @@ async fn test_join_no_swap() { .expect("The type of the plan should not be changed"); assert_eq!( - swapped_join - .left() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.left().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join - .right() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.right().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(2097152) @@ -603,17 +594,15 @@ async fn test_nl_join_with_swap(join_type: JoinType) { ); assert_eq!( - swapped_join - .left() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.left().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join - .right() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.right().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(2097152) @@ -676,17 +665,15 @@ async fn test_nl_join_with_swap_no_proj(join_type: JoinType) { ); assert_eq!( - swapped_join - .left() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.left().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join - .right() - .statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new() + .compute(swapped_join.right().as_ref(), &StatisticsArgs::new()) .unwrap() .total_byte_size, Precision::Inexact(2097152) @@ -1152,7 +1139,11 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { Ok(Arc::new(if args.partition().is_some() { Statistics::new_unknown(&self.schema) } else { diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 6a79c668bd52e..6cabcdb710393 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -55,7 +55,7 @@ mod test { use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; - use datafusion_physical_plan::statistics::StatisticsArgs; + use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::union::{InterleaveExec, UnionExec}; use datafusion_physical_plan::windows::{WindowAggExec, create_window_expr}; use datafusion_physical_plan::{ @@ -240,7 +240,8 @@ mod test { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let statistics = (0..scan.output_partitioning().partition_count()) .map(|idx| { - scan.statistics_with_args( + StatisticsContext::new().compute( + scan.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -288,7 +289,8 @@ mod test { Arc::new(ProjectionExec::try_new(exprs, scan)?); let statistics = (0..projection.output_partitioning().partition_count()) .map(|idx| { - projection.statistics_with_args( + StatisticsContext::new().compute( + projection.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -324,7 +326,8 @@ mod test { let sort_exec: Arc = Arc::new(sort); let statistics = (0..sort_exec.output_partitioning().partition_count()) .map(|idx| { - sort_exec.statistics_with_args( + StatisticsContext::new().compute( + sort_exec.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -367,7 +370,8 @@ mod test { ); let statistics = (0..sort_exec.output_partitioning().partition_count()) .map(|idx| { - sort_exec.statistics_with_args( + StatisticsContext::new().compute( + sort_exec.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -397,7 +401,8 @@ mod test { )?; let filter: Arc = Arc::new(FilterExec::try_new(predicate, scan)?); - let full_statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let full_statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; let expected_full_statistic = Statistics { num_rows: Precision::Inexact(0), total_byte_size: Precision::Inexact(0), @@ -424,7 +429,8 @@ mod test { let statistics = (0..filter.output_partitioning().partition_count()) .map(|idx| { - filter.statistics_with_args( + StatisticsContext::new().compute( + filter.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -464,7 +470,8 @@ mod test { UnionExec::try_new(vec![scan.clone(), scan])?; let statistics = (0..union_exec.output_partitioning().partition_count()) .map(|idx| { - union_exec.statistics_with_args( + StatisticsContext::new().compute( + union_exec.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -531,7 +538,8 @@ mod test { // Verify the result of partition statistics let stats = (0..interleave.output_partitioning().partition_count()) .map(|idx| { - interleave.statistics_with_args( + StatisticsContext::new().compute( + interleave.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -581,7 +589,8 @@ mod test { Arc::new(CrossJoinExec::new(left_scan, right_scan)); let statistics = (0..cross_join.output_partitioning().partition_count()) .map(|idx| { - cross_join.statistics_with_args( + StatisticsContext::new().compute( + cross_join.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -691,8 +700,8 @@ mod test { // Test partition_statistics(None) - returns overall statistics // For RightSemi join, output columns come from right side only - let full_statistics = - nested_loop_join.statistics_with_args(&StatisticsArgs::new())?; + let full_statistics = StatisticsContext::new() + .compute(nested_loop_join.as_ref(), &StatisticsArgs::new())?; // With empty join columns, estimate_join_statistics returns Inexact row count // based on the outer side (right side for RightSemi) let expected_full_statistics = create_partition_statistics( @@ -728,7 +737,8 @@ mod test { let statistics = (0..nested_loop_join.output_partitioning().partition_count()) .map(|idx| { - nested_loop_join.statistics_with_args( + StatisticsContext::new().compute( + nested_loop_join.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -762,7 +772,8 @@ mod test { ); let statistics = (0..coalesce_partitions.output_partitioning().partition_count()) .map(|idx| { - coalesce_partitions.statistics_with_args( + StatisticsContext::new().compute( + coalesce_partitions.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -783,7 +794,8 @@ mod test { Arc::new(LocalLimitExec::new(scan.clone(), 1)); let statistics = (0..local_limit.output_partitioning().partition_count()) .map(|idx| { - local_limit.statistics_with_args( + StatisticsContext::new().compute( + local_limit.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -814,7 +826,8 @@ mod test { Arc::new(GlobalLimitExec::new(scan.clone(), 0, Some(2))); let statistics = (0..global_limit.output_partitioning().partition_count()) .map(|idx| { - global_limit.statistics_with_args( + StatisticsContext::new().compute( + global_limit.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -876,8 +889,10 @@ mod test { @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]" ); - let p0_statistics = aggregate_exec_partial - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))?; + let p0_statistics = StatisticsContext::new().compute( + aggregate_exec_partial.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + )?; // Aggregate doesn't propagate num_rows and ColumnStatistics byte_size from input let expected_p0_statistics = Statistics { @@ -916,8 +931,10 @@ mod test { ], }; - let p1_statistics = aggregate_exec_partial - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(1)))?; + let p1_statistics = StatisticsContext::new().compute( + aggregate_exec_partial.as_ref(), + &StatisticsArgs::new().with_partition(Some(1)), + )?; assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( @@ -939,12 +956,16 @@ mod test { aggregate_exec_partial.schema(), )?); - let p0_statistics = agg_final - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))?; + let p0_statistics = StatisticsContext::new().compute( + agg_final.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + )?; assert_eq!(*p0_statistics, expected_p0_statistics); - let p1_statistics = agg_final - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(1)))?; + let p1_statistics = StatisticsContext::new().compute( + agg_final.as_ref(), + &StatisticsArgs::new().with_partition(Some(1)), + )?; assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( @@ -991,13 +1012,17 @@ mod test { assert_eq!( empty_stat, - *agg_partial - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))? + *StatisticsContext::new().compute( + agg_partial.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)) + )? ); assert_eq!( empty_stat, - *agg_partial - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(1)))? + *StatisticsContext::new().compute( + agg_partial.as_ref(), + &StatisticsArgs::new().with_partition(Some(1)) + )? ); validate_statistics_with_data( agg_partial.clone(), @@ -1026,13 +1051,17 @@ mod test { assert_eq!( empty_stat, - *agg_final - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))? + *StatisticsContext::new().compute( + agg_final.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)) + )? ); assert_eq!( empty_stat, - *agg_final - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(1)))? + *StatisticsContext::new().compute( + agg_final.as_ref(), + &StatisticsArgs::new().with_partition(Some(1)) + )? ); validate_statistics_with_data( @@ -1059,13 +1088,17 @@ mod test { }; assert_eq!( expect_partial_stat, - *agg_partial - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))? + *StatisticsContext::new().compute( + agg_partial.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)) + )? ); assert_eq!( expect_partial_stat, - *agg_partial - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(1)))? + *StatisticsContext::new().compute( + agg_partial.as_ref(), + &StatisticsArgs::new().with_partition(Some(1)) + )? ); let expect_partial_overall_stat = Statistics { @@ -1075,7 +1108,8 @@ mod test { }; assert_eq!( expect_partial_overall_stat, - *agg_partial.statistics_with_args(&StatisticsArgs::new())? + *StatisticsContext::new() + .compute(agg_partial.as_ref(), &StatisticsArgs::new())? ); // Verify that the partial aggregate emits one accumulator-state row per @@ -1110,8 +1144,10 @@ mod test { assert_eq!( expect_stat, - *agg_final - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))? + *StatisticsContext::new().compute( + agg_final.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)) + )? ); // Verify that the aggregate final result has exactly one partition with one row @@ -1140,8 +1176,10 @@ mod test { let mut all_batches = vec![]; for (i, partition_stream) in partitions.into_iter().enumerate() { let batches: Vec = partition_stream.try_collect().await?; - let actual = plan - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(i)))?; + let actual = StatisticsContext::new().compute( + plan.as_ref(), + &StatisticsArgs::new().with_partition(Some(i)), + )?; let expected = compute_record_batch_statistics( std::slice::from_ref(&batches), &schema, @@ -1151,7 +1189,8 @@ mod test { all_batches.push(batches); } - let actual = plan.statistics_with_args(&StatisticsArgs::new())?; + let actual = + StatisticsContext::new().compute(plan.as_ref(), &StatisticsArgs::new())?; let expected = compute_record_batch_statistics(&all_batches, &schema, None); assert_eq!(*actual, expected); @@ -1169,7 +1208,8 @@ mod test { let statistics = (0..repartition.partitioning().partition_count()) .map(|idx| { - repartition.statistics_with_args( + StatisticsContext::new().compute( + repartition.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -1223,14 +1263,16 @@ mod test { Partitioning::RoundRobinBatch(2), )?); - let result = repartition - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(2))); + let result = StatisticsContext::new().compute( + repartition.as_ref(), + &StatisticsArgs::new().with_partition(Some(2)), + ); assert!(result.is_err()); let error = result.unwrap_err(); assert!( error .to_string() - .contains("RepartitionExec invalid partition 2 (expected less than 2)") + .contains("Invalid partition index: 2, the partition count is 2") ); let partitions = execute_stream_partitioned( @@ -1253,9 +1295,19 @@ mod test { Partitioning::RoundRobinBatch(0), )?); - let result = repartition - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))?; - assert_eq!(*result, Statistics::new_unknown(&scan_schema)); + // Requesting a specific partition of a zero-partition plan is out of + // range, so the context rejects it. + let result = StatisticsContext::new().compute( + repartition.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + ); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Invalid partition index: 0, the partition count is 0") + ); // Verify that the result has exactly 0 partitions let partitions = execute_stream_partitioned( @@ -1282,7 +1334,8 @@ mod test { // Verify the result of partition statistics of repartition let stats = (0..repartition.partitioning().partition_count()) .map(|idx| { - repartition.statistics_with_args( + StatisticsContext::new().compute( + repartition.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -1344,7 +1397,8 @@ mod test { // Verify partition statistics are properly propagated (not unknown) let statistics = (0..window_agg.output_partitioning().partition_count()) .map(|idx| { - window_agg.statistics_with_args( + StatisticsContext::new().compute( + window_agg.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -1433,8 +1487,10 @@ mod test { // Try to test with single partition let empty_single = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let stats = empty_single - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))?; + let stats = StatisticsContext::new().compute( + empty_single.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + )?; assert_eq!(stats.num_rows, Precision::Exact(0)); assert_eq!(stats.total_byte_size, Precision::Exact(0)); assert_eq!(stats.column_statistics.len(), 2); @@ -1449,7 +1505,8 @@ mod test { assert_eq!(col_stat.byte_size, Precision::Exact(0)); } - let overall_stats = empty_single.statistics_with_args(&StatisticsArgs::new())?; + let overall_stats = StatisticsContext::new() + .compute(empty_single.as_ref(), &StatisticsArgs::new())?; assert_eq!(stats, overall_stats); validate_statistics_with_data(empty_single, vec![ExpectedStatistics::Empty], 0) @@ -1461,7 +1518,8 @@ mod test { let statistics = (0..empty_multi.output_partitioning().partition_count()) .map(|idx| { - empty_multi.statistics_with_args( + StatisticsContext::new().compute( + empty_multi.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -1525,7 +1583,8 @@ mod test { // Test partition statistics for CollectLeft mode let statistics = (0..collect_left_join.output_partitioning().partition_count()) .map(|idx| { - collect_left_join.statistics_with_args( + StatisticsContext::new().compute( + collect_left_join.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -1605,7 +1664,8 @@ mod test { // Test partition statistics for Partitioned mode let statistics = (0..partitioned_join.output_partitioning().partition_count()) .map(|idx| { - partitioned_join.statistics_with_args( + StatisticsContext::new().compute( + partitioned_join.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) @@ -1683,7 +1743,8 @@ mod test { // Test partition statistics for Auto mode let statistics = (0..auto_join.output_partitioning().partition_count()) .map(|idx| { - auto_join.statistics_with_args( + StatisticsContext::new().compute( + auto_join.as_ref(), &StatisticsArgs::new().with_partition(Some(idx)), ) }) diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index d71f9be5a2da5..c7ff04413e9bd 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -967,7 +967,11 @@ impl ExecutionPlan for TestScan { internal_err!("TestScan is for testing optimizer only, not for execution") } - fn statistics_with_args(&self, _args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { Ok(Arc::new(Statistics::new_unknown(&self.schema))) } diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index de6349d1295c5..82a15eb401fc4 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -38,7 +38,7 @@ use datafusion_common::ScalarValue; use datafusion_common::stats::Precision; use datafusion_common::test_util::batches_to_sort_string; use datafusion_execution::config::SessionConfig; -use datafusion_physical_plan::statistics::StatisticsArgs; +use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use async_trait::async_trait; use bytes::Bytes; @@ -462,8 +462,8 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 4); - let stat_cols = physical_plan - .statistics_with_args(&StatisticsArgs::new())? + let stat_cols = StatisticsContext::new() + .compute(physical_plan.as_ref(), &StatisticsArgs::new())? .column_statistics .clone(); assert_eq!(stat_cols.len(), 4); @@ -489,8 +489,8 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 2); - let stat_cols = physical_plan - .statistics_with_args(&StatisticsArgs::new())? + let stat_cols = StatisticsContext::new() + .compute(physical_plan.as_ref(), &StatisticsArgs::new())? .column_statistics .clone(); assert_eq!(stat_cols.len(), 2); diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index b1ba0584c96a0..83f16697c91c1 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -2276,7 +2276,7 @@ mod tests { // of just the projected ones. use crate::source::DataSourceExec; - use datafusion_physical_plan::statistics::StatisticsArgs; + use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; // Create a schema with 4 columns let schema = Arc::new(Schema::new(vec![ @@ -2330,8 +2330,11 @@ mod tests { let exec = DataSourceExec::from_data_source(config); // Get statistics for partition 0 - let partition_stats = exec - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0))) + let partition_stats = StatisticsContext::new() + .compute( + exec.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + ) .unwrap(); // Verify that only 2 columns are in the statistics (the projected ones) diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index a4e30d7f0bd82..255dd76cbd6b4 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -853,7 +853,7 @@ mod tests { use datafusion_common::stats::{ColumnStatistics, Precision}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::lit; - use datafusion_physical_plan::statistics::StatisticsArgs; + use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::ExecutionPlan; @@ -986,7 +986,7 @@ mod tests { let values = MemorySourceConfig::try_new_as_values(schema, data)?; assert_eq!( - *values.statistics_with_args(&StatisticsArgs::new())?, + *StatisticsContext::new().compute(values.as_ref(), &StatisticsArgs::new())?, Statistics { num_rows: Precision::Exact(rows), total_byte_size: Precision::Exact(8), // not important diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index b7e920f53ff12..ce370bb59fe45 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -427,7 +427,11 @@ impl ExecutionPlan for DataSourceExec { Some(metrics) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { self.data_source.partition_statistics(args.partition()) } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 738f87fd610e1..087a351b697cc 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -25,6 +25,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr_common::metrics::MetricsSet; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsArgs, + StatisticsContext, }; use stabby::string::String as SString; use stabby::vec::Vec as SVec; @@ -209,8 +210,11 @@ unsafe extern "C" fn partition_statistics_fn_wrapper( partition: FFI_Option, ) -> FFI_Result> { let partition: Option = partition.into(); - plan.inner() - .statistics_with_args(&StatisticsArgs::new().with_partition(partition)) + StatisticsContext::new() + .compute( + plan.inner().as_ref(), + &StatisticsArgs::new().with_partition(partition), + ) .map(|stats| SVec::from(serialize_statistics(stats.as_ref()).as_slice())) .into() } @@ -556,8 +560,9 @@ pub mod tests { self.metrics.clone() } - fn statistics_with_args( + fn statistics_from_inputs( &self, + _input_stats: &[Arc], _args: &StatisticsArgs, ) -> Result> { Ok(Arc::new(self.statistics.clone().unwrap_or_else(|| { @@ -745,15 +750,17 @@ pub mod tests { /// Same round trip as /// [`test_ffi_execution_plan_partition_statistics_round_trip`], but queried - /// through the **new** `statistics_with_args` entry point. + /// through the **new** `StatisticsContext::compute` entry point. #[test] - fn test_ffi_execution_plan_statistics_with_args_round_trip() -> Result<()> { + fn test_ffi_execution_plan_statistics_context_round_trip() -> Result<()> { let (schema, original_stats) = stats_round_trip_fixture(); // A plan without explicit statistics reports new_unknown. let bare = export_empty_exec_over_ffi(&schema, None)?; assert_eq!( - bare.statistics_with_args(&StatisticsArgs::new())?.as_ref(), + StatisticsContext::new() + .compute(bare.as_ref(), &StatisticsArgs::new())? + .as_ref(), &Statistics::new_unknown(&schema) ); @@ -761,14 +768,17 @@ pub mod tests { let with_stats = export_empty_exec_over_ffi(&schema, Some(original_stats.clone()))?; assert_eq!( - with_stats - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(with_stats.as_ref(), &StatisticsArgs::new())? .as_ref(), &original_stats ); assert_eq!( - with_stats - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(1)))? + StatisticsContext::new() + .compute( + with_stats.as_ref(), + &StatisticsArgs::new().with_partition(Some(1)), + )? .as_ref(), &original_stats ); diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index b83f4ed7305e4..43b1abb4b68a9 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -25,7 +25,7 @@ use datafusion_physical_plan::aggregates::{ }; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; -use datafusion_physical_plan::statistics::StatisticsArgs; +use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::udaf::{ AggregateFunctionExpr, StatisticsArgs as PlanStatisticsArgs, }; @@ -58,9 +58,8 @@ impl PhysicalOptimizerRule for AggregateStatistics { let partial_agg_exec = partial_agg_exec .downcast_ref::() .expect("take_optimizable() ensures that this is a AggregateExec"); - let stats = partial_agg_exec - .input() - .statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new() + .compute(partial_agg_exec.input().as_ref(), &StatisticsArgs::new())?; let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { let field = expr.field(); diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 6d9550fa50072..b5abf75696ba9 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -61,7 +61,7 @@ use datafusion_physical_plan::joins::{ use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -use datafusion_physical_plan::statistics::StatisticsArgs; +use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::tree_node::PlanContext; use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave}; use datafusion_physical_plan::windows::WindowAggExec; @@ -1016,8 +1016,8 @@ fn get_repartition_requirement_status( { // Decide whether adding a round robin is beneficial depending on // the statistical information we have on the number of rows: - let roundrobin_beneficial_stats = match child - .statistics_with_args(&StatisticsArgs::new())? + let roundrobin_beneficial_stats = match StatisticsContext::new() + .compute(child.as_ref(), &StatisticsArgs::new())? .num_rows { Precision::Exact(n_rows) => n_rows > batch_size, diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 82294825b60ea..42736f8205089 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -40,7 +40,7 @@ use datafusion_physical_plan::joins::{ StreamJoinPartitionMode, SymmetricHashJoinExec, }; use datafusion_physical_plan::operator_statistics::StatisticsRegistry; -use datafusion_physical_plan::statistics::StatisticsArgs; +use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use std::sync::Arc; @@ -66,7 +66,7 @@ fn get_stats( reg.compute(plan) .map(|s| Arc::::clone(s.base_arc())) } else { - plan.statistics_with_args(&StatisticsArgs::new()) + StatisticsContext::new().compute(plan, &StatisticsArgs::new()) } } diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 224084d576834..01a288f7f1632 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -76,7 +76,7 @@ use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -use datafusion_physical_plan::statistics::StatisticsArgs; +use datafusion_physical_plan::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; /// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from /// the parent to the child if applicable. @@ -352,8 +352,8 @@ fn limit_eliminable_exact_num_rows( } if matches!( - current - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(current.as_ref(), &StatisticsArgs::new())? .num_rows, Precision::Exact(0) ) { diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index fb91ae46a2a08..a69ca2565de02 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -32,7 +32,6 @@ use datafusion_common::{Result, Statistics}; use datafusion_execution::TaskContext; use datafusion_physical_expr::Distribution; use datafusion_physical_expr_common::sort_expr::OrderingRequirements; -use datafusion_physical_plan::StatisticsArgs; use datafusion_physical_plan::execution_plan::Boundedness; use datafusion_physical_plan::projection::{ ProjectionExec, make_with_child, update_expr, update_ordering_requirement, @@ -41,7 +40,7 @@ use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, - SendableRecordBatchStream, + SendableRecordBatchStream, StatisticsArgs, }; /// This rule either adds or removes [`OutputRequirements`]s to/from the physical @@ -243,8 +242,12 @@ impl ExecutionPlan for OutputRequirementExec { unreachable!(); } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - args.compute_child_statistics(&self.input, args.partition()) + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + Ok(Arc::clone(&input_stats[0])) } fn try_swapping_with_projection( diff --git a/datafusion/physical-plan/benches/compute_statistics.rs b/datafusion/physical-plan/benches/compute_statistics.rs index 04b5612563097..56a518c95292e 100644 --- a/datafusion/physical-plan/benches/compute_statistics.rs +++ b/datafusion/physical-plan/benches/compute_statistics.rs @@ -47,6 +47,7 @@ use datafusion_physical_plan::joins::CrossJoinExec; use datafusion_physical_plan::statistics::StatisticsArgs; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, Partitioning, SendableRecordBatchStream, + StatisticsContext, }; /// Minimal leaf node for benchmarking @@ -111,7 +112,11 @@ impl ExecutionPlan for BenchLeaf { unimplemented!() } - fn statistics_with_args(&self, _args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { Ok(Arc::new(Statistics::new_unknown(&self.schema))) } } @@ -175,10 +180,10 @@ fn build_mixed_chain(groups: usize) -> Arc { } /// Recursive walk without a shared cross-node cache, simulating pre-cache behavior. -/// Each operator's internal `compute_child_statistics` call triggers a fresh -/// subtree walk, resulting in O(n^2) total node visits for a chain of depth n. +/// Each node is computed with a fresh `StatisticsContext`, so every call triggers a +/// fresh subtree walk, resulting in O(n^2) total node visits for a chain of depth n. /// -/// Note: each `compute_child_statistics` re-walk still benefits from its own +/// Note: each `StatisticsContext::compute` re-walk still benefits from its own /// ephemeral cache; only the cross-node sharing is removed. fn compute_statistics_without_shared_cache( plan: &dyn ExecutionPlan, @@ -188,7 +193,7 @@ fn compute_statistics_without_shared_cache( compute_statistics_without_shared_cache(child.as_ref(), None)?; } let args = StatisticsArgs::new().with_partition(partition); - plan.statistics_with_args(&args) + StatisticsContext::new().compute(plan, &args) } fn bench_compute_statistics(c: &mut Criterion) { @@ -198,7 +203,11 @@ fn bench_compute_statistics(c: &mut Criterion) { for depth in [10, 20, 50] { let plan = build_coalesce_chain(depth); group.bench_with_input(BenchmarkId::new("cached", depth), &plan, |b, plan| { - b.iter(|| plan.statistics_with_args(&StatisticsArgs::new()).unwrap()); + b.iter(|| { + StatisticsContext::new() + .compute(plan.as_ref(), &StatisticsArgs::new()) + .unwrap() + }); }); group.bench_with_input( BenchmarkId::new("no_shared_cache", depth), @@ -215,7 +224,7 @@ fn bench_compute_statistics(c: &mut Criterion) { // --- Cross-join tree (balanced binary plan) --- // Binary trees arise from multi-way joins (e.g. physical_many_self_joins // in sql_planner.rs, see #19795). CrossJoinExec calls - // compute_child_statistics for per-partition stats, re-walking the left + // StatisticsContext::compute for per-partition stats, re-walking the left // subtree at each node. The gap between cached/uncached is smaller than // the linear chain because only the left child triggers a re-walk. let mut group = c.benchmark_group("compute_statistics_cross_join_tree"); @@ -225,7 +234,11 @@ fn bench_compute_statistics(c: &mut Criterion) { let label = format!("depth={depth}_leaves={}", 1usize << depth); group.bench_with_input(BenchmarkId::new("cached", &label), &plan, |b, plan| { b.iter(|| { - plan.statistics_with_args(&StatisticsArgs::new().with_partition(Some(0))) + StatisticsContext::new() + .compute( + plan.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + ) .unwrap() }); }); @@ -254,10 +267,12 @@ fn bench_compute_statistics(c: &mut Criterion) { &plan, |b, plan| { b.iter(|| { - plan.statistics_with_args( - &StatisticsArgs::new().with_partition(Some(0)), - ) - .unwrap() + StatisticsContext::new() + .compute( + plan.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + ) + .unwrap() }); }, ); @@ -265,7 +280,11 @@ fn bench_compute_statistics(c: &mut Criterion) { BenchmarkId::new("cached_overall", depth), &plan, |b, plan| { - b.iter(|| plan.statistics_with_args(&StatisticsArgs::new()).unwrap()); + b.iter(|| { + StatisticsContext::new() + .compute(plan.as_ref(), &StatisticsArgs::new()) + .unwrap() + }); }, ); group.bench_with_input( @@ -290,7 +309,11 @@ fn bench_compute_statistics(c: &mut Criterion) { let depth = groups * 3; // 2 filters + 1 coalesce per group group.bench_with_input(BenchmarkId::new("cached", depth), &plan, |b, plan| { b.iter(|| { - plan.statistics_with_args(&StatisticsArgs::new().with_partition(Some(0))) + StatisticsContext::new() + .compute( + plan.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + ) .unwrap() }); }); diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e1c598e02dfff..33bba5585cd9b 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1765,9 +1765,12 @@ impl ExecutionPlan for AggregateExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let child_statistics = - args.compute_child_statistics(&self.input, args.partition())?; + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { + let child_statistics = Arc::clone(&input_stats[0]); Ok(Arc::new( self.statistics_inner(&child_statistics, args.partition())?, )) @@ -2419,7 +2422,7 @@ mod tests { use crate::execution_plan::Boundedness; use crate::expressions::col; use crate::metrics::MetricValue; - use crate::statistics::StatisticsArgs; + use crate::statistics::{StatisticsArgs, StatisticsContext}; use crate::test::TestMemoryExec; use crate::test::assert_is_pending; use crate::test::exec::{ @@ -2805,7 +2808,8 @@ mod tests { )?); // Verify statistics are preserved proportionally through aggregation - let final_stats = merged_aggregate.statistics_with_args(&StatisticsArgs::new())?; + let final_stats = StatisticsContext::new() + .compute(merged_aggregate.as_ref(), &StatisticsArgs::new())?; assert!(final_stats.total_byte_size.get_value().is_some()); let task_ctx = if spill { @@ -2940,7 +2944,11 @@ mod tests { Ok(Box::pin(stream)) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if args.partition().is_some() { return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } @@ -4741,7 +4749,7 @@ mod tests { PhysicalGroupBy::default(), None, )?; - let stats = agg.statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new().compute(&agg, &StatisticsArgs::new())?; assert_eq!(stats.total_byte_size, Precision::Absent); let zero_row_stats = Statistics { @@ -4758,7 +4766,8 @@ mod tests { PhysicalGroupBy::default(), None, )?; - let stats_zero = agg_zero.statistics_with_args(&StatisticsArgs::new())?; + let stats_zero = + StatisticsContext::new().compute(&agg_zero, &StatisticsArgs::new())?; assert_eq!(stats_zero.total_byte_size, Precision::Absent); let single_input = @@ -4779,7 +4788,7 @@ mod tests { 1 ); let single_stats_zero = - single_agg_zero.statistics_with_args(&StatisticsArgs::new())?; + StatisticsContext::new().compute(&single_agg_zero, &StatisticsArgs::new())?; assert_eq!(single_stats_zero.num_rows, Precision::Exact(1)); Ok(()) @@ -5152,7 +5161,7 @@ mod tests { let agg = build_test_aggregate(&schema, input_stats, group_by, case.limit_options)?; - let stats = agg.statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new().compute(&agg, &StatisticsArgs::new())?; assert_eq!( stats.num_rows, case.expected_num_rows, "FAILED: '{}' — expected {:?}, got {:?}", @@ -5191,7 +5200,7 @@ mod tests { None, )?; - let stats = agg.statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new().compute(&agg, &StatisticsArgs::new())?; assert_eq!( stats.column_statistics[0].distinct_count, Precision::Exact(100), @@ -5245,7 +5254,7 @@ mod tests { let agg = build_test_aggregate(&schema, input_stats, grouping_set, None)?; - let stats = agg.statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new().compute(&agg, &StatisticsArgs::new())?; // Per-set NDV: (a,NULL)=100, (NULL,b)=50, (a,b)=100*50=5000 // Total = 100 + 50 + 5000 = 5150 assert_eq!( @@ -5275,8 +5284,8 @@ mod tests { Arc::clone(&schema), )?; assert_eq!( - single_agg - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(&single_agg, &StatisticsArgs::new())? .num_rows, Precision::Exact(2) ); @@ -5303,9 +5312,10 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); for partition in 0..2 { assert_eq!( - partial_agg - .statistics_with_args( - &StatisticsArgs::new().with_partition(Some(partition)) + StatisticsContext::new() + .compute( + partial_agg.as_ref(), + &StatisticsArgs::new().with_partition(Some(partition)), )? .num_rows, Precision::Exact(2) @@ -5316,8 +5326,8 @@ mod tests { } assert_eq!( - partial_agg - .statistics_with_args(&StatisticsArgs::new())? + StatisticsContext::new() + .compute(partial_agg.as_ref(), &StatisticsArgs::new())? .num_rows, Precision::Exact(4) ); @@ -5362,7 +5372,7 @@ mod tests { PhysicalGroupBy::new_single(vec![(expr_a_plus_b, "a+b".to_string())]); let agg = build_test_aggregate(&schema, input_stats, group_by, None)?; - let stats = agg.statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new().compute(&agg, &StatisticsArgs::new())?; assert_eq!( stats.num_rows, Precision::Inexact(1_000_000), diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 871d3c4d3fc8a..3c2c1495ddfec 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -238,8 +238,12 @@ impl ExecutionPlan for BufferExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - args.compute_child_statistics(&self.input, args.partition()) + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + Ok(Arc::clone(&input_stats[0])) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 59b3138b55430..6856d34f12b79 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -216,10 +216,12 @@ impl ExecutionPlan for CoalesceBatchesExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let stats = input_stats[0].as_ref().clone(); Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 0a8c5f78882c5..03e8682042828 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -30,7 +30,7 @@ use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; use crate::sort_pushdown::SortOrderPushdownResult; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; @@ -232,9 +232,16 @@ impl ExecutionPlan for CoalescePartitionsExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let stats = - Arc::unwrap_or_clone(args.compute_child_statistics(&self.input, None)?); + fn child_stats_requests(&self, _partition: Option) -> Vec { + vec![ChildStats::At(None)] + } + + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let stats = input_stats[0].as_ref().clone(); Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index 7bd84a3a6b392..1a39940472a58 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -299,8 +299,12 @@ impl ExecutionPlan for CooperativeExec { Ok(make_cooperative(child_stream)) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - args.compute_child_statistics(&self.input, args.partition()) + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + Ok(Arc::clone(&input_stats[0])) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 164637f760286..48be5ad53be78 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -32,7 +32,7 @@ use datafusion_physical_expr::LexOrdering; use crate::metrics::{MetricCategory, MetricType, MetricValue}; use crate::render_tree::RenderTree; -use crate::statistics::StatisticsArgs; +use crate::statistics::{StatisticsArgs, StatisticsContext}; use super::{ExecutionPlan, ExecutionPlanVisitor, accept}; @@ -581,8 +581,8 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { } } if self.show_statistics { - let stats = plan - .statistics_with_args(&StatisticsArgs::default()) + let stats = StatisticsContext::new() + .compute(plan, &StatisticsArgs::new()) .map_err(|_e| fmt::Error)?; write!(self.f, ", statistics=[{stats}]")?; } @@ -679,8 +679,8 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { }; let statistics = if self.show_statistics { - let stats = plan - .statistics_with_args(&StatisticsArgs::new()) + let stats = StatisticsContext::new() + .compute(plan, &StatisticsArgs::new()) .map_err(|_e| fmt::Error)?; format!("statistics=[{stats}]") } else { @@ -1501,7 +1501,11 @@ mod tests { todo!() } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if args.partition().is_some() { return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index a8f4af5b3d34d..44a6f444dc4b5 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -152,7 +152,11 @@ impl ExecutionPlan for EmptyExec { )?)) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if let Some(partition) = args.partition() { assert_or_internal_err!( partition < self.partitions, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 76abf73e0ebbe..9182e7ceea0b4 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -47,7 +47,7 @@ use crate::metrics::MetricsSet; use crate::projection::ProjectionExec; use crate::repartition::RepartitionExec; use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::stream::RecordBatchStreamAdapter; use arrow::array::{Array, RecordBatch}; @@ -498,10 +498,10 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { /// Returns statistics for a specific partition of this `ExecutionPlan` node. /// - /// Deprecated: use [`Self::statistics_with_args`] instead, - /// which accepts a [`StatisticsArgs`] carrying pre-computed child - /// statistics. - #[deprecated(since = "55.0.0", note = "Use statistics_with_args instead")] + /// Deprecated: use [`StatisticsContext::compute`] instead. + /// + /// [`StatisticsContext::compute`]: crate::statistics::StatisticsContext::compute + #[deprecated(since = "55.0.0", note = "Use StatisticsContext::compute instead")] fn partition_statistics(&self, partition: Option) -> Result> { if let Some(idx) = partition { // Validate partition index @@ -516,21 +516,44 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } - /// Returns statistics for a specific partition of this `ExecutionPlan` node. + /// Returns statistics for a specific partition of this `ExecutionPlan` node, + /// given pre-computed child statistics. + /// /// If statistics are not available, should return [`Statistics::new_unknown`] /// (the default), not an error. - /// If `partition` is `None`, it returns statistics for all partitions. + /// If `args.partition()` is `None`, it returns statistics for all partitions. /// - /// [`StatisticsArgs`] carries the partition index and a shared cache. - /// Create one with [`StatisticsArgs::new`] and pass it to this method. + /// Implementations should not call [`StatisticsContext::compute`] from within + /// this method; child statistics are provided via `input_stats`. /// - /// [`StatisticsArgs`]: crate::statistics::StatisticsArgs - /// [`StatisticsArgs::new`]: crate::statistics::StatisticsArgs::new - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + /// Use [`StatisticsContext::compute`] to initiate a full plan-tree walk. + /// + /// [`StatisticsContext::compute`]: crate::statistics::StatisticsContext::compute + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { #[expect(deprecated)] self.partition_statistics(args.partition()) } + /// Returns the partition index to request from each child when computing + /// statistics for this node at `partition`. + /// + /// The returned `Vec` has one entry per child (in the same order as + /// [`Self::children`]). [`ChildStats::At`] with `None` requests the child's + /// overall (all-partitions) statistics, and [`ChildStats::Skip`] omits a child + /// whose statistics this node does not need. + /// + /// The default requests the same `partition` from every child. + fn child_stats_requests(&self, partition: Option) -> Vec { + self.children() + .iter() + .map(|_| ChildStats::At(partition)) + .collect() + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// @@ -1657,8 +1680,9 @@ mod tests { unimplemented!() } - fn statistics_with_args( + fn statistics_from_inputs( &self, + _input_stats: &[Arc], _args: &StatisticsArgs, ) -> Result> { unimplemented!() @@ -1719,8 +1743,9 @@ mod tests { unimplemented!() } - fn statistics_with_args( + fn statistics_from_inputs( &self, + _input_stats: &[Arc], _args: &StatisticsArgs, ) -> Result> { unimplemented!() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index d23dd380423d1..c6083a28ef8cc 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -42,7 +42,7 @@ use crate::projection::{ EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child, try_embed_projection, update_expr, }; -use crate::statistics::StatisticsArgs; +use crate::statistics::{StatisticsArgs, StatisticsContext}; use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayFormatType, ExecutionPlan, @@ -422,7 +422,10 @@ impl FilterExec { let schema = input.schema(); let stats = Self::statistics_helper( &schema, - Arc::unwrap_or_clone(input.statistics_with_args(&StatisticsArgs::new())?), + Arc::unwrap_or_clone( + StatisticsContext::new() + .compute(input.as_ref(), &StatisticsArgs::new())?, + ), predicate, default_selectivity, )?; @@ -591,10 +594,12 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let input_stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let input_stats = input_stats[0].as_ref().clone(); let stats = Self::statistics_helper( &self.input.schema(), input_stats, @@ -1268,7 +1273,7 @@ mod tests { use super::*; use crate::empty::EmptyExec; use crate::expressions::*; - use crate::statistics::StatisticsArgs; + use crate::statistics::{StatisticsArgs, StatisticsContext}; use crate::test; use crate::test::exec::StatisticsExec; use arrow::datatypes::{Field, Schema, UnionFields, UnionMode}; @@ -1345,7 +1350,8 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(25)); assert_eq!( statistics.total_byte_size, @@ -1397,7 +1403,8 @@ mod tests { sub_filter, )?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(16)); assert_eq!( statistics.column_statistics, @@ -1459,7 +1466,8 @@ mod tests { binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?, b_gt_5, )?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; // On a uniform distribution, only fifteen rows will satisfy the // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50). @@ -1509,7 +1517,8 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Absent); Ok(()) @@ -1582,7 +1591,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330... // num_rows after ceil => 133.0... => 134 // total_byte_size after ceil => 532.0... => 533 @@ -1680,8 +1690,8 @@ mod tests { // The filter predicate passes all (non-null) entries, so min/max/NDV // are unchanged. `a < 200` and `1 <= b` are null-rejecting, though, so // both columns lose any nulls regardless of selectivity. - let mut expected = input - .statistics_with_args(&StatisticsArgs::new())? + let mut expected = StatisticsContext::new() + .compute(input.as_ref(), &StatisticsArgs::new())? .column_statistics .clone(); for col in &mut expected { @@ -1689,7 +1699,8 @@ mod tests { } let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(1000)); assert_eq!(statistics.total_byte_size, Precision::Inexact(4000)); @@ -1742,7 +1753,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(0)); assert_eq!(statistics.total_byte_size, Precision::Inexact(0)); @@ -1829,7 +1841,8 @@ mod tests { Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?); // Should succeed without error - let statistics = outer_filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = StatisticsContext::new() + .compute(outer_filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(0)); Ok(()) @@ -1868,7 +1881,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(490)); assert_eq!(statistics.total_byte_size, Precision::Inexact(1960)); @@ -1922,7 +1936,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let filter_statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let filter_statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; let expected_filter_statistics = Statistics { num_rows: Precision::Absent, @@ -1959,7 +1974,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let filter_statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let filter_statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; // First column is "a", and it is a column with only one value after the filter. assert!(filter_statistics.column_statistics[0].is_singleton()); @@ -2006,11 +2022,13 @@ mod tests { Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))), )); let filter = FilterExec::try_new(predicate, input)?; - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(&filter, &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(200)); assert_eq!(statistics.total_byte_size, Precision::Inexact(800)); let filter = filter.with_default_selectivity(40)?; - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(&filter, &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(400)); assert_eq!(statistics.total_byte_size, Precision::Inexact(1600)); Ok(()) @@ -2045,7 +2063,9 @@ mod tests { Arc::new(EmptyExec::new(Arc::clone(&schema))), )?; - exec.statistics_with_args(&StatisticsArgs::new()).unwrap(); + StatisticsContext::new() + .compute(&exec, &StatisticsArgs::new()) + .unwrap(); Ok(()) } @@ -2201,8 +2221,10 @@ mod tests { assert_eq!(filter1.projection(), filter2.projection()); // Verify statistics are the same - let stats1 = filter1.statistics_with_args(&StatisticsArgs::new())?; - let stats2 = filter2.statistics_with_args(&StatisticsArgs::new())?; + let stats1 = + StatisticsContext::new().compute(&filter1, &StatisticsArgs::new())?; + let stats2 = + StatisticsContext::new().compute(&filter2, &StatisticsArgs::new())?; assert_eq!(stats1.num_rows, stats2.num_rows); assert_eq!(stats1.total_byte_size, stats2.total_byte_size); @@ -2255,7 +2277,8 @@ mod tests { .unwrap() .build()?; - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(&filter, &StatisticsArgs::new())?; // Verify statistics reflect both filtering and projection assert!(matches!(statistics.num_rows, Precision::Inexact(_))); @@ -2486,7 +2509,8 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; let col_b_stats = &statistics.column_statistics[1]; assert_eq!(col_b_stats.min_value, Precision::Absent); assert_eq!(col_b_stats.max_value, Precision::Absent); @@ -2773,7 +2797,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = StatisticsContext::new() + .compute(filter.as_ref(), &StatisticsArgs::new())?; for (i, expected) in expected_ndvs.iter().enumerate() { assert_eq!( @@ -2834,7 +2859,8 @@ mod tests { let input = Arc::new(StatisticsExec::new(input_stats, schema)); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = StatisticsContext::new() + .compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!( statistics.num_rows, @@ -2911,7 +2937,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; // Equality predicates collapse NDV and reject nulls for their columns. assert_eq!( statistics.column_statistics[0].distinct_count, @@ -2964,7 +2991,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!( statistics.column_statistics[0].distinct_count, Precision::Exact(1) @@ -2997,7 +3025,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!( statistics.column_statistics[0].distinct_count, Precision::Exact(1) @@ -3030,7 +3059,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!( statistics.column_statistics[0].distinct_count, Precision::Exact(1) @@ -3063,7 +3093,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!( statistics.column_statistics[0].distinct_count, Precision::Exact(1) @@ -3097,7 +3128,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!( statistics.column_statistics[0].distinct_count, Precision::Exact(1) @@ -3143,7 +3175,8 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!( statistics.column_statistics[0].distinct_count, Precision::Exact(1) @@ -3445,7 +3478,8 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; // Filter estimates ~10 rows (selectivity = 10/100) assert_eq!(statistics.num_rows, Precision::Inexact(10)); let ndv = &statistics.column_statistics[0].distinct_count; @@ -3491,7 +3525,8 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(20)); assert_eq!( statistics.column_statistics[0].null_count, @@ -3534,7 +3569,8 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(20)); assert_eq!( statistics.column_statistics[0].null_count, @@ -3575,7 +3611,8 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics_with_args(&StatisticsArgs::new())?; + let statistics = + StatisticsContext::new().compute(filter.as_ref(), &StatisticsArgs::new())?; assert_eq!(statistics.num_rows, Precision::Inexact(20)); assert_eq!( statistics.column_statistics[0].null_count, diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 79295ba2fb556..e5cfff60e78a6 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -31,7 +31,7 @@ use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, }; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::stream::EmptyRecordBatchStream; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, @@ -372,14 +372,19 @@ impl ExecutionPlan for CrossJoinExec { } } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - // Left side is always broadcast, so it always needs overall stats - let left_stats = - Arc::unwrap_or_clone(args.compute_child_statistics(&self.left, None)?); - // Right side is partitioned, so it needs per-partition stats - let right_stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.right, args.partition())?, - ); + fn child_stats_requests(&self, partition: Option) -> Vec { + // Left side is always broadcast, so it always needs overall stats. + // Right side is partitioned, so it needs per-partition stats. + vec![ChildStats::At(None), ChildStats::At(partition)] + } + + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let left_stats = input_stats[0].as_ref().clone(); + let right_stats = input_stats[1].as_ref().clone(); Ok(Arc::new(stats_cartesian_product(left_stats, right_stats))) } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index bb9ebcd4e6191..0e9e8b73d2596 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -52,7 +52,7 @@ use crate::projection::{ try_pushdown_through_join, }; use crate::repartition::REPARTITION_RANDOM_STATE; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -1450,72 +1450,43 @@ impl ExecutionPlan for HashJoinExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let stats = match (args.partition(), self.mode) { + fn child_stats_requests(&self, partition: Option) -> Vec { + match (partition, self.mode) { // Left side is broadcast, so it always needs overall stats // Right side is partitioned, so it needs per-partition stats (Some(_), PartitionMode::CollectLeft) => { - let left_stats = args.compute_child_statistics(&self.left, None)?; - let right_stats = - args.compute_child_statistics(&self.right, args.partition())?; - - estimate_join_statistics( - Arc::unwrap_or_clone(left_stats), - Arc::unwrap_or_clone(right_stats), - &self.on, - self.null_equality, - &self.join_type, - &self.join_schema, - )? + vec![ChildStats::At(None), ChildStats::At(partition)] } - // For Partitioned mode, both sides are hash-partitioned symmetrically, // so each output partition uses the matching partition from both sides. (Some(_), PartitionMode::Partitioned) => { - let left_stats = - args.compute_child_statistics(&self.left, args.partition())?; - let right_stats = - args.compute_child_statistics(&self.right, args.partition())?; - - estimate_join_statistics( - Arc::unwrap_or_clone(left_stats), - Arc::unwrap_or_clone(right_stats), - &self.on, - self.null_equality, - &self.join_type, - &self.join_schema, - )? + vec![ChildStats::At(partition), ChildStats::At(partition)] } - // Overall stats requested, look up overall child stats. - (None, _) => { - let left_stats = args.compute_child_statistics(&self.left, None)?; - let right_stats = args.compute_child_statistics(&self.right, None)?; - estimate_join_statistics( - Arc::unwrap_or_clone(left_stats), - Arc::unwrap_or_clone(right_stats), - &self.on, - self.null_equality, - &self.join_type, - &self.join_schema, - )? - } - + (None, _) => vec![ChildStats::At(None), ChildStats::At(None)], // Auto mode hasn't decided partitioning yet, so it needs // overall stats from both sides. (Some(_), PartitionMode::Auto) => { - let left_stats = args.compute_child_statistics(&self.left, None)?; - let right_stats = args.compute_child_statistics(&self.right, None)?; - estimate_join_statistics( - Arc::unwrap_or_clone(left_stats), - Arc::unwrap_or_clone(right_stats), - &self.on, - self.null_equality, - &self.join_type, - &self.join_schema, - )? + vec![ChildStats::At(None), ChildStats::At(None)] } - }; + } + } + + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let left_stats = Arc::clone(&input_stats[0]); + let right_stats = Arc::clone(&input_stats[1]); + let stats = estimate_join_statistics( + Arc::unwrap_or_clone(left_stats), + Arc::unwrap_or_clone(right_stats), + &self.on, + self.null_equality, + &self.join_type, + &self.join_schema, + )?; // Project statistics if there is a projection let stats = stats.project(self.projection.as_ref()); // Apply fetch limit to statistics diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index db552fed96724..c4b1e04f3f55b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -42,7 +42,7 @@ use crate::projection::{ EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection, try_pushdown_through_join, }; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -691,7 +691,17 @@ impl ExecutionPlan for NestedLoopJoinExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn child_stats_requests(&self, partition: Option) -> Vec { + // Left side is always broadcast, so it always needs overall stats. + // Right side is partitioned, so it needs per-partition stats. + vec![ChildStats::At(None), ChildStats::At(partition)] + } + + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { // NestedLoopJoinExec is designed for joins without equijoin keys in the // ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join // predicates are stored in `self.filter`, but `estimate_join_statistics` @@ -701,13 +711,8 @@ impl ExecutionPlan for NestedLoopJoinExec { // unknown row counts. let join_columns = Vec::new(); - // Left side is always broadcast, so it always needs overall stats - let left_stats = - Arc::unwrap_or_clone(args.compute_child_statistics(&self.left, None)?); - // Right side is partitioned, so it needs per-partition stats - let right_stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.right, args.partition())?, - ); + let left_stats = input_stats[0].as_ref().clone(); + let right_stats = input_stats[1].as_ref().clone(); let stats = estimate_join_statistics( left_stats, @@ -3063,7 +3068,7 @@ fn build_unmatched_batch( #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::statistics::StatisticsArgs; + use crate::statistics::{StatisticsArgs, StatisticsContext}; use crate::test::{TestMemoryExec, assert_join_metrics}; use crate::{ common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, @@ -3443,7 +3448,8 @@ pub(crate) mod tests { &JoinType::Left, Some(vec![1, 2]), )?; - let stats = nested_loop_join.statistics_with_args(&StatisticsArgs::new())?; + let stats = StatisticsContext::new() + .compute(&nested_loop_join, &StatisticsArgs::new())?; assert_eq!( nested_loop_join.schema().fields().len(), stats.column_statistics.len(), diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index a8d25fd002b76..7f012895cdd6c 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -564,7 +564,11 @@ impl ExecutionPlan for SortMergeJoinExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { // SortMergeJoinExec uses symmetric hash partitioning where both left and right // inputs are hash-partitioned on the join keys. This means partition `i` of the // left input is joined with partition `i` of the right input. @@ -572,12 +576,8 @@ impl ExecutionPlan for SortMergeJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - let left_stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.left, args.partition())?, - ); - let right_stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.right, args.partition())?, - ); + let left_stats = input_stats[0].as_ref().clone(); + let right_stats = input_stats[1].as_ref().clone(); Ok(Arc::new(estimate_join_statistics( left_stats, right_stats, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 338c5111d223d..e215ce82c763f 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -3383,7 +3383,7 @@ async fn test_left_outer_join_filtered_mask() -> Result<()> { #[test] fn test_partition_statistics() -> Result<()> { - use crate::statistics::StatisticsArgs; + use crate::statistics::{StatisticsArgs, StatisticsContext}; use datafusion_common::stats::Precision; let left = build_table( @@ -3420,7 +3420,8 @@ fn test_partition_statistics() -> Result<()> { // Test aggregate statistics (partition = None) // Should return meaningful statistics computed from both inputs - let stats = join_exec.statistics_with_args(&StatisticsArgs::new())?; + let stats = + StatisticsContext::new().compute(&join_exec, &StatisticsArgs::new())?; assert_eq!( stats.column_statistics.len(), expected_cols, @@ -3438,8 +3439,8 @@ fn test_partition_statistics() -> Result<()> { // Since the child TestMemoryExec returns unknown stats for specific partitions, // the join output will also have Absent num_rows. This is expected behavior // as the statistics depend on what the children can provide. - let partition_stats = join_exec - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))?; + let partition_stats = StatisticsContext::new() + .compute(&join_exec, &StatisticsArgs::new().with_partition(Some(0)))?; assert_eq!( partition_stats.column_statistics.len(), expected_cols, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6cc6e44c32cc3..336e8ae32df73 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -49,7 +49,7 @@ pub use crate::execution_plan::{ pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; pub use crate::sort_pushdown::SortOrderPushdownResult; -pub use crate::statistics::StatisticsArgs; +pub use crate::statistics::{ChildStats, StatisticsArgs, StatisticsContext}; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{ExecutionPlanVisitor, accept, visit_execution_plan}; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 1e4b5e5bb6426..4f1a2ecaec93d 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -220,10 +220,12 @@ impl ExecutionPlan for GlobalLimitExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let stats = input_stats[0].as_ref().clone(); Ok(Arc::new(stats.with_fetch(self.fetch, self.skip, 1)?)) } @@ -385,10 +387,12 @@ impl ExecutionPlan for LocalLimitExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let stats = input_stats[0].as_ref().clone(); Ok(Arc::new(stats.with_fetch(Some(self.fetch), 0, 1)?)) } @@ -535,7 +539,7 @@ mod tests { use super::*; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::common::collect; - use crate::statistics::StatisticsArgs; + use crate::statistics::{StatisticsArgs, StatisticsContext}; use crate::test; use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; @@ -818,8 +822,8 @@ mod tests { let offset = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch); - Ok(offset - .statistics_with_args(&StatisticsArgs::new())? + Ok(StatisticsContext::new() + .compute(&offset, &StatisticsArgs::new())? .num_rows) } @@ -860,8 +864,8 @@ mod tests { fetch, ); - Ok(offset - .statistics_with_args(&StatisticsArgs::new())? + Ok(StatisticsContext::new() + .compute(&offset, &StatisticsArgs::new())? .num_rows) } @@ -875,8 +879,8 @@ mod tests { let offset = LocalLimitExec::new(csv, fetch); - Ok(offset - .statistics_with_args(&StatisticsArgs::new())? + Ok(StatisticsContext::new() + .compute(&offset, &StatisticsArgs::new())? .num_rows) } diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index 990bb4a68249d..142768fcf49d2 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -94,7 +94,7 @@ use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; use crate::ExecutionPlan; -use crate::statistics::StatisticsArgs; +use crate::statistics::{StatisticsArgs, StatisticsContext}; // ============================================================================ // ExtendedStatistics: Statistics with type-safe extensions @@ -267,7 +267,7 @@ impl StatisticsProvider for DefaultStatisticsProvider { plan: &dyn ExecutionPlan, _child_stats: &[ExtendedStatistics], ) -> Result { - let base = plan.statistics_with_args(&StatisticsArgs::new())?; + let base = StatisticsContext::new().compute(plan, &StatisticsArgs::new())?; Ok(StatisticsResult::Computed(ExtendedStatistics::new_arc( base, ))) @@ -359,7 +359,7 @@ impl StatisticsRegistry { pub fn compute(&self, plan: &dyn ExecutionPlan) -> Result { // Fast path: no providers registered, skip the walk entirely if self.providers.is_empty() { - let base = plan.statistics_with_args(&StatisticsArgs::new())?; + let base = StatisticsContext::new().compute(plan, &StatisticsArgs::new())?; return Ok(ExtendedStatistics::new_arc(base)); } @@ -383,7 +383,7 @@ impl StatisticsRegistry { } } // Fallback: use plan's built-in stats - let base = plan.statistics_with_args(&StatisticsArgs::new())?; + let base = StatisticsContext::new().compute(plan, &StatisticsArgs::new())?; Ok(ExtendedStatistics::new_arc(base)) } @@ -506,8 +506,9 @@ fn computed_with_row_count( plan: &dyn ExecutionPlan, num_rows: Precision, ) -> Result { - let mut base = - Arc::unwrap_or_clone(plan.statistics_with_args(&StatisticsArgs::new())?); + let mut base = Arc::unwrap_or_clone( + StatisticsContext::new().compute(plan, &StatisticsArgs::new())?, + ); rescale_byte_size(&mut base, num_rows); Ok(StatisticsResult::Computed(ExtendedStatistics::new(base))) } @@ -1124,8 +1125,9 @@ mod tests { unimplemented!() } - fn statistics_with_args( + fn statistics_from_inputs( &self, + _input_stats: &[Arc], _args: &StatisticsArgs, ) -> Result> { Ok(Arc::new(self.stats.clone())) diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 64b192d58d238..20d267331b2aa 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -165,7 +165,11 @@ impl ExecutionPlan for PlaceholderRowExec { Ok(Box::pin(cooperative(ms))) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { let batches = self .data() .expect("Create single row placeholder RecordBatch should not fail"); diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 16b0a5ad7e4b5..f7a3279b71392 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -349,10 +349,12 @@ impl ExecutionPlan for ProjectionExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let input_stats = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let input_stats = input_stats[0].as_ref().clone(); let output_schema = self.schema(); Ok(Arc::new( self.projector @@ -1186,7 +1188,7 @@ mod tests { use crate::common::collect; use crate::filter_pushdown::PushedDown; - use crate::statistics::StatisticsArgs; + use crate::statistics::{StatisticsArgs, StatisticsContext}; use crate::test; use crate::test::exec::StatisticsExec; @@ -1377,8 +1379,8 @@ mod tests { let projection = ProjectionExec::try_new(exprs, input).unwrap(); - let stats = projection - .statistics_with_args(&StatisticsArgs::new()) + let stats = StatisticsContext::new() + .compute(&projection, &StatisticsArgs::new()) .unwrap(); assert_eq!(stats.num_rows, Precision::Exact(10)); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2298183485f55..1c096c1408379 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -39,7 +39,7 @@ use crate::projection::{ProjectionExec, all_columns, make_with_child, update_exp use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter}; use crate::{ DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics, @@ -1362,22 +1362,27 @@ impl ExecutionPlan for RepartitionExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - if let Some(partition) = args.partition() { - let partition_count = self.partitioning().partition_count(); - if partition_count == 0 { - return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); - } + fn child_stats_requests(&self, _partition: Option) -> Vec { + vec![ChildStats::At(None)] + } + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { + if args.partition().is_some() { + let partition_count = self.partitioning().partition_count(); + // `StatisticsContext::compute` validates the partition index against + // this same count before calling, so it is non-zero here; guard + // defensively against a direct call so the division below cannot + // divide by zero assert_or_internal_err!( - partition < partition_count, - "RepartitionExec invalid partition {} (expected less than {})", - partition, - partition_count + partition_count > 0, + "RepartitionExec statistics requested for a partition but the partition count is 0" ); - let mut stats = - Arc::unwrap_or_clone(args.compute_child_statistics(&self.input, None)?); + let mut stats = input_stats[0].as_ref().clone(); // Distribute statistics across partitions stats.num_rows = stats @@ -1400,7 +1405,7 @@ impl ExecutionPlan for RepartitionExec { Ok(Arc::new(stats)) } else { - args.compute_child_statistics(&self.input, None) + Ok(Arc::clone(&input_stats[0])) } } diff --git a/datafusion/physical-plan/src/scalar_subquery.rs b/datafusion/physical-plan/src/scalar_subquery.rs index dd44d09c386c5..f0a95d00163de 100644 --- a/datafusion/physical-plan/src/scalar_subquery.rs +++ b/datafusion/physical-plan/src/scalar_subquery.rs @@ -236,8 +236,12 @@ impl ExecutionPlan for ScalarSubqueryExec { vec![false; self.subqueries.len() + 1] } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - args.compute_child_statistics(&self.input, args.partition()) + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + Ok(Arc::clone(&input_stats[0])) } fn cardinality_effect(&self) -> CardinalityEffect { diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index f7b403d94341f..1c49639addcbb 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -335,8 +335,12 @@ impl ExecutionPlan for PartialSortExec { Some(self.metrics_set.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - args.compute_child_statistics(&self.input, args.partition()) + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + Ok(Arc::clone(&input_stats[0])) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index ccc675c6ef4bb..e46acb926da1c 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -45,7 +45,7 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::stream::RecordBatchStreamAdapter; use crate::stream::ReservationStream; use crate::topk::TopK; @@ -1284,14 +1284,21 @@ impl ExecutionPlan for SortExec { Some(self.metrics_set.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let partition = if self.preserve_partitioning() { - args.partition() + fn child_stats_requests(&self, partition: Option) -> Vec { + let child_partition = if self.preserve_partitioning() { + partition } else { None }; - let child_stats = args.compute_child_statistics(&self.input, partition)?; - let stats = Arc::unwrap_or_clone(child_stats); + vec![ChildStats::At(child_partition)] + } + + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let stats = input_stats[0].as_ref().clone(); Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index dcf3a7baad435..aa24cdd2268f3 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -24,7 +24,7 @@ use crate::limit::LimitStream; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ProjectionExec, make_with_child, update_ordering}; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -381,8 +381,16 @@ impl ExecutionPlan for SortPreservingMergeExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - args.compute_child_statistics(&self.input, None) + fn child_stats_requests(&self, _partition: Option) -> Vec { + vec![ChildStats::At(None)] + } + + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + Ok(Arc::clone(&input_stats[0])) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs index 5ed5558e28a5b..9246d7d9f5a9c 100644 --- a/datafusion/physical-plan/src/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -18,10 +18,12 @@ //! Statistics computation for physical plans. //! //! [`StatisticsArgs`] provides external context to -//! [`ExecutionPlan::statistics_with_args`]. +//! [`ExecutionPlan::statistics_from_inputs`]. use crate::ExecutionPlan; -use datafusion_common::{Result, Statistics, assert_or_internal_err}; +use datafusion_common::{ + Result, Statistics, assert_eq_or_internal_err, assert_or_internal_err, +}; use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; @@ -30,7 +32,7 @@ use std::sync::Arc; /// Per-call memoization cache for statistics computation. /// /// Keyed by `(plan node pointer address, partition)`. Shared across -/// a single statistics walk via [`StatisticsArgs`]. +/// a single statistics walk via [`StatisticsContext`]. /// /// The pointer-based key is safe within a single synchronous walk: /// all `Arc` nodes are held by the plan tree for @@ -65,18 +67,16 @@ impl StatsCache { } } -/// Arguments passed to [`ExecutionPlan::statistics_with_args`] carrying +/// Arguments passed to [`ExecutionPlan::statistics_from_inputs`] carrying /// external information that operators can use when computing their /// statistics. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct StatisticsArgs { partition: Option, - /// Shared memoization cache for the current statistics walk. - cache: Rc>, } impl StatisticsArgs { - /// Creates new statistics arguments with a fresh cache. + /// Creates new statistics arguments. /// /// By default the partition is set to `None` (statistics should be computed /// for the entire plan). @@ -89,18 +89,8 @@ impl StatisticsArgs { /// * `None` means statistics should be computed for the entire plan. /// * `Some(idx)` means statistics should be computed for the specified /// partition index. - /// - /// Changing the partition starts a new statistics walk, so the - /// memoization cache is reset to avoid reusing entries computed for a - /// different partition. pub fn set_partition(&mut self, partition: Option) { - if self.partition != partition { - self.partition = partition; - // Drop the previous walk's cache: its entries are keyed by raw - // plan pointer and the prior partition, so they must not leak - // into the new walk. - self.cache = Rc::new(RefCell::new(StatsCache::default())); - } + self.partition = partition; } /// Builder Style API for [`Self::set_partition`] @@ -113,15 +103,60 @@ impl StatisticsArgs { pub fn partition(&self) -> Option { self.partition } +} + +/// Directive returned by [`ExecutionPlan::child_stats_requests`] describing +/// how the [`StatisticsContext`] should obtain each child's statistics. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChildStats { + /// Compute the child's statistics at this partition (`None` = overall). + At(Option), + /// Skip this child; the parent does not need its statistics. A placeholder + /// [`Statistics::new_unknown`] is supplied in its slot. + Skip, +} + +/// Owns the bottom-up traversal and per-walk memoization cache for statistics +/// computation. Call [`StatisticsContext::compute`] to walk a plan tree. +pub struct StatisticsContext { + cache: Rc>, +} + +impl Default for StatisticsContext { + fn default() -> Self { + Self::new() + } +} + +impl StatisticsContext { + /// Creates a context with an empty cache. + pub fn new() -> Self { + Self { + cache: Rc::new(RefCell::new(StatsCache::default())), + } + } - /// Computes statistics for a child plan, using the shared cache - /// to avoid redundant subtree walks. - pub fn compute_child_statistics( + /// Clears the memoization cache. + /// + /// The cache is keyed by raw plan-node pointers, which are only stable + /// while the current plan tree is alive. Reset between optimizer passes + /// (which rewrite the plan) when reusing one context across them, so stale + /// pointer keys cannot collide. + pub fn reset_cache(&self) { + self.cache.borrow_mut().0.clear(); + } + + /// Computes statistics for `plan`, resolving children first and passing + /// the results to [`ExecutionPlan::statistics_from_inputs`]. + /// + /// When `args.partition()` is `Some(idx)`, `idx` is validated against the + /// plan's partition count. + pub fn compute( &self, - plan: impl AsRef, - partition: Option, + plan: &dyn ExecutionPlan, + args: &StatisticsArgs, ) -> Result> { - let plan = plan.as_ref(); + let partition = args.partition(); if let Some(idx) = partition { let partition_count = plan.properties().partitioning.partition_count(); @@ -137,12 +172,30 @@ impl StatisticsArgs { return Ok(Arc::clone(cached)); } - let child_args = StatisticsArgs { - partition, - cache: Rc::clone(&self.cache), - }; - let result = plan.statistics_with_args(&child_args)?; + let children = plan.children(); + let requests = plan.child_stats_requests(partition); + assert_eq_or_internal_err!( + requests.len(), + children.len(), + "{} child_stats_requests returned {} entries for {} children", + plan.name(), + requests.len(), + children.len() + ); + let child_stats = children + .iter() + .zip(requests) + .map(|(child, directive)| match directive { + ChildStats::At(p) => { + self.compute(child.as_ref(), &StatisticsArgs::new().with_partition(p)) + } + ChildStats::Skip => { + Ok(Arc::new(Statistics::new_unknown(child.schema().as_ref()))) + } + }) + .collect::>>()?; + let result = plan.statistics_from_inputs(&child_stats, args)?; self.cache .borrow_mut() .insert(plan, partition, Arc::clone(&result)); @@ -183,47 +236,39 @@ mod tests { let leaf = make_stats_leaf(100); let plan: Arc = Arc::new(CoalescePartitionsExec::new(leaf)); - let args = StatisticsArgs::new().with_partition(Some(0)); - let stats = plan.statistics_with_args(&args).unwrap(); + let ctx = StatisticsContext::new(); + let stats = ctx + .compute( + plan.as_ref(), + &StatisticsArgs::new().with_partition(Some(0)), + ) + .unwrap(); assert_eq!(stats.num_rows, Precision::Exact(100)); - let args_none = StatisticsArgs::new(); - let stats_none = plan.statistics_with_args(&args_none).unwrap(); + let stats_none = ctx.compute(plan.as_ref(), &StatisticsArgs::new()).unwrap(); assert_eq!(stats_none.num_rows, Precision::Exact(100)); } #[test] - fn changing_partition_resets_cache() { - let leaf = make_stats_leaf(100); + fn context_caches_within_walk() { + let leaf = make_stats_leaf(42); + let ctx = StatisticsContext::new(); + let args = StatisticsArgs::new(); - // Populate the memoization cache for an initial walk. - let mut args = StatisticsArgs::new(); - let _ = args - .compute_child_statistics(Arc::clone(&leaf), Some(0)) - .unwrap(); - assert!( - !args.cache.borrow().0.is_empty(), - "cache should be populated after a statistics walk" - ); + let s1 = ctx.compute(leaf.as_ref(), &args).unwrap(); + assert!(!ctx.cache.borrow().0.is_empty()); - // Changing the partition starts a new walk and must reset the cache - // so stale, pointer-keyed entries cannot leak across walks. - args.set_partition(Some(1)); - assert!( - args.cache.borrow().0.is_empty(), - "cache should be cleared when the partition changes" - ); + let s2 = ctx.compute(leaf.as_ref(), &args).unwrap(); + assert!(Arc::ptr_eq(&s1, &s2)); + } - // Setting the partition to its current value is a no-op and retains - // the cache (avoids needlessly discarding work mid-walk). - let _ = args - .compute_child_statistics(Arc::clone(&leaf), Some(0)) - .unwrap(); - assert!(!args.cache.borrow().0.is_empty()); - args.set_partition(Some(1)); - assert!( - !args.cache.borrow().0.is_empty(), - "cache should be retained when the partition is unchanged" - ); + #[test] + fn reset_cache_clears_entries() { + let leaf = make_stats_leaf(10); + let ctx = StatisticsContext::new(); + let _ = ctx.compute(leaf.as_ref(), &StatisticsArgs::new()).unwrap(); + assert!(!ctx.cache.borrow().0.is_empty()); + ctx.reset_cache(); + assert!(ctx.cache.borrow().0.is_empty()); } } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 44aacfa87a31e..e8c775a786578 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -165,7 +165,11 @@ impl ExecutionPlan for TestMemoryExec { unimplemented!() } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if args.partition().is_some() { Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 2bd19ccbeb738..b92008c6b219b 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -249,7 +249,11 @@ impl ExecutionPlan for MockExec { } // Panics if one of the batches is an error - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if args.partition().is_some() { return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } @@ -474,7 +478,11 @@ impl ExecutionPlan for BarrierExec { Ok(builder.build()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if args.partition().is_some() { return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } @@ -654,7 +662,11 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { Ok(Arc::new(if args.partition().is_some() { Statistics::new_unknown(&self.schema) } else { diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 29624285325a5..42b3d3a80d825 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -43,7 +43,7 @@ use crate::filter_pushdown::{ }; use crate::metrics::BaselineMetrics; use crate::projection::{ProjectionExec, make_with_child}; -use crate::statistics::StatisticsArgs; +use crate::statistics::{ChildStats, StatisticsArgs}; use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; @@ -165,6 +165,20 @@ impl UnionExec { &self.inputs } + /// Maps a global output partition index to the `(input index, local + /// partition index)` of the input that owns it, or `None` if out of range. + fn owning_input(&self, partition: usize) -> Option<(usize, usize)> { + let mut remaining = partition; + for (i, input) in self.inputs.iter().enumerate() { + let count = input.output_partitioning().partition_count(); + if remaining < count { + return Some((i, remaining)); + } + remaining -= count; + } + None + } + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( inputs: &[Arc], @@ -319,30 +333,41 @@ impl ExecutionPlan for UnionExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + fn child_stats_requests(&self, partition: Option) -> Vec { + if let Some(partition_idx) = partition { + // For a specific partition, compute stats only for the input that + // owns it; the other inputs are not needed and are skipped. + let targeted = self.owning_input(partition_idx); + self.inputs + .iter() + .enumerate() + .map(|(i, _)| match targeted { + Some((target_i, target_partition)) if i == target_i => { + ChildStats::At(Some(target_partition)) + } + _ => ChildStats::Skip, + }) + .collect() + } else { + vec![ChildStats::At(None); self.inputs.len()] + } + } + + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + args: &StatisticsArgs, + ) -> Result> { if let Some(partition_idx) = args.partition() { // For a specific partition, find which input it belongs to - let mut remaining_idx = partition_idx; - for (i, input) in self.inputs.iter().enumerate() { - let input_partition_count = input.output_partitioning().partition_count(); - if remaining_idx < input_partition_count { - // This partition belongs to this input - compute stats - // for the specific child at the specific partition - let child = &self.inputs[i]; - return args.compute_child_statistics(child, Some(remaining_idx)); - } - remaining_idx -= input_partition_count; + if let Some((target_i, _)) = self.owning_input(partition_idx) { + // This partition belongs to this input - return its stats + return Ok(Arc::clone(&input_stats[target_i])); } // If we get here, the partition index is out of bounds Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } else { - // Collect overall stats for each input from the cache - let stats = self - .inputs - .iter() - .map(|input| args.compute_child_statistics(input, None)) - .collect::>>()?; - let stats_refs = stats.iter().map(|s| s.as_ref()).collect::>(); + let stats_refs = input_stats.iter().map(|s| s.as_ref()).collect::>(); Ok(Arc::new(Statistics::try_merge_iter_with_ndv_fallback( stats_refs, @@ -651,15 +676,15 @@ impl ExecutionPlan for InterleaveExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let stats = self - .inputs + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let stats = input_stats .iter() - .map(|input| { - args.compute_child_statistics(input, args.partition()) - .map(Arc::unwrap_or_clone) - }) - .collect::>>()?; + .map(|s| s.as_ref().clone()) + .collect::>(); Ok(Arc::new(Statistics::try_merge_iter_with_ndv_fallback( stats.iter(), @@ -829,7 +854,7 @@ mod tests { use super::*; use crate::collect; use crate::repartition::RepartitionExec; - use crate::statistics::StatisticsArgs; + use crate::statistics::{StatisticsArgs, StatisticsContext}; use crate::test::exec::StatisticsExec; use crate::test::{self, TestMemoryExec}; @@ -1020,7 +1045,8 @@ mod tests { Arc::new(StatisticsExec::new(right, schema.as_ref().clone())); let union = UnionExec::try_new(vec![left, right])?; - let stats = union.statistics_with_args(&StatisticsArgs::new())?; + let stats = + StatisticsContext::new().compute(union.as_ref(), &StatisticsArgs::new())?; assert_eq!(stats.as_ref(), &expected); Ok(()) @@ -1037,7 +1063,8 @@ mod tests { Arc::new(StatisticsExec::new(right, schema.as_ref().clone())); let union = UnionExec::try_new(vec![left, right])?; - let stats = union.statistics_with_args(&StatisticsArgs::new())?; + let stats = + StatisticsContext::new().compute(union.as_ref(), &StatisticsArgs::new())?; assert_eq!(stats.as_ref(), &expected); Ok(()) @@ -1058,7 +1085,8 @@ mod tests { )?); let interleave = InterleaveExec::try_new(vec![left, right])?; - let stats = interleave.statistics_with_args(&StatisticsArgs::new())?; + let stats = + StatisticsContext::new().compute(&interleave, &StatisticsArgs::new())?; assert_eq!(stats.as_ref(), &expected); Ok(()) @@ -1080,8 +1108,8 @@ mod tests { )?); let interleave = InterleaveExec::try_new(vec![left, right])?; - let stats = interleave - .statistics_with_args(&StatisticsArgs::new().with_partition(Some(0)))?; + let stats = StatisticsContext::new() + .compute(&interleave, &StatisticsArgs::new().with_partition(Some(0)))?; let expected = Statistics::default() .with_num_rows(Precision::Inexact(5)) diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index b0a0330441e94..2d79276b1c381 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -378,10 +378,12 @@ impl ExecutionPlan for BoundedWindowAggExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let input_stat = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let input_stat = input_stats[0].as_ref().clone(); Ok(Arc::new(self.statistics_helper(input_stat)?)) } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index f4bc40cf35d5a..bc88e2ee7ebc6 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -281,10 +281,12 @@ impl ExecutionPlan for WindowAggExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { - let input_stat = Arc::unwrap_or_clone( - args.compute_child_statistics(&self.input, args.partition())?, - ); + fn statistics_from_inputs( + &self, + input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { + let input_stat = input_stats[0].as_ref().clone(); let win_cols = self.window_expr.len(); let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max... diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 9bf167aa73f55..c92face1e5404 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -228,7 +228,11 @@ impl ExecutionPlan for WorkTableExec { Some(self.metrics.clone_inner()) } - fn statistics_with_args(&self, _args: &StatisticsArgs) -> Result> { + fn statistics_from_inputs( + &self, + _input_stats: &[Arc], + _args: &StatisticsArgs, + ) -> Result> { Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } diff --git a/docs/source/library-user-guide/upgrading/55.0.0.md b/docs/source/library-user-guide/upgrading/55.0.0.md index 6d1f834abfac0..8784bf51e339c 100644 --- a/docs/source/library-user-guide/upgrading/55.0.0.md +++ b/docs/source/library-user-guide/upgrading/55.0.0.md @@ -154,37 +154,40 @@ as a supertrait: + pub trait QueryPlanner: Any + Debug ``` -### `ExecutionPlan::partition_statistics` deprecated in favor of `statistics_with_args` +### `ExecutionPlan::partition_statistics` deprecated in favor of `statistics_from_inputs` -`ExecutionPlan::partition_statistics` is deprecated. A new method -`statistics_with_args` accepts a `StatisticsArgs` parameter that carries -the partition index and a shared cache for memoized child statistics lookups. +`ExecutionPlan::partition_statistics` is deprecated. Statistics computation is +now split into two parts: + +- `StatisticsContext` owns the bottom-up plan-tree traversal and a per-walk + cache of memoized child statistics. Call `StatisticsContext::compute` to + obtain statistics for a plan. +- `ExecutionPlan::statistics_from_inputs` computes a node's statistics from its + children's already-resolved statistics, which the context passes in. The node + does not traverse the tree itself. Existing implementations of `partition_statistics` continue to work unchanged. -The default `statistics_with_args` delegates to the deprecated method, so no +The default `statistics_from_inputs` delegates to the deprecated method, so no migration is required until the deprecated method is removed. -> **Warning:** The delegation is **one-way**: the default `statistics_with_args` +> **Warning:** The delegation is **one-way**: the default `statistics_from_inputs` > calls `partition_statistics`, but the default `partition_statistics` does -> **not** call `statistics_with_args` — it returns `Statistics::new_unknown`. -> Nodes that override only `statistics_with_args` will silently return +> **not** call `statistics_from_inputs` — it returns `Statistics::new_unknown`. +> Nodes that override only `statistics_from_inputs` will silently return > `Statistics::new_unknown` to any caller still using the deprecated > `partition_statistics`. **Who is affected:** - Users who implement custom `ExecutionPlan` nodes (recommended to migrate) -- Users who call `partition_statistics` directly (recommended to switch to `statistics_with_args`) +- Users who call `partition_statistics` directly (recommended to switch to `StatisticsContext::compute`) **Migration guide:** -For **implementations**, override `statistics_with_args` instead of -`partition_statistics`. Leaf nodes that do not have children can ignore -the args. - -Child statistics are looked up via `args.compute_child_statistics(child, partition)`. -Use `args.partition()` for partition-preserving operators, or `None` for -partition-merging operators that always need overall stats: +For **implementations**, override `statistics_from_inputs` instead of +`partition_statistics`. Child statistics arrive pre-computed in `input_stats` +(one entry per child, in `children()` order); the node only expresses its local +propagation logic. Leaf nodes that have no children can ignore `input_stats`. ```rust,ignore // Before: @@ -193,36 +196,40 @@ fn partition_statistics(&self, partition: Option) -> Result Result> { - let child_stats = args.compute_child_statistics(&self.input, args.partition())?; - // ... transform child_stats ... -} - -// After (partition-merging): -fn statistics_with_args( +// After: +fn statistics_from_inputs( &self, + input_stats: &[Arc], args: &StatisticsArgs, ) -> Result> { - let child_stats = args.compute_child_statistics(&self.input, None)?; + let child_stats = Arc::clone(&input_stats[0]); // ... transform child_stats ... } ``` -For **callers**, create a `StatisticsArgs` and call `statistics_with_args` -directly. The cache is created automatically: +> **Note:** the default `child_stats_requests` requests each child at the +> parent's partition. A partition-merging or multi-child node must override it, +> or it will silently receive child statistics for the wrong partition: +> +> ```rust,ignore +> fn child_stats_requests(&self, _partition: Option) -> Vec { +> vec![ChildStats::At(None)] // request the child's overall stats +> } +> ``` +> +> See the `child_stats_requests` rustdoc for `ChildStats::Skip` and multi-child cases. + +For **callers**, walk a plan through `StatisticsContext::compute`. The cache is +created with the context: ```rust,ignore -use datafusion_physical_plan::StatisticsArgs; +use datafusion_physical_plan::{StatisticsArgs, StatisticsContext}; // Before: let stats = plan.partition_statistics(None)?; // After: -let stats = plan.statistics_with_args(&StatisticsArgs::new())?; +let stats = StatisticsContext::new().compute(plan.as_ref(), &StatisticsArgs::new())?; ``` ### `DdlStatement::CreateExternalTable` and `CreateFunction` are now boxed