Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 84 additions & 11 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use datafusion_common::{
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{
Expand Down Expand Up @@ -359,18 +359,37 @@ impl FilterExec {
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
&& binary.op() == &Operator::Eq
{
// Filter evaluates to single value for all partitions
if input_eqs.is_expr_constant(binary.left()).is_some() {
let across = input_eqs
.is_expr_constant(binary.right())
.unwrap_or_default();
// Check if either side is constant — either already known
// constant from the input equivalence properties, or a literal
// value (which is inherently constant across all partitions).
let left_const =
input_eqs.is_expr_constant(binary.left()).or_else(|| {
binary
.left()
.as_any()
.downcast_ref::<Literal>()
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
});
let right_const =
input_eqs.is_expr_constant(binary.right()).or_else(|| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current logic repeats nearly identical is_expr_constant(...).or_else(literal...) blocks for left/right operands.

Could we extract an expr_constant_or_literal(expr, input_eqs) helper here? It would remove duplication and centralize literal/const semantics used by filter equivalence inference.

binary
.right()
.as_any()
.downcast_ref::<Literal>()
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
});

if let Some(left_across) = left_const {
// LEFT is constant, so RIGHT must also be constant.
// Use RIGHT's known across value if available, otherwise
// propagate LEFT's (e.g. Uniform from a literal).
let across = right_const.unwrap_or(left_across);
res_constants
.push(ConstExpr::new(Arc::clone(binary.right()), across));
} else if input_eqs.is_expr_constant(binary.right()).is_some() {
let across = input_eqs
.is_expr_constant(binary.left())
.unwrap_or_default();
res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
} else if let Some(right_across) = right_const {
// RIGHT is constant, so LEFT must also be constant.
res_constants
.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
}
}
}
Expand Down Expand Up @@ -979,6 +998,18 @@ fn collect_columns_from_predicate_inner(
let predicates = split_conjunction(predicate);
predicates.into_iter().for_each(|p| {
if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
// Only extract pairs where at least one side is a Column reference.
// Pairs like `complex_expr = literal` should not create equivalence
// classes — the literal could appear in many unrelated expressions
// (e.g. sort keys), and normalize_expr's deep traversal would
// replace those occurrences with the complex expression, corrupting
// sort orderings. Constant propagation for such pairs is handled
// separately by `extend_constants`.
let has_column = binary.left().as_any().downcast_ref::<Column>().is_some()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has_column currently means "one side is directly a Column expression," not "expression contains a column."

Would has_direct_column_operand (or similar) be clearer here? It matches the intentional exclusion of complex_expr=literal pairs and avoids misreading this as recursive column detection.

|| binary.right().as_any().downcast_ref::<Column>().is_some();
if !has_column {
return;
}
match binary.op() {
Operator::Eq => {
eq_predicate_columns.push((binary.left(), binary.right()))
Expand Down Expand Up @@ -2066,4 +2097,46 @@ mod tests {

Ok(())
}

/// Regression test for https://github.com/apache/datafusion/issues/20194
///
/// `collect_columns_from_predicate_inner` should only extract equality
/// pairs where at least one side is a Column. Pairs like
/// `complex_expr = literal` must not create equivalence classes because
/// `normalize_expr`'s deep traversal would replace the literal inside
/// unrelated expressions (e.g. sort keys) with the complex expression.
#[tokio::test]
async fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_collect_columns_skips_non_column_pairs does not use async behavior.

Could this be a plain #[test]? Keeping it sync makes scope a bit clearer since no async execution is involved.

let schema = test::aggr_test_schema();

// Simulate: nvl(c2, 0) = 0 → (c2 IS DISTINCT FROM 0) = 0
// Neither side is a Column, so this should NOT be extracted.
let complex_expr: Arc<dyn PhysicalExpr> = binary(
col("c2", &schema)?,
Operator::IsDistinctFrom,
lit(0u32),
&schema,
)?;
let predicate: Arc<dyn PhysicalExpr> =
binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;

let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
assert_eq!(
0,
equal_pairs.len(),
"Should not extract equality pairs where neither side is a Column"
);

// But col = literal should still be extracted
let predicate: Arc<dyn PhysicalExpr> =
binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
assert_eq!(
1,
equal_pairs.len(),
"Should extract equality pairs where one side is a Column"
);

Ok(())
}
}
46 changes: 46 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6081,3 +6081,49 @@ WHERE acctbal > (
);
----
1

# Regression test for https://github.com/apache/datafusion/issues/20194
# Window function with CASE WHEN in ORDER BY combined with NVL filter
# should not trigger SanityCheckPlan error from equivalence normalization
# replacing literals in sort expressions with complex filter expressions.
statement ok
CREATE TABLE issue_20194_t1 (
value_1_1 decimal(25) NULL,
value_1_2 int NULL,
value_1_3 bigint NULL
);

statement ok
CREATE TABLE issue_20194_t2 (
value_2_1 bigint NULL,
value_2_2 varchar(140) NULL,
value_2_3 varchar(140) NULL
);

statement ok
INSERT INTO issue_20194_t1 (value_1_1, value_1_2, value_1_3) VALUES (6774502793, 10040029, 1120);

statement ok
INSERT INTO issue_20194_t2 (value_2_1, value_2_2, value_2_3) VALUES (1120, '0', '0');

query RII
SELECT
t1.value_1_1, t1.value_1_2,
ROW_NUMBER() OVER (
PARTITION BY t1.value_1_1, t1.value_1_2
ORDER BY
CASE WHEN t2.value_2_2 = '0' THEN 1 ELSE 0 END ASC,
CASE WHEN t2.value_2_3 = '0' THEN 1 ELSE 0 END ASC
) AS ord
FROM issue_20194_t1 t1
INNER JOIN issue_20194_t2 t2
ON t1.value_1_3 = t2.value_2_1
AND nvl(t2.value_2_3, '0') = '0';
----
6774502793 10040029 1

statement ok
DROP TABLE issue_20194_t1;

statement ok
DROP TABLE issue_20194_t2;
Loading