diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index a03792fae826..199638511571 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -390,17 +390,17 @@ mod test { column_statistics: vec![ ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Int32(None)), + min_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(16), }, ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Date32(None)), + min_value: Precision::Exact(ScalarValue::Date32(None)), + sum_value: Precision::Exact(ScalarValue::Date32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32) }, @@ -419,17 +419,17 @@ mod test { column_statistics: vec![ ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Int32(None)), + min_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(8), }, ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Date32(None)), + min_value: Precision::Exact(ScalarValue::Date32(None)), + sum_value: Precision::Exact(ScalarValue::Date32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32) }, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 581e833f8c29..1212fab5f7ed 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -338,6 +338,7 @@ impl FilterExec { let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); let column_statistics = collect_new_statistics( + schema, &input_stats.column_statistics, analysis_ctx.boundaries, ); @@ -769,6 +770,7 @@ impl EmbeddedProjection for FilterExec { /// is adjusted by using the next/previous value for its data type to convert /// it into a closed bound. fn collect_new_statistics( + schema: &SchemaRef, input_column_stats: &[ColumnStatistics], analysis_boundaries: Vec, ) -> Vec { @@ -785,12 +787,17 @@ fn collect_new_statistics( }, )| { let Some(interval) = interval else { - // If the interval is `None`, we can say that there are no rows: + // If the interval is `None`, we can say that there are no rows. + // Use a typed null to preserve the column's data type, so that + // downstream interval analysis can still intersect intervals + // of the same type. + let typed_null = ScalarValue::try_from(schema.field(idx).data_type()) + .unwrap_or(ScalarValue::Null); return ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(typed_null.clone()), + min_value: Precision::Exact(typed_null.clone()), + sum_value: Precision::Exact(typed_null), distinct_count: Precision::Exact(0), byte_size: input_column_stats[idx].byte_size, }; @@ -1483,17 +1490,17 @@ mod tests { statistics.column_statistics, vec![ ColumnStatistics { - min_value: Precision::Exact(ScalarValue::Null), - max_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Int32(None)), + max_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), null_count: Precision::Exact(0), byte_size: Precision::Absent, }, ColumnStatistics { - min_value: Precision::Exact(ScalarValue::Null), - max_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Int32(None)), + max_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), null_count: Precision::Exact(0), byte_size: Precision::Absent, @@ -1504,6 +1511,70 @@ mod tests { Ok(()) } + /// Regression test: stacking two FilterExecs where the inner filter + /// proves zero selectivity should not panic with a type mismatch + /// during interval intersection. + /// + /// Previously, when a filter proved no rows could match, the column + /// statistics used untyped `ScalarValue::Null` (data type `Null`). + /// If an outer FilterExec then tried to analyze its own predicate + /// against those statistics, `Interval::intersect` would fail with: + /// "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32" + #[tokio::test] + async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> { + // Inner table: a: [1, 100], b: [1, 3] + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(4000), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(3))), + ..Default::default() + }, + ], + }, + schema, + )); + + // Inner filter: a > 200 (impossible given a max=100 → zero selectivity) + let inner_predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(200)))), + )); + let inner_filter: Arc = + Arc::new(FilterExec::try_new(inner_predicate, input)?); + + // Outer filter: a = 50 + // Before the fix, this would panic because the inner filter's + // zero-selectivity statistics produced Null-typed intervals for + // column `a`, which couldn't intersect with the Int32 literal. + let outer_predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), + )); + let outer_filter: Arc = + Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?); + + // Should succeed without error + let statistics = outer_filter.partition_statistics(None)?; + assert_eq!(statistics.num_rows, Precision::Inexact(0)); + + Ok(()) + } + #[tokio::test] async fn test_filter_statistics_more_inputs() -> Result<()> { let schema = Schema::new(vec![