Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions datafusion/core/tests/physical_optimizer/partition_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,17 @@ mod test {
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Int32(None)),
min_value: Precision::Exact(ScalarValue::Int32(None)),
sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(16),
},
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Date32(None)),
min_value: Precision::Exact(ScalarValue::Date32(None)),
sum_value: Precision::Exact(ScalarValue::Date32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32)
},
Expand All @@ -415,17 +415,17 @@ mod test {
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Int32(None)),
min_value: Precision::Exact(ScalarValue::Int32(None)),
sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(8),
},
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Date32(None)),
min_value: Precision::Exact(ScalarValue::Date32(None)),
sum_value: Precision::Exact(ScalarValue::Date32(None)),
distinct_count: Precision::Exact(0),
byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32)
},
Expand Down
91 changes: 81 additions & 10 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl FilterExec {
let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);

let column_statistics = collect_new_statistics(
schema,
&input_stats.column_statistics,
analysis_ctx.boundaries,
);
Expand Down Expand Up @@ -765,6 +766,7 @@ impl EmbeddedProjection for FilterExec {
/// is adjusted by using the next/previous value for its data type to convert
/// it into a closed bound.
fn collect_new_statistics(
schema: &SchemaRef,
input_column_stats: &[ColumnStatistics],
analysis_boundaries: Vec<ExprBoundaries>,
) -> Vec<ColumnStatistics> {
Expand All @@ -781,12 +783,17 @@ fn collect_new_statistics(
},
)| {
let Some(interval) = interval else {
// If the interval is `None`, we can say that there are no rows:
// If the interval is `None`, we can say that there are no rows.
// Use a typed null to preserve the column's data type, so that
// downstream interval analysis can still intersect intervals
// of the same type.
let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
.unwrap_or(ScalarValue::Null);
return ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(typed_null.clone()),
min_value: Precision::Exact(typed_null.clone()),
sum_value: Precision::Exact(typed_null),
distinct_count: Precision::Exact(0),
byte_size: input_column_stats[idx].byte_size,
};
Expand Down Expand Up @@ -1479,17 +1486,17 @@ mod tests {
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Int32(None)),
max_value: Precision::Exact(ScalarValue::Int32(None)),
sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
byte_size: Precision::Absent,
},
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Int32(None)),
max_value: Precision::Exact(ScalarValue::Int32(None)),
sum_value: Precision::Exact(ScalarValue::Int32(None)),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
byte_size: Precision::Absent,
Expand All @@ -1500,6 +1507,70 @@ mod tests {
Ok(())
}

/// Regression test: stacking two FilterExecs where the inner filter
/// proves zero selectivity should not panic with a type mismatch
/// during interval intersection.
///
/// Previously, when a filter proved no rows could match, the column
/// statistics used untyped `ScalarValue::Null` (data type `Null`).
/// If an outer FilterExec then tried to analyze its own predicate
/// against those statistics, `Interval::intersect` would fail with:
/// "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
#[tokio::test]
async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
// Inner table: a: [1, 100], b: [1, 3]
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
],
},
schema,
));

// Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
));
let inner_filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(inner_predicate, input)?);

// Outer filter: a = 50
// Before the fix, this would panic because the inner filter's
// zero-selectivity statistics produced Null-typed intervals for
// column `a`, which couldn't intersect with the Int32 literal.
let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
));
let outer_filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);

// Should succeed without error
let statistics = outer_filter.partition_statistics(None)?;
assert_eq!(statistics.num_rows, Precision::Inexact(0));

Ok(())
}

#[tokio::test]
async fn test_filter_statistics_more_inputs() -> Result<()> {
let schema = Schema::new(vec![
Expand Down
Loading