diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 759ebfe67a812..0bdfe2d2ce372 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -22,6 +22,7 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; use crate::error::_plan_err; +use crate::utils::aggregate::precision_add; use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to @@ -576,23 +577,6 @@ impl Statistics { /// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent. /// /// Returns an error if the statistics do not match the specified schemas. - pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result - where - I: IntoIterator, - { - let mut items = items.into_iter(); - - let Some(init) = items.next() else { - return Ok(Statistics::new_unknown(schema)); - }; - items.try_fold(init.clone(), |acc: Statistics, item_stats: &Statistics| { - acc.try_merge(item_stats) - }) - } - - /// Merge this Statistics value with another Statistics value. - /// - /// Returns an error if the statistics do not match (different schemas). /// /// # Example /// ``` @@ -600,68 +584,100 @@ impl Statistics { /// # use arrow::datatypes::{Field, Schema, DataType}; /// # use datafusion_common::stats::Precision; /// let stats1 = Statistics::default() - /// .with_num_rows(Precision::Exact(1)) - /// .with_total_byte_size(Precision::Exact(2)) + /// .with_num_rows(Precision::Exact(10)) /// .add_column_statistics( /// ColumnStatistics::new_unknown() - /// .with_null_count(Precision::Exact(3)) - /// .with_min_value(Precision::Exact(ScalarValue::from(4))) - /// .with_max_value(Precision::Exact(ScalarValue::from(5))), + /// .with_min_value(Precision::Exact(ScalarValue::from(1))) + /// .with_max_value(Precision::Exact(ScalarValue::from(100))) + /// .with_sum_value(Precision::Exact(ScalarValue::from(500))), /// ); /// /// let stats2 = Statistics::default() - /// .with_num_rows(Precision::Exact(10)) - /// .with_total_byte_size(Precision::Inexact(20)) + /// .with_num_rows(Precision::Exact(20)) /// .add_column_statistics( /// ColumnStatistics::new_unknown() - /// // absent null count - /// .with_min_value(Precision::Exact(ScalarValue::from(40))) - /// .with_max_value(Precision::Exact(ScalarValue::from(50))), + /// .with_min_value(Precision::Exact(ScalarValue::from(5))) + /// .with_max_value(Precision::Exact(ScalarValue::from(200))) + /// .with_sum_value(Precision::Exact(ScalarValue::from(1000))), /// ); /// - /// let merged_stats = stats1.try_merge(&stats2).unwrap(); - /// let expected_stats = Statistics::default() - /// .with_num_rows(Precision::Exact(11)) - /// .with_total_byte_size(Precision::Inexact(22)) // inexact in stats2 --> inexact - /// .add_column_statistics( - /// ColumnStatistics::new_unknown() - /// .with_null_count(Precision::Absent) // missing from stats2 --> absent - /// .with_min_value(Precision::Exact(ScalarValue::from(4))) - /// .with_max_value(Precision::Exact(ScalarValue::from(50))), - /// ); + /// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + /// let merged = Statistics::try_merge_iter( + /// &[stats1, stats2], + /// &schema, + /// ).unwrap(); /// - /// assert_eq!(merged_stats, expected_stats) + /// assert_eq!(merged.num_rows, Precision::Exact(30)); + /// assert_eq!(merged.column_statistics[0].min_value, + /// Precision::Exact(ScalarValue::from(1))); + /// assert_eq!(merged.column_statistics[0].max_value, + /// Precision::Exact(ScalarValue::from(200))); + /// assert_eq!(merged.column_statistics[0].sum_value, + /// Precision::Exact(ScalarValue::from(1500))); /// ``` - pub fn try_merge(self, other: &Statistics) -> Result { - let Self { - mut num_rows, - mut total_byte_size, - mut column_statistics, - } = self; - - // Accumulate statistics for subsequent items - num_rows = num_rows.add(&other.num_rows); - total_byte_size = total_byte_size.add(&other.total_byte_size); - - if column_statistics.len() != other.column_statistics.len() { - return _plan_err!( - "Cannot merge statistics with different number of columns: {} vs {}", - column_statistics.len(), - other.column_statistics.len() - ); + pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result + where + I: IntoIterator, + { + let items: Vec<&Statistics> = items.into_iter().collect(); + + if items.is_empty() { + return Ok(Statistics::new_unknown(schema)); + } + if items.len() == 1 { + return Ok(items[0].clone()); + } + + let num_cols = items[0].column_statistics.len(); + // Validate all items have the same number of columns + for (i, stat) in items.iter().enumerate().skip(1) { + if stat.column_statistics.len() != num_cols { + return _plan_err!( + "Cannot merge statistics with different number of columns: {} vs {} (item {})", + num_cols, + stat.column_statistics.len(), + i + ); + } + } + + // Aggregate usize fields (cheap arithmetic) + let mut num_rows = Precision::Exact(0usize); + let mut total_byte_size = Precision::Exact(0usize); + for stat in &items { + num_rows = num_rows.add(&stat.num_rows); + total_byte_size = total_byte_size.add(&stat.total_byte_size); } - for (item_col_stats, col_stats) in other + let first = items[0]; + let mut column_statistics: Vec = first .column_statistics .iter() - .zip(column_statistics.iter_mut()) - { - col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count); - col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); - col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); - col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); - col_stats.distinct_count = Precision::Absent; - col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size); + .map(|cs| ColumnStatistics { + null_count: cs.null_count, + max_value: cs.max_value.clone(), + min_value: cs.min_value.clone(), + sum_value: cs.sum_value.clone(), + distinct_count: Precision::Absent, + byte_size: cs.byte_size, + }) + .collect(); + + // Accumulate all statistics in a single pass. + // Uses precision_add for sum (avoids the expensive + // ScalarValue::add round-trip through Arrow arrays), and + // Precision::min/max which use cheap PartialOrd comparison. + for stat in items.iter().skip(1) { + for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { + let item_cs = &stat.column_statistics[col_idx]; + + col_stats.null_count = col_stats.null_count.add(&item_cs.null_count); + col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size); + col_stats.sum_value = + precision_add(&col_stats.sum_value, &item_cs.sum_value); + col_stats.min_value = col_stats.min_value.min(&item_cs.min_value); + col_stats.max_value = col_stats.max_value.max(&item_cs.max_value); + } } Ok(Statistics { @@ -1141,7 +1157,7 @@ mod tests { } #[test] - fn test_try_merge_basic() { + fn test_try_merge() { // Create a schema with two columns let schema = Arc::new(Schema::new(vec![ Field::new("col1", DataType::Int32, false), @@ -1338,52 +1354,6 @@ mod tests { ); } - #[test] - fn test_try_merge_distinct_count_absent() { - // Create statistics with known distinct counts - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .with_total_byte_size(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_null_count(Precision::Exact(0)) - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(1)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) - .with_distinct_count(Precision::Exact(5)), - ); - - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(15)) - .with_total_byte_size(Precision::Exact(150)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_null_count(Precision::Exact(0)) - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(20)))) - .with_distinct_count(Precision::Exact(7)), - ); - - // Merge statistics - let merged_stats = stats1.try_merge(&stats2).unwrap(); - - // Verify the results - assert_eq!(merged_stats.num_rows, Precision::Exact(25)); - assert_eq!(merged_stats.total_byte_size, Precision::Exact(250)); - - let col_stats = &merged_stats.column_statistics[0]; - assert_eq!(col_stats.null_count, Precision::Exact(0)); - assert_eq!( - col_stats.min_value, - Precision::Exact(ScalarValue::Int32(Some(1))) - ); - assert_eq!( - col_stats.max_value, - Precision::Exact(ScalarValue::Int32(Some(20))) - ); - // Distinct count should be Absent after merge - assert_eq!(col_stats.distinct_count, Precision::Absent); - } - #[test] fn test_with_fetch_basic_preservation() { // Test that column statistics and byte size are preserved (as inexact) when applying fetch @@ -1650,44 +1620,6 @@ mod tests { assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789)); } - #[test] - fn test_byte_size_try_merge() { - // Test that byte_size is summed correctly in try_merge - let col_stats1 = ColumnStatistics { - null_count: Precision::Exact(10), - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Exact(1000), - }; - let col_stats2 = ColumnStatistics { - null_count: Precision::Exact(20), - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Exact(2000), - }; - - let stats1 = Statistics { - num_rows: Precision::Exact(50), - total_byte_size: Precision::Exact(1000), - column_statistics: vec![col_stats1], - }; - let stats2 = Statistics { - num_rows: Precision::Exact(100), - total_byte_size: Precision::Exact(2000), - column_statistics: vec![col_stats2], - }; - - let merged = stats1.try_merge(&stats2).unwrap(); - assert_eq!( - merged.column_statistics[0].byte_size, - Precision::Exact(3000) // 1000 + 2000 - ); - } - #[test] fn test_byte_size_to_inexact() { let col_stats = ColumnStatistics { @@ -1785,4 +1717,442 @@ mod tests { // total_byte_size should fall back to scaling: 8000 * 0.1 = 800 assert_eq!(result.total_byte_size, Precision::Inexact(800)); } + + #[test] + fn test_try_merge_iter_basic() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + ])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(40), + }, + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(200))), + min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(40), + }, + ], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(60), + }, + ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int32(Some(180))), + min_value: Precision::Exact(ScalarValue::Int32(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(60), + }, + ], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Exact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); + + let col1_stats = &summary_stats.column_statistics[0]; + assert_eq!(col1_stats.null_count, Precision::Exact(3)); + assert_eq!( + col1_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col1_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(-10))) + ); + assert_eq!( + col1_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(1100))) + ); + + let col2_stats = &summary_stats.column_statistics[1]; + assert_eq!(col2_stats.null_count, Precision::Exact(5)); + assert_eq!( + col2_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col2_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(5))) + ); + assert_eq!( + col2_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(2200))) + ); + } + + #[test] + fn test_try_merge_iter_mixed_precision() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(40), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Inexact(60), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-10))) + ); + // sum_value becomes Absent because stats2 has Absent sum + assert_eq!(col_stats.sum_value, Precision::Absent); + } + + #[test] + fn test_try_merge_iter_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let items: Vec<&Statistics> = vec![]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Absent); + assert_eq!(summary_stats.total_byte_size, Precision::Absent); + assert_eq!(summary_stats.column_statistics.len(), 1); + assert_eq!( + summary_stats.column_statistics[0].null_count, + Precision::Absent + ); + } + + #[test] + fn test_try_merge_iter_single_item() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Exact(10), + byte_size: Precision::Exact(40), + }], + }; + + let items = vec![&stats]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats, stats); + } + + #[test] + fn test_try_merge_iter_mismatched_columns() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats1 = Statistics::default(); + let stats2 = + Statistics::default().add_column_statistics(ColumnStatistics::new_unknown()); + + let items = vec![&stats1, &stats2]; + let e = Statistics::try_merge_iter(items, &schema).unwrap_err(); + assert_contains!( + e.to_string(), + "Cannot merge statistics with different number of columns: 0 vs 1" + ); + } + + #[test] + fn test_try_merge_iter_three_items() { + // Verify that merging three items works correctly + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int64, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int64(Some(100))), + min_value: Precision::Exact(ScalarValue::Int64(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(500))), + distinct_count: Precision::Exact(8), + byte_size: Precision::Exact(80), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(20), + total_byte_size: Precision::Exact(200), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int64(Some(200))), + min_value: Precision::Exact(ScalarValue::Int64(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(1000))), + distinct_count: Precision::Exact(15), + byte_size: Precision::Exact(160), + }], + }; + + let stats3 = Statistics { + num_rows: Precision::Exact(30), + total_byte_size: Precision::Exact(300), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int64(Some(150))), + min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(2000))), + distinct_count: Precision::Exact(25), + byte_size: Precision::Exact(240), + }], + }; + + let items = vec![&stats1, &stats2, &stats3]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Exact(60)); + assert_eq!(summary_stats.total_byte_size, Precision::Exact(600)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Exact(6)); + assert_eq!( + col_stats.max_value, + Precision::Exact(ScalarValue::Int64(Some(200))) + ); + assert_eq!( + col_stats.min_value, + Precision::Exact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + col_stats.sum_value, + Precision::Exact(ScalarValue::Int64(Some(3500))) + ); + assert_eq!(col_stats.byte_size, Precision::Exact(480)); + // distinct_count is always Absent after merge (can't accurately merge NDV) + assert_eq!(col_stats.distinct_count, Precision::Absent); + } + + #[test] + fn test_try_merge_iter_float_types() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Float64, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(80), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Float64(Some(99.9))), + min_value: Precision::Exact(ScalarValue::Float64(Some(1.1))), + sum_value: Precision::Exact(ScalarValue::Float64(Some(500.5))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(80), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(80), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Float64(Some(200.0))), + min_value: Precision::Exact(ScalarValue::Float64(Some(0.5))), + sum_value: Precision::Exact(ScalarValue::Float64(Some(1000.0))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(80), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!( + col_stats.max_value, + Precision::Exact(ScalarValue::Float64(Some(200.0))) + ); + assert_eq!( + col_stats.min_value, + Precision::Exact(ScalarValue::Float64(Some(0.5))) + ); + assert_eq!( + col_stats.sum_value, + Precision::Exact(ScalarValue::Float64(Some(1500.5))) + ); + } + + #[test] + fn test_try_merge_iter_string_types() { + let schema = + Arc::new(Schema::new(vec![Field::new("col1", DataType::Utf8, false)])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Utf8(Some("dog".to_string()))), + min_value: Precision::Exact(ScalarValue::Utf8(Some("ant".to_string()))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(100), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Utf8(Some("zebra".to_string()))), + min_value: Precision::Exact(ScalarValue::Utf8(Some("bat".to_string()))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(100), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!( + col_stats.max_value, + Precision::Exact(ScalarValue::Utf8(Some("zebra".to_string()))) + ); + assert_eq!( + col_stats.min_value, + Precision::Exact(ScalarValue::Utf8(Some("ant".to_string()))) + ); + assert_eq!(col_stats.sum_value, Precision::Absent); + } + + #[test] + fn test_try_merge_iter_all_inexact() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Inexact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(1), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Inexact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + byte_size: Precision::Inexact(40), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(20), + total_byte_size: Precision::Inexact(200), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(200))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(-5))), + sum_value: Precision::Inexact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + byte_size: Precision::Inexact(60), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(30)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(300)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-5))) + ); + assert_eq!( + col_stats.sum_value, + Precision::Inexact(ScalarValue::Int32(Some(1500))) + ); + } } diff --git a/datafusion/common/src/utils/aggregate.rs b/datafusion/common/src/utils/aggregate.rs new file mode 100644 index 0000000000000..16cc3bc033197 --- /dev/null +++ b/datafusion/common/src/utils/aggregate.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Scalar-level aggregation utilities for statistics merging. +//! +//! Provides a cheap pairwise [`ScalarValue`] addition that directly +//! extracts inner primitive values, avoiding the expensive +//! `ScalarValue::add` path (which round-trips through Arrow arrays). + +use crate::stats::Precision; +use crate::{Result, ScalarValue}; + +/// Add two [`ScalarValue`]s by directly extracting and adding their +/// inner primitive values. +/// +/// This avoids `ScalarValue::add` which converts both operands to +/// single-element Arrow arrays, runs the `add_wrapping` kernel, and +/// converts the result back — 3 heap allocations per call. +/// +/// For non-primitive types, falls back to `ScalarValue::add`. +pub(crate) fn scalar_add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result { + macro_rules! add_wrapping { + ($lhs:expr, $rhs:expr, $VARIANT:ident) => { + match ($lhs, $rhs) { + (ScalarValue::$VARIANT(Some(a)), ScalarValue::$VARIANT(Some(b))) => { + Ok(ScalarValue::$VARIANT(Some(a.wrapping_add(*b)))) + } + (ScalarValue::$VARIANT(None), other) + | (other, ScalarValue::$VARIANT(None)) => Ok(other.clone()), + _ => unreachable!(), + } + }; + } + + macro_rules! add_decimal { + ($lhs:expr, $rhs:expr, $VARIANT:ident) => { + match ($lhs, $rhs) { + ( + ScalarValue::$VARIANT(Some(a), p, s), + ScalarValue::$VARIANT(Some(b), _, _), + ) => Ok(ScalarValue::$VARIANT(Some(a.wrapping_add(*b)), *p, *s)), + (ScalarValue::$VARIANT(None, _, _), other) + | (other, ScalarValue::$VARIANT(None, _, _)) => Ok(other.clone()), + _ => unreachable!(), + } + }; + } + + macro_rules! add_float { + ($lhs:expr, $rhs:expr, $VARIANT:ident) => { + match ($lhs, $rhs) { + (ScalarValue::$VARIANT(Some(a)), ScalarValue::$VARIANT(Some(b))) => { + Ok(ScalarValue::$VARIANT(Some(*a + *b))) + } + (ScalarValue::$VARIANT(None), other) + | (other, ScalarValue::$VARIANT(None)) => Ok(other.clone()), + _ => unreachable!(), + } + }; + } + + match lhs { + ScalarValue::Int8(_) => add_wrapping!(lhs, rhs, Int8), + ScalarValue::Int16(_) => add_wrapping!(lhs, rhs, Int16), + ScalarValue::Int32(_) => add_wrapping!(lhs, rhs, Int32), + ScalarValue::Int64(_) => add_wrapping!(lhs, rhs, Int64), + ScalarValue::UInt8(_) => add_wrapping!(lhs, rhs, UInt8), + ScalarValue::UInt16(_) => add_wrapping!(lhs, rhs, UInt16), + ScalarValue::UInt32(_) => add_wrapping!(lhs, rhs, UInt32), + ScalarValue::UInt64(_) => add_wrapping!(lhs, rhs, UInt64), + ScalarValue::Float16(_) => add_float!(lhs, rhs, Float16), + ScalarValue::Float32(_) => add_float!(lhs, rhs, Float32), + ScalarValue::Float64(_) => add_float!(lhs, rhs, Float64), + ScalarValue::Decimal32(_, _, _) => add_decimal!(lhs, rhs, Decimal32), + ScalarValue::Decimal64(_, _, _) => add_decimal!(lhs, rhs, Decimal64), + ScalarValue::Decimal128(_, _, _) => add_decimal!(lhs, rhs, Decimal128), + ScalarValue::Decimal256(_, _, _) => add_decimal!(lhs, rhs, Decimal256), + // Fallback: use the existing ScalarValue::add + _ => lhs.add(rhs), + } +} + +/// [`Precision`]-aware sum of two [`ScalarValue`] precisions using +/// cheap direct addition via [`scalar_add`]. +/// +/// Mirrors the semantics of `Precision::add` but avoids +/// the expensive `ScalarValue::add` round-trip through Arrow arrays. +pub(crate) fn precision_add( + lhs: &Precision, + rhs: &Precision, +) -> Precision { + match (lhs, rhs) { + (Precision::Exact(a), Precision::Exact(b)) => scalar_add(a, b) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + (Precision::Inexact(a), Precision::Exact(b)) + | (Precision::Exact(a), Precision::Inexact(b)) + | (Precision::Inexact(a), Precision::Inexact(b)) => scalar_add(a, b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + (_, _) => Precision::Absent, + } +} diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 7f2d78d57970e..075a189c371dc 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -17,6 +17,7 @@ //! This module provides the bisect function, which implements binary search. +pub(crate) mod aggregate; pub mod expr; pub mod memory; pub mod proxy;