Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 90 additions & 14 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,23 @@ fn rewrite_plan_in_place(
}
}

// Recurse into children using Arc::make_mut (zero-cost when refcount == 1)
changed |= map_children_mut(plan, |child| {
rewrite_plan_in_place(child, apply_order, rule, config)
let mut child_schema_changed = false;
let children_changed = map_children_mut(plan, |child| {
let old_schema = Arc::clone(child.schema());
let child_changed = rewrite_plan_in_place(child, apply_order, rule, config)?;
if child_changed && old_schema.as_ref() != child.schema().as_ref() {
child_schema_changed = true;
}
Ok(child_changed)
})?;
changed |= children_changed;

if child_schema_changed {
// Child rewrites can change their output schemas. Recompute the current
// node before later rules use positional requirements from that schema.
let owned = std::mem::take(plan);
*plan = owned.recompute_schema()?;
}

// f_up phase
if apply_order == ApplyOrder::BottomUp {
Expand Down Expand Up @@ -604,15 +617,14 @@ impl Optimizer {
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);

// Check once per pass whether the plan contains subquery
// expressions. When there are no subqueries, we use the
// cheaper `rewrite` traversal instead of
// `rewrite_with_subqueries`, avoiding the per-node
// map_subqueries call that walks all expression trees
// via ownership-based transform_down.
let has_subqueries = plan_has_subqueries(&new_plan);

for rule in &self.rules {
// Re-check for each rule: early rules can remove subquery
// expressions, allowing later rules to use the cheaper in-place
// traversal in the same optimizer pass. This is also
// correctness-sensitive: the in-place path refreshes parent
// schemas after child schemas change.
let has_subqueries = plan_has_subqueries(&new_plan);

// If skipping failed rules, copy plan before attempting to rewrite
// as rewriting is destructive
let prev_plan = options
Expand Down Expand Up @@ -773,13 +785,15 @@ mod tests {

use datafusion_common::tree_node::Transformed;
use datafusion_common::{
DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
Column, DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit};
use datafusion_expr::{
Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, col, lit,
};

use crate::optimizer::Optimizer;
use crate::test::test_table_scan;
use crate::test::{test_table_scan, test_table_scan_with_name};
use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};

use super::ApplyOrder;
Expand Down Expand Up @@ -863,6 +877,34 @@ mod tests {
Ok(())
}

#[test]
fn in_place_rewrite_recomputes_parent_schema_when_child_schema_changes() -> Result<()>
{
let left = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
.project(vec![col("left.a"), col("left.b"), col("left.c")])?
.build()?;
let right = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
.project(vec![col("right.a"), col("right.b"), col("right.c")])?
.build()?;
let mut plan = LogicalPlanBuilder::from(left)
.join_on(right, JoinType::Inner, [col("left.a").eq(col("right.a"))])?
.build()?;

assert_eq!(plan.schema().fields().len(), 6);

let changed = super::rewrite_plan_in_place(
&mut plan,
ApplyOrder::TopDown,
&KeepOnlyAProjectionRule {},
&OptimizerContext::new(),
)?;

assert!(changed);
assert_eq!(plan.schema().fields().len(), 2);
assert!(plan.schema().has_column_with_unqualified_name("a"));
Ok(())
}

#[test]
fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
// Run a goofy optimizer, which rotates projection columns
Expand Down Expand Up @@ -980,6 +1022,40 @@ mod tests {
}
}

#[derive(Default, Debug)]
struct KeepOnlyAProjectionRule {}

impl OptimizerRule for KeepOnlyAProjectionRule {
fn name(&self) -> &str {
"keep_only_a_projection"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let projection = match plan {
LogicalPlan::Projection(p) => p,
_ => return Ok(Transformed::no(plan)),
};

let expr = Expr::from(Column::from(projection.schema.qualified_field(0)));

Ok(Transformed::yes(LogicalPlan::Projection(
Projection::try_new(vec![expr], Arc::clone(&projection.input))?,
)))
}
}

/// A goofy rule doing rotation of columns in all projections.
///
/// Useful to test cycle detection.
Expand Down
11 changes: 5 additions & 6 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,12 @@ fn intersect() -> Result<()> {
format!("{plan}"),
@r"
LeftSemi Join: left.col_int32 = test.col_int32, left.col_utf8 = test.col_utf8
Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
LeftSemi Join: left.col_int32 = right.col_int32, left.col_utf8 = right.col_utf8
Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
SubqueryAlias: left
TableScan: test projection=[col_int32, col_utf8]
SubqueryAlias: right
LeftSemi Join: left.col_int32 = right.col_int32, left.col_utf8 = right.col_utf8
Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
SubqueryAlias: left
TableScan: test projection=[col_int32, col_utf8]
SubqueryAlias: right
TableScan: test projection=[col_int32, col_utf8]
TableScan: test projection=[col_int32, col_utf8]
"
);
Expand Down
36 changes: 36 additions & 0 deletions datafusion/sqllogictest/test_files/projection_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,42 @@ SELECT s, id FROM simple_struct WHERE s['value'] > 100 AND id < 4;
{value: 200, label: beta} 2
{value: 150, label: gamma} 3

#####################
# Section 9: Join key extraction with pruned outputs
#####################

statement ok
CREATE TABLE issue_22895_rt2 AS SELECT * FROM (VALUES
(named_struct('msg','user auth failed','sid','a'), 1, 'svc1'),
(named_struct('msg','login token','sid','b'), 2, 'svc2')
) v(attributes, id, name);

query IT
SELECT a.id, b.name
FROM issue_22895_rt2 a JOIN issue_22895_rt2 b
ON a.attributes['sid'] = b.attributes['sid']
WHERE a.attributes['msg'] LIKE '%auth%'
ORDER BY a.id, b.name;
----
1 svc1

statement ok
CREATE TABLE issue_22895_rt AS SELECT * FROM (VALUES
(named_struct('uid','u1','t','t1'), TIMESTAMP '2026-06-08T10:00:00', 'a'),
(named_struct('uid','u2','t','t2'), TIMESTAMP '2026-06-08T11:00:00', 'b')
) v(attributes, start_timestamp, span_name);

query P
SELECT r.start_timestamp
FROM issue_22895_rt r
JOIN (SELECT attributes['uid'] AS uid FROM issue_22895_rt) f
ON f.uid = r.attributes['uid']
WHERE r.attributes['t'] IN (SELECT attributes['t'] FROM issue_22895_rt)
ORDER BY r.start_timestamp;
----
2026-06-08T10:00:00
2026-06-08T11:00:00

# Config reset

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
Expand Down
8 changes: 8 additions & 0 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ physical_plan
05)--------FilterExec: id@0 = 1 OR id@0 = 2
06)----------DataSourceExec: partitions=1, partition_sizes=[1]

# Regression: schema recomputation must preserve the unqualified UNION
# output labels while unions_to_filter is enabled.
query IT rowsort
SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2
----
1 Alex
2 Bob

statement ok
set datafusion.optimizer.enable_unions_to_filter = false;

Expand Down
Loading