From 053b7699c04dd0bd41a3c7966e743393143eec2b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 19:06:41 +0800 Subject: [PATCH 01/17] feat: enhance projection optimization with volatile expression handling - Added handling for volatile expressions to impact the optimization process within the `optimize_projections` function. - Introduced checks for volatile expressions in both plan and ancestor nodes to adjust required indices accordingly. - Updated `RequiredIndices` struct to track whether it encounters volatile expressions and to handle multiplicity sensitivity. - Implemented new utility functions to streamline the processing of child requirements and eliminate unnecessary unnesting when certain conditions are met. - Added unit tests to validate the new functionality related to unnesting and aggregation on volatile expression scenarios. --- .../optimizer/src/optimize_projections/mod.rs | 232 ++++++++++++++++-- .../optimize_projections/required_indices.rs | 53 +++- 2 files changed, 264 insertions(+), 21 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 93df300bb50b4..de1d0fd547a83 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -132,6 +132,8 @@ fn optimize_projections( config: &dyn OptimizerConfig, indices: RequiredIndices, ) -> Result> { + let volatile_in_plan = plan.expressions().iter().any(Expr::is_volatile); + // Recursively rewrite any nodes that may be able to avoid computation given // their parents' required indices. match plan { @@ -141,6 +143,7 @@ fn optimize_projections( }); } LogicalPlan::Aggregate(aggregate) => { + let has_volatile_ancestor = indices.has_volatile_ancestor(); // Split parent requirements to GROUP BY and aggregate sections: let n_group_exprs = aggregate.group_expr_len()?; // Offset aggregate indices so that they point to valid indices at @@ -188,6 +191,14 @@ fn optimize_projections( let necessary_indices = RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); + let mut necessary_indices = if new_aggr_expr.is_empty() { + necessary_indices.with_multiplicity_insensitive() + } else { + necessary_indices.with_multiplicity_sensitive() + }; + if has_volatile_ancestor || volatile_in_plan { + necessary_indices = necessary_indices.with_volatile_ancestor(); + } return optimize_projections( Arc::unwrap_or_clone(aggregate.input), @@ -213,6 +224,7 @@ fn optimize_projections( }); } LogicalPlan::Window(window) => { + let has_volatile_ancestor = indices.has_volatile_ancestor(); let input_schema = Arc::clone(window.input.schema()); // Split parent requirements to child and window expression sections: let n_input_fields = input_schema.fields().len(); @@ -227,6 +239,14 @@ fn optimize_projections( // Get all the required column indices at the input, either by the // parent or window expression requirements. let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); + let mut required_indices = if new_window_expr.is_empty() { + required_indices.with_multiplicity_insensitive() + } else { + required_indices.with_multiplicity_sensitive() + }; + if has_volatile_ancestor || volatile_in_plan { + required_indices = required_indices.with_volatile_ancestor(); + } return optimize_projections( Arc::unwrap_or_clone(window.input), @@ -293,10 +313,14 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - indices + let mut required = indices .clone() .with_projection_beneficial() - .with_plan_exprs(&plan, input.schema()) + .with_plan_exprs(&plan, input.schema())?; + if volatile_in_plan { + required = required.with_volatile_ancestor(); + } + Ok(required) }) .collect::>()? } @@ -307,7 +331,14 @@ fn optimize_projections( // flag is `false`. plan.inputs() .into_iter() - .map(|input| indices.clone().with_plan_exprs(&plan, input.schema())) + .map(|input| { + let mut required = + indices.clone().with_plan_exprs(&plan, input.schema())?; + if volatile_in_plan { + required = required.with_volatile_ancestor(); + } + Ok(required) + }) .collect::>()? } LogicalPlan::Copy(_) @@ -316,8 +347,7 @@ fn optimize_projections( | LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Subquery(_) - | LogicalPlan::Statement(_) - | LogicalPlan::Distinct(Distinct::All(_)) => { + | LogicalPlan::Statement(_) => { // These plans require all their fields, and their children should // be treated as final plans -- otherwise, we may have schema a // mismatch. @@ -325,8 +355,14 @@ fn optimize_projections( // EXISTS expression), we may not need to require all indices. plan.inputs() .into_iter() - .map(RequiredIndices::new_for_all_exprs) - .collect() + .map(|input| { + let mut required = RequiredIndices::new_for_all_exprs(input); + if volatile_in_plan { + required = required.with_volatile_ancestor(); + } + Ok(required) + }) + .collect::>()? } LogicalPlan::Extension(extension) => { let Some(necessary_children_indices) = @@ -348,8 +384,13 @@ fn optimize_projections( .into_iter() .zip(necessary_children_indices) .map(|(child, necessary_indices)| { - RequiredIndices::new_from_indices(necessary_indices) - .with_plan_exprs(&plan, child.schema()) + let mut required = + RequiredIndices::new_from_indices(necessary_indices) + .with_plan_exprs(&plan, child.schema())?; + if volatile_in_plan { + required = required.with_volatile_ancestor(); + } + Ok(required) }) .collect::>>()? } @@ -376,10 +417,14 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - indices + let mut required = indices .clone() .with_projection_beneficial() - .with_plan_exprs(&plan, input.schema()) + .with_plan_exprs(&plan, input.schema())?; + if volatile_in_plan { + required = required.with_volatile_ancestor(); + } + Ok(required) }) .collect::>>()? } @@ -391,6 +436,12 @@ fn optimize_projections( left_req_indices.with_plan_exprs(&plan, join.left.schema())?; let right_indices = right_req_indices.with_plan_exprs(&plan, join.right.schema())?; + let mut left_indices = left_indices.with_multiplicity_sensitive(); + let mut right_indices = right_indices.with_multiplicity_sensitive(); + if volatile_in_plan { + left_indices = left_indices.with_volatile_ancestor(); + right_indices = right_indices.with_volatile_ancestor(); + } // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: vec![ @@ -398,6 +449,18 @@ fn optimize_projections( right_indices.with_projection_beneficial(), ] } + LogicalPlan::Distinct(Distinct::All(_)) => plan + .inputs() + .into_iter() + .map(|input| { + let mut required = RequiredIndices::new_for_all_exprs(input) + .with_multiplicity_insensitive(); + if volatile_in_plan { + required = required.with_volatile_ancestor(); + } + Ok(required) + }) + .collect::>()?, // these nodes are explicitly rewritten in the match statement above LogicalPlan::Projection(_) | LogicalPlan::Aggregate(_) @@ -407,19 +470,29 @@ fn optimize_projections( "OptimizeProjection: should have handled in the match statement above" ); } - LogicalPlan::Unnest(Unnest { - input, - dependency_indices, - .. - }) => { + LogicalPlan::Unnest(unnest) => { + if can_eliminate_unnest(unnest, &indices) { + let child_required_indices = + build_unnest_child_requirements(unnest, &indices); + let transformed_input = optimize_projections( + Arc::unwrap_or_clone(Arc::clone(&unnest.input)), + config, + child_required_indices, + )?; + return Ok(Transformed::yes(transformed_input.data)); + } // at least provide the indices for the exec-columns as a starting point - let required_indices = - RequiredIndices::new().with_plan_exprs(&plan, input.schema())?; + let mut required_indices = + RequiredIndices::new().with_plan_exprs(&plan, unnest.input.schema())?; + required_indices = required_indices.with_multiplicity_sensitive(); + if volatile_in_plan || indices.has_volatile_ancestor() { + required_indices = required_indices.with_volatile_ancestor(); + } // Add additional required indices from the parent let mut additional_necessary_child_indices = Vec::new(); indices.indices().iter().for_each(|idx| { - if let Some(index) = dependency_indices.get(*idx) { + if let Some(index) = unnest.dependency_indices.get(*idx) { additional_necessary_child_indices.push(*index); } }); @@ -909,6 +982,56 @@ fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool { .any(|child| plan_contains_other_subqueries(child, cte_name)) } +fn can_eliminate_unnest(unnest: &Unnest, indices: &RequiredIndices) -> bool { + if indices.multiplicity_sensitive() || indices.has_volatile_ancestor() { + return false; + } + + if !unnest.options.preserve_nulls || !unnest.struct_type_columns.is_empty() { + return false; + } + + indices + .indices() + .iter() + .all(|&output_idx| unnest_output_is_passthrough(unnest, output_idx)) +} + +fn unnest_output_is_passthrough(unnest: &Unnest, output_idx: usize) -> bool { + let Some(&dependency_idx) = unnest.dependency_indices.get(output_idx) else { + return false; + }; + + if dependency_idx >= unnest.input.schema().fields().len() { + return false; + } + + unnest.schema.qualified_field(output_idx) + == unnest.input.schema().qualified_field(dependency_idx) +} + +fn build_unnest_child_requirements( + unnest: &Unnest, + indices: &RequiredIndices, +) -> RequiredIndices { + let child_indices = indices + .indices() + .iter() + .filter_map(|&output_idx| unnest.dependency_indices.get(output_idx).copied()) + .collect::>(); + let mut child_required_indices = RequiredIndices::new_from_indices(child_indices); + if indices.projection_beneficial() { + child_required_indices = child_required_indices.with_projection_beneficial(); + } + if indices.has_volatile_ancestor() { + child_required_indices = child_required_indices.with_volatile_ancestor(); + } + if !indices.multiplicity_sensitive() { + child_required_indices = child_required_indices.with_multiplicity_insensitive(); + } + child_required_indices +} + fn expr_contains_subquery(expr: &Expr) -> bool { expr.exists(|e| match e { Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => Ok(true), @@ -953,7 +1076,7 @@ mod tests { use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{ - Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, + Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, UnnestOptions, }; use datafusion_expr::ExprFunctionExt; use datafusion_expr::{ @@ -2274,6 +2397,75 @@ mod tests { ) } + #[test] + fn eliminate_unnest_when_only_group_keys_are_required() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "vals", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column("vals")? + .aggregate(vec![col("id")], Vec::::new())? + .project(vec![col("id")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(!formatted.contains("Unnest:")); + Ok(()) + } + + #[test] + fn keep_unnest_when_count_depends_on_row_multiplicity() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "vals", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column("vals")? + .aggregate(vec![col("id")], vec![count(lit(1)).alias("cnt")])? + .project(vec![col("id"), col("cnt")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(formatted.contains("Unnest:")); + Ok(()) + } + + #[test] + fn keep_unnest_when_preserve_nulls_is_disabled() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "vals", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column_with_options( + "vals", + UnnestOptions::new().with_preserve_nulls(false), + )? + .aggregate(vec![col("id")], Vec::::new())? + .project(vec![col("id")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(formatted.contains("Unnest:")); + Ok(()) + } + #[test] fn test_window() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index c1e0885c9b5f2..12263657d0a4b 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -34,13 +34,28 @@ use datafusion_expr::{Expr, LogicalPlan}; /// Indices are always in order and without duplicates. For example, if these /// indices were added `[3, 2, 4, 3, 6, 1]`, the instance would be represented /// by `[1, 2, 3, 4, 6]`. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub(super) struct RequiredIndices { /// The indices of the required columns in the indices: Vec, /// If putting a projection above children is beneficial for the parent. /// Defaults to false. projection_beneficial: bool, + /// Whether ancestors can observe row multiplicity changes. + multiplicity_sensitive: bool, + /// Whether any ancestor expression is volatile. + has_volatile_ancestor: bool, +} + +impl Default for RequiredIndices { + fn default() -> Self { + Self { + indices: Vec::new(), + projection_beneficial: false, + multiplicity_sensitive: true, + has_volatile_ancestor: false, + } + } } impl RequiredIndices { @@ -54,6 +69,8 @@ impl RequiredIndices { Self { indices: (0..plan.schema().fields().len()).collect(), projection_beneficial: false, + multiplicity_sensitive: true, + has_volatile_ancestor: false, } } @@ -62,6 +79,8 @@ impl RequiredIndices { Self { indices, projection_beneficial: false, + multiplicity_sensitive: true, + has_volatile_ancestor: false, } .compact() } @@ -77,6 +96,34 @@ impl RequiredIndices { self } + /// Mark this requirement as multiplicity-insensitive. + pub fn with_multiplicity_insensitive(mut self) -> Self { + self.multiplicity_sensitive = false; + self + } + + /// Mark this requirement as multiplicity-sensitive. + pub fn with_multiplicity_sensitive(mut self) -> Self { + self.multiplicity_sensitive = true; + self + } + + /// Return whether ancestors can observe multiplicity changes. + pub fn multiplicity_sensitive(&self) -> bool { + self.multiplicity_sensitive + } + + /// Mark this requirement as having volatile ancestors. + pub fn with_volatile_ancestor(mut self) -> Self { + self.has_volatile_ancestor = true; + self + } + + /// Return whether a volatile expression exists in the ancestor chain. + pub fn has_volatile_ancestor(&self) -> bool { + self.has_volatile_ancestor + } + /// Return the value of projection beneficial flag pub fn projection_beneficial(&self) -> bool { self.projection_beneficial @@ -173,10 +220,14 @@ impl RequiredIndices { Self { indices: l, projection_beneficial, + multiplicity_sensitive: self.multiplicity_sensitive, + has_volatile_ancestor: self.has_volatile_ancestor, }, Self { indices: r, projection_beneficial, + multiplicity_sensitive: self.multiplicity_sensitive, + has_volatile_ancestor: self.has_volatile_ancestor, }, ) } From 6864757a711e368254dcad49f75528a4327af6e5 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 19:14:21 +0800 Subject: [PATCH 02/17] feat: update unnest optimization logic and enhance tests - Allow elimination of unnest operation for empty lists while preserving nulls. - Modify the `eliminate_unnest_when_only_group_keys_are_required` test case to specify struct unnest conditions. - Introduce a new test case `keep_list_unnest_when_group_keys_are_only_required_outputs` to verify unnest behavior when only group keys are required. - Ensure that the optimization logic correctly handles different unnest scenarios based on list and struct types. --- .../optimizer/src/optimize_projections/mod.rs | 40 +++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index de1d0fd547a83..86f66fc24b8f7 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -987,7 +987,13 @@ fn can_eliminate_unnest(unnest: &Unnest, indices: &RequiredIndices) -> bool { return false; } - if !unnest.options.preserve_nulls || !unnest.struct_type_columns.is_empty() { + // List unnest can drop rows for empty lists even when preserve_nulls=true. + // Without proving non-empty cardinality, keep UNNEST conservatively. + if !unnest.list_type_columns.is_empty() { + return false; + } + + if !unnest.options.preserve_nulls { return false; } @@ -2398,7 +2404,35 @@ mod tests { } #[test] - fn eliminate_unnest_when_only_group_keys_are_required() -> Result<()> { + fn eliminate_struct_unnest_when_only_group_keys_are_required() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new( + "user", + DataType::Struct( + vec![ + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Int32, true), + ] + .into(), + ), + true, + ), + ]); + let plan = scan_empty(Some("test"), &schema, None)? + .unnest_column("user")? + .aggregate(vec![col("id")], Vec::::new())? + .project(vec![col("id")])? + .build()?; + + let optimized = optimize(plan)?; + let formatted = format!("{}", optimized.display_indent()); + assert!(!formatted.contains("Unnest:")); + Ok(()) + } + + #[test] + fn keep_list_unnest_when_group_keys_are_only_required_outputs() -> Result<()> { let schema = Schema::new(vec![ Field::new("id", DataType::UInt32, false), Field::new( @@ -2415,7 +2449,7 @@ mod tests { let optimized = optimize(plan)?; let formatted = format!("{}", optimized.display_indent()); - assert!(!formatted.contains("Unnest:")); + assert!(formatted.contains("Unnest:")); Ok(()) } From 5cfbd35a46660fb3e8b8f7eb174ede957f93a5c5 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 19:21:10 +0800 Subject: [PATCH 03/17] feat(tests): add optimizer unnest prune safety tests - Introduced new SQL Logic Tests to validate unnest pruning behavior in DataFusion. - Tests include scenarios with empty lists and null values to ensure correct handling of cardinality-sensitive cases. - Added explanations for expected logical plans for both aggregation and selection queries. --- .../test_files/optimizer_unnest_prune.slt | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt diff --git a/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt new file mode 100644 index 0000000000000..1c25211789eb7 --- /dev/null +++ b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +############################### +# Unnest Pruning Safety Tests # +############################### + +statement ok +CREATE TABLE unnest_prune_t +AS VALUES + (1, [1, 2]), + (2, []), + (3, [3]), + (4, null) +; + +statement ok +set datafusion.explain.logical_plan_only = true; + +# Empty-list/null semantics are cardinality-sensitive even if unnested column is dead. +# Unnest must remain. +query TT +EXPLAIN SELECT id +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id; +---- +logical_plan +01)Aggregate: groupBy=[[q.id]], aggr=[[]] +02)--SubqueryAlias: q +03)----Projection: id +04)------Unnest: lists[__unnest_placeholder(unnest_prune_t.column2)|depth=1] structs[] +05)--------Projection: unnest_prune_t.column1 AS id, unnest_prune_t.column2 AS __unnest_placeholder(unnest_prune_t.column2) +06)----------TableScan: unnest_prune_t projection=[column1, column2] + +# Count(*) is explicitly multiplicity-sensitive. Unnest must remain. +query TT +EXPLAIN SELECT id, count(*) AS cnt +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id; +---- +logical_plan +01)Projection: q.id, count(Int64(1)) AS count(*) AS cnt +02)--Aggregate: groupBy=[[q.id]], aggr=[[count(Int64(1))]] +03)----SubqueryAlias: q +04)------Projection: id +05)--------Unnest: lists[__unnest_placeholder(unnest_prune_t.column2)|depth=1] structs[] +06)----------Projection: unnest_prune_t.column1 AS id, unnest_prune_t.column2 AS __unnest_placeholder(unnest_prune_t.column2) +07)------------TableScan: unnest_prune_t projection=[column1, column2] + +statement ok +set datafusion.explain.logical_plan_only = false; + +# Correctness check for empty-list/null behavior +query I +SELECT id +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id +ORDER BY id; +---- +1 +3 + +# Correctness check for multiplicity-sensitive count path +query II +SELECT id, count(*) AS cnt +FROM ( + SELECT column1 AS id, unnest(column2) AS elem + FROM unnest_prune_t +) q +GROUP BY id +ORDER BY id; +---- +1 2 +3 1 From 16b7090a28a7fbdbec0fe268dfc65e8e44192ab1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 19:23:36 +0800 Subject: [PATCH 04/17] refactor(optimizer): simplify volatile ancestor handling in optimize_projections - Removed repetitive code for handling volatile ancestors across different input plans. - Introduced a new helper function `with_volatile_if_needed` to encapsulate the logic of conditionally adding a volatile ancestor. - Improved code readability and maintainability by reducing duplication in `optimize_projections` method. --- .../optimizer/src/optimize_projections/mod.rs | 70 +++++++++---------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 86f66fc24b8f7..5858d2de158e6 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -313,14 +313,11 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - let mut required = indices + let required = indices .clone() .with_projection_beneficial() .with_plan_exprs(&plan, input.schema())?; - if volatile_in_plan { - required = required.with_volatile_ancestor(); - } - Ok(required) + Ok(with_volatile_if_needed(required, volatile_in_plan)) }) .collect::>()? } @@ -332,12 +329,9 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - let mut required = + let required = indices.clone().with_plan_exprs(&plan, input.schema())?; - if volatile_in_plan { - required = required.with_volatile_ancestor(); - } - Ok(required) + Ok(with_volatile_if_needed(required, volatile_in_plan)) }) .collect::>()? } @@ -356,11 +350,8 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - let mut required = RequiredIndices::new_for_all_exprs(input); - if volatile_in_plan { - required = required.with_volatile_ancestor(); - } - Ok(required) + let required = RequiredIndices::new_for_all_exprs(input); + Ok(with_volatile_if_needed(required, volatile_in_plan)) }) .collect::>()? } @@ -384,13 +375,9 @@ fn optimize_projections( .into_iter() .zip(necessary_children_indices) .map(|(child, necessary_indices)| { - let mut required = - RequiredIndices::new_from_indices(necessary_indices) - .with_plan_exprs(&plan, child.schema())?; - if volatile_in_plan { - required = required.with_volatile_ancestor(); - } - Ok(required) + let required = RequiredIndices::new_from_indices(necessary_indices) + .with_plan_exprs(&plan, child.schema())?; + Ok(with_volatile_if_needed(required, volatile_in_plan)) }) .collect::>>()? } @@ -417,14 +404,11 @@ fn optimize_projections( plan.inputs() .into_iter() .map(|input| { - let mut required = indices + let required = indices .clone() .with_projection_beneficial() .with_plan_exprs(&plan, input.schema())?; - if volatile_in_plan { - required = required.with_volatile_ancestor(); - } - Ok(required) + Ok(with_volatile_if_needed(required, volatile_in_plan)) }) .collect::>>()? } @@ -436,12 +420,14 @@ fn optimize_projections( left_req_indices.with_plan_exprs(&plan, join.left.schema())?; let right_indices = right_req_indices.with_plan_exprs(&plan, join.right.schema())?; - let mut left_indices = left_indices.with_multiplicity_sensitive(); - let mut right_indices = right_indices.with_multiplicity_sensitive(); - if volatile_in_plan { - left_indices = left_indices.with_volatile_ancestor(); - right_indices = right_indices.with_volatile_ancestor(); - } + let left_indices = with_volatile_if_needed( + left_indices.with_multiplicity_sensitive(), + volatile_in_plan, + ); + let right_indices = with_volatile_if_needed( + right_indices.with_multiplicity_sensitive(), + volatile_in_plan, + ); // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: vec![ @@ -453,12 +439,9 @@ fn optimize_projections( .inputs() .into_iter() .map(|input| { - let mut required = RequiredIndices::new_for_all_exprs(input) + let required = RequiredIndices::new_for_all_exprs(input) .with_multiplicity_insensitive(); - if volatile_in_plan { - required = required.with_volatile_ancestor(); - } - Ok(required) + Ok(with_volatile_if_needed(required, volatile_in_plan)) }) .collect::>()?, // these nodes are explicitly rewritten in the match statement above @@ -539,6 +522,17 @@ fn optimize_projections( } } +fn with_volatile_if_needed( + required: RequiredIndices, + volatile_in_plan: bool, +) -> RequiredIndices { + if volatile_in_plan { + required.with_volatile_ancestor() + } else { + required + } +} + /// Merges consecutive projections. /// /// Given a projection `proj`, this function attempts to merge it with a previous From ad56c14990bcf5751672608093a8dc18e7fd460d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 19:25:17 +0800 Subject: [PATCH 05/17] feat: enhance required indices handling with new methods for multiplicity and volatility - Introduced methods `for_multiplicity_sensitive_child` and `for_multiplicity_insensitive_child` for better handling of child requirements in `RequiredIndices`. - Replaced usage of `with_volatile_if_needed` with `with_plan_volatile` and `with_volatile_ancestor_if` for clearer logic when managing volatile context. - Updated `optimize_projections` function to use new methods, improving code readability and maintainability. --- .../optimizer/src/optimize_projections/mod.rs | 70 ++++++++----------- .../optimize_projections/required_indices.rs | 28 ++++++++ 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5858d2de158e6..02b53d324f7db 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -192,13 +192,13 @@ fn optimize_projections( RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); let mut necessary_indices = if new_aggr_expr.is_empty() { - necessary_indices.with_multiplicity_insensitive() + necessary_indices.for_multiplicity_insensitive_child() } else { - necessary_indices.with_multiplicity_sensitive() + necessary_indices.for_multiplicity_sensitive_child() }; - if has_volatile_ancestor || volatile_in_plan { - necessary_indices = necessary_indices.with_volatile_ancestor(); - } + necessary_indices = necessary_indices + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); return optimize_projections( Arc::unwrap_or_clone(aggregate.input), @@ -240,13 +240,13 @@ fn optimize_projections( // parent or window expression requirements. let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); let mut required_indices = if new_window_expr.is_empty() { - required_indices.with_multiplicity_insensitive() + required_indices.for_multiplicity_insensitive_child() } else { - required_indices.with_multiplicity_sensitive() + required_indices.for_multiplicity_sensitive_child() }; - if has_volatile_ancestor || volatile_in_plan { - required_indices = required_indices.with_volatile_ancestor(); - } + required_indices = required_indices + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); return optimize_projections( Arc::unwrap_or_clone(window.input), @@ -317,7 +317,7 @@ fn optimize_projections( .clone() .with_projection_beneficial() .with_plan_exprs(&plan, input.schema())?; - Ok(with_volatile_if_needed(required, volatile_in_plan)) + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>()? } @@ -331,7 +331,7 @@ fn optimize_projections( .map(|input| { let required = indices.clone().with_plan_exprs(&plan, input.schema())?; - Ok(with_volatile_if_needed(required, volatile_in_plan)) + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>()? } @@ -351,7 +351,7 @@ fn optimize_projections( .into_iter() .map(|input| { let required = RequiredIndices::new_for_all_exprs(input); - Ok(with_volatile_if_needed(required, volatile_in_plan)) + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>()? } @@ -377,7 +377,7 @@ fn optimize_projections( .map(|(child, necessary_indices)| { let required = RequiredIndices::new_from_indices(necessary_indices) .with_plan_exprs(&plan, child.schema())?; - Ok(with_volatile_if_needed(required, volatile_in_plan)) + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>>()? } @@ -408,7 +408,7 @@ fn optimize_projections( .clone() .with_projection_beneficial() .with_plan_exprs(&plan, input.schema())?; - Ok(with_volatile_if_needed(required, volatile_in_plan)) + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>>()? } @@ -420,14 +420,12 @@ fn optimize_projections( left_req_indices.with_plan_exprs(&plan, join.left.schema())?; let right_indices = right_req_indices.with_plan_exprs(&plan, join.right.schema())?; - let left_indices = with_volatile_if_needed( - left_indices.with_multiplicity_sensitive(), - volatile_in_plan, - ); - let right_indices = with_volatile_if_needed( - right_indices.with_multiplicity_sensitive(), - volatile_in_plan, - ); + let left_indices = left_indices + .for_multiplicity_sensitive_child() + .with_plan_volatile(volatile_in_plan); + let right_indices = right_indices + .for_multiplicity_sensitive_child() + .with_plan_volatile(volatile_in_plan); // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: vec![ @@ -440,8 +438,8 @@ fn optimize_projections( .into_iter() .map(|input| { let required = RequiredIndices::new_for_all_exprs(input) - .with_multiplicity_insensitive(); - Ok(with_volatile_if_needed(required, volatile_in_plan)) + .for_multiplicity_insensitive_child(); + Ok(required.with_plan_volatile(volatile_in_plan)) }) .collect::>()?, // these nodes are explicitly rewritten in the match statement above @@ -467,10 +465,10 @@ fn optimize_projections( // at least provide the indices for the exec-columns as a starting point let mut required_indices = RequiredIndices::new().with_plan_exprs(&plan, unnest.input.schema())?; - required_indices = required_indices.with_multiplicity_sensitive(); - if volatile_in_plan || indices.has_volatile_ancestor() { - required_indices = required_indices.with_volatile_ancestor(); - } + required_indices = required_indices + .for_multiplicity_sensitive_child() + .with_volatile_ancestor_if(indices.has_volatile_ancestor()) + .with_plan_volatile(volatile_in_plan); // Add additional required indices from the parent let mut additional_necessary_child_indices = Vec::new(); @@ -522,17 +520,6 @@ fn optimize_projections( } } -fn with_volatile_if_needed( - required: RequiredIndices, - volatile_in_plan: bool, -) -> RequiredIndices { - if volatile_in_plan { - required.with_volatile_ancestor() - } else { - required - } -} - /// Merges consecutive projections. /// /// Given a projection `proj`, this function attempts to merge it with a previous @@ -1027,7 +1014,8 @@ fn build_unnest_child_requirements( child_required_indices = child_required_indices.with_volatile_ancestor(); } if !indices.multiplicity_sensitive() { - child_required_indices = child_required_indices.with_multiplicity_insensitive(); + child_required_indices = + child_required_indices.for_multiplicity_insensitive_child(); } child_required_indices } diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index 12263657d0a4b..7053266d0e658 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -119,6 +119,34 @@ impl RequiredIndices { self } + /// Conditionally mark this requirement as having volatile ancestors. + pub fn with_volatile_ancestor_if(mut self, value: bool) -> Self { + if value { + self.has_volatile_ancestor = true; + } + self + } + + /// Propagate volatile-plan context into this requirement. + /// + /// This keeps call sites declarative and centralizes state-transition logic. + pub fn with_plan_volatile(mut self, volatile_in_plan: bool) -> Self { + if volatile_in_plan { + self.has_volatile_ancestor = true; + } + self + } + + /// Transition this requirement for a multiplicity-sensitive child. + pub fn for_multiplicity_sensitive_child(self) -> Self { + self.with_multiplicity_sensitive() + } + + /// Transition this requirement for a multiplicity-insensitive child. + pub fn for_multiplicity_insensitive_child(self) -> Self { + self.with_multiplicity_insensitive() + } + /// Return whether a volatile expression exists in the ancestor chain. pub fn has_volatile_ancestor(&self) -> bool { self.has_volatile_ancestor From ab7738e14e5d9897a502f6a61f0f39a4898dd075 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 19:32:18 +0800 Subject: [PATCH 06/17] feat(optimizer): enhance projection requirements handling and add tests for unnest pruning - Updated the `rewrite_projection_given_requirements` function to enhance handling of projection requirements based on additional conditions such as projected benefit, multiplicity sensitivity, and volatile ancestors. - Added a new SQL logic test to validate the pruning of struct unnest in cases where it is cardinality-preserving and outputs are irrelevant. - Improved comments for clarity on unnest semantics regarding null preservation. --- .../optimizer/src/optimize_projections/mod.rs | 16 +++++++++---- .../test_files/optimizer_unnest_prune.slt | 23 +++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 02b53d324f7db..94603bb9b766c 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -891,8 +891,17 @@ fn rewrite_projection_given_requirements( let exprs_used = indices.get_at_indices(&expr); - let required_indices = + let mut required_indices = RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter()); + if indices.projection_beneficial() { + required_indices = required_indices.with_projection_beneficial(); + } + if !indices.multiplicity_sensitive() { + required_indices = required_indices.for_multiplicity_insensitive_child(); + } + if indices.has_volatile_ancestor() { + required_indices = required_indices.with_volatile_ancestor(); + } // rewrite the children projection, and if they are changed rewrite the // projection down @@ -974,9 +983,8 @@ fn can_eliminate_unnest(unnest: &Unnest, indices: &RequiredIndices) -> bool { return false; } - if !unnest.options.preserve_nulls { - return false; - } + // preserve_nulls only affects list unnest semantics. For struct-only unnest, + // row cardinality is unchanged and this option is not semantically relevant. indices .indices() diff --git a/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt index 1c25211789eb7..3a44090b2c090 100644 --- a/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt +++ b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt @@ -31,6 +31,29 @@ AS VALUES statement ok set datafusion.explain.logical_plan_only = true; +# Safe case: struct unnest is cardinality-preserving and unnested outputs are dead. +# Unnest should be removed. +statement ok +CREATE TABLE unnest_prune_struct_t +AS VALUES + (1, struct('a', 10)), + (2, struct('b', 20)) +; + +query TT +EXPLAIN SELECT id +FROM ( + SELECT column1 AS id, unnest(column2) + FROM unnest_prune_struct_t +) q +GROUP BY id; +---- +logical_plan +01)Aggregate: groupBy=[[q.id]], aggr=[[]] +02)--SubqueryAlias: q +03)----Projection: unnest_prune_struct_t.column1 AS id +04)------TableScan: unnest_prune_struct_t projection=[column1] + # Empty-list/null semantics are cardinality-sensitive even if unnested column is dead. # Unnest must remain. query TT From e240cd1e26dfca54711d564d916bd5400dd60731 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 19:53:34 +0800 Subject: [PATCH 07/17] feat(optimizer): remove unnecessary projection benefit check in rewrite_projection_given_requirements function This change simplifies the logic in the rewrite_projection_given_requirements function by removing the check for projection benefit, which was deemed unnecessary. This helps streamline the code and improve readability. --- datafusion/optimizer/src/optimize_projections/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 94603bb9b766c..8b123dd3ef79a 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -893,9 +893,6 @@ fn rewrite_projection_given_requirements( let mut required_indices = RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter()); - if indices.projection_beneficial() { - required_indices = required_indices.with_projection_beneficial(); - } if !indices.multiplicity_sensitive() { required_indices = required_indices.for_multiplicity_insensitive_child(); } From cca917461dbae604b3493e68c4ee43dabd787aab Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 14:16:29 +0800 Subject: [PATCH 08/17] add comment on necessary_indices' mulplicity transition --- datafusion/optimizer/src/optimize_projections/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 8b123dd3ef79a..5b22da47addbb 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -192,8 +192,15 @@ fn optimize_projections( RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); let mut necessary_indices = if new_aggr_expr.is_empty() { + // no aggregate functions – the aggregation is just a GROUP BY + // (possibly global). In that case the output row count is always + // ≤1 per input group, and nothing upstream can tell how many input + // rows we had, so the child is *multiplicity‑insensitive*. necessary_indices.for_multiplicity_insensitive_child() } else { + // there is at least one aggregate function (COUNT, SUM, …). + // those functions generally depend on how many input rows hit each + // group, so the child must be treated as *multiplicity‑sensitive*. necessary_indices.for_multiplicity_sensitive_child() }; necessary_indices = necessary_indices From 1fb4c932299aa437082cdb7b2ebd5d74303ee3d0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 14:21:08 +0800 Subject: [PATCH 09/17] add comment on required_indices' mulplicity transition --- .../optimizer/src/optimize_projections/mod.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5b22da47addbb..e4fe3f0b8fab2 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -192,8 +192,8 @@ fn optimize_projections( RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); let mut necessary_indices = if new_aggr_expr.is_empty() { - // no aggregate functions – the aggregation is just a GROUP BY - // (possibly global). In that case the output row count is always + // no aggregate functions – the aggregation is just a GROUP BY. + // In that case the output row count is always // ≤1 per input group, and nothing upstream can tell how many input // rows we had, so the child is *multiplicity‑insensitive*. necessary_indices.for_multiplicity_insensitive_child() @@ -247,8 +247,17 @@ fn optimize_projections( // parent or window expression requirements. let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); let mut required_indices = if new_window_expr.is_empty() { + // There are no window functions that the parent cares about. + // A window operator without any window expressions doesn’t change the + // number of rows coming from its child – the output is just the input. + // Hence the child is multiplicity‑insensitive: upstream nodes can’t + // observe how many rows the child produced. required_indices.for_multiplicity_insensitive_child() } else { + // At least one window expression remains; e.g. `row_number()` or + // `lag()` etc. These depend on the ordering of rows coming from the + // child, so the number of input rows matters. Treat the child as + // multiplicity‑sensitive. required_indices.for_multiplicity_sensitive_child() }; required_indices = required_indices From 2390b6ab22e1f77e452dc0d239f2726e1eb46b99 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 14:30:46 +0800 Subject: [PATCH 10/17] feat(optimizer): refactor projection optimization logic and introduce dedicated functions for aggregates, windows, and table scans --- .../optimizer/src/optimize_projections/mod.rs | 383 +++++++++--------- 1 file changed, 183 insertions(+), 200 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index e4fe3f0b8fab2..22ef5ab854552 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -143,171 +143,23 @@ fn optimize_projections( }); } LogicalPlan::Aggregate(aggregate) => { - let has_volatile_ancestor = indices.has_volatile_ancestor(); - // Split parent requirements to GROUP BY and aggregate sections: - let n_group_exprs = aggregate.group_expr_len()?; - // Offset aggregate indices so that they point to valid indices at - // `aggregate.aggr_expr`: - let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs); - - // Get absolutely necessary GROUP BY fields: - let group_by_expr_existing = aggregate - .group_expr - .iter() - .map(|group_by_expr| group_by_expr.schema_name().to_string()) - .collect::>(); - - let new_group_bys = if let Some(simplest_groupby_indices) = - get_required_group_by_exprs_indices( - aggregate.input.schema(), - &group_by_expr_existing, - ) { - // Some of the fields in the GROUP BY may be required by the - // parent even if these fields are unnecessary in terms of - // functional dependency. - group_by_reqs - .append(&simplest_groupby_indices) - .get_at_indices(&aggregate.group_expr) - } else { - aggregate.group_expr - }; - - // Only use the absolutely necessary aggregate expressions required - // by the parent: - let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); - - if new_group_bys.is_empty() && new_aggr_expr.is_empty() { - // Global aggregation with no aggregate functions always produces 1 row and no columns. - return Ok(Transformed::yes(LogicalPlan::EmptyRelation( - EmptyRelation { - produce_one_row: true, - schema: Arc::new(DFSchema::empty()), - }, - ))); - } - - let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); - let schema = aggregate.input.schema(); - let necessary_indices = - RequiredIndices::new().with_exprs(schema, all_exprs_iter); - let necessary_exprs = necessary_indices.get_required_exprs(schema); - let mut necessary_indices = if new_aggr_expr.is_empty() { - // no aggregate functions – the aggregation is just a GROUP BY. - // In that case the output row count is always - // ≤1 per input group, and nothing upstream can tell how many input - // rows we had, so the child is *multiplicity‑insensitive*. - necessary_indices.for_multiplicity_insensitive_child() - } else { - // there is at least one aggregate function (COUNT, SUM, …). - // those functions generally depend on how many input rows hit each - // group, so the child must be treated as *multiplicity‑sensitive*. - necessary_indices.for_multiplicity_sensitive_child() - }; - necessary_indices = necessary_indices - .with_volatile_ancestor_if(has_volatile_ancestor) - .with_plan_volatile(volatile_in_plan); - - return optimize_projections( - Arc::unwrap_or_clone(aggregate.input), + return optimize_aggregate_projections( + aggregate, config, - necessary_indices, - )? - .transform_data(|aggregate_input| { - // Simplify the input of the aggregation by adding a projection so - // that its input only contains absolutely necessary columns for - // the aggregate expressions. Note that necessary_indices refer to - // fields in `aggregate.input.schema()`. - add_projection_on_top_if_helpful(aggregate_input, necessary_exprs) - })? - .map_data(|aggregate_input| { - // Create a new aggregate plan with the updated input and only the - // absolutely necessary fields: - Aggregate::try_new( - Arc::new(aggregate_input), - new_group_bys, - new_aggr_expr, - ) - .map(LogicalPlan::Aggregate) - }); + indices, + volatile_in_plan, + ); } LogicalPlan::Window(window) => { - let has_volatile_ancestor = indices.has_volatile_ancestor(); - let input_schema = Arc::clone(window.input.schema()); - // Split parent requirements to child and window expression sections: - let n_input_fields = input_schema.fields().len(); - // Offset window expression indices so that they point to valid - // indices at `window.window_expr`: - let (child_reqs, window_reqs) = indices.split_off(n_input_fields); - - // Only use window expressions that are absolutely necessary according - // to parent requirements: - let new_window_expr = window_reqs.get_at_indices(&window.window_expr); - - // Get all the required column indices at the input, either by the - // parent or window expression requirements. - let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); - let mut required_indices = if new_window_expr.is_empty() { - // There are no window functions that the parent cares about. - // A window operator without any window expressions doesn’t change the - // number of rows coming from its child – the output is just the input. - // Hence the child is multiplicity‑insensitive: upstream nodes can’t - // observe how many rows the child produced. - required_indices.for_multiplicity_insensitive_child() - } else { - // At least one window expression remains; e.g. `row_number()` or - // `lag()` etc. These depend on the ordering of rows coming from the - // child, so the number of input rows matters. Treat the child as - // multiplicity‑sensitive. - required_indices.for_multiplicity_sensitive_child() - }; - required_indices = required_indices - .with_volatile_ancestor_if(has_volatile_ancestor) - .with_plan_volatile(volatile_in_plan); - - return optimize_projections( - Arc::unwrap_or_clone(window.input), + return optimize_window_projections( + window, config, - required_indices.clone(), - )? - .transform_data(|window_child| { - if new_window_expr.is_empty() { - // When no window expression is necessary, use the input directly: - Ok(Transformed::no(window_child)) - } else { - // Calculate required expressions at the input of the window. - // Please note that we use `input_schema`, because `required_indices` - // refers to that schema - let required_exprs = - required_indices.get_required_exprs(&input_schema); - let window_child = - add_projection_on_top_if_helpful(window_child, required_exprs)? - .data; - Window::try_new(new_window_expr, Arc::new(window_child)) - .map(LogicalPlan::Window) - .map(Transformed::yes) - } - }); + indices, + volatile_in_plan, + ); } LogicalPlan::TableScan(table_scan) => { - let TableScan { - table_name, - source, - projection, - filters, - fetch, - projected_schema: _, - } = table_scan; - - // Get indices referred to in the original (schema with all fields) - // given projected indices. - let projection = match &projection { - Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), - None => indices.into_inner(), - }; - let new_scan = - TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; - - return Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))); + return optimize_table_scan_projections(table_scan, indices); } // Other node types are handled below _ => {} @@ -315,7 +167,7 @@ fn optimize_projections( // For other plan node types, calculate indices for columns they use and // try to rewrite their children - let mut child_required_indices: Vec = match &plan { + let child_required_indices: Vec = match &plan { LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Repartition(_) @@ -326,30 +178,14 @@ fn optimize_projections( // that appear in this plan's expressions to its child. All these // operators benefit from "small" inputs, so the projection_beneficial // flag is `true`. - plan.inputs() - .into_iter() - .map(|input| { - let required = indices - .clone() - .with_projection_beneficial() - .with_plan_exprs(&plan, input.schema())?; - Ok(required.with_plan_volatile(volatile_in_plan)) - }) - .collect::>()? + build_plan_input_requirements(&plan, &indices, volatile_in_plan, true)? } LogicalPlan::Limit(_) => { // Pass index requirements from the parent as well as column indices // that appear in this plan's expressions to its child. These operators // do not benefit from "small" inputs, so the projection_beneficial // flag is `false`. - plan.inputs() - .into_iter() - .map(|input| { - let required = - indices.clone().with_plan_exprs(&plan, input.schema())?; - Ok(required.with_plan_volatile(volatile_in_plan)) - }) - .collect::>()? + build_plan_input_requirements(&plan, &indices, volatile_in_plan, false)? } LogicalPlan::Copy(_) | LogicalPlan::Ddl(_) @@ -417,16 +253,7 @@ fn optimize_projections( return Ok(Transformed::no(plan)); } - plan.inputs() - .into_iter() - .map(|input| { - let required = indices - .clone() - .with_projection_beneficial() - .with_plan_exprs(&plan, input.schema())?; - Ok(required.with_plan_volatile(volatile_in_plan)) - }) - .collect::>>()? + build_plan_input_requirements(&plan, &indices, volatile_in_plan, true)? } LogicalPlan::Join(join) => { let left_len = join.left.schema().fields().len(); @@ -497,8 +324,172 @@ fn optimize_projections( } }; - // Required indices are currently ordered (child0, child1, ...) - // but the loop pops off the last element, so we need to reverse the order + let transformed_plan = rewrite_plan_children(plan, config, child_required_indices)?; + + // If any of the children are transformed, we need to potentially update the plan's schema + if transformed_plan.transformed { + transformed_plan.map_data(|plan| plan.recompute_schema()) + } else { + Ok(transformed_plan) + } +} + +fn optimize_aggregate_projections( + aggregate: Aggregate, + config: &dyn OptimizerConfig, + indices: RequiredIndices, + volatile_in_plan: bool, +) -> Result> { + let has_volatile_ancestor = indices.has_volatile_ancestor(); + let n_group_exprs = aggregate.group_expr_len()?; + let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs); + + let group_by_expr_existing = aggregate + .group_expr + .iter() + .map(|group_by_expr| group_by_expr.schema_name().to_string()) + .collect::>(); + + let new_group_bys = if let Some(simplest_groupby_indices) = + get_required_group_by_exprs_indices( + aggregate.input.schema(), + &group_by_expr_existing, + ) { + group_by_reqs + .append(&simplest_groupby_indices) + .get_at_indices(&aggregate.group_expr) + } else { + aggregate.group_expr + }; + + let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); + + if new_group_bys.is_empty() && new_aggr_expr.is_empty() { + return Ok(Transformed::yes(LogicalPlan::EmptyRelation( + EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + }, + ))); + } + + let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); + let schema = aggregate.input.schema(); + let necessary_indices = RequiredIndices::new().with_exprs(schema, all_exprs_iter); + let necessary_exprs = necessary_indices.get_required_exprs(schema); + let mut necessary_indices = if new_aggr_expr.is_empty() { + necessary_indices.for_multiplicity_insensitive_child() + } else { + necessary_indices.for_multiplicity_sensitive_child() + }; + necessary_indices = necessary_indices + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); + + optimize_projections( + Arc::unwrap_or_clone(aggregate.input), + config, + necessary_indices, + )? + .transform_data(|aggregate_input| { + add_projection_on_top_if_helpful(aggregate_input, necessary_exprs) + })? + .map_data(|aggregate_input| { + Aggregate::try_new(Arc::new(aggregate_input), new_group_bys, new_aggr_expr) + .map(LogicalPlan::Aggregate) + }) +} + +fn optimize_window_projections( + window: Window, + config: &dyn OptimizerConfig, + indices: RequiredIndices, + volatile_in_plan: bool, +) -> Result> { + let has_volatile_ancestor = indices.has_volatile_ancestor(); + let input_schema = Arc::clone(window.input.schema()); + let n_input_fields = input_schema.fields().len(); + let (child_reqs, window_reqs) = indices.split_off(n_input_fields); + + let new_window_expr = window_reqs.get_at_indices(&window.window_expr); + + let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); + let mut required_indices = if new_window_expr.is_empty() { + required_indices.for_multiplicity_insensitive_child() + } else { + required_indices.for_multiplicity_sensitive_child() + }; + required_indices = required_indices + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); + + optimize_projections( + Arc::unwrap_or_clone(window.input), + config, + required_indices.clone(), + )? + .transform_data(|window_child| { + if new_window_expr.is_empty() { + Ok(Transformed::no(window_child)) + } else { + let required_exprs = required_indices.get_required_exprs(&input_schema); + let window_child = + add_projection_on_top_if_helpful(window_child, required_exprs)?.data; + Window::try_new(new_window_expr, Arc::new(window_child)) + .map(LogicalPlan::Window) + .map(Transformed::yes) + } + }) +} + +fn optimize_table_scan_projections( + table_scan: TableScan, + indices: RequiredIndices, +) -> Result> { + let TableScan { + table_name, + source, + projection, + filters, + fetch, + projected_schema: _, + } = table_scan; + + let projection = match &projection { + Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), + None => indices.into_inner(), + }; + let new_scan = + TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; + + Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))) +} + +fn build_plan_input_requirements( + plan: &LogicalPlan, + indices: &RequiredIndices, + volatile_in_plan: bool, + projection_beneficial: bool, +) -> Result> { + plan.inputs() + .into_iter() + .map(|input| { + let required = if projection_beneficial { + indices.clone().with_projection_beneficial() + } else { + indices.clone() + }; + let required = required.with_plan_exprs(plan, input.schema())?; + Ok(required.with_plan_volatile(volatile_in_plan)) + }) + .collect::>() +} + +fn rewrite_plan_children( + plan: LogicalPlan, + config: &dyn OptimizerConfig, + mut child_required_indices: Vec, +) -> Result> { child_required_indices.reverse(); assert_eq_or_internal_err!( child_required_indices.len(), @@ -506,8 +497,7 @@ fn optimize_projections( "OptimizeProjection: child_required_indices length mismatch with plan inputs" ); - // Rewrite children of the plan - let transformed_plan = plan.map_children(|child| { + plan.map_children(|child| { let required_indices = child_required_indices.pop().ok_or_else(|| { internal_datafusion_err!( "Unexpected number of required_indices in OptimizeProjections rule" @@ -526,14 +516,7 @@ fn optimize_projections( } }, ) - })?; - - // If any of the children are transformed, we need to potentially update the plan's schema - if transformed_plan.transformed { - transformed_plan.map_data(|plan| plan.recompute_schema()) - } else { - Ok(transformed_plan) - } + }) } /// Merges consecutive projections. From dfb53359783a893e3068784d5a95c04276a62338 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 14:57:52 +0800 Subject: [PATCH 11/17] Refactor requirement construction into helper functions Extract repeated child-requirement construction logic into dedicated helper functions to improve code clarity and maintainability. Introduce `build_all_expr_input_requirements`, `build_extension_input_requirements`, and `build_unnest_fallback_requirements` for streamlined requirement handling in various components. --- .../optimizer/src/optimize_projections/mod.rs | 196 +++++++++++------- 1 file changed, 122 insertions(+), 74 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 22ef5ab854552..1e8b028ac27cf 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -30,8 +30,8 @@ use datafusion_common::{ }; use datafusion_expr::expr::Alias; use datafusion_expr::{ - Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window, - logical_plan::LogicalPlan, + Aggregate, Distinct, EmptyRelation, Expr, Extension, Projection, TableScan, Unnest, + Window, logical_plan::LogicalPlan, }; use crate::optimize_projections::required_indices::RequiredIndices; @@ -199,39 +199,20 @@ fn optimize_projections( // mismatch. // TODO: For some subquery variants (e.g. a subquery arising from an // EXISTS expression), we may not need to require all indices. - plan.inputs() - .into_iter() - .map(|input| { - let required = RequiredIndices::new_for_all_exprs(input); - Ok(required.with_plan_volatile(volatile_in_plan)) - }) - .collect::>()? + build_all_expr_input_requirements(&plan, volatile_in_plan, true)? } LogicalPlan::Extension(extension) => { - let Some(necessary_children_indices) = - extension.node.necessary_children_exprs(indices.indices()) + let Some(child_requirements) = build_extension_input_requirements( + &plan, + extension, + &indices, + volatile_in_plan, + )? else { // Requirements from parent cannot be routed down to user defined logical plan safely return Ok(Transformed::no(plan)); }; - let children = extension.node.inputs(); - assert_eq_or_internal_err!( - children.len(), - necessary_children_indices.len(), - "Inconsistent length between children and necessary children indices. \ - Make sure `.necessary_children_exprs` implementation of the \ - `UserDefinedLogicalNode` is consistent with actual children length \ - for the node." - ); - children - .into_iter() - .zip(necessary_children_indices) - .map(|(child, necessary_indices)| { - let required = RequiredIndices::new_from_indices(necessary_indices) - .with_plan_exprs(&plan, child.schema())?; - Ok(required.with_plan_volatile(volatile_in_plan)) - }) - .collect::>>()? + child_requirements } LogicalPlan::EmptyRelation(_) | LogicalPlan::Values(_) @@ -255,36 +236,17 @@ fn optimize_projections( build_plan_input_requirements(&plan, &indices, volatile_in_plan, true)? } - LogicalPlan::Join(join) => { - let left_len = join.left.schema().fields().len(); - let (left_req_indices, right_req_indices) = - split_join_requirements(left_len, indices, &join.join_type); - let left_indices = - left_req_indices.with_plan_exprs(&plan, join.left.schema())?; - let right_indices = - right_req_indices.with_plan_exprs(&plan, join.right.schema())?; - let left_indices = left_indices - .for_multiplicity_sensitive_child() - .with_plan_volatile(volatile_in_plan); - let right_indices = right_indices - .for_multiplicity_sensitive_child() - .with_plan_volatile(volatile_in_plan); - // Joins benefit from "small" input tables (lower memory usage). - // Therefore, each child benefits from projection: - vec![ - left_indices.with_projection_beneficial(), - right_indices.with_projection_beneficial(), - ] + LogicalPlan::Join(join) => build_join_input_requirements( + &plan, + join.left.schema(), + join.right.schema(), + &join.join_type, + indices, + volatile_in_plan, + )?, + LogicalPlan::Distinct(Distinct::All(_)) => { + build_all_expr_input_requirements(&plan, volatile_in_plan, false)? } - LogicalPlan::Distinct(Distinct::All(_)) => plan - .inputs() - .into_iter() - .map(|input| { - let required = RequiredIndices::new_for_all_exprs(input) - .for_multiplicity_insensitive_child(); - Ok(required.with_plan_volatile(volatile_in_plan)) - }) - .collect::>()?, // these nodes are explicitly rewritten in the match statement above LogicalPlan::Projection(_) | LogicalPlan::Aggregate(_) @@ -305,22 +267,7 @@ fn optimize_projections( )?; return Ok(Transformed::yes(transformed_input.data)); } - // at least provide the indices for the exec-columns as a starting point - let mut required_indices = - RequiredIndices::new().with_plan_exprs(&plan, unnest.input.schema())?; - required_indices = required_indices - .for_multiplicity_sensitive_child() - .with_volatile_ancestor_if(indices.has_volatile_ancestor()) - .with_plan_volatile(volatile_in_plan); - - // Add additional required indices from the parent - let mut additional_necessary_child_indices = Vec::new(); - indices.indices().iter().for_each(|idx| { - if let Some(index) = unnest.dependency_indices.get(*idx) { - additional_necessary_child_indices.push(*index); - } - }); - vec![required_indices.append(&additional_necessary_child_indices)] + build_unnest_fallback_requirements(&plan, unnest, &indices, volatile_in_plan)? } }; @@ -485,6 +432,107 @@ fn build_plan_input_requirements( .collect::>() } +fn build_all_expr_input_requirements( + plan: &LogicalPlan, + volatile_in_plan: bool, + multiplicity_sensitive: bool, +) -> Result> { + plan.inputs() + .into_iter() + .map(|input| { + let mut required = RequiredIndices::new_for_all_exprs(input); + if !multiplicity_sensitive { + required = required.for_multiplicity_insensitive_child(); + } + Ok(required.with_plan_volatile(volatile_in_plan)) + }) + .collect::>() +} + +fn build_extension_input_requirements( + plan: &LogicalPlan, + extension: &Extension, + indices: &RequiredIndices, + volatile_in_plan: bool, +) -> Result>> { + let Some(necessary_children_indices) = + extension.node.necessary_children_exprs(indices.indices()) + else { + return Ok(None); + }; + + let children = extension.node.inputs(); + assert_eq_or_internal_err!( + children.len(), + necessary_children_indices.len(), + "Inconsistent length between children and necessary children indices. \ + Make sure `.necessary_children_exprs` implementation of the \ + `UserDefinedLogicalNode` is consistent with actual children length \ + for the node." + ); + + children + .into_iter() + .zip(necessary_children_indices) + .map(|(child, necessary_indices)| { + let required = RequiredIndices::new_from_indices(necessary_indices) + .with_plan_exprs(plan, child.schema())?; + Ok(required.with_plan_volatile(volatile_in_plan)) + }) + .collect::>>() + .map(Some) +} + +fn build_unnest_fallback_requirements( + plan: &LogicalPlan, + unnest: &Unnest, + indices: &RequiredIndices, + volatile_in_plan: bool, +) -> Result> { + let mut required_indices = + RequiredIndices::new().with_plan_exprs(plan, unnest.input.schema())?; + required_indices = required_indices + .for_multiplicity_sensitive_child() + .with_volatile_ancestor_if(indices.has_volatile_ancestor()) + .with_plan_volatile(volatile_in_plan); + + let additional_necessary_child_indices = indices + .indices() + .iter() + .filter_map(|idx| unnest.dependency_indices.get(*idx).copied()) + .collect::>(); + + Ok(vec![ + required_indices.append(&additional_necessary_child_indices), + ]) +} + +fn build_join_input_requirements( + plan: &LogicalPlan, + left_schema: &datafusion_common::DFSchemaRef, + right_schema: &datafusion_common::DFSchemaRef, + join_type: &JoinType, + indices: RequiredIndices, + volatile_in_plan: bool, +) -> Result> { + let left_len = left_schema.fields().len(); + let (left_req_indices, right_req_indices) = + split_join_requirements(left_len, indices, join_type); + let left_indices = left_req_indices.with_plan_exprs(plan, left_schema)?; + let right_indices = right_req_indices.with_plan_exprs(plan, right_schema)?; + let left_indices = left_indices + .for_multiplicity_sensitive_child() + .with_plan_volatile(volatile_in_plan); + let right_indices = right_indices + .for_multiplicity_sensitive_child() + .with_plan_volatile(volatile_in_plan); + + Ok(vec![ + left_indices.with_projection_beneficial(), + right_indices.with_projection_beneficial(), + ]) +} + fn rewrite_plan_children( plan: LogicalPlan, config: &dyn OptimizerConfig, From 05fb85813d0b328e8824399bdffec7a6a3505bea Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 15:19:55 +0800 Subject: [PATCH 12/17] Refactor with_child_multiplicity usage Add helper in mod.rs for handling child multiplicity. Replace duplicate code in aggregate and window paths with the new helper method, passing in multiplicity sensitivity based on the presence of expressions. This improves code readability and maintainability. --- .../optimizer/src/optimize_projections/mod.rs | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 1e8b028ac27cf..9257240c23515 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -324,11 +324,10 @@ fn optimize_aggregate_projections( let schema = aggregate.input.schema(); let necessary_indices = RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); - let mut necessary_indices = if new_aggr_expr.is_empty() { - necessary_indices.for_multiplicity_insensitive_child() - } else { - necessary_indices.for_multiplicity_sensitive_child() - }; + let mut necessary_indices = with_child_multiplicity( + necessary_indices, + !new_aggr_expr.is_empty(), + ); necessary_indices = necessary_indices .with_volatile_ancestor_if(has_volatile_ancestor) .with_plan_volatile(volatile_in_plan); @@ -361,11 +360,8 @@ fn optimize_window_projections( let new_window_expr = window_reqs.get_at_indices(&window.window_expr); let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); - let mut required_indices = if new_window_expr.is_empty() { - required_indices.for_multiplicity_insensitive_child() - } else { - required_indices.for_multiplicity_sensitive_child() - }; + let mut required_indices = + with_child_multiplicity(required_indices, !new_window_expr.is_empty()); required_indices = required_indices .with_volatile_ancestor_if(has_volatile_ancestor) .with_plan_volatile(volatile_in_plan); @@ -412,6 +408,17 @@ fn optimize_table_scan_projections( Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))) } +fn with_child_multiplicity( + required_indices: RequiredIndices, + multiplicity_sensitive: bool, +) -> RequiredIndices { + if multiplicity_sensitive { + required_indices.for_multiplicity_sensitive_child() + } else { + required_indices.for_multiplicity_insensitive_child() + } +} + fn build_plan_input_requirements( plan: &LogicalPlan, indices: &RequiredIndices, From 18761fcdc25a20f473f6ff5306b710e4cb8fcbb5 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 15:22:11 +0800 Subject: [PATCH 13/17] comment with_child_multiplicity --- datafusion/optimizer/src/optimize_projections/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 9257240c23515..e9151ce7143bc 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -324,10 +324,8 @@ fn optimize_aggregate_projections( let schema = aggregate.input.schema(); let necessary_indices = RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); - let mut necessary_indices = with_child_multiplicity( - necessary_indices, - !new_aggr_expr.is_empty(), - ); + let mut necessary_indices = + with_child_multiplicity(necessary_indices, !new_aggr_expr.is_empty()); necessary_indices = necessary_indices .with_volatile_ancestor_if(has_volatile_ancestor) .with_plan_volatile(volatile_in_plan); @@ -412,6 +410,9 @@ fn with_child_multiplicity( required_indices: RequiredIndices, multiplicity_sensitive: bool, ) -> RequiredIndices { + // This switch encodes semantic safety, not a performance preference. + // If ancestors can observe row-count changes, keep the child multiplicity-sensitive; + // otherwise mark it multiplicity-insensitive to allow more aggressive rewrites. if multiplicity_sensitive { required_indices.for_multiplicity_sensitive_child() } else { From 51aed572f666b4183d8b78fdd278ddac186424c9 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 15:34:21 +0800 Subject: [PATCH 14/17] Refactor post-processing in mod.rs Extract shared post-processing into finalize_child_requirements() to handle multiplicity mode, volatile-ancestor propagation, and plan-volatile propagation. Update optimize_aggregate_projections and optimize_window_projections to utilize this helper. Improve readability with clearer plural naming for new aggregation and window expressions. --- .../optimizer/src/optimize_projections/mod.rs | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index e9151ce7143bc..02d5160de29a5 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -309,9 +309,9 @@ fn optimize_aggregate_projections( aggregate.group_expr }; - let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); + let new_aggr_exprs = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); - if new_group_bys.is_empty() && new_aggr_expr.is_empty() { + if new_group_bys.is_empty() && new_aggr_exprs.is_empty() { return Ok(Transformed::yes(LogicalPlan::EmptyRelation( EmptyRelation { produce_one_row: true, @@ -320,15 +320,16 @@ fn optimize_aggregate_projections( ))); } - let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); + let all_exprs_iter = new_group_bys.iter().chain(new_aggr_exprs.iter()); let schema = aggregate.input.schema(); let necessary_indices = RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); - let mut necessary_indices = - with_child_multiplicity(necessary_indices, !new_aggr_expr.is_empty()); - necessary_indices = necessary_indices - .with_volatile_ancestor_if(has_volatile_ancestor) - .with_plan_volatile(volatile_in_plan); + let necessary_indices = finalize_child_requirements( + necessary_indices, + !new_aggr_exprs.is_empty(), + has_volatile_ancestor, + volatile_in_plan, + ); optimize_projections( Arc::unwrap_or_clone(aggregate.input), @@ -339,7 +340,7 @@ fn optimize_aggregate_projections( add_projection_on_top_if_helpful(aggregate_input, necessary_exprs) })? .map_data(|aggregate_input| { - Aggregate::try_new(Arc::new(aggregate_input), new_group_bys, new_aggr_expr) + Aggregate::try_new(Arc::new(aggregate_input), new_group_bys, new_aggr_exprs) .map(LogicalPlan::Aggregate) }) } @@ -355,14 +356,15 @@ fn optimize_window_projections( let n_input_fields = input_schema.fields().len(); let (child_reqs, window_reqs) = indices.split_off(n_input_fields); - let new_window_expr = window_reqs.get_at_indices(&window.window_expr); + let new_window_exprs = window_reqs.get_at_indices(&window.window_expr); - let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); - let mut required_indices = - with_child_multiplicity(required_indices, !new_window_expr.is_empty()); - required_indices = required_indices - .with_volatile_ancestor_if(has_volatile_ancestor) - .with_plan_volatile(volatile_in_plan); + let required_indices = child_reqs.with_exprs(&input_schema, &new_window_exprs); + let required_indices = finalize_child_requirements( + required_indices, + !new_window_exprs.is_empty(), + has_volatile_ancestor, + volatile_in_plan, + ); optimize_projections( Arc::unwrap_or_clone(window.input), @@ -370,13 +372,13 @@ fn optimize_window_projections( required_indices.clone(), )? .transform_data(|window_child| { - if new_window_expr.is_empty() { + if new_window_exprs.is_empty() { Ok(Transformed::no(window_child)) } else { let required_exprs = required_indices.get_required_exprs(&input_schema); let window_child = add_projection_on_top_if_helpful(window_child, required_exprs)?.data; - Window::try_new(new_window_expr, Arc::new(window_child)) + Window::try_new(new_window_exprs, Arc::new(window_child)) .map(LogicalPlan::Window) .map(Transformed::yes) } @@ -420,6 +422,17 @@ fn with_child_multiplicity( } } +fn finalize_child_requirements( + required_indices: RequiredIndices, + multiplicity_sensitive: bool, + has_volatile_ancestor: bool, + volatile_in_plan: bool, +) -> RequiredIndices { + with_child_multiplicity(required_indices, multiplicity_sensitive) + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan) +} + fn build_plan_input_requirements( plan: &LogicalPlan, indices: &RequiredIndices, From 6a82743adae571f6605a8f8ba998c1d70b6a2116 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 15:50:40 +0800 Subject: [PATCH 15/17] remove finalize_child_requirements --- .../optimizer/src/optimize_projections/mod.rs | 31 +++++-------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 02d5160de29a5..5ba5f759a8f9d 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -324,12 +324,10 @@ fn optimize_aggregate_projections( let schema = aggregate.input.schema(); let necessary_indices = RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); - let necessary_indices = finalize_child_requirements( - necessary_indices, - !new_aggr_exprs.is_empty(), - has_volatile_ancestor, - volatile_in_plan, - ); + let necessary_indices = + with_child_multiplicity(necessary_indices, !new_aggr_exprs.is_empty()) + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); optimize_projections( Arc::unwrap_or_clone(aggregate.input), @@ -359,12 +357,10 @@ fn optimize_window_projections( let new_window_exprs = window_reqs.get_at_indices(&window.window_expr); let required_indices = child_reqs.with_exprs(&input_schema, &new_window_exprs); - let required_indices = finalize_child_requirements( - required_indices, - !new_window_exprs.is_empty(), - has_volatile_ancestor, - volatile_in_plan, - ); + let required_indices = + with_child_multiplicity(required_indices, !new_window_exprs.is_empty()) + .with_volatile_ancestor_if(has_volatile_ancestor) + .with_plan_volatile(volatile_in_plan); optimize_projections( Arc::unwrap_or_clone(window.input), @@ -422,17 +418,6 @@ fn with_child_multiplicity( } } -fn finalize_child_requirements( - required_indices: RequiredIndices, - multiplicity_sensitive: bool, - has_volatile_ancestor: bool, - volatile_in_plan: bool, -) -> RequiredIndices { - with_child_multiplicity(required_indices, multiplicity_sensitive) - .with_volatile_ancestor_if(has_volatile_ancestor) - .with_plan_volatile(volatile_in_plan) -} - fn build_plan_input_requirements( plan: &LogicalPlan, indices: &RequiredIndices, From fd4795d235de476a3b756f4fc06b91bbd59b37f3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 15:59:58 +0800 Subject: [PATCH 16/17] comment - explain multiplicity --- .../optimizer/src/optimize_projections/required_indices.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index 7053266d0e658..1abbe36c45d32 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -42,6 +42,11 @@ pub(super) struct RequiredIndices { /// Defaults to false. projection_beneficial: bool, /// Whether ancestors can observe row multiplicity changes. + /// + /// "Multiplicity" means how many rows a child produces, including duplicate + /// rows. If this is `true`, rewrites must preserve row counts exactly because + /// upstream expressions (for example, `COUNT` or window functions) may depend + /// on them. multiplicity_sensitive: bool, /// Whether any ancestor expression is volatile. has_volatile_ancestor: bool, From cbe65d95c084bc9f740c6a9a056497f3f67ae808 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 5 Mar 2026 16:39:13 +0800 Subject: [PATCH 17/17] Refine UNNEST elimination logic in mod.rs Implement strict proof checks for UNNEST removal. Ensure it is only eliminated under specific conditions, such as when the ancestor context is multiplicity-insensitive, the list rows are provably preserved, and the recursion depth is exactly 1. Add new optimizer_unnest_prune.slt coverage for unnest removal in query plans. --- .../optimizer/src/optimize_projections/mod.rs | 61 ++++++++++++++++++- .../test_files/optimizer_unnest_prune.slt | 16 +++++ 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5ba5f759a8f9d..bcba7408e923f 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -21,11 +21,12 @@ mod required_indices; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; +use arrow::array::Array; use std::collections::HashSet; use std::sync::Arc; use datafusion_common::{ - Column, DFSchema, HashMap, JoinType, Result, assert_eq_or_internal_err, + Column, DFSchema, HashMap, JoinType, Result, ScalarValue, assert_eq_or_internal_err, get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, }; use datafusion_expr::expr::Alias; @@ -1028,8 +1029,8 @@ fn can_eliminate_unnest(unnest: &Unnest, indices: &RequiredIndices) -> bool { } // List unnest can drop rows for empty lists even when preserve_nulls=true. - // Without proving non-empty cardinality, keep UNNEST conservatively. - if !unnest.list_type_columns.is_empty() { + // Allow elimination only when list inputs are provably non-empty. + if !list_unnest_rows_are_preserved(unnest) { return false; } @@ -1042,6 +1043,60 @@ fn can_eliminate_unnest(unnest: &Unnest, indices: &RequiredIndices) -> bool { .all(|&output_idx| unnest_output_is_passthrough(unnest, output_idx)) } +fn list_unnest_rows_are_preserved(unnest: &Unnest) -> bool { + if unnest.list_type_columns.is_empty() { + return true; + } + + // To preserve row cardinality we need strict evidence that every unnested + // list input yields at least one element per row. + let LogicalPlan::Projection(input_projection) = unnest.input.as_ref() else { + return false; + }; + + unnest + .list_type_columns + .iter() + .all(|(input_idx, list_column)| { + list_column.depth == 1 + && input_projection + .expr + .get(*input_idx) + .is_some_and(expr_is_provably_non_empty_list) + }) +} + +fn expr_is_provably_non_empty_list(expr: &Expr) -> bool { + let expr = strip_alias(expr); + if expr.is_volatile() { + return false; + } + + match expr { + Expr::ScalarFunction(func) => { + func.name() == "make_array" && !func.args.is_empty() + } + Expr::Literal(scalar, _) => scalar_value_is_non_empty_list(scalar), + _ => false, + } +} + +fn strip_alias(expr: &Expr) -> &Expr { + match expr { + Expr::Alias(alias) => strip_alias(alias.expr.as_ref()), + _ => expr, + } +} + +fn scalar_value_is_non_empty_list(scalar: &ScalarValue) -> bool { + match scalar { + ScalarValue::List(arr) => !arr.is_null(0) && arr.value_length(0) > 0, + ScalarValue::LargeList(arr) => !arr.is_null(0) && arr.value_length(0) > 0, + ScalarValue::FixedSizeList(arr) => !arr.is_null(0) && arr.value_length() > 0, + _ => false, + } +} + fn unnest_output_is_passthrough(unnest: &Unnest, output_idx: usize) -> bool { let Some(&dependency_idx) = unnest.dependency_indices.get(output_idx) else { return false; diff --git a/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt index 3a44090b2c090..7b99bccfccaa6 100644 --- a/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt +++ b/datafusion/sqllogictest/test_files/optimizer_unnest_prune.slt @@ -54,6 +54,22 @@ logical_plan 03)----Projection: unnest_prune_struct_t.column1 AS id 04)------TableScan: unnest_prune_struct_t projection=[column1] +# Safe case: deterministic non-empty make_array unnest is cardinality-preserving. +# Unnest should be removed. +query TT +EXPLAIN SELECT id +FROM ( + SELECT column1 AS id, unnest(make_array(1, 2, 3)) AS elem + FROM unnest_prune_t +) q +GROUP BY id; +---- +logical_plan +01)Aggregate: groupBy=[[q.id]], aggr=[[]] +02)--SubqueryAlias: q +03)----Projection: unnest_prune_t.column1 AS id +04)------TableScan: unnest_prune_t projection=[column1] + # Empty-list/null semantics are cardinality-sensitive even if unnested column is dead. # Unnest must remain. query TT