From 684f901e73b14431f1625af06d5aa47d2b1b873e Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:00:47 +0800 Subject: [PATCH 01/10] Support nullary aggregate expression validation --- datafusion/expr-common/src/type_coercion/aggregates.rs | 8 ++++++++ datafusion/physical-expr/src/aggregate.rs | 6 +----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/datafusion/expr-common/src/type_coercion/aggregates.rs b/datafusion/expr-common/src/type_coercion/aggregates.rs index ada0bd26b8d06..235343fdc69b2 100644 --- a/datafusion/expr-common/src/type_coercion/aggregates.rs +++ b/datafusion/expr-common/src/type_coercion/aggregates.rs @@ -75,6 +75,14 @@ pub fn check_arg_count( ); } } + TypeSignature::Nullary => { + if !input_fields.is_empty() { + return plan_err!( + "The function {func_name} expects zero arguments, but {} were provided", + input_fields.len() + ); + } + } TypeSignature::OneOf(variants) => { let ok = variants .iter() diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index e5d55aba4f51c..5e3bc74ee2a5e 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -44,9 +44,7 @@ use crate::planner::{create_physical_expr, create_physical_exprs}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef}; use datafusion_common::metadata::FieldMetadata; -use datafusion_common::{ - DFSchema, Result, ScalarValue, assert_or_internal_err, internal_err, not_impl_err, -}; +use datafusion_common::{DFSchema, Result, ScalarValue, internal_err, not_impl_err}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::{ AggregateFunction, AggregateFunctionParams, NullTreatment, physical_name, @@ -261,8 +259,6 @@ impl AggregateExprBuilder { is_distinct, is_reversed, } = self; - assert_or_internal_err!(!args.is_empty(), "args should not be empty"); - let ordering_types = order_bys .iter() .map(|e| e.expr.data_type(&schema)) From 3e81a3286e35fc824c851f440921132e42e264f9 Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:02:05 +0800 Subject: [PATCH 02/10] Add planner coverage for nullary aggregate UDFs --- datafusion/sql/tests/sql_integration.rs | 30 ++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 88b7b43eb73f6..67d1184eff1d4 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -31,7 +31,7 @@ use datafusion_expr::{ ColumnarValue, CreateIndex, DdlStatement, Expr, HigherOrderFunctionArgs, HigherOrderReturnFieldArgs, HigherOrderSignature, HigherOrderUDF, HigherOrderUDFImpl, LambdaParametersProgress, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, - ValueOrLambda, Volatility, col, + ValueOrLambda, Volatility, col, create_udaf, expr::{HigherOrderFunction, LambdaVariable, ScalarFunction}, lambda, logical_plan::LogicalPlan, @@ -5135,6 +5135,34 @@ fn test_error_message_invalid_aggregate_function_signature() { assert!(error_message("select max(9, 3)").starts_with(r"Error during planning: Execution error: Function 'max' user-defined coercion failed with: Execution error: min/max was called with 2 arguments. It requires only 1")); } +#[test] +fn plan_zero_argument_aggregate_udf() { + let state = mock_session_state().with_aggregate_function(Arc::new(create_udaf( + "window_start", + vec![], + Arc::new(DataType::Utf8), + Volatility::Immutable, + Arc::new(|_| panic!("dummy - not implemented")), + Arc::new(vec![DataType::Utf8]), + ))); + let plan = logical_plan_from_state( + "SELECT window_start()", + &GenericDialect {}, + ParserOptions::default(), + state, + ) + .unwrap(); + + assert_snapshot!( + plan, + @r#" + Projection: window_start() + Aggregate: groupBy=[[]], aggr=[[window_start()]] + EmptyRelation: rows=1 + "# + ); +} + #[test] fn test_error_message_invalid_window_function_signature() { assert!(error_message("select rank(1) over()").starts_with(r"Error during planning: The function 'rank' expected zero argument but received 1")); From 52bfa73fae2c3d760decd3ffea235ea1de9dac13 Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:04:55 +0800 Subject: [PATCH 03/10] Add execution coverage for nullary aggregate UDFs --- .../user_defined/user_defined_aggregates.rs | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index b895cb9c7ce2c..cde4c0fe63e5a 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -103,6 +103,24 @@ async fn test_udaf() { assert!(!test_state.retract_batch()); } +#[tokio::test] +async fn test_zero_argument_udaf() { + let TestContext { mut ctx, .. } = TestContext::new(); + NullaryAccumulator::register(&mut ctx, "window_start"); + + let actual = execute(&ctx, "SELECT window_start() from t") + .await + .unwrap(); + + insta::assert_snapshot!(batches_to_string(&actual), @r" + +----------------+ + | window_start() | + +----------------+ + | 7 | + +----------------+ + "); +} + /// User defined aggregate used as a window function #[tokio::test] async fn test_udaf_as_window() { @@ -559,6 +577,55 @@ impl TestState { } } +#[derive(Debug, Default)] +struct NullaryAccumulator { + value: Option, +} + +impl NullaryAccumulator { + fn register(ctx: &mut SessionContext, name: &str) { + let accumulator: AccumulatorFactoryFunction = + Arc::new(|_| Ok(Box::::default())); + + let udaf = AggregateUDF::from(SimpleAggregateUDF::new( + name, + vec![], + DataType::Int64, + Volatility::Immutable, + accumulator, + vec![Field::new("value", DataType::Int64, true).into()], + )); + + ctx.register_udaf(udaf) + } +} + +impl Accumulator for NullaryAccumulator { + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + assert!(values.is_empty()); + self.value = Some(7); + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + assert_eq!(states.len(), 1); + self.value = Some(7); + Ok(()) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::Int64(self.value)) + } + + fn size(&self) -> usize { + size_of_val(self) + } +} + /// Models a user defined aggregate function that computes the a sum /// of timestamps (not a quantity that has much real world meaning) #[derive(Debug)] From b2cdbfc58265a6dc170f25e224634b616a7e0956 Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:05:18 +0800 Subject: [PATCH 04/10] Format nullary aggregate UDF test --- datafusion/core/tests/user_defined/user_defined_aggregates.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index cde4c0fe63e5a..d5358ca095ee5 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -108,9 +108,7 @@ async fn test_zero_argument_udaf() { let TestContext { mut ctx, .. } = TestContext::new(); NullaryAccumulator::register(&mut ctx, "window_start"); - let actual = execute(&ctx, "SELECT window_start() from t") - .await - .unwrap(); + let actual = execute(&ctx, "SELECT window_start() from t").await.unwrap(); insta::assert_snapshot!(batches_to_string(&actual), @r" +----------------+ From 18c38c75048e9e5c885376d375e95f84beb2c2bf Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:06:34 +0800 Subject: [PATCH 05/10] Use nullary signatures in aggregate UDF regressions --- .../user_defined/user_defined_aggregates.rs | 5 ++-- datafusion/sql/tests/sql_integration.rs | 26 ++++++++++--------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index d5358ca095ee5..8df17d5d286d9 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -585,11 +585,10 @@ impl NullaryAccumulator { let accumulator: AccumulatorFactoryFunction = Arc::new(|_| Ok(Box::::default())); - let udaf = AggregateUDF::from(SimpleAggregateUDF::new( + let udaf = AggregateUDF::from(SimpleAggregateUDF::new_with_signature( name, - vec![], + Signature::nullary(Volatility::Immutable), DataType::Int64, - Volatility::Immutable, accumulator, vec![Field::new("value", DataType::Int64, true).into()], )); diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 67d1184eff1d4..2cfb046a1d81f 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -28,10 +28,11 @@ use arrow::datatypes::{TimeUnit::Nanosecond, *}; use common::MockContextProvider; use datafusion_common::{DFSchema, DataFusionError, Result, assert_contains}; use datafusion_expr::{ - ColumnarValue, CreateIndex, DdlStatement, Expr, HigherOrderFunctionArgs, - HigherOrderReturnFieldArgs, HigherOrderSignature, HigherOrderUDF, HigherOrderUDFImpl, - LambdaParametersProgress, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, - ValueOrLambda, Volatility, col, create_udaf, + AggregateUDF, ColumnarValue, CreateIndex, DdlStatement, Expr, + HigherOrderFunctionArgs, HigherOrderReturnFieldArgs, HigherOrderSignature, + HigherOrderUDF, HigherOrderUDFImpl, LambdaParametersProgress, ScalarFunctionArgs, + ScalarUDF, ScalarUDFImpl, Signature, SimpleAggregateUDF, ValueOrLambda, Volatility, + col, expr::{HigherOrderFunction, LambdaVariable, ScalarFunction}, lambda, logical_plan::LogicalPlan, @@ -5137,14 +5138,15 @@ fn test_error_message_invalid_aggregate_function_signature() { #[test] fn plan_zero_argument_aggregate_udf() { - let state = mock_session_state().with_aggregate_function(Arc::new(create_udaf( - "window_start", - vec![], - Arc::new(DataType::Utf8), - Volatility::Immutable, - Arc::new(|_| panic!("dummy - not implemented")), - Arc::new(vec![DataType::Utf8]), - ))); + let state = mock_session_state().with_aggregate_function(Arc::new( + AggregateUDF::from(SimpleAggregateUDF::new_with_signature( + "window_start", + Signature::nullary(Volatility::Immutable), + DataType::Utf8, + Arc::new(|_| panic!("dummy - not implemented")), + vec![Field::new("value", DataType::Utf8, true).into()], + )), + )); let plan = logical_plan_from_state( "SELECT window_start()", &GenericDialect {}, From 0b805ed243b5e720076d707e5fd73c99db106d27 Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:22:09 +0800 Subject: [PATCH 06/10] Reject empty physical count aggregates --- datafusion/physical-expr/src/aggregate.rs | 10 +++++++++- datafusion/physical-plan/src/aggregates/mod.rs | 15 ++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 5e3bc74ee2a5e..66f731663b61e 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -44,7 +44,9 @@ use crate::planner::{create_physical_expr, create_physical_exprs}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef}; use datafusion_common::metadata::FieldMetadata; -use datafusion_common::{DFSchema, Result, ScalarValue, internal_err, not_impl_err}; +use datafusion_common::{ + DFSchema, Result, ScalarValue, internal_err, not_impl_err, plan_err, +}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::{ AggregateFunction, AggregateFunctionParams, NullTreatment, physical_name, @@ -271,6 +273,12 @@ impl AggregateExprBuilder { .map(|arg| arg.return_field(&schema)) .collect::>>()?; + if args.is_empty() && fun.name() == "count" { + return plan_err!( + "Physical count aggregate requires an argument; use COUNT_STAR_EXPANSION for count(*)" + ); + } + check_arg_count( fun.name(), &input_exprs_fields, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e1c598e02dfff..003f107db5e5a 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2433,7 +2433,7 @@ mod tests { use arrow::compute::{SortOptions, concat_batches}; use arrow::datatypes::Int32Type; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; - use datafusion_common::{DataFusionError, internal_err}; + use datafusion_common::{DataFusionError, assert_contains, internal_err}; use datafusion_execution::config::SessionConfig; use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::RuntimeEnvBuilder; @@ -2469,6 +2469,19 @@ mod tests { Ok(schema) } + #[test] + fn count_requires_physical_argument() { + let err = AggregateExprBuilder::new(count_udaf(), vec![]) + .alias("count") + .build() + .expect_err("empty-argument physical count should fail"); + + assert_contains!( + err.to_string(), + "Physical count aggregate requires an argument" + ); + } + /// some mock data to aggregates fn some_data() -> (Arc, Vec) { // define a schema. From 3eed5a181cd2dc47a3f25dd9e3f5f67a8da7ebed Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:29:17 +0800 Subject: [PATCH 07/10] Support grouped nullary aggregate UDFs --- .../user_defined/user_defined_aggregates.rs | 18 +++++ .../src/aggregate/groups_accumulator.rs | 81 ++++++++++++++----- .../physical-plan/src/aggregates/row_hash.rs | 8 +- 3 files changed, 87 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 8df17d5d286d9..1763647ce9b31 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -117,6 +117,24 @@ async fn test_zero_argument_udaf() { | 7 | +----------------+ "); + + let actual = execute( + &ctx, + "SELECT value, window_start() FROM t GROUP BY value ORDER BY value", + ) + .await + .unwrap(); + + insta::assert_snapshot!(batches_to_string(&actual), @r" + +-------+----------------+ + | value | window_start() | + +-------+----------------+ + | 1.0 | 7 | + | 2.0 | 7 | + | 3.0 | 7 | + | 5.0 | 7 | + +-------+----------------+ + "); } /// User defined aggregate used as a window function diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index b412b4ffe09f2..6597331e6d518 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -27,7 +27,7 @@ use std::mem::{size_of, size_of_val}; use arrow::array::new_empty_array; use arrow::{ - array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}, + array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray}, compute, compute::take_arrays, datatypes::UInt32Type, @@ -100,6 +100,12 @@ pub struct GroupsAccumulatorAdapter { /// bottleneck in earlier implementations when there were many /// distinct groups. allocation_bytes: usize, + + /// Whether this adapter can convert raw input rows directly to aggregate + /// state. Nullary aggregates do not carry a value array with the input row + /// cardinality, so this optimization cannot safely reconstruct one state + /// per input row for them. + supports_convert_to_state: bool, } struct AccumulatorState { @@ -137,6 +143,24 @@ impl GroupsAccumulatorAdapter { factory: Box::new(factory), states: vec![], allocation_bytes: 0, + supports_convert_to_state: true, + } + } + + /// Create a new adapter with explicit control over whether row-to-state + /// conversion is supported. + pub fn new_with_convert_to_state( + factory: F, + supports_convert_to_state: bool, + ) -> Self + where + F: Fn() -> Result> + Send + 'static, + { + Self { + factory: Box::new(factory), + states: vec![], + allocation_bytes: 0, + supports_convert_to_state, } } @@ -196,13 +220,24 @@ impl GroupsAccumulatorAdapter { { self.make_accumulators_if_needed(total_num_groups)?; - assert_eq!(values[0].len(), group_indices.len()); + if values.is_empty() { + for (idx, group_index) in group_indices.iter().enumerate() { + if opt_filter + .is_some_and(|filter| !filter.is_valid(idx) || !filter.value(idx)) + { + continue; + } + self.states[*group_index].indices.push(idx as u32); + } + } else { + assert_eq!(values[0].len(), group_indices.len()); - // figure out which input rows correspond to which groups. - // Note that self.state.indices starts empty for all groups - // (it is cleared out below) - for (idx, group_index) in group_indices.iter().enumerate() { - self.states[*group_index].indices.push(idx as u32); + // figure out which input rows correspond to which groups. + // Note that self.state.indices starts empty for all groups + // (it is cleared out below) + for (idx, group_index) in group_indices.iter().enumerate() { + self.states[*group_index].indices.push(idx as u32); + } } // groups_with_rows holds a list of group indexes that have @@ -230,13 +265,18 @@ impl GroupsAccumulatorAdapter { offset_so_far += indices.len(); offsets.push(offset_so_far); } - let batch_indices = batch_indices.into(); + let values_and_filter = if values.is_empty() { + None + } else { + let batch_indices = batch_indices.into(); - // reorder the values and opt_filter by batch_indices so that - // all values for each group are contiguous, then invoke the - // accumulator once per group with values - let values = take_arrays(values, &batch_indices, None)?; - let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; + // reorder the values and opt_filter by batch_indices so that + // all values for each group are contiguous, then invoke the + // accumulator once per group with values + let values = take_arrays(values, &batch_indices, None)?; + let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; + Some((values, opt_filter)) + }; // invoke each accumulator with the appropriate rows, first // pulling the input arguments for this group into their own @@ -249,11 +289,14 @@ impl GroupsAccumulatorAdapter { let state = &mut self.states[group_idx]; sizes_pre += state.size(); - let values_to_accumulate = slice_and_maybe_filter( - &values, - opt_filter.as_ref().map(|f| f.as_boolean()), - offsets, - )?; + let values_to_accumulate = match &values_and_filter { + Some((values, opt_filter)) => slice_and_maybe_filter( + values, + opt_filter.as_ref().map(|f| f.as_boolean()), + offsets, + )?, + None => vec![], + }; f(state.accumulator.as_mut(), &values_to_accumulate)?; // clear out the state so they are empty for next @@ -443,7 +486,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { } fn supports_convert_to_state(&self) -> bool { - true + self.supports_convert_to_state } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index d46faf9acc14a..4f6570864ac32 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -621,8 +621,14 @@ pub(crate) fn create_group_accumulator( agg_expr.name() ); let agg_expr_captured = Arc::clone(agg_expr); + let supports_convert_to_state = !agg_expr.all_expressions().args.is_empty(); let factory = move || agg_expr_captured.create_accumulator(); - Ok(Box::new(GroupsAccumulatorAdapter::new(factory))) + Ok(Box::new( + GroupsAccumulatorAdapter::new_with_convert_to_state( + factory, + supports_convert_to_state, + ), + )) } } From ccbfdca4840c9ac2920b48fe49e5552bb73df8fb Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 18:37:30 +0800 Subject: [PATCH 08/10] Skip empty filtered nullary aggregates --- .../tests/user_defined/user_defined_aggregates.rs | 15 +++++++++++++++ .../physical-plan/src/aggregates/no_grouping.rs | 7 +++++++ 2 files changed, 22 insertions(+) diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 1763647ce9b31..a123ed7d4420d 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -118,6 +118,21 @@ async fn test_zero_argument_udaf() { +----------------+ "); + let actual = execute( + &ctx, + "SELECT window_start() FILTER (WHERE value < 0.0) FROM t", + ) + .await + .unwrap(); + + insta::assert_snapshot!(batches_to_string(&actual), @r" + +----------------------------------------------------+ + | window_start() FILTER (WHERE t.value < Float64(0)) | + +----------------------------------------------------+ + | | + +----------------------------------------------------+ + "); + let actual = execute( &ctx, "SELECT value, window_start() FROM t GROUP BY value ORDER BY value", diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index ac7727b459300..1a3cabfbcb592 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -463,6 +463,13 @@ fn aggregate_batch( // 1.3 let values = evaluate_expressions_to_arrays(expr, batch.as_ref())?; + if values.is_empty() + && batch.num_rows() == 0 + && mode.input_mode() == AggregateInputMode::Raw + { + return Ok(()); + } + // 1.4 let size_pre = accum.size(); let res = match mode.input_mode() { From 822a6f07047d178b2267d26dfeef96d2e201bff8 Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Fri, 19 Jun 2026 19:22:21 +0800 Subject: [PATCH 09/10] Skip hash value test under forced collisions --- datafusion/common/src/hash_utils.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index e9c4c26e37482..1fcc005123bd8 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -1852,6 +1852,8 @@ mod tests { assert_eq!(hashes1, hashes2); } + // Tests actual values of hashes, which are different if forcing collisions + #[cfg(not(feature = "force_hash_collisions"))] #[test] fn test_create_hashes_with_quality_hash_state() { let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); From a83f8e949d23806493bc85733e4ae698b0e686bd Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Sat, 20 Jun 2026 04:16:44 +0800 Subject: [PATCH 10/10] Fix approx_distinct tests with forced hash collisions --- .../src/approx_distinct.rs | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 04ae5c1b35a5e..0c1e62295139e 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -850,6 +850,12 @@ mod tests { hll.count() as u64 } + fn array_hashes(array: &ArrayRef) -> Vec { + let mut hashes = vec![0; array.len()]; + create_hashes([array.as_ref()], &HLL_HASH_STATE, &mut hashes).unwrap(); + hashes + } + fn serialize(g: &mut GroupHll) -> Vec { let mut buf = Vec::new(); g.serialize(&mut buf); @@ -992,17 +998,20 @@ mod tests { BooleanArray::from(vec![Some(true), None, Some(false), None, Some(true)]); let mut acc = HllGroupsAccumulator::new(); - // put all rows in group 0 - let group_indices = vec![0usize; 5]; - acc.update_batch(&[values], &group_indices, Some(&filter), 1) + // Put true rows in group 0, and the null/false filter rows in group 1. + // The collision CI job forces every hash to 0, so group 1 is what proves + // filtered-out rows are not counted there. + let group_indices = vec![0usize, 1, 1, 1, 0]; + let hashes = array_hashes(&values); + acc.update_batch(&[values], &group_indices, Some(&filter), 2) .unwrap(); // Only rows 0 and 4 (values 1 and 5) should be counted. let result = acc.evaluate(EmitTo::All).unwrap(); let counts = result.as_any().downcast_ref::().unwrap(); - // reference: hash 1 and 5 into a dense sketch - let expected = reference_count(&[h(1), h(5)]); + let expected = reference_count(&[hashes[0], hashes[4]]); assert_eq!(counts.value(0), expected); + assert_eq!(counts.value(1), 0); } /// Regression: a short (≤ 12-byte) Utf8View string must hash identically @@ -1020,16 +1029,21 @@ mod tests { assert!(!batch2.as_string_view().data_buffers().is_empty()); let group_indices = vec![0usize, 0]; + let batch1_hashes = array_hashes(&batch1); + let batch2_hashes = array_hashes(&batch2); + let expected = + reference_count(&[batch1_hashes[0], batch1_hashes[1], batch2_hashes[1]]); let mut acc = HllGroupsAccumulator::new(); acc.update_batch(&[batch1], &group_indices, None, 1) .unwrap(); acc.update_batch(&[batch2], &group_indices, None, 1) .unwrap(); - // True distinct values: {"aaa", "bbb", LONG} == 3. + // Under normal hashing the true distinct values are {"aaa", "bbb", LONG}; + // under collision testing they intentionally collapse to fewer hashes. let result = acc.evaluate(EmitTo::All).unwrap(); let counts = result.as_any().downcast_ref::().unwrap(); - assert_eq!(counts.value(0), 3); + assert_eq!(counts.value(0), expected); } /// Regression: a short (≤ 12-byte) Utf8View string must hash identically @@ -1039,6 +1053,7 @@ mod tests { // Multiset: {"aaa" x2, "bbb", LONG}, so 3 distinct values. let mixed: ArrayRef = Arc::new(StringViewArray::from(vec!["aaa", "bbb", LONG, "aaa"])); + let expected = reference_count(&array_hashes(&mixed)[0..3]); let mut acc_single = HLLAccumulator::new(); acc_single.update_batch(&[mixed]).unwrap(); @@ -1057,6 +1072,6 @@ mod tests { distinct_count(&mut acc_single), distinct_count(&mut acc_split) ); - assert_eq!(distinct_count(&mut acc_single), 3); + assert_eq!(distinct_count(&mut acc_single), expected); } }