diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2889259dd4820..b7698f2ecd21f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1348,6 +1348,13 @@ config_namespace! { /// closer to the leaf table scans, and push those projections down /// towards the leaf nodes. pub enable_leaf_expression_pushdown: bool, default = true + + /// When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that + /// read from the same source and differ only by filter predicates into a single branch + /// with a combined filter. This optimization is conservative and only applies when the + /// branches share the same source and compatible wrapper nodes such as identical + /// projections or aliases. + pub enable_unions_to_filter: bool, default = false } } diff --git a/datafusion/core/src/optimizer_rule_reference.md b/datafusion/core/src/optimizer_rule_reference.md index fcbb200c71624..1f9f37f530557 100644 --- a/datafusion/core/src/optimizer_rule_reference.md +++ b/datafusion/core/src/optimizer_rule_reference.md @@ -39,28 +39,29 @@ Rule order matters. The default pipeline may change between releases. | ----- | ----------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | | 1 | `rewrite_set_comparison` | Rewrites `ANY` and `ALL` set-comparison subqueries into `EXISTS`-based boolean expressions with correct SQL NULL semantics. | | 2 | `optimize_unions` | Flattens nested unions and removes unions with a single input. | -| 3 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. | -| 4 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. | -| 5 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. | -| 6 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. | -| 7 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. | -| 8 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. | -| 9 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. | -| 10 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. | -| 11 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. | -| 12 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. | -| 13 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. | -| 14 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. | -| 15 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. | -| 16 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. | -| 17 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. | -| 18 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. | -| 19 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. | -| 20 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. | -| 21 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. | -| 22 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. | -| 23 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. | -| 24 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. | +| 3 | `unions_to_filter` | Merges `UNION DISTINCT` branches that share the same source into a single filtered branch with a disjunctive predicate. | +| 4 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. | +| 5 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. | +| 6 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. | +| 7 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. | +| 8 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. | +| 9 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. | +| 10 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. | +| 11 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. | +| 12 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. | +| 13 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. | +| 14 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. | +| 15 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. | +| 16 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. | +| 17 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. | +| 18 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. | +| 19 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. | +| 20 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. | +| 21 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. | +| 22 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. | +| 23 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. | +| 24 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. | +| 25 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. | ### Physical Optimizer Rules diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index a767526feb930..0822a17f24f16 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -83,3 +83,7 @@ harness = false [[bench]] name = "optimize_projections" harness = false + +[[bench]] +name = "unions_to_filter" +harness = false diff --git a/datafusion/optimizer/benches/unions_to_filter.rs b/datafusion/optimizer/benches/unions_to_filter.rs new file mode 100644 index 0000000000000..3f7ef1e582410 --- /dev/null +++ b/datafusion/optimizer/benches/unions_to_filter.rs @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Microbenchmarks for the [`UnionsToFilter`] optimizer rule. +//! +//! Three scenarios are covered: +//! +//! 1. **merge** – N branches over the *same* table, each with a simple +//! equality filter. All branches should be merged into a single +//! `DISTINCT(Filter(OR …))` plan. +//! +//! 2. **no_merge** – N branches over *different* tables. The rule must +//! recognise that no merge is possible and leave the plan unchanged. +//! This exercises the "cold path" without any rewrite work. +//! +//! 3. **merge_with_projection** – N branches over the same table but each +//! branch wraps the filter in a `Projection`. This exercises the wrapper- +//! peeling and re-wrapping paths in addition to the core merge logic. +//! +//! To generate a flamegraph (requires `cargo-flamegraph`): +//! ```text +//! cargo flamegraph -p datafusion-optimizer --bench unions_to_filter \ +//! --flamechart --root --profile profiling --freq 1000 -- --bench +//! ``` + +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, logical_plan::table_scan}; +use datafusion_expr::{col, lit}; +use datafusion_optimizer::OptimizerContext; +use datafusion_optimizer::unions_to_filter::UnionsToFilter; +use datafusion_optimizer::{Optimizer, OptimizerRule}; +use std::hint::black_box; +use std::sync::Arc; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Build a three-column table scan for `name`. +fn scan(name: &str) -> LogicalPlan { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + table_scan(Some(name), &schema, None) + .unwrap() + .build() + .unwrap() +} + +/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches all filter over +/// the *same* table (`t`), so the rule can merge them. +fn build_merge_plan(n: usize) -> LogicalPlan { + assert!(n >= 2); + let mut builder: Option = None; + for i in 0..n { + let branch = LogicalPlanBuilder::from(scan("t")) + .filter(col("a").eq(lit(i as i32))) + .unwrap() + .build() + .unwrap(); + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(b) => b.union(branch).unwrap(), + }); + } + builder.unwrap().distinct().unwrap().build().unwrap() +} + +/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each filter over a +/// *different* table, so no merge is possible. +fn build_no_merge_plan(n: usize) -> LogicalPlan { + assert!(n >= 2); + let mut builder: Option = None; + for i in 0..n { + let branch = LogicalPlanBuilder::from(scan(&format!("t{i}"))) + .filter(col("a").eq(lit(i as i32))) + .unwrap() + .build() + .unwrap(); + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(b) => b.union(branch).unwrap(), + }); + } + builder.unwrap().distinct().unwrap().build().unwrap() +} + +/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each wrap the +/// filter inside a `Projection` over the *same* table. +fn build_merge_with_projection_plan(n: usize) -> LogicalPlan { + assert!(n >= 2); + let mut builder: Option = None; + for i in 0..n { + let branch = LogicalPlanBuilder::from(scan("t")) + .filter(col("a").eq(lit(i as i32))) + .unwrap() + .project(vec![col("a"), col("b")]) + .unwrap() + .build() + .unwrap(); + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(b) => b.union(branch).unwrap(), + }); + } + builder.unwrap().distinct().unwrap().build().unwrap() +} + +/// Run the [`UnionsToFilter`] rule through the full [`Optimizer`] pipeline +/// (single pass, feature flag enabled). +fn run_optimizer(plan: &LogicalPlan, ctx: &OptimizerContext) -> LogicalPlan { + let rules: Vec> = + vec![Arc::new(UnionsToFilter::new())]; + Optimizer::with_rules(rules) + .optimize(plan.clone(), ctx, |_, _| {}) + .unwrap() +} + +// --------------------------------------------------------------------------- +// Benchmark functions +// --------------------------------------------------------------------------- + +fn bench_merge(c: &mut Criterion) { + let mut options = ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let ctx = + OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1); + + let mut group = c.benchmark_group("unions_to_filter/merge"); + for n in [2, 8, 32, 128] { + let plan = build_merge_plan(n); + group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| { + b.iter(|| black_box(run_optimizer(p, &ctx))); + }); + } + group.finish(); +} + +fn bench_no_merge(c: &mut Criterion) { + let mut options = ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let ctx = + OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1); + + let mut group = c.benchmark_group("unions_to_filter/no_merge"); + for n in [2, 8, 32, 128] { + let plan = build_no_merge_plan(n); + group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| { + b.iter(|| black_box(run_optimizer(p, &ctx))); + }); + } + group.finish(); +} + +fn bench_merge_with_projection(c: &mut Criterion) { + let mut options = ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let ctx = + OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1); + + let mut group = c.benchmark_group("unions_to_filter/merge_with_projection"); + for n in [2, 8, 32, 128] { + let plan = build_merge_with_projection_plan(n); + group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| { + b.iter(|| black_box(run_optimizer(p, &ctx))); + }); + } + group.finish(); +} + +criterion_group!( + benches, + bench_merge, + bench_no_merge, + bench_merge_with_projection +); +criterion_main!(benches); diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index e610091824092..47adc99ff21b4 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -70,6 +70,7 @@ pub mod rewrite_set_comparison; pub mod scalar_subquery_to_join; pub mod simplify_expressions; pub mod single_distinct_to_groupby; +pub mod unions_to_filter; pub mod utils; #[cfg(test)] diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d0fbb31414dab..31f8088f79c98 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -56,6 +56,7 @@ use crate::rewrite_set_comparison::RewriteSetComparison; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; use crate::single_distinct_to_groupby::SingleDistinctToGroupBy; +use crate::unions_to_filter::UnionsToFilter; use crate::utils::log_plan; /// Transforms one [`LogicalPlan`] into another which computes the same results, @@ -280,6 +281,7 @@ impl Optimizer { let rules: Vec> = vec![ Arc::new(RewriteSetComparison::new()), Arc::new(OptimizeUnions::new()), + Arc::new(UnionsToFilter::new()), Arc::new(SimplifyExpressions::new()), Arc::new(ReplaceDistinctWithAggregate::new()), Arc::new(EliminateJoin::new()), diff --git a/datafusion/optimizer/src/unions_to_filter.rs b/datafusion/optimizer/src/unions_to_filter.rs new file mode 100644 index 0000000000000..158fd358287fe --- /dev/null +++ b/datafusion/optimizer/src/unions_to_filter.rs @@ -0,0 +1,652 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Rewrites `UNION DISTINCT` branches that differ only by filter predicates +//! into a single filtered branch plus `DISTINCT`. + +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; +use datafusion_expr::logical_plan::builder::LogicalPlanBuilder; +use datafusion_expr::utils::disjunction; +use datafusion_expr::{ + Distinct, Expr, Filter, LogicalPlan, Projection, SubqueryAlias, Union, +}; +use log::debug; +use std::sync::Arc; + +#[derive(Default, Debug)] +pub struct UnionsToFilter; + +impl UnionsToFilter { + #[expect(missing_docs)] + pub fn new() -> Self { + Self + } +} + +impl OptimizerRule for UnionsToFilter { + fn name(&self) -> &str { + "unions_to_filter" + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + if !config.options().optimizer.enable_unions_to_filter { + return Ok(Transformed::no(plan)); + } + + // Fast pre-check: if the plan tree has no Distinct::All node at all we can + // skip the expensive bottom-up rewrite_with_subqueries traversal entirely. + // This matters for large UNION ALL plans (e.g. TPC-DS Q4) where the rule + // can never fire and the traversal overhead is otherwise measurable. + if !plan.exists(|p| Ok(matches!(p, LogicalPlan::Distinct(Distinct::All(_)))))? { + return Ok(Transformed::no(plan)); + } + + plan.rewrite_with_subqueries(&mut UnionsToFilterRewriter) + } +} + +struct UnionsToFilterRewriter; + +impl TreeNodeRewriter for UnionsToFilterRewriter { + type Node = LogicalPlan; + + fn f_up(&mut self, plan: LogicalPlan) -> Result> { + match &plan { + LogicalPlan::Distinct(Distinct::All(input)) => { + match try_rewrite_distinct_union(input.as_ref().clone())? { + Some(rewritten) => Ok(Transformed::yes(rewritten)), + None => Ok(Transformed::no(plan)), + } + } + _ => Ok(Transformed::no(plan)), + } + } +} + +fn try_rewrite_distinct_union(plan: LogicalPlan) -> Result> { + let LogicalPlan::Union(Union { inputs, schema }) = plan else { + debug!("unions_to_filter skipped: input is not a UNION"); + return Ok(None); + }; + + if inputs.len() < 2 { + debug!( + "unions_to_filter skipped: UNION has {} input(s), need at least 2", + inputs.len() + ); + return Ok(None); + } + + // Use a Vec instead of HashMap: union branches are typically 2-10 entries, + // so a linear scan with PartialEq is faster than recursively hashing entire + // LogicalPlan subtrees (O(N * tree_size) hashing for every insert/lookup). + let mut grouped: Vec<(GroupKey, Vec)> = Vec::new(); + let mut transformed = false; + + for input in inputs { + let Some(branch) = extract_branch(Arc::unwrap_or_clone(input))? else { + return Ok(None); + }; + + let key = GroupKey { + source: branch.source, + wrappers: branch.wrappers, + }; + if let Some((_, conds)) = grouped.iter_mut().find(|(k, _)| k == &key) { + conds.push(branch.predicate); + transformed = true; + } else { + grouped.push((key, vec![branch.predicate])); + } + } + + if !transformed { + debug!("unions_to_filter skipped: no branch groups could be merged"); + return Ok(None); + } + + let mut builder: Option = None; + for (key, predicates) in grouped { + let combined = + disjunction(predicates).expect("union branches always provide predicates"); + let branch = LogicalPlanBuilder::from(key.source) + .filter(combined)? + .build()?; + let branch = wrap_branch(branch, &key.wrappers)?; + let branch = coerce_plan_expr_for_schema(branch, &schema)?; + let branch = align_plan_to_schema(branch, Arc::clone(&schema))?; + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(builder) => builder.union(branch)?, + }); + } + + let union = builder + .expect("at least one branch after rewrite") + .build()?; + Ok(Some(LogicalPlan::Distinct(Distinct::All(Arc::new(union))))) +} + +struct UnionBranch { + source: LogicalPlan, + predicate: Expr, + wrappers: Vec, +} + +fn extract_branch(plan: LogicalPlan) -> Result> { + let (wrappers, plan) = peel_wrappers(plan); + + // Volatile or subquery expressions in the projection must not be merged: + // they are evaluated once per branch in the original plan but would be + // evaluated once per combined row after the rewrite, which can change the + // output row set. + if !wrapper_projections_are_safe(&wrappers) { + debug!( + "unions_to_filter skipped: projection wrapper contains volatile expression or subquery" + ); + return Ok(None); + } + + match plan { + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + if !is_mergeable_predicate(&predicate) { + debug!( + "unions_to_filter skipped: branch predicate contains volatility or a subquery" + ); + return Ok(None); + } + Ok(Some(UnionBranch { + source: strip_passthrough_nodes(Arc::unwrap_or_clone(input)), + predicate, + wrappers, + })) + } + // A Limit or Sort node changes the row-set semantics of the branch. + // Merging two such branches into one would silently drop the per-branch + // row restriction (LIMIT) or rely on an order guarantee that UNION does + // not preserve (ORDER BY). Bail out to leave the UNION unchanged. + LogicalPlan::Limit(_) => { + debug!("unions_to_filter skipped: branch contains LIMIT"); + Ok(None) + } + LogicalPlan::Sort(_) => { + debug!("unions_to_filter skipped: branch contains ORDER BY / SORT"); + Ok(None) + } + other => Ok(Some(UnionBranch { + source: strip_passthrough_nodes(other), + predicate: Expr::Literal( + datafusion_common::ScalarValue::Boolean(Some(true)), + None, + ), + wrappers, + })), + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct GroupKey { + source: LogicalPlan, + wrappers: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum Wrapper { + Projection { + expr: Vec, + schema: datafusion_common::DFSchemaRef, + }, + SubqueryAlias { + alias: datafusion_common::TableReference, + schema: datafusion_common::DFSchemaRef, + }, +} + +fn peel_wrappers(mut plan: LogicalPlan) -> (Vec, LogicalPlan) { + let mut wrappers = vec![]; + loop { + match plan { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + .. + }) => { + wrappers.push(Wrapper::Projection { expr, schema }); + plan = Arc::unwrap_or_clone(input); + } + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + .. + }) => { + wrappers.push(Wrapper::SubqueryAlias { alias, schema }); + plan = Arc::unwrap_or_clone(input); + } + other => return (wrappers, other), + } + } +} + +fn wrap_branch(mut plan: LogicalPlan, wrappers: &[Wrapper]) -> Result { + for wrapper in wrappers.iter().rev() { + plan = match wrapper { + Wrapper::Projection { expr, schema } => { + LogicalPlan::Projection(Projection::try_new_with_schema( + expr.clone(), + Arc::new(plan), + Arc::clone(schema), + )?) + } + // SubqueryAlias::try_new recomputes the schema from the new input. + // This is safe because the source table is unchanged; only the + // filter predicate differs, so the recomputed schema matches the + // original one stored in peel_wrappers. + Wrapper::SubqueryAlias { alias, .. } => LogicalPlan::SubqueryAlias( + SubqueryAlias::try_new(Arc::new(plan), alias.clone())?, + ), + }; + } + Ok(plan) +} + +fn strip_passthrough_nodes(mut plan: LogicalPlan) -> LogicalPlan { + loop { + plan = match plan { + LogicalPlan::Projection(Projection { input, .. }) => { + Arc::unwrap_or_clone(input) + } + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { + Arc::unwrap_or_clone(input) + } + other => return other, + }; + } +} + +fn align_plan_to_schema( + plan: LogicalPlan, + schema: datafusion_common::DFSchemaRef, +) -> Result { + if plan.schema() == &schema { + return Ok(plan); + } + + let expr = plan + .schema() + .iter() + .enumerate() + .map(|(i, _)| { + Expr::Column(datafusion_common::Column::from( + plan.schema().qualified_field(i), + )) + }) + .collect::>(); + + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr, + Arc::new(plan), + schema, + )?)) +} + +fn is_mergeable_predicate(expr: &Expr) -> bool { + !expr.is_volatile() && !expr_contains_subquery(expr) +} + +/// Returns `true` when every projection expression in `wrappers` is both +/// non-volatile and subquery-free. +/// +/// Volatile expressions (e.g. `random()`, `now()`) or correlated subqueries +/// in the SELECT list cannot be safely merged: the original plan evaluates +/// them once per branch execution, while the rewritten plan evaluates them +/// once per combined row, which can change the set of output rows. +fn wrapper_projections_are_safe(wrappers: &[Wrapper]) -> bool { + wrappers.iter().all(|w| match w { + Wrapper::Projection { expr, .. } => expr + .iter() + .all(|e| !e.is_volatile() && !expr_contains_subquery(e)), + Wrapper::SubqueryAlias { .. } => true, + }) +} + +fn expr_contains_subquery(expr: &Expr) -> bool { + expr.exists(|e| match e { + Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => Ok(true), + _ => Ok(false), + }) + .expect("boolean expression walk is infallible") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::OptimizerContext; + use crate::assert_optimized_plan_eq_snapshot; + use crate::test::test_table_scan_with_name; + use arrow::datatypes::DataType; + use datafusion_common::Result; + use datafusion_expr::{ + ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, col, lit, + }; + + macro_rules! assert_optimized_plan_equal { + ( + $plan:expr, + @ $expected:literal $(,)? + ) => {{ + let mut options = datafusion_common::config::ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let optimizer_ctx = OptimizerContext::new_with_config_options(Arc::new(options)) + .with_max_passes(1); + let rules: Vec> = + vec![Arc::new(UnionsToFilter::new())]; + assert_optimized_plan_eq_snapshot!( + optimizer_ctx, + rules, + $plan, + @ $expected, + ) + }}; + } + + #[derive(Debug, PartialEq, Eq, Hash)] + struct VolatileTestUdf; + + impl ScalarUDFImpl for VolatileTestUdf { + fn name(&self) -> &str { + "volatile_test" + } + + fn signature(&self) -> &Signature { + static SIGNATURE: std::sync::LazyLock = + std::sync::LazyLock::new(|| Signature::nullary(Volatility::Volatile)); + &SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + panic!("VolatileTestUdf is not intended for execution") + } + } + + fn volatile_expr() -> Expr { + ScalarUDF::new_from_impl(VolatileTestUdf).call(vec![]) + } + + #[test] + fn rewrite_union_distinct_same_source_filters() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(1)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Projection: t.a, t.b, t.c + Filter: t.a = Int32(1) OR t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + #[test] + fn keep_union_distinct_different_sources() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .filter(col("a").eq(lit(1)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t2")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Filter: t1.a = Int32(1) + TableScan: t1 + Filter: t2.a = Int32(2) + TableScan: t2 + ")?; + Ok(()) + } + + #[test] + fn keep_union_distinct_with_volatile_predicate() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(volatile_expr().gt(lit(0.5_f64)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Filter: volatile_test() > Float64(0.5) + TableScan: t + Filter: t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + #[test] + fn rewrite_union_distinct_with_matching_projection_prefix() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .filter(col("b").eq(lit(5)))? + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Projection: emp.a AS mgr, emp.b AS comm + Filter: Boolean(true) OR emp.b = Int32(5) + TableScan: emp + ")?; + Ok(()) + } + + /// A volatile expression in the **projection** (SELECT list) must block the + /// rewrite. Each original branch evaluates it independently; merging them + /// would evaluate it once per combined row, changing the row set. + #[test] + fn keep_union_distinct_with_volatile_projection() -> Result<()> { + // Both branches project volatile_test() AS v over the same source. + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(1)))? + .project(vec![volatile_expr().alias("v"), col("a")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .project(vec![volatile_expr().alias("v"), col("a")])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Projection: volatile_test() AS v, t.a + Filter: t.a = Int32(1) + TableScan: t + Projection: volatile_test() AS v, t.a + Filter: t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + /// A scalar subquery in the **projection** must also block the rewrite. + #[test] + fn keep_union_distinct_with_subquery_in_projection() -> Result<()> { + use datafusion_expr::scalar_subquery; + + // Build a simple scalar subquery: (SELECT t2.b FROM t2 WHERE t2.a = t.a) + let t2 = test_table_scan_with_name("t2")?; + let subquery_plan = Arc::new( + LogicalPlanBuilder::from(t2) + .filter(col("t2.a").eq(col("t.a")))? + .project(vec![col("t2.b")])? + .build()?, + ); + let sq = scalar_subquery(subquery_plan); + + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(1)))? + .project(vec![sq.clone().alias("sub"), col("a")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .project(vec![sq.alias("sub"), col("a")])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + // Plan should be left untouched because the projection contains a subquery. + let optimized = { + let mut options = datafusion_common::config::ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let optimizer_ctx = + OptimizerContext::new_with_config_options(Arc::new(options)) + .with_max_passes(1); + let rules: Vec> = + vec![Arc::new(UnionsToFilter::new())]; + crate::Optimizer::with_rules(rules).optimize( + plan.clone(), + &optimizer_ctx, + |_, _| {}, + )? + }; + // The Distinct(Union(...)) structure must be preserved. + assert!( + matches!( + &optimized, + LogicalPlan::Distinct(Distinct::All(inner)) + if matches!(inner.as_ref(), LogicalPlan::Union(_)) + ), + "expected Distinct(Union(...)) to be preserved, got:\n{optimized:?}" + ); + Ok(()) + } + + /// A UNION where both branches have a LIMIT must **not** be rewritten. + /// Each branch independently restricts the row-set; collapsing them into a + /// single branch would lose the per-branch LIMIT semantics. + #[test] + fn keep_union_distinct_with_limit_branches() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .limit(0, Some(2))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .limit(0, Some(2))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Limit: skip=0, fetch=2 + Projection: emp.a AS mgr, emp.b AS comm + TableScan: emp + Limit: skip=0, fetch=2 + Projection: emp.a AS mgr, emp.b AS comm + TableScan: emp + ")?; + Ok(()) + } + + /// A UNION where both branches have an ORDER BY (Sort) must **not** be + /// rewritten. ORDER BY inside a UNION subquery does not guarantee ordering + /// in the result; merging the branches would silently discard the Sort. + #[test] + fn keep_union_distinct_with_sort_branches() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .sort(vec![col("a").sort(true, true)])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .sort(vec![col("a").sort(true, true)])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Projection: mgr, comm + Sort: emp.a ASC NULLS FIRST + Projection: emp.a AS mgr, emp.b AS comm, emp.a + TableScan: emp + Projection: mgr, comm + Sort: emp.a ASC NULLS FIRST + Projection: emp.a AS mgr, emp.b AS comm, emp.a + TableScan: emp + ")?; + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 2e8a65385541e..67d2c1e7b516e 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -178,6 +178,7 @@ logical_plan after type_coercion SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -202,6 +203,7 @@ logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -553,6 +555,7 @@ logical_plan after type_coercion SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -577,6 +580,7 @@ logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b04c78bd2774c..8ec72d1b9946a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -308,6 +308,7 @@ datafusion.optimizer.enable_sort_pushdown true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.enable_topk_dynamic_filter_pushdown true datafusion.optimizer.enable_topk_repartition true +datafusion.optimizer.enable_unions_to_filter false datafusion.optimizer.enable_window_limits true datafusion.optimizer.enable_window_topn false datafusion.optimizer.expand_views_at_output false @@ -455,6 +456,7 @@ datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. datafusion.optimizer.enable_topk_repartition true When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. +datafusion.optimizer.enable_unions_to_filter false When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases. datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible datafusion.optimizer.enable_window_topn false When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. When the window partition key has low cardinality, enabling this optimization can improve performance. However, for high cardinality keys, it may cause regressions in both memory usage and runtime. datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 3871468411c4b..500c9d0d9064a 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -297,6 +297,53 @@ physical_plan 04)--ProjectionExec: expr=[name@0 || _new as name] 05)----DataSourceExec: partitions=1, partition_sizes=[1] +# unions_to_filter is disabled by default + +statement ok +set datafusion.optimizer.enable_unions_to_filter = false; + +query TT +EXPLAIN SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 +---- +logical_plan +01)Aggregate: groupBy=[[id, name]], aggr=[[]] +02)--Union +03)----Filter: t1.id = Int32(1) +04)------TableScan: t1 projection=[id, name] +05)----Filter: t1.id = Int32(2) +06)------TableScan: t1 projection=[id, name] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +05)--------UnionExec +06)----------FilterExec: id@0 = 1 +07)------------DataSourceExec: partitions=1, partition_sizes=[1] +08)----------FilterExec: id@0 = 2 +09)------------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.enable_unions_to_filter = true; + +query TT +EXPLAIN SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 +---- +logical_plan +01)Aggregate: groupBy=[[id, name]], aggr=[[]] +02)--Filter: t1.id = Int32(1) OR t1.id = Int32(2) +03)----TableScan: t1 projection=[id, name] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------FilterExec: id@0 = 1 OR id@0 = 2 +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.enable_unions_to_filter = false; + # Make sure to choose a small batch size to introduce parallelism to the plan. statement ok set datafusion.execution.batch_size = 2; diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 34d1f7c61eaf1..b818eda0cd0b4 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -497,3 +497,37 @@ impl Default for MyTreeNode { } } ``` + +[#21075]: https://github.com/apache/datafusion/pull/21075 + +### `UnionsToFilter` optimizer rule is now disabled by default + +The `datafusion.optimizer.enable_unions_to_filter` option now defaults to +`false`. When enabled, the rule rewrites `UNION DISTINCT` branches that read the +same source and differ only by filter predicates into a single scan with a +combined `OR` predicate: + +```sql +-- Before: two separate scans +SELECT * FROM t WHERE a = 1 +UNION +SELECT * FROM t WHERE a = 2 + +-- After: one scan +SELECT DISTINCT * FROM t WHERE a = 1 OR a = 2 +``` + +**Who is affected:** + +- Queries using `UNION` against the same table with different filter + conditions may benefit from enabling this rule. + +**Migration guide:** + +Enable the rule when your `UNION` queries scan the same large table +multiple times with different predicates. Avoid it when the data source handles individual equality predicates more efficiently than +a combined `OR` (e.g., index-backed sources). + +```sql +SET datafusion.optimizer.enable_unions_to_filter = true; +``` diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 46039f3c99c27..d2841ceb4ca5f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -173,6 +173,7 @@ The following configuration settings are available: | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | | datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | | datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. | +| datafusion.optimizer.enable_unions_to_filter | false | When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |