From f4704a23f209c2f13a0a94b3ce5d638881d3b543 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Thu, 5 Mar 2026 18:28:57 -0700 Subject: [PATCH] Copy limits before repartitions --- datafusion/common/src/config.rs | 5 + datafusion/core/Cargo.toml | 4 + datafusion/core/benches/topk_repartition.rs | 90 +++++ datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/optimizer.rs | 6 + .../src/topk_repartition.rs | 368 ++++++++++++++++++ .../sqllogictest/test_files/explain.slt | 4 + .../test_files/information_schema.slt | 2 + .../sqllogictest/test_files/window_limits.slt | 7 +- .../test_files/window_topk_pushdown.slt | 141 +++++++ docs/source/user-guide/configs.md | 1 + 11 files changed, 626 insertions(+), 3 deletions(-) create mode 100644 datafusion/core/benches/topk_repartition.rs create mode 100644 datafusion/physical-optimizer/src/topk_repartition.rs create mode 100644 datafusion/sqllogictest/test_files/window_topk_pushdown.slt diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7238d842e39ab..fe536a24ecf21 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -922,6 +922,11 @@ config_namespace! { /// past window functions, if possible pub enable_window_limits: bool, default = true + /// When set to true, the optimizer will push TopK (Sort with fetch) + /// below hash repartition when the partition key is a prefix of the + /// sort key, reducing data volume before the shuffle. + pub enable_topk_repartition: bool, default = true + /// When set to true, the optimizer will attempt to push down TopK dynamic filters /// into the file scan phase. pub enable_topk_dynamic_filter_pushdown: bool, default = true diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 8965948a0f4e2..9beb94497a5fd 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -228,6 +228,10 @@ name = "struct_query_sql" harness = false name = "window_query_sql" +[[bench]] +harness = false +name = "topk_repartition" + [[bench]] harness = false name = "scalar" diff --git a/datafusion/core/benches/topk_repartition.rs b/datafusion/core/benches/topk_repartition.rs new file mode 100644 index 0000000000000..e1f14e4aaa633 --- /dev/null +++ b/datafusion/core/benches/topk_repartition.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark for the TopKRepartition optimizer rule. +//! +//! Measures the benefit of pushing TopK (Sort with fetch) below hash +//! repartition when running partitioned window functions with LIMIT. + +mod data_utils; + +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use data_utils::create_table_provider; +use datafusion::prelude::{SessionConfig, SessionContext}; +use parking_lot::Mutex; +use std::hint::black_box; +use std::sync::Arc; +use tokio::runtime::Runtime; + +#[expect(clippy::needless_pass_by_value)] +fn query(ctx: Arc>, rt: &Runtime, sql: &str) { + let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); + black_box(rt.block_on(df.collect()).unwrap()); +} + +fn create_context( + partitions_len: usize, + target_partitions: usize, + enable_topk_repartition: bool, +) -> Arc> { + let array_len = 1024 * 1024; + let batch_size = 8 * 1024; + let mut config = SessionConfig::new().with_target_partitions(target_partitions); + config.options_mut().optimizer.enable_topk_repartition = enable_topk_repartition; + let ctx = SessionContext::new_with_config(config); + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let provider = + create_table_provider(partitions_len, array_len, batch_size).unwrap(); + ctx.register_table("t", provider).unwrap(); + }); + Arc::new(Mutex::new(ctx)) +} + +fn criterion_benchmark(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let limits = [10, 1_000, 10_000, 100_000]; + let scans = 16; + let target_partitions = 4; + + let group = format!("topk_repartition_{scans}_to_{target_partitions}"); + let mut group = c.benchmark_group(group); + for limit in limits { + let sql = format!( + "SELECT \ + SUM(f64) OVER (PARTITION BY u64_narrow ORDER BY u64_wide ROWS UNBOUNDED PRECEDING) \ + FROM t \ + ORDER BY u64_narrow, u64_wide \ + LIMIT {limit}" + ); + + let ctx_disabled = create_context(scans, target_partitions, false); + group.bench_function(BenchmarkId::new("disabled", limit), |b| { + b.iter(|| query(ctx_disabled.clone(), &rt, &sql)) + }); + + let ctx_enabled = create_context(scans, target_partitions, true); + group.bench_function(BenchmarkId::new("enabled", limit), |b| { + b.iter(|| query(ctx_enabled.clone(), &rt, &sql)) + }); + } + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 3a0d79ae2d234..47580d03250fd 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -42,6 +42,7 @@ pub use datafusion_pruning as pruning; pub mod pushdown_sort; pub mod sanity_checker; pub mod topk_aggregation; +pub mod topk_repartition; pub mod update_aggr_exprs; pub mod utils; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 49225db03ac48..7c1e04e295aa8 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -33,6 +33,7 @@ use crate::output_requirements::OutputRequirements; use crate::projection_pushdown::ProjectionPushdown; use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; +use crate::topk_repartition::TopKRepartition; use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::limit_pushdown_past_window::LimitPushPastWindows; @@ -141,6 +142,11 @@ impl PhysicalOptimizer { // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. Arc::new(LimitPushdown::new()), + // TopKRepartition pushes TopK (Sort with fetch) below Hash + // repartition when the partition key is a prefix of the sort key. + // This reduces data volume before a hash shuffle. It must run + // after LimitPushdown so that the TopK already exists on the SortExec. + Arc::new(TopKRepartition::new()), // The ProjectionPushdown rule tries to push projections towards // the sources in the execution plan. As a result of this process, // a projection can disappear if it reaches the source providers, and diff --git a/datafusion/physical-optimizer/src/topk_repartition.rs b/datafusion/physical-optimizer/src/topk_repartition.rs new file mode 100644 index 0000000000000..668e0d273288b --- /dev/null +++ b/datafusion/physical-optimizer/src/topk_repartition.rs @@ -0,0 +1,368 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Push TopK (Sort with fetch) past Hash Repartition +//! +//! When a `SortExec` with a fetch limit (TopK) sits above a +//! `RepartitionExec(Hash)`, and the hash partition expressions are a prefix +//! of the sort expressions, this rule inserts a copy of the TopK below +//! the repartition to reduce the volume of data flowing through the shuffle. +//! +//! This is correct because the hash partition key being a prefix of the sort +//! key guarantees that all rows with the same partition key end up in the same +//! output partition. Therefore, rows that survive the final TopK after +//! repartitioning will always survive the pre-repartition TopK as well. +//! +//! ## Example +//! +//! Before: +//! ```text +//! SortExec: TopK(fetch=3), expr=[a ASC, b ASC] +//! RepartitionExec: Hash([a], 4) +//! DataSourceExec +//! ``` +//! +//! After: +//! ```text +//! SortExec: TopK(fetch=3), expr=[a ASC, b ASC] +//! RepartitionExec: Hash([a], 4) +//! SortExec: TopK(fetch=3), expr=[a ASC, b ASC] +//! DataSourceExec +//! ``` + +use crate::PhysicalOptimizerRule; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use std::sync::Arc; +// CoalesceBatchesExec is deprecated on main (replaced by arrow-rs BatchCoalescer), +// but older DataFusion versions may still insert it between SortExec and RepartitionExec. +#[expect(deprecated)] +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::{ExecutionPlan, Partitioning}; + +/// A physical optimizer rule that pushes TopK (Sort with fetch) past +/// hash repartition when the partition key is a prefix of the sort key. +/// +/// See module-level documentation for details. +#[derive(Debug, Clone, Default)] +pub struct TopKRepartition; + +impl TopKRepartition { + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for TopKRepartition { + #[expect(deprecated)] // CoalesceBatchesExec: kept for older DataFusion versions + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.optimizer.enable_topk_repartition { + return Ok(plan); + } + plan.transform_down(|node| { + // Match SortExec with fetch (TopK) + let Some(sort_exec) = node.as_any().downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + let Some(fetch) = sort_exec.fetch() else { + return Ok(Transformed::no(node)); + }; + + // The child might be a CoalesceBatchesExec; look through it + let sort_input = sort_exec.input(); + let sort_any = sort_input.as_any(); + let (repart_parent, repart_exec) = if let Some(rp) = + sort_any.downcast_ref::() + { + // found a RepartitionExec, use it + (None, rp) + } else if let Some(cb_exec) = sort_any.downcast_ref::() { + // There's a CoalesceBatchesExec between TopK & RepartitionExec + // in this case we will need to reconstruct both nodes + let cb_input = cb_exec.input(); + let Some(rp) = cb_input.as_any().downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + (Some(Arc::clone(sort_input)), rp) + } else { + return Ok(Transformed::no(node)); + }; + + // Only handle Hash partitioning + let Partitioning::Hash(hash_exprs, num_partitions) = + repart_exec.partitioning() + else { + return Ok(Transformed::no(node)); + }; + + let sort_exprs = sort_exec.expr(); + + // Check that hash expressions are a prefix of the sort expressions. + // Each hash expression must match the corresponding sort expression + // (ignoring sort options like ASC/DESC since hash doesn't care about order). + if hash_exprs.len() > sort_exprs.len() { + return Ok(Transformed::no(node)); + } + for (hash_expr, sort_expr) in hash_exprs.iter().zip(sort_exprs.iter()) { + if !hash_expr.eq(&sort_expr.expr) { + return Ok(Transformed::no(node)); + } + } + + // Don't push if the input to the repartition is already bounded + // (e.g., another TopK), as it would be redundant. + let repart_input = repart_exec.input(); + if repart_input.as_any().downcast_ref::().is_some() { + return Ok(Transformed::no(node)); + } + + // Insert a copy of the TopK below the repartition + let new_sort: Arc = Arc::new( + SortExec::new(sort_exprs.clone(), Arc::clone(repart_input)) + .with_fetch(Some(fetch)) + .with_preserve_partitioning(sort_exec.preserve_partitioning()), + ); + + let new_partitioning = + Partitioning::Hash(hash_exprs.clone(), *num_partitions); + let new_repartition: Arc = + Arc::new(RepartitionExec::try_new(new_sort, new_partitioning)?); + + // Rebuild the tree above the repartition + let new_sort_input = if let Some(parent) = repart_parent { + parent.with_new_children(vec![new_repartition])? + } else { + new_repartition + }; + + let new_top_sort: Arc = Arc::new( + SortExec::new(sort_exprs.clone(), new_sort_input) + .with_fetch(Some(fetch)) + .with_preserve_partitioning(sort_exec.preserve_partitioning()), + ); + + Ok(Transformed::yes(new_top_sort)) + }) + .data() + } + + fn name(&self) -> &str { + "TopKRepartition" + } + + fn schema_check(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::displayable; + use datafusion_physical_plan::test::scan_partitioned; + use insta::assert_snapshot; + use std::sync::Arc; + + fn schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int64, false), + ])) + } + + fn sort_exprs(schema: &Schema) -> LexOrdering { + LexOrdering::new(vec![ + PhysicalSortExpr::new_default(col("a", schema).unwrap()).asc(), + PhysicalSortExpr::new_default(col("b", schema).unwrap()).asc(), + ]) + .unwrap() + } + + /// TopK above Hash(a) repartition should get pushed below it, + /// because `a` is a prefix of the sort key `(a, b)`. + #[test] + fn topk_pushed_below_hash_repartition() { + let s = schema(); + let input = scan_partitioned(1); + let ordering = sort_exprs(&s); + + let repartition = Arc::new( + RepartitionExec::try_new( + input, + Partitioning::Hash(vec![col("a", &s).unwrap()], 4), + ) + .unwrap(), + ); + + let sort = Arc::new( + SortExec::new(ordering, repartition) + .with_fetch(Some(3)) + .with_preserve_partitioning(true), + ); + + let config = ConfigOptions::new(); + let optimized = TopKRepartition::new().optimize(sort, &config).unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + assert_snapshot!(display, @r" + SortExec: TopK(fetch=3), expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[true], sort_prefix=[a@0 ASC] + RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1, maintains_sort_order=true + SortExec: TopK(fetch=3), expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[true] + DataSourceExec: partitions=1, partition_sizes=[1] + "); + } + + /// TopK with no fetch (unbounded sort) should NOT be pushed. + #[test] + fn unbounded_sort_not_pushed() { + let s = schema(); + let input = scan_partitioned(1); + let ordering = sort_exprs(&s); + + let repartition = Arc::new( + RepartitionExec::try_new( + input, + Partitioning::Hash(vec![col("a", &s).unwrap()], 4), + ) + .unwrap(), + ); + + let sort: Arc = Arc::new( + SortExec::new(ordering, repartition).with_preserve_partitioning(true), + ); + + let config = ConfigOptions::new(); + let optimized = TopKRepartition::new().optimize(sort, &config).unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + assert_snapshot!(display, @r" + SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1] + "); + } + + /// Hash key NOT a prefix of sort key should NOT be pushed. + #[test] + fn non_prefix_hash_key_not_pushed() { + let s = schema(); + let input = scan_partitioned(1); + let ordering = sort_exprs(&s); + + // Hash by `b`, but sort by `(a, b)` - b is not a prefix + let repartition = Arc::new( + RepartitionExec::try_new( + input, + Partitioning::Hash(vec![col("b", &s).unwrap()], 4), + ) + .unwrap(), + ); + + let sort: Arc = Arc::new( + SortExec::new(ordering, repartition) + .with_fetch(Some(3)) + .with_preserve_partitioning(true), + ); + + let config = ConfigOptions::new(); + let optimized = TopKRepartition::new().optimize(sort, &config).unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + assert_snapshot!(display, @r" + SortExec: TopK(fetch=3), expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([b@1], 4), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1] + "); + } + + /// TopK above CoalesceBatchesExec above Hash(a) repartition should + /// push through both, inserting a new TopK below the repartition. + #[expect(deprecated)] + #[test] + fn topk_pushed_through_coalesce_batches() { + let s = schema(); + let input = scan_partitioned(1); + let ordering = sort_exprs(&s); + + let repartition = Arc::new( + RepartitionExec::try_new( + input, + Partitioning::Hash(vec![col("a", &s).unwrap()], 4), + ) + .unwrap(), + ); + + let coalesce: Arc = + Arc::new(CoalesceBatchesExec::new(repartition, 8192)); + + let sort = Arc::new( + SortExec::new(ordering, coalesce) + .with_fetch(Some(3)) + .with_preserve_partitioning(true), + ); + + let config = ConfigOptions::new(); + let optimized = TopKRepartition::new().optimize(sort, &config).unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + assert_snapshot!(display, @r" + SortExec: TopK(fetch=3), expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[true], sort_prefix=[a@0 ASC] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1, maintains_sort_order=true + SortExec: TopK(fetch=3), expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[true] + DataSourceExec: partitions=1, partition_sizes=[1] + "); + } + + /// RoundRobin repartition should NOT be pushed. + #[test] + fn round_robin_not_pushed() { + let s = schema(); + let input = scan_partitioned(1); + let ordering = sort_exprs(&s); + + let repartition = Arc::new( + RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(4)).unwrap(), + ); + + let sort: Arc = Arc::new( + SortExec::new(ordering, repartition) + .with_fetch(Some(3)) + .with_preserve_partitioning(true), + ); + + let config = ConfigOptions::new(); + let optimized = TopKRepartition::new().optimize(sort, &config).unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + assert_snapshot!(display, @r" + SortExec: TopK(fetch=3), expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1] + "); + } +} diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index c5907d497500e..f024aee590d46 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -244,6 +244,7 @@ physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[W physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE +physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE @@ -324,6 +325,7 @@ physical_plan after OutputRequirements physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] +physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE @@ -368,6 +370,7 @@ physical_plan after OutputRequirements physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet +physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE @@ -609,6 +612,7 @@ physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[W physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE +physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b61ceecb24fc0..00210211877f4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -303,6 +303,7 @@ datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_sort_pushdown true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.enable_topk_dynamic_filter_pushdown true +datafusion.optimizer.enable_topk_repartition true datafusion.optimizer.enable_window_limits true datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false @@ -441,6 +442,7 @@ datafusion.optimizer.enable_round_robin_repartition true When set to true, the p datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. +datafusion.optimizer.enable_topk_repartition true When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. diff --git a/datafusion/sqllogictest/test_files/window_limits.slt b/datafusion/sqllogictest/test_files/window_limits.slt index 3a10cbb96aac2..5c06e7f04ec1c 100644 --- a/datafusion/sqllogictest/test_files/window_limits.slt +++ b/datafusion/sqllogictest/test_files/window_limits.slt @@ -586,9 +586,10 @@ physical_plan 01)SortPreservingMergeExec: [depname@0 ASC NULLS LAST], fetch=5 02)--ProjectionExec: expr=[depname@0 as depname, empno@1 as empno, salary@2 as salary, sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW@3 as running_sum] 03)----BoundedWindowAggExec: wdw=[sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW: Field { "sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW": nullable UInt64 }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted] -04)------SortExec: TopK(fetch=5), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true] -05)--------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=1 -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno, salary], file_type=csv, has_header=true +04)------SortExec: TopK(fetch=5), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true], sort_prefix=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST] +05)--------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=1, maintains_sort_order=true +06)----------SortExec: TopK(fetch=5), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true] +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno, salary], file_type=csv, has_header=true # unbounded following statement ok diff --git a/datafusion/sqllogictest/test_files/window_topk_pushdown.slt b/datafusion/sqllogictest/test_files/window_topk_pushdown.slt new file mode 100644 index 0000000000000..2c33566736745 --- /dev/null +++ b/datafusion/sqllogictest/test_files/window_topk_pushdown.slt @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for the TopKRepartition optimizer rule. +# +# When a partitioned window function has ORDER BY + LIMIT, the optimizer +# can push a TopK (Sort with fetch) below the hash repartition to reduce +# the volume of data flowing through the shuffle. +# +# The optimization is correct when the hash partition key is a prefix of +# the sort key, because all rows with the same partition key land in the +# same output partition. + +statement ok +CREATE EXTERNAL TABLE employees ( + depname VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + empno INT NOT NULL, + salary BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL, + hire_date DATE NOT NULL, + c15 TIMESTAMP NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100_with_dates.csv' +OPTIONS ('format.has_header' 'true'); + +# Use multiple partitions to trigger hash repartitioning for the window function +statement ok +SET datafusion.execution.target_partitions = 4; + +### +### Results correctness: both enabled and disabled must produce the same output +### + +# Disabled: baseline results without the optimization +statement ok +SET datafusion.optimizer.enable_topk_repartition = false; + +query TI +SELECT depname, SUM(1) OVER (PARTITION BY depname ORDER BY empno ASC ROWS UNBOUNDED PRECEDING) as running_total +FROM employees +ORDER BY depname, empno +LIMIT 3; +---- +a 1 +a 2 +a 3 + +# Enabled: results must match baseline +statement ok +SET datafusion.optimizer.enable_topk_repartition = true; + +query TI +SELECT depname, SUM(1) OVER (PARTITION BY depname ORDER BY empno ASC ROWS UNBOUNDED PRECEDING) as running_total +FROM employees +ORDER BY depname, empno +LIMIT 3; +---- +a 1 +a 2 +a 3 + +### +### Plan shape: disabled should have TopK only above repartition +### + +statement ok +SET datafusion.optimizer.enable_topk_repartition = false; + +query TT +EXPLAIN SELECT depname, SUM(1) OVER (PARTITION BY depname ORDER BY empno ASC ROWS UNBOUNDED PRECEDING) as running_total +FROM employees +ORDER BY depname, empno +LIMIT 3; +---- +logical_plan +01)Projection: employees.depname, running_total +02)--Sort: employees.depname ASC NULLS LAST, employees.empno ASC NULLS LAST, fetch=3 +03)----Projection: employees.depname, sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS running_total, employees.empno +04)------WindowAggr: windowExpr=[[sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +05)--------TableScan: employees projection=[depname, empno] +physical_plan +01)ProjectionExec: expr=[depname@0 as depname, running_total@1 as running_total] +02)--SortPreservingMergeExec: [depname@0 ASC NULLS LAST, empno@2 ASC NULLS LAST], fetch=3 +03)----ProjectionExec: expr=[depname@0 as depname, sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as running_total, empno@1 as empno] +04)------BoundedWindowAggExec: wdw=[sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +05)--------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true] +06)----------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=1 +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno], file_type=csv, has_header=true + +### +### Plan shape: enabled should have TopK on BOTH sides of the repartition +### + +statement ok +SET datafusion.optimizer.enable_topk_repartition = true; + +query TT +EXPLAIN SELECT depname, SUM(1) OVER (PARTITION BY depname ORDER BY empno ASC ROWS UNBOUNDED PRECEDING) as running_total +FROM employees +ORDER BY depname, empno +LIMIT 3; +---- +logical_plan +01)Projection: employees.depname, running_total +02)--Sort: employees.depname ASC NULLS LAST, employees.empno ASC NULLS LAST, fetch=3 +03)----Projection: employees.depname, sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS running_total, employees.empno +04)------WindowAggr: windowExpr=[[sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +05)--------TableScan: employees projection=[depname, empno] +physical_plan +01)ProjectionExec: expr=[depname@0 as depname, running_total@1 as running_total] +02)--SortPreservingMergeExec: [depname@0 ASC NULLS LAST, empno@2 ASC NULLS LAST], fetch=3 +03)----ProjectionExec: expr=[depname@0 as depname, sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as running_total, empno@1 as empno] +04)------BoundedWindowAggExec: wdw=[sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +05)--------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true], sort_prefix=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST] +06)----------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=1, maintains_sort_order=true +07)------------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true] +08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno], file_type=csv, has_header=true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f33e6314d3619..d3358a04b2c2f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -137,6 +137,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible | +| datafusion.optimizer.enable_topk_repartition | true | When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. | | datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. |