From ce9919cb4bac010b31a20dc13b122efaa067255f Mon Sep 17 00:00:00 2001 From: Eeshan Bembi Date: Mon, 9 Feb 2026 16:12:54 +0530 Subject: [PATCH 1/2] fix: SanityCheckPlan error with window functions and NVL filter (#20194) `collect_columns_from_predicate_inner` was extracting equality pairs where neither side was a Column (e.g. `nvl(col, '0') = '0'`), creating equivalence classes between complex expressions and literals. `normalize_expr`'s deep traversal would then replace the literal inside unrelated sort/window expressions with the complex expression, corrupting the sort ordering and triggering SanityCheckPlan failures. Fix by restricting `collect_columns_from_predicate_inner` to only extract pairs where at least one side is a Column reference, matching the function's documented intent. Also update `extend_constants` to recognize Literal expressions as inherently constant, so constant propagation still works for `complex_expr = literal` predicates. Closes #20194 --- datafusion/physical-plan/src/filter.rs | 95 ++++++++++++++++--- datafusion/sqllogictest/test_files/window.slt | 46 +++++++++ 2 files changed, 130 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ecea4e6ebe9f7..703c59eda074a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -57,7 +57,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit}; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{ @@ -359,18 +359,37 @@ impl FilterExec { if let Some(binary) = conjunction.as_any().downcast_ref::() && binary.op() == &Operator::Eq { - // Filter evaluates to single value for all partitions - if input_eqs.is_expr_constant(binary.left()).is_some() { - let across = input_eqs - .is_expr_constant(binary.right()) - .unwrap_or_default(); + // Check if either side is constant — either already known + // constant from the input equivalence properties, or a literal + // value (which is inherently constant across all partitions). + let left_const = + input_eqs.is_expr_constant(binary.left()).or_else(|| { + binary + .left() + .as_any() + .downcast_ref::() + .map(|l| AcrossPartitions::Uniform(Some(l.value().clone()))) + }); + let right_const = + input_eqs.is_expr_constant(binary.right()).or_else(|| { + binary + .right() + .as_any() + .downcast_ref::() + .map(|l| AcrossPartitions::Uniform(Some(l.value().clone()))) + }); + + if let Some(left_across) = left_const { + // LEFT is constant, so RIGHT must also be constant. + // Use RIGHT's known across value if available, otherwise + // propagate LEFT's (e.g. Uniform from a literal). + let across = right_const.unwrap_or(left_across); res_constants .push(ConstExpr::new(Arc::clone(binary.right()), across)); - } else if input_eqs.is_expr_constant(binary.right()).is_some() { - let across = input_eqs - .is_expr_constant(binary.left()) - .unwrap_or_default(); - res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across)); + } else if let Some(right_across) = right_const { + // RIGHT is constant, so LEFT must also be constant. + res_constants + .push(ConstExpr::new(Arc::clone(binary.left()), right_across)); } } } @@ -979,6 +998,18 @@ fn collect_columns_from_predicate_inner( let predicates = split_conjunction(predicate); predicates.into_iter().for_each(|p| { if let Some(binary) = p.as_any().downcast_ref::() { + // Only extract pairs where at least one side is a Column reference. + // Pairs like `complex_expr = literal` should not create equivalence + // classes — the literal could appear in many unrelated expressions + // (e.g. sort keys), and normalize_expr's deep traversal would + // replace those occurrences with the complex expression, corrupting + // sort orderings. Constant propagation for such pairs is handled + // separately by `extend_constants`. + let has_column = binary.left().as_any().downcast_ref::().is_some() + || binary.right().as_any().downcast_ref::().is_some(); + if !has_column { + return; + } match binary.op() { Operator::Eq => { eq_predicate_columns.push((binary.left(), binary.right())) @@ -2066,4 +2097,46 @@ mod tests { Ok(()) } + + /// Regression test for https://github.com/apache/datafusion/issues/20194 + /// + /// `collect_columns_from_predicate_inner` should only extract equality + /// pairs where at least one side is a Column. Pairs like + /// `complex_expr = literal` must not create equivalence classes because + /// `normalize_expr`'s deep traversal would replace the literal inside + /// unrelated expressions (e.g. sort keys) with the complex expression. + #[tokio::test] + async fn test_collect_columns_skips_non_column_pairs() -> Result<()> { + let schema = test::aggr_test_schema(); + + // Simulate: nvl(c2, 0) = 0 → (c2 IS DISTINCT FROM 0) = 0 + // Neither side is a Column, so this should NOT be extracted. + let complex_expr: Arc = binary( + col("c2", &schema)?, + Operator::IsDistinctFrom, + lit(0u32), + &schema, + )?; + let predicate: Arc = + binary(complex_expr, Operator::Eq, lit(0u32), &schema)?; + + let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate); + assert_eq!( + 0, + equal_pairs.len(), + "Should not extract equality pairs where neither side is a Column" + ); + + // But col = literal should still be extracted + let predicate: Arc = + binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?; + let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate); + assert_eq!( + 1, + equal_pairs.len(), + "Should extract equality pairs where one side is a Column" + ); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index c3e6f39adbd68..9fc053d38cfeb 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6081,3 +6081,49 @@ WHERE acctbal > ( ); ---- 1 + +# Regression test for https://github.com/apache/datafusion/issues/20194 +# Window function with CASE WHEN in ORDER BY combined with NVL filter +# should not trigger SanityCheckPlan error from equivalence normalization +# replacing literals in sort expressions with complex filter expressions. +statement ok +CREATE TABLE issue_20194_t1 ( + value_1_1 decimal(25) NULL, + value_1_2 int NULL, + value_1_3 bigint NULL +); + +statement ok +CREATE TABLE issue_20194_t2 ( + value_2_1 bigint NULL, + value_2_2 varchar(140) NULL, + value_2_3 varchar(140) NULL +); + +statement ok +INSERT INTO issue_20194_t1 (value_1_1, value_1_2, value_1_3) VALUES (6774502793, 10040029, 1120); + +statement ok +INSERT INTO issue_20194_t2 (value_2_1, value_2_2, value_2_3) VALUES (1120, '0', '0'); + +query RII +SELECT + t1.value_1_1, t1.value_1_2, + ROW_NUMBER() OVER ( + PARTITION BY t1.value_1_1, t1.value_1_2 + ORDER BY + CASE WHEN t2.value_2_2 = '0' THEN 1 ELSE 0 END ASC, + CASE WHEN t2.value_2_3 = '0' THEN 1 ELSE 0 END ASC + ) AS ord +FROM issue_20194_t1 t1 +INNER JOIN issue_20194_t2 t2 + ON t1.value_1_3 = t2.value_2_1 + AND nvl(t2.value_2_3, '0') = '0'; +---- +6774502793 10040029 1 + +statement ok +DROP TABLE issue_20194_t1; + +statement ok +DROP TABLE issue_20194_t2; From d86642cc7a7d2e710b2f1df0bdda20545a09132a Mon Sep 17 00:00:00 2001 From: Eeshan Bembi Date: Fri, 6 Mar 2026 18:06:30 +0530 Subject: [PATCH 2/2] address review comments: rename has_column, extract helper, sync test --- datafusion/physical-plan/src/filter.rs | 46 ++++++++++++++------------ 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 703c59eda074a..b7d2820137c8e 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -61,8 +61,8 @@ use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{ - AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze, - conjunction, split_conjunction, + AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries, + PhysicalExpr, analyze, conjunction, split_conjunction, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -347,6 +347,20 @@ impl FilterExec { }) } + /// Returns the `AcrossPartitions` value for `expr` if it is constant: + /// either already known constant in `input_eqs`, or a `Literal` + /// (which is inherently constant across all partitions). + fn expr_constant_or_literal( + expr: &Arc, + input_eqs: &EquivalenceProperties, + ) -> Option { + input_eqs.is_expr_constant(expr).or_else(|| { + expr.as_any() + .downcast_ref::() + .map(|l| AcrossPartitions::Uniform(Some(l.value().clone()))) + }) + } + fn extend_constants( input: &Arc, predicate: &Arc, @@ -362,22 +376,9 @@ impl FilterExec { // Check if either side is constant — either already known // constant from the input equivalence properties, or a literal // value (which is inherently constant across all partitions). - let left_const = - input_eqs.is_expr_constant(binary.left()).or_else(|| { - binary - .left() - .as_any() - .downcast_ref::() - .map(|l| AcrossPartitions::Uniform(Some(l.value().clone()))) - }); + let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs); let right_const = - input_eqs.is_expr_constant(binary.right()).or_else(|| { - binary - .right() - .as_any() - .downcast_ref::() - .map(|l| AcrossPartitions::Uniform(Some(l.value().clone()))) - }); + Self::expr_constant_or_literal(binary.right(), input_eqs); if let Some(left_across) = left_const { // LEFT is constant, so RIGHT must also be constant. @@ -1005,9 +1006,10 @@ fn collect_columns_from_predicate_inner( // replace those occurrences with the complex expression, corrupting // sort orderings. Constant propagation for such pairs is handled // separately by `extend_constants`. - let has_column = binary.left().as_any().downcast_ref::().is_some() - || binary.right().as_any().downcast_ref::().is_some(); - if !has_column { + let has_direct_column_operand = + binary.left().as_any().downcast_ref::().is_some() + || binary.right().as_any().downcast_ref::().is_some(); + if !has_direct_column_operand { return; } match binary.op() { @@ -2105,8 +2107,8 @@ mod tests { /// `complex_expr = literal` must not create equivalence classes because /// `normalize_expr`'s deep traversal would replace the literal inside /// unrelated expressions (e.g. sort keys) with the complex expression. - #[tokio::test] - async fn test_collect_columns_skips_non_column_pairs() -> Result<()> { + #[test] + fn test_collect_columns_skips_non_column_pairs() -> Result<()> { let schema = test::aggr_test_schema(); // Simulate: nvl(c2, 0) = 0 → (c2 IS DISTINCT FROM 0) = 0