diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index a765d7f27a51e..2cec2a0433d54 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -518,10 +518,23 @@ fn rewrite_plan_in_place( } } - // Recurse into children using Arc::make_mut (zero-cost when refcount == 1) - changed |= map_children_mut(plan, |child| { - rewrite_plan_in_place(child, apply_order, rule, config) + let mut child_schema_changed = false; + let children_changed = map_children_mut(plan, |child| { + let old_schema = Arc::clone(child.schema()); + let child_changed = rewrite_plan_in_place(child, apply_order, rule, config)?; + if child_changed && old_schema.as_ref() != child.schema().as_ref() { + child_schema_changed = true; + } + Ok(child_changed) })?; + changed |= children_changed; + + if child_schema_changed { + // Child rewrites can change their output schemas. Recompute the current + // node before later rules use positional requirements from that schema. + let owned = std::mem::take(plan); + *plan = owned.recompute_schema()?; + } // f_up phase if apply_order == ApplyOrder::BottomUp { @@ -604,15 +617,14 @@ impl Optimizer { while i < options.optimizer.max_passes { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); - // Check once per pass whether the plan contains subquery - // expressions. When there are no subqueries, we use the - // cheaper `rewrite` traversal instead of - // `rewrite_with_subqueries`, avoiding the per-node - // map_subqueries call that walks all expression trees - // via ownership-based transform_down. - let has_subqueries = plan_has_subqueries(&new_plan); - for rule in &self.rules { + // Re-check for each rule: early rules can remove subquery + // expressions, allowing later rules to use the cheaper in-place + // traversal in the same optimizer pass. This is also + // correctness-sensitive: the in-place path refreshes parent + // schemas after child schemas change. + let has_subqueries = plan_has_subqueries(&new_plan); + // If skipping failed rules, copy plan before attempting to rewrite // as rewriting is destructive let prev_plan = options @@ -773,13 +785,15 @@ mod tests { use datafusion_common::tree_node::Transformed; use datafusion_common::{ - DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err, + Column, DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err, }; use datafusion_expr::logical_plan::EmptyRelation; - use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit}; + use datafusion_expr::{ + Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, col, lit, + }; use crate::optimizer::Optimizer; - use crate::test::test_table_scan; + use crate::test::{test_table_scan, test_table_scan_with_name}; use crate::{OptimizerConfig, OptimizerContext, OptimizerRule}; use super::ApplyOrder; @@ -863,6 +877,34 @@ mod tests { Ok(()) } + #[test] + fn in_place_rewrite_recomputes_parent_schema_when_child_schema_changes() -> Result<()> + { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("left")?) + .project(vec![col("left.a"), col("left.b"), col("left.c")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("right")?) + .project(vec![col("right.a"), col("right.b"), col("right.c")])? + .build()?; + let mut plan = LogicalPlanBuilder::from(left) + .join_on(right, JoinType::Inner, [col("left.a").eq(col("right.a"))])? + .build()?; + + assert_eq!(plan.schema().fields().len(), 6); + + let changed = super::rewrite_plan_in_place( + &mut plan, + ApplyOrder::TopDown, + &KeepOnlyAProjectionRule {}, + &OptimizerContext::new(), + )?; + + assert!(changed); + assert_eq!(plan.schema().fields().len(), 2); + assert!(plan.schema().has_column_with_unqualified_name("a")); + Ok(()) + } + #[test] fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> { // Run a goofy optimizer, which rotates projection columns @@ -980,6 +1022,40 @@ mod tests { } } + #[derive(Default, Debug)] + struct KeepOnlyAProjectionRule {} + + impl OptimizerRule for KeepOnlyAProjectionRule { + fn name(&self) -> &str { + "keep_only_a_projection" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let projection = match plan { + LogicalPlan::Projection(p) => p, + _ => return Ok(Transformed::no(plan)), + }; + + let expr = Expr::from(Column::from(projection.schema.qualified_field(0))); + + Ok(Transformed::yes(LogicalPlan::Projection( + Projection::try_new(vec![expr], Arc::clone(&projection.input))?, + ))) + } + } + /// A goofy rule doing rotation of columns in all projections. /// /// Useful to test cycle detection. diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 6fad39dc33d9f..7a3065f68a527 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -275,13 +275,12 @@ fn intersect() -> Result<()> { format!("{plan}"), @r" LeftSemi Join: left.col_int32 = test.col_int32, left.col_utf8 = test.col_utf8 - Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]] - LeftSemi Join: left.col_int32 = right.col_int32, left.col_utf8 = right.col_utf8 - Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]] - SubqueryAlias: left - TableScan: test projection=[col_int32, col_utf8] - SubqueryAlias: right + LeftSemi Join: left.col_int32 = right.col_int32, left.col_utf8 = right.col_utf8 + Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]] + SubqueryAlias: left TableScan: test projection=[col_int32, col_utf8] + SubqueryAlias: right + TableScan: test projection=[col_int32, col_utf8] TableScan: test projection=[col_int32, col_utf8] " ); diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt b/datafusion/sqllogictest/test_files/projection_pushdown.slt index 344aef1f92cf9..2171451e997c9 100644 --- a/datafusion/sqllogictest/test_files/projection_pushdown.slt +++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt @@ -2068,6 +2068,42 @@ SELECT s, id FROM simple_struct WHERE s['value'] > 100 AND id < 4; {value: 200, label: beta} 2 {value: 150, label: gamma} 3 +##################### +# Section 9: Join key extraction with pruned outputs +##################### + +statement ok +CREATE TABLE issue_22895_rt2 AS SELECT * FROM (VALUES + (named_struct('msg','user auth failed','sid','a'), 1, 'svc1'), + (named_struct('msg','login token','sid','b'), 2, 'svc2') +) v(attributes, id, name); + +query IT +SELECT a.id, b.name +FROM issue_22895_rt2 a JOIN issue_22895_rt2 b + ON a.attributes['sid'] = b.attributes['sid'] +WHERE a.attributes['msg'] LIKE '%auth%' +ORDER BY a.id, b.name; +---- +1 svc1 + +statement ok +CREATE TABLE issue_22895_rt AS SELECT * FROM (VALUES + (named_struct('uid','u1','t','t1'), TIMESTAMP '2026-06-08T10:00:00', 'a'), + (named_struct('uid','u2','t','t2'), TIMESTAMP '2026-06-08T11:00:00', 'b') +) v(attributes, start_timestamp, span_name); + +query P +SELECT r.start_timestamp +FROM issue_22895_rt r +JOIN (SELECT attributes['uid'] AS uid FROM issue_22895_rt) f + ON f.uid = r.attributes['uid'] +WHERE r.attributes['t'] IN (SELECT attributes['t'] FROM issue_22895_rt) +ORDER BY r.start_timestamp; +---- +2026-06-08T10:00:00 +2026-06-08T11:00:00 + # Config reset # The SLT runner sets `target_partitions` to 4 instead of using the default, so diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index a48ede604968b..40e4fb5c50ecd 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -341,6 +341,14 @@ physical_plan 05)--------FilterExec: id@0 = 1 OR id@0 = 2 06)----------DataSourceExec: partitions=1, partition_sizes=[1] +# Regression: schema recomputation must preserve the unqualified UNION +# output labels while unions_to_filter is enabled. +query IT rowsort +SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 +---- +1 Alex +2 Bob + statement ok set datafusion.optimizer.enable_unions_to_filter = false;