From b34533dde74a6ea536dd5f858d0a52176d39456b Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 26 Feb 2026 11:56:08 +0100 Subject: [PATCH 1/2] Arc stats --- .../examples/relation_planner/table_sample.rs | 6 +- .../core/tests/custom_sources_cases/mod.rs | 8 +- .../tests/custom_sources_cases/statistics.rs | 14 +-- .../physical_optimizer/join_selection.rs | 6 +- .../partition_statistics.rs | 88 +++++++++---------- .../tests/physical_optimizer/test_utils.rs | 4 +- datafusion/core/tests/sql/path_partition.rs | 4 +- datafusion/datasource/src/file_scan_config.rs | 12 +-- datafusion/datasource/src/memory.rs | 14 +-- datafusion/datasource/src/source.rs | 4 +- .../src/output_requirements.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 12 +-- datafusion/physical-plan/src/buffer.rs | 2 +- .../physical-plan/src/coalesce_batches.rs | 7 +- .../physical-plan/src/coalesce_partitions.rs | 7 +- datafusion/physical-plan/src/coop.rs | 2 +- datafusion/physical-plan/src/display.rs | 6 +- datafusion/physical-plan/src/empty.rs | 4 +- .../physical-plan/src/execution_plan.rs | 8 +- datafusion/physical-plan/src/filter.rs | 14 +-- .../physical-plan/src/joins/cross_join.rs | 8 +- .../physical-plan/src/joins/hash_join/exec.rs | 12 +-- .../src/joins/nested_loop_join.rs | 10 +-- .../src/joins/sort_merge_join/exec.rs | 12 +-- datafusion/physical-plan/src/limit.rs | 14 ++- .../physical-plan/src/placeholder_row.rs | 6 +- datafusion/physical-plan/src/projection.rs | 11 +-- .../physical-plan/src/repartition/mod.rs | 8 +- .../physical-plan/src/sorts/partial_sort.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 18 ++-- .../src/sorts/sort_preserving_merge.rs | 2 +- datafusion/physical-plan/src/test.rs | 6 +- datafusion/physical-plan/src/test/exec.rs | 22 ++--- datafusion/physical-plan/src/union.rs | 34 ++++--- .../src/windows/bounded_window_agg_exec.rs | 6 +- .../src/windows/window_agg_exec.rs | 8 +- datafusion/physical-plan/src/work_table.rs | 4 +- 37 files changed, 207 insertions(+), 200 deletions(-) diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 657432ef31362..5686c8e7d27d8 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -727,8 +727,8 @@ impl ExecutionPlan for SampleExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let mut stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let ratio = self.upper_bound - self.lower_bound; // Scale statistics by sampling ratio (inexact due to randomness) @@ -741,7 +741,7 @@ impl ExecutionPlan for SampleExec { .map(|n| (n as f64 * ratio) as usize) .to_inexact(); - Ok(stats) + Ok(Arc::new(stats)) } } diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index ec0b9e253d2ab..2b2cca298f301 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -180,12 +180,12 @@ impl ExecutionPlan for CustomExecutionPlan { Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 })) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap(); - Ok(Statistics { + Ok(Arc::new(Statistics { num_rows: Precision::Exact(batch.num_rows()), total_byte_size: Precision::Absent, column_statistics: self @@ -204,7 +204,7 @@ impl ExecutionPlan for CustomExecutionPlan { ..Default::default() }) .collect(), - }) + })) } } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index e81cd9f6b81b1..684f34e988fdd 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -181,11 +181,11 @@ impl ExecutionPlan for StatisticsValidation { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - Ok(Statistics::new_unknown(&self.schema)) + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { - Ok(self.stats.clone()) + Ok(Arc::new(self.stats.clone())) } } } @@ -238,7 +238,7 @@ async fn sql_basic() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); // the statistics should be those of the source - assert_eq!(stats, physical_plan.partition_statistics(None)?); + assert_eq!(stats, *physical_plan.partition_statistics(None)?); Ok(()) } @@ -278,7 +278,7 @@ async fn sql_limit() -> Result<()> { .collect(), total_byte_size: Precision::Absent }, - physical_plan.partition_statistics(None)? + *physical_plan.partition_statistics(None)? ); let df = ctx @@ -287,7 +287,7 @@ async fn sql_limit() -> Result<()> { .unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is larger than the original number of lines, statistics remain unchanged - assert_eq!(stats, physical_plan.partition_statistics(None)?); + assert_eq!(stats, *physical_plan.partition_statistics(None)?); Ok(()) } @@ -307,7 +307,7 @@ async fn sql_window() -> Result<()> { let result = physical_plan.partition_statistics(None)?; assert_eq!(stats.num_rows, result.num_rows); - let col_stats = result.column_statistics; + let col_stats = &result.column_statistics; assert_eq!(2, col_stats.len()); assert_eq!(stats.column_statistics[1], col_stats[0]); diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 567af64c6a366..dc68fe88a2c0f 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -1176,12 +1176,12 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { - Ok(if partition.is_some() { + fn partition_statistics(&self, partition: Option) -> Result> { + Ok(Arc::new(if partition.is_some() { Statistics::new_unknown(&self.schema) } else { self.stats.clone() - }) + })) } } diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index fa021ed3dcce3..b4f4fb2ad9393 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -255,8 +255,8 @@ mod test { ); // Check the statistics of each partition assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -288,8 +288,8 @@ mod test { create_partition_statistics(2, 8, 1, 2, None); // Check the statistics of each partition assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -322,7 +322,7 @@ mod test { Some((DATE_2025_03_01, DATE_2025_03_04)), ); assert_eq!(statistics.len(), 1); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)]; validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?; @@ -353,8 +353,8 @@ mod test { .map(|idx| sort_exec.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -402,7 +402,7 @@ mod test { }, ], }; - assert_eq!(full_statistics, expected_full_statistic); + assert_eq!(*full_statistics, expected_full_statistic); let statistics = (0..filter.output_partitioning().partition_count()) .map(|idx| filter.partition_statistics(Some(idx))) @@ -431,8 +431,8 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_partition_statistic); - assert_eq!(statistics[1], expected_partition_statistic); + assert_eq!(*statistics[0], expected_partition_statistic); + assert_eq!(*statistics[1], expected_partition_statistic); Ok(()) } @@ -463,13 +463,13 @@ mod test { Some((DATE_2025_03_03, DATE_2025_03_04)), ); // Verify first partition (from first scan) - assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[0], expected_statistic_partition_1); // Verify second partition (from first scan) - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Verify third partition (from second scan - same as first partition) - assert_eq!(statistics[2], expected_statistic_partition_1); + assert_eq!(*statistics[2], expected_statistic_partition_1); // Verify fourth partition (from second scan - same as second partition) - assert_eq!(statistics[3], expected_statistic_partition_2); + assert_eq!(*statistics[3], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -518,8 +518,8 @@ mod test { ColumnStatistics::new_unknown(), ], }; - assert_eq!(stats[0], expected_stats); - assert_eq!(stats[1], expected_stats); + assert_eq!(*stats[0], expected_stats); + assert_eq!(*stats[1], expected_stats); // Verify the execution results let partitions = execute_stream_partitioned( @@ -625,8 +625,8 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -670,7 +670,7 @@ mod test { ); expected_full_statistics.num_rows = Precision::Inexact(4); expected_full_statistics.total_byte_size = Precision::Absent; - assert_eq!(full_statistics, expected_full_statistics); + assert_eq!(*full_statistics, expected_full_statistics); // Test partition_statistics(Some(idx)) - returns partition-specific statistics // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] @@ -699,8 +699,8 @@ mod test { .map(|idx| nested_loop_join.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -729,7 +729,7 @@ mod test { .map(|idx| coalesce_partitions.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 1); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)]; @@ -746,20 +746,20 @@ mod test { .map(|idx| local_limit.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - let mut expected_0 = statistics[0].clone(); + let mut expected_0 = Statistics::clone(&statistics[0]); expected_0.column_statistics = expected_0 .column_statistics .into_iter() .map(|c| c.to_inexact()) .collect(); - let mut expected_1 = statistics[1].clone(); + let mut expected_1 = Statistics::clone(&statistics[1]); expected_1.column_statistics = expected_1 .column_statistics .into_iter() .map(|c| c.to_inexact()) .collect(); - assert_eq!(statistics[0], expected_0); - assert_eq!(statistics[1], expected_1); + assert_eq!(*statistics[0], expected_0); + assert_eq!(*statistics[1], expected_1); Ok(()) } @@ -781,7 +781,7 @@ mod test { 4, Some((DATE_2025_03_01, DATE_2025_03_02)), ); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); Ok(()) } @@ -849,7 +849,7 @@ mod test { ], }; - assert_eq!(&p0_statistics, &expected_p0_statistics); + assert_eq!(*p0_statistics, expected_p0_statistics); let expected_p1_statistics = Statistics { num_rows: Precision::Inexact(2), @@ -869,7 +869,7 @@ mod test { }; let p1_statistics = aggregate_exec_partial.partition_statistics(Some(1))?; - assert_eq!(&p1_statistics, &expected_p1_statistics); + assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( aggregate_exec_partial.clone(), @@ -891,10 +891,10 @@ mod test { )?); let p0_statistics = agg_final.partition_statistics(Some(0))?; - assert_eq!(&p0_statistics, &expected_p0_statistics); + assert_eq!(*p0_statistics, expected_p0_statistics); let p1_statistics = agg_final.partition_statistics(Some(1))?; - assert_eq!(&p1_statistics, &expected_p1_statistics); + assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( agg_final.clone(), @@ -935,8 +935,8 @@ mod test { ], }; - assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?); - assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(1))?); + assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(0))?); + assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(1))?); validate_statistics_with_data( agg_partial.clone(), vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty], @@ -962,8 +962,8 @@ mod test { agg_partial.schema(), )?); - assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(0))?); - assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(1))?); + assert_eq!(empty_stat, *agg_final.partition_statistics(Some(0))?); + assert_eq!(empty_stat, *agg_final.partition_statistics(Some(1))?); validate_statistics_with_data( agg_final, @@ -999,7 +999,7 @@ mod test { column_statistics: vec![ColumnStatistics::new_unknown()], }; - assert_eq!(&expect_stat, &agg_final.partition_statistics(Some(0))?); + assert_eq!(expect_stat, *agg_final.partition_statistics(Some(0))?); // Verify that the aggregate final result has exactly one partition with one row let mut partitions = execute_stream_partitioned( @@ -1033,13 +1033,13 @@ mod test { &schema, None, ); - assert_eq!(actual, expected); + assert_eq!(*actual, expected); all_batches.push(batches); } let actual = plan.partition_statistics(None)?; let expected = compute_record_batch_statistics(&all_batches, &schema, None); - assert_eq!(actual, expected); + assert_eq!(*actual, expected); Ok(()) } @@ -1070,7 +1070,7 @@ mod test { // All partitions should have the same statistics for stat in statistics.iter() { - assert_eq!(stat, &expected_stats); + assert_eq!(**stat, expected_stats); } // Verify that the result has exactly 3 partitions @@ -1135,7 +1135,7 @@ mod test { )?); let result = repartition.partition_statistics(Some(0))?; - assert_eq!(result, Statistics::new_unknown(&scan_schema)); + assert_eq!(*result, Statistics::new_unknown(&scan_schema)); // Verify that the result has exactly 0 partitions let partitions = execute_stream_partitioned( @@ -1174,8 +1174,8 @@ mod test { ColumnStatistics::new_unknown(), ], }; - assert_eq!(stats[0], expected_stats); - assert_eq!(stats[1], expected_stats); + assert_eq!(*stats[0], expected_stats); + assert_eq!(*stats[1], expected_stats); // Verify the repartition execution results let partitions = @@ -1282,8 +1282,8 @@ mod test { ], }; - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Verify the statistics match actual execution results let expected_stats = vec![ diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index feac8190ffde4..a02b5d306642e 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -942,8 +942,8 @@ impl ExecutionPlan for TestScan { internal_err!("TestScan is for testing optimizer only, not for execution") } - fn partition_statistics(&self, _partition: Option) -> Result { - Ok(Statistics::new_unknown(&self.schema)) + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } // This is the key method - implement sort pushdown diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index c6f920584dc2b..7683d9b2ee199 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -459,7 +459,7 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 4); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; + let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); assert_eq!(stat_cols.len(), 4); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(3)); @@ -483,7 +483,7 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 2); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; + let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); assert_eq!(stat_cols.len(), 2); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(1)); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c3e5cabce7bc2..073f915e58c50 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -777,7 +777,7 @@ impl DataSource for FileScanConfig { SchedulingType::Cooperative } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { // Get statistics for a specific partition // Note: FileGroup statistics include partition columns (computed from partition_values) @@ -787,22 +787,22 @@ impl DataSource for FileScanConfig { // Project the statistics based on the projection let output_schema = self.projected_schema()?; return if let Some(projection) = self.file_source.projection() { - projection.project_statistics(stat.clone(), &output_schema) + Ok(Arc::new(projection.project_statistics(stat.clone(), &output_schema)?)) } else { - Ok(stat.clone()) + Ok(Arc::new(stat.clone())) }; } // If no statistics available for this partition, return unknown - Ok(Statistics::new_unknown(self.projected_schema()?.as_ref())) + Ok(Arc::new(Statistics::new_unknown(self.projected_schema()?.as_ref()))) } else { // Return aggregate statistics across all partitions let statistics = self.statistics(); let projection = self.file_source.projection(); let output_schema = self.projected_schema()?; if let Some(projection) = &projection { - projection.project_statistics(statistics.clone(), &output_schema) + Ok(Arc::new(projection.project_statistics(statistics.clone(), &output_schema)?)) } else { - Ok(statistics) + Ok(Arc::new(statistics)) } } } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..fda44120cd80a 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -196,26 +196,26 @@ impl DataSource for MemorySourceConfig { SchedulingType::Cooperative } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { // Compute statistics for a specific partition if let Some(batches) = self.partitions.get(partition) { - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( from_ref(batches), &self.schema, self.projection.clone(), - )) + ))) } else { // Invalid partition index - Ok(Statistics::new_unknown(&self.projected_schema)) + Ok(Arc::new(Statistics::new_unknown(&self.projected_schema))) } } else { // Compute statistics across all partitions - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &self.partitions, &self.schema, self.projection.clone(), - )) + ))) } } @@ -953,7 +953,7 @@ mod tests { let values = MemorySourceConfig::try_new_as_values(schema, data)?; assert_eq!( - values.partition_statistics(None)?, + *values.partition_statistics(None)?, Statistics { num_rows: Precision::Exact(rows), total_byte_size: Precision::Exact(8), // not important diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index a4e27dac769af..58278f075cbd8 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -156,7 +156,7 @@ pub trait DataSource: Send + Sync + Debug { /// Returns statistics for a specific partition, or aggregate statistics /// across all partitions if `partition` is `None`. - fn partition_statistics(&self, partition: Option) -> Result; + fn partition_statistics(&self, partition: Option) -> Result>; /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; @@ -318,7 +318,7 @@ impl ExecutionPlan for DataSourceExec { Some(self.data_source.metrics().clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.data_source.partition_statistics(partition) } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index afc0ee1a336dd..ad4826b65798e 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -244,7 +244,7 @@ impl ExecutionPlan for OutputRequirementExec { unreachable!(); } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 27eee0025aa60..5c438bba1a2e7 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1403,9 +1403,9 @@ impl ExecutionPlan for AggregateExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let child_statistics = self.input().partition_statistics(partition)?; - self.statistics_inner(&child_statistics) + Ok(Arc::new(self.statistics_inner(&child_statistics)?)) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -2483,16 +2483,16 @@ mod tests { Ok(Box::pin(stream)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(self.schema().as_ref())); + return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } let (_, batches) = some_data(); - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &[batches], &self.schema(), None, - )) + ))) } } diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 3b80f9924e311..7d18d96df4f64 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -226,7 +226,7 @@ impl ExecutionPlan for BufferExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 1356eca78329d..30bcc4e18a6c7 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -206,10 +206,9 @@ impl ExecutionPlan for CoalesceBatchesExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index d1fc58837b0fa..02a93bbac8028 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -224,10 +224,9 @@ impl ExecutionPlan for CoalescePartitionsExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { - self.input - .partition_statistics(None)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, _partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index ce54a451ac4d1..39c24a1d377ca 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -293,7 +293,7 @@ impl ExecutionPlan for CooperativeExec { Ok(make_cooperative(child_stream)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 19698cd4ea78c..6a0c35b7ffb5f 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1176,14 +1176,14 @@ mod tests { todo!() } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(self.schema().as_ref())); + return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } match self { Self::Panic => panic!("expected panic"), Self::Error => Err(internal_datafusion_err!("expected error")), - Self::Ok => Ok(Statistics::new_unknown(self.schema().as_ref())), + Self::Ok => Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))), } } } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 64808bbc25167..3253a388de827 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -156,7 +156,7 @@ impl ExecutionPlan for EmptyExec { )?)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { assert_or_internal_err!( partition < self.partitions, @@ -183,7 +183,7 @@ impl ExecutionPlan for EmptyExec { }); } - Ok(stats) + Ok(Arc::new(stats)) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2ce1e79601c52..52de03015bf8e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -476,7 +476,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// If statistics are not available, should return [`Statistics::new_unknown`] /// (the default), not an error. /// If `partition` is `None`, it returns statistics for the entire plan. - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(idx) = partition { // Validate partition index let partition_count = self.properties().partitioning.partition_count(); @@ -487,7 +487,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { partition_count ); } - Ok(Statistics::new_unknown(&self.schema())) + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } /// Returns `true` if a limit can be safely pushed down through this @@ -1497,7 +1497,7 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics(&self, _partition: Option) -> Result> { unimplemented!() } } @@ -1560,7 +1560,7 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics(&self, _partition: Option) -> Result> { unimplemented!() } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 2af0731fb7a63..6a31354a46ac9 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -387,7 +387,7 @@ impl FilterExec { let schema = input.schema(); let stats = Self::statistics_helper( &schema, - input.partition_statistics(None)?, + Arc::unwrap_or_clone(input.partition_statistics(None)?), predicate, default_selectivity, )?; @@ -542,15 +542,15 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. - fn partition_statistics(&self, partition: Option) -> Result { - let input_stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let stats = Self::statistics_helper( &self.input.schema(), input_stats, self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(Arc::new(stats.project(self.projection.as_ref()))) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -1323,7 +1323,7 @@ mod tests { ]; let _ = exp_col_stats .into_iter() - .zip(statistics.column_statistics) + .zip(statistics.column_statistics.clone()) .map(|(expected, actual)| { if let Some(val) = actual.min_value.get_value() { if val.data_type().is_floating() { @@ -1394,7 +1394,7 @@ mod tests { )), )); // Since filter predicate passes all entries, statistics after filter shouldn't change. - let expected = input.partition_statistics(None)?.column_statistics; + let expected = input.partition_statistics(None)?.column_statistics.clone(); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); let statistics = filter.partition_statistics(None)?; @@ -1577,7 +1577,7 @@ mod tests { }], }; - assert_eq!(filter_statistics, expected_filter_statistics); + assert_eq!(*filter_statistics, expected_filter_statistics); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index d5b540885efae..77899a7e52003 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -356,12 +356,12 @@ impl ExecutionPlan for CrossJoinExec { } } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // Get the all partitions statistics of the left - let left_stats = self.left.partition_statistics(None)?; - let right_stats = self.right.partition_statistics(partition)?; + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); - Ok(stats_cartesian_product(left_stats, right_stats)) + Ok(Arc::new(stats_cartesian_product(left_stats, right_stats))) } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index f39208bcb78d0..2beb9434fce03 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1369,16 +1369,18 @@ impl ExecutionPlan for HashJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(None)?); let stats = estimate_join_statistics( - self.left.partition_statistics(None)?, - self.right.partition_statistics(None)?, + left_stats, + right_stats, &self.on, &self.join_type, &self.join_schema, @@ -1386,7 +1388,7 @@ impl ExecutionPlan for HashJoinExec { // Project statistics if there is a projection let stats = stats.project(self.projection.as_ref()); // Apply fetch limit to statistics - stats.with_fetch(self.fetch, 0, 1) + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } /// Tries to push `projection` down through `hash_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 5b2cebb360439..ec234a50641ec 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -611,7 +611,7 @@ impl ExecutionPlan for NestedLoopJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // NestedLoopJoinExec is designed for joins without equijoin keys in the // ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join // predicates are stored in `self.filter`, but `estimate_join_statistics` @@ -625,11 +625,11 @@ impl ExecutionPlan for NestedLoopJoinExec { // so we always request overall stats with `None`. Right side can have // multiple partitions, so we forward the partition parameter to get // partition-specific statistics when requested. - let left_stats = self.left.partition_statistics(None)?; - let right_stats = match partition { + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = Arc::unwrap_or_clone(match partition { Some(partition) => self.right.partition_statistics(Some(partition))?, None => self.right.partition_statistics(None)?, - }; + }); let stats = estimate_join_statistics( left_stats, @@ -639,7 +639,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(Arc::new(stats.project(self.projection.as_ref()))) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 8778e4154e60e..c1f1100676d5f 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -519,7 +519,7 @@ impl ExecutionPlan for SortMergeJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // SortMergeJoinExec uses symmetric hash partitioning where both left and right // inputs are hash-partitioned on the join keys. This means partition `i` of the // left input is joined with partition `i` of the right input. @@ -531,13 +531,15 @@ impl ExecutionPlan for SortMergeJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - estimate_join_statistics( - self.left.partition_statistics(partition)?, - self.right.partition_statistics(partition)?, + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(partition)?); + let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); + Ok(Arc::new(estimate_join_statistics( + left_stats, + right_stats, &self.on, &self.join_type, &self.schema, - ) + )?)) } /// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 9ce63a1c586a6..0ac54a19228a3 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -209,10 +209,9 @@ impl ExecutionPlan for GlobalLimitExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, self.skip, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(self.fetch, self.skip, 1)?)) } fn fetch(&self) -> Option { @@ -365,10 +364,9 @@ impl ExecutionPlan for LocalLimitExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(Some(self.fetch), 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(Some(self.fetch), 0, 1)?)) } fn fetch(&self) -> Option { diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index c91085965b07c..ed7e9e0fa8c5e 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -169,7 +169,7 @@ impl ExecutionPlan for PlaceholderRowExec { Ok(Box::pin(cooperative(ms))) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let batches = self .data() .expect("Create single row placeholder RecordBatch should not fail"); @@ -180,11 +180,11 @@ impl ExecutionPlan for PlaceholderRowExec { None => vec![batches; self.partitions], }; - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &batches, &self.schema, None, - )) + ))) } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 55b4129223c24..4c4f9064bab37 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -342,12 +342,13 @@ impl ExecutionPlan for ProjectionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let output_schema = self.schema(); - self.projector - .projection() - .project_statistics(input_stats, &output_schema) + Ok(Arc::new(self.projector.projection().project_statistics( + input_stats, + &output_schema, + )?)) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2b0c0ea31689b..20cc13c658dc7 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1070,11 +1070,11 @@ impl ExecutionPlan for RepartitionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { let partition_count = self.partitioning().partition_count(); if partition_count == 0 { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } assert_or_internal_err!( @@ -1084,7 +1084,7 @@ impl ExecutionPlan for RepartitionExec { partition_count ); - let mut stats = self.input.partition_statistics(None)?; + let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?); // Distribute statistics across partitions stats.num_rows = stats @@ -1105,7 +1105,7 @@ impl ExecutionPlan for RepartitionExec { .map(|_| ColumnStatistics::new_unknown()) .collect(); - Ok(stats) + Ok(Arc::new(stats)) } else { self.input.partition_statistics(None) } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 08bc73c92d4b3..e6e323ea9a752 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -329,7 +329,7 @@ impl ExecutionPlan for PartialSortExec { Some(self.metrics_set.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 55e1f460e1901..7d95f9aa428d5 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1353,16 +1353,14 @@ impl ExecutionPlan for SortExec { Some(self.metrics_set.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - if !self.preserve_partitioning() { - return self - .input - .partition_statistics(None)? - .with_fetch(self.fetch, 0, 1); - } - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let p = if !self.preserve_partitioning() { + None + } else { + partition + }; + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(p)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 6c1bb4883d1ad..3de6eec30963c 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -372,7 +372,7 @@ impl ExecutionPlan for SortPreservingMergeExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics(&self, _partition: Option) -> Result> { self.input.partition_statistics(None) } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index a967d035bd387..81e8d526aecfa 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -169,11 +169,11 @@ impl ExecutionPlan for TestMemoryExec { unimplemented!() } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - Ok(Statistics::new_unknown(&self.schema)) + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { - self.statistics_inner() + Ok(Arc::new(self.statistics_inner()?)) } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index ebed84477a568..ef5bc44a31dfa 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -254,9 +254,9 @@ impl ExecutionPlan for MockExec { } // Panics if one of the batches is an error - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema)); + return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } let data: Result> = self .data @@ -269,11 +269,11 @@ impl ExecutionPlan for MockExec { let data = data?; - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &[data], &self.schema, None, - )) + ))) } } @@ -406,15 +406,15 @@ impl ExecutionPlan for BarrierExec { Ok(builder.build()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema)); + return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &self.data, &self.schema, None, - )) + ))) } } @@ -592,12 +592,12 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { - Ok(if partition.is_some() { + fn partition_statistics(&self, partition: Option) -> Result> { + Ok(Arc::new(if partition.is_some() { Statistics::new_unknown(&self.schema) } else { self.stats.clone() - }) + })) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 8174160dc9332..1b322d816e6a3 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -310,7 +310,7 @@ impl ExecutionPlan for UnionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition_idx) = partition { // For a specific partition, find which input it belongs to let mut remaining_idx = partition_idx; @@ -323,19 +323,25 @@ impl ExecutionPlan for UnionExec { remaining_idx -= input_partition_count; } // If we get here, the partition index is out of bounds - Ok(Statistics::new_unknown(&self.schema())) + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } else { // Collect statistics from all inputs let stats = self .inputs .iter() - .map(|input_exec| input_exec.partition_statistics(None)) + .map(|input_exec| { + input_exec + .partition_statistics(None) + .map(Arc::unwrap_or_clone) + }) .collect::>>()?; - Ok(stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + Ok(Arc::new( + stats + .into_iter() + .reduce(stats_union) + .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), + )) } } @@ -624,17 +630,19 @@ impl ExecutionPlan for InterleaveExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let stats = self .inputs .iter() - .map(|stat| stat.partition_statistics(partition)) + .map(|stat| stat.partition_statistics(partition).map(Arc::unwrap_or_clone)) .collect::>>()?; - Ok(stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + Ok(Arc::new( + stats + .into_iter() + .reduce(stats_union) + .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), + )) } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 20d54303a94b4..33faf5447ed49 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -368,9 +368,9 @@ impl ExecutionPlan for BoundedWindowAggExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stat = self.input.partition_statistics(partition)?; - self.statistics_helper(input_stat) + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(self.statistics_helper(input_stat)?)) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 0c73cf23523d5..cf0f489f77d46 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -272,8 +272,8 @@ impl ExecutionPlan for WindowAggExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stat = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let win_cols = self.window_expr.len(); let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max... @@ -283,11 +283,11 @@ impl ExecutionPlan for WindowAggExec { for _ in 0..win_cols { column_statistics.push(ColumnStatistics::new_unknown()) } - Ok(Statistics { + Ok(Arc::new(Statistics { num_rows: input_stat.num_rows, column_statistics, total_byte_size: Precision::Absent, - }) + })) } } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 08390f87a2033..a43de47e42eb8 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -231,8 +231,8 @@ impl ExecutionPlan for WorkTableExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { - Ok(Statistics::new_unknown(&self.schema())) + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } /// Injects run-time state into this `WorkTableExec`. From ab910da7bdc6555e917e1e6f4c0ff0a4e4de7398 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sun, 1 Mar 2026 20:57:27 +0100 Subject: [PATCH 2/2] fi conflicts --- datafusion/core/tests/sql/path_partition.rs | 10 ++++++++-- datafusion/datasource/src/file_scan_config.rs | 12 +++++++++--- datafusion/physical-plan/src/aggregates/mod.rs | 5 ++++- datafusion/physical-plan/src/display.rs | 5 ++++- datafusion/physical-plan/src/execution_plan.rs | 10 ++++++++-- datafusion/physical-plan/src/filter.rs | 3 ++- datafusion/physical-plan/src/joins/cross_join.rs | 3 ++- .../physical-plan/src/joins/sort_merge_join/exec.rs | 3 ++- datafusion/physical-plan/src/projection.rs | 12 +++++++----- datafusion/physical-plan/src/union.rs | 5 ++++- .../src/windows/bounded_window_agg_exec.rs | 3 ++- .../physical-plan/src/windows/window_agg_exec.rs | 3 ++- 12 files changed, 54 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 7683d9b2ee199..221e8933780d4 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -459,7 +459,10 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 4); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); + let stat_cols = physical_plan + .partition_statistics(None)? + .column_statistics + .clone(); assert_eq!(stat_cols.len(), 4); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(3)); @@ -483,7 +486,10 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 2); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); + let stat_cols = physical_plan + .partition_statistics(None)? + .column_statistics + .clone(); assert_eq!(stat_cols.len(), 2); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(1)); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 073f915e58c50..821e39f14c297 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -787,20 +787,26 @@ impl DataSource for FileScanConfig { // Project the statistics based on the projection let output_schema = self.projected_schema()?; return if let Some(projection) = self.file_source.projection() { - Ok(Arc::new(projection.project_statistics(stat.clone(), &output_schema)?)) + Ok(Arc::new( + projection.project_statistics(stat.clone(), &output_schema)?, + )) } else { Ok(Arc::new(stat.clone())) }; } // If no statistics available for this partition, return unknown - Ok(Arc::new(Statistics::new_unknown(self.projected_schema()?.as_ref()))) + Ok(Arc::new(Statistics::new_unknown( + self.projected_schema()?.as_ref(), + ))) } else { // Return aggregate statistics across all partitions let statistics = self.statistics(); let projection = self.file_source.projection(); let output_schema = self.projected_schema()?; if let Some(projection) = &projection { - Ok(Arc::new(projection.project_statistics(statistics.clone(), &output_schema)?)) + Ok(Arc::new( + projection.project_statistics(statistics.clone(), &output_schema)?, + )) } else { Ok(Arc::new(statistics)) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5c438bba1a2e7..4dd1165289459 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2483,7 +2483,10 @@ mod tests { Ok(Box::pin(stream)) } - fn partition_statistics(&self, partition: Option) -> Result> { + fn partition_statistics( + &self, + partition: Option, + ) -> Result> { if partition.is_some() { return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 6a0c35b7ffb5f..6bf5c3bf70346 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1176,7 +1176,10 @@ mod tests { todo!() } - fn partition_statistics(&self, partition: Option) -> Result> { + fn partition_statistics( + &self, + partition: Option, + ) -> Result> { if partition.is_some() { return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 52de03015bf8e..4eaebbca3e653 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -1497,7 +1497,10 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result> { + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { unimplemented!() } } @@ -1560,7 +1563,10 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result> { + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { unimplemented!() } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6a31354a46ac9..89d5c7d62604a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -543,7 +543,8 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn partition_statistics(&self, partition: Option) -> Result> { - let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stats = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let stats = Self::statistics_helper( &self.input.schema(), input_stats, diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 77899a7e52003..045860a1810a8 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -359,7 +359,8 @@ impl ExecutionPlan for CrossJoinExec { fn partition_statistics(&self, partition: Option) -> Result> { // Get the all partitions statistics of the left let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); - let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); + let right_stats = + Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); Ok(Arc::new(stats_cartesian_product(left_stats, right_stats))) } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index c1f1100676d5f..bf947ddf24a59 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -532,7 +532,8 @@ impl ExecutionPlan for SortMergeJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(partition)?); - let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); + let right_stats = + Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); Ok(Arc::new(estimate_join_statistics( left_stats, right_stats, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 4c4f9064bab37..93fc4c4fda9b1 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -343,12 +343,14 @@ impl ExecutionPlan for ProjectionExec { } fn partition_statistics(&self, partition: Option) -> Result> { - let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stats = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let output_schema = self.schema(); - Ok(Arc::new(self.projector.projection().project_statistics( - input_stats, - &output_schema, - )?)) + Ok(Arc::new( + self.projector + .projection() + .project_statistics(input_stats, &output_schema)?, + )) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 1b322d816e6a3..9ea77b1d1ab7d 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -634,7 +634,10 @@ impl ExecutionPlan for InterleaveExec { let stats = self .inputs .iter() - .map(|stat| stat.partition_statistics(partition).map(Arc::unwrap_or_clone)) + .map(|stat| { + stat.partition_statistics(partition) + .map(Arc::unwrap_or_clone) + }) .collect::>>()?; Ok(Arc::new( diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 33faf5447ed49..486449585061d 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -369,7 +369,8 @@ impl ExecutionPlan for BoundedWindowAggExec { } fn partition_statistics(&self, partition: Option) -> Result> { - let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stat = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); Ok(Arc::new(self.statistics_helper(input_stat)?)) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index cf0f489f77d46..5edeb60b7984c 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -273,7 +273,8 @@ impl ExecutionPlan for WindowAggExec { } fn partition_statistics(&self, partition: Option) -> Result> { - let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stat = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let win_cols = self.window_expr.len(); let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max...