From e72464385654a38d2653117d739693d80b1f2798 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Fri, 6 Mar 2026 10:08:34 -0700 Subject: [PATCH] fmt --- Cargo.lock | 1 + datafusion/physical-optimizer/Cargo.toml | 1 + .../src/limit_pushdown_past_window.rs | 119 +++++++++++++++++- 3 files changed, 120 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 38fa83dd12119..9c8f2c593555a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2449,6 +2449,7 @@ dependencies = [ "datafusion-expr", "datafusion-expr-common", "datafusion-functions", + "datafusion-functions-window", "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 395da10d629ba..38c8a7c37211f 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -56,5 +56,6 @@ recursive = { workspace = true, optional = true } [dev-dependencies] datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } +datafusion-functions-window = { workspace = true } insta = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs index c23fa4faef95f..729b600da7297 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs @@ -25,7 +25,7 @@ use datafusion_physical_expr::window::{ StandardWindowFunctionExpr, WindowExpr, }; use datafusion_physical_plan::execution_plan::CardinalityEffect; -use datafusion_physical_plan::limit::GlobalLimitExec; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -206,6 +206,12 @@ fn get_limit(node: &Arc, ctx: &mut TraverseState) -> bool { ctx.reset_limit(limit.fetch().map(|fetch| fetch + limit.skip())); return true; } + // In distributed execution, GlobalLimitExec becomes LocalLimitExec + // per partition. Handle it the same way (LocalLimitExec has no skip). + if let Some(limit) = node.as_any().downcast_ref::() { + ctx.reset_limit(Some(limit.fetch())); + return true; + } if let Some(limit) = node.as_any().downcast_ref::() { ctx.reset_limit(limit.fetch()); return true; @@ -254,3 +260,114 @@ fn bound_to_usize(bound: &WindowFrameBound) -> Option { _ => None, } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; + use datafusion_functions_window::row_number::row_number_udwf; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr::window::StandardWindowExpr; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::InputOrderMode; + use datafusion_physical_plan::displayable; + use datafusion_physical_plan::limit::LocalLimitExec; + use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; + use datafusion_physical_plan::windows::{ + BoundedWindowAggExec, create_udwf_window_expr, + }; + use insta::assert_snapshot; + use std::sync::Arc; + + fn plan_str(plan: &dyn ExecutionPlan) -> String { + displayable(plan).indent(true).to_string() + } + + fn schema() -> Arc { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + /// Build: LocalLimitExec or GlobalLimitExec → BoundedWindowAggExec(row_number) → SortExec + fn build_window_plan( + use_local_limit: bool, + ) -> datafusion_common::Result> { + let s = schema(); + let input: Arc = + Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + let ordering = + LexOrdering::new(vec![PhysicalSortExpr::new_default(col("a", &s)?).asc()]) + .unwrap(); + + let sort: Arc = Arc::new( + SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), + ); + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr( + &row_number_udwf(), + &[], + &s, + "row_number".to_string(), + false, + )?, + &[], + ordering.as_ref(), + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + let limit: Arc = if use_local_limit { + Arc::new(LocalLimitExec::new(window, 100)) + } else { + Arc::new(GlobalLimitExec::new(window, 0, Some(100))) + }; + + Ok(limit) + } + + fn optimize(plan: Arc) -> Arc { + let mut config = ConfigOptions::new(); + config.optimizer.enable_window_limits = true; + LimitPushPastWindows::new().optimize(plan, &config).unwrap() + } + + /// GlobalLimitExec above a windowed sort should push fetch into the SortExec. + #[test] + fn global_limit_pushes_past_window() { + let plan = build_window_plan(false).unwrap(); + let optimized = optimize(plan); + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + GlobalLimitExec: skip=0, fetch=100 + BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + SortExec: TopK(fetch=100), expr=[a@0 ASC], preserve_partitioning=[true] + PlaceholderRowExec + "#); + } + + /// LocalLimitExec above a windowed sort should also push fetch into the SortExec. + /// This is the case in distributed execution where GlobalLimitExec becomes LocalLimitExec. + #[test] + fn local_limit_pushes_past_window() { + let plan = build_window_plan(true).unwrap(); + let optimized = optimize(plan); + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + LocalLimitExec: fetch=100 + BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + SortExec: TopK(fetch=100), expr=[a@0 ASC], preserve_partitioning=[true] + PlaceholderRowExec + "#); + } +}