Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-pruning = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod limit_pushdown_past_window;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
pub mod parallel_window;
pub mod projection_pushdown;
pub use datafusion_pruning as pruning;
pub mod hash_join_buffering;
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::join_selection::JoinSelection;
use crate::limit_pushdown::LimitPushdown;
use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
use crate::output_requirements::OutputRequirements;
use crate::parallel_window::ParallelWindow;
use crate::projection_pushdown::ProjectionPushdown;
use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
Expand Down Expand Up @@ -187,6 +188,13 @@ impl PhysicalOptimizer {
// [`EnsureRequirements`](crate::ensure_requirements) for the per-phase
// breakdown, and <https://github.com/apache/datafusion/issues/21973>
// for the original failure mode.
// Re-shape no-PARTITION-BY RANGE-frame windows into a parallel
// form: SortExec(preserve_partitioning) + RangeRepartitionExec
// + parallel-aware BWAG. Runs *before* EnsureRequirements so
// we own the distribution decision — otherwise EnsureRequirements
// would satisfy BWAG's SinglePartition requirement by inserting
// an SPM that collapses the parallelism we're trying to create.
Arc::new(ParallelWindow::new()),
Arc::new(EnsureRequirements::new()),
// The CombinePartialFinalAggregate rule should be applied after distribution enforcement
Arc::new(CombinePartialFinalAggregate::new()),
Expand Down
168 changes: 168 additions & 0 deletions datafusion/physical-optimizer/src/parallel_window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parallelize bounded RANGE-frame window functions that have an ORDER BY
//! but no PARTITION BY by re-shaping the plan above the window's input.
//!
//! Runs *before* `EnsureRequirements`. For each eligible
//! `BoundedWindowAggExec`, this rule:
//! - inserts a `SortExec(preserve_partitioning=true)` on the ORDER BY
//! key above the window's existing input;
//! - inserts a `RangeRepartitionExec` carrying the halo distances above
//! that sort;
//! - rebuilds the `BoundedWindowAggExec` on top of the result with
//! `parallel_aware = true`, so its `required_input_distribution()`
//! returns `UnspecifiedDistribution` instead of `SinglePartition` —
//! which is what would otherwise force `EnsureRequirements` to insert
//! a `SortPreservingMergeExec` and collapse our parallelism.
//!
//! By owning the structural decisions before `EnsureRequirements` runs,
//! this rule avoids the post-hoc surgery of stripping an inserted
//! `SortPreservingMergeExec`. All boundary / global-extremes logic lives
//! in `RangeRepartitionExec`'s coordinator and runs against runtime
//! stats rather than plan-time `Statistics`.

use crate::PhysicalOptimizerRule;
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_expr::{WindowFrameBound, WindowFrameUnits};
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::halo_drop::HaloDropExec;
use datafusion_physical_plan::range_repartition::RangeRepartitionExec;
use datafusion_physical_plan::windows::BoundedWindowAggExec;
use log::info;
use std::sync::Arc;

#[derive(Default, Clone, Debug)]
pub struct ParallelWindow;

impl ParallelWindow {
pub fn new() -> Self {
Self
}
}

impl PhysicalOptimizerRule for ParallelWindow {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let out = plan.transform_down(|node| {
let Some(window) = node.downcast_ref::<BoundedWindowAggExec>() else {
return Ok(Transformed::no(node));
};
let Some((halo_preceding, halo_following)) = candidate_halo(window)
else {
return Ok(Transformed::no(node));
};
info!(
"ParallelWindow: candidate BoundedWindowAggExec (RANGE frame, no PARTITION BY); \
halo: {halo_preceding} preceding, {halo_following} following"
);
// `candidate_halo` already verified order_by.len()==1.
let sort_key = window.window_expr()[0].order_by()[0].clone();
let lex = LexOrdering::new(vec![sort_key])
.expect("candidate_halo guarantees one sort key");
let original_input = Arc::clone(&node.children()[0]);
// Don't pre-insert a SortExec; RangeRepartitionExec now declares
// its required input ordering, so EnsureRequirements will plant
// the pipeline-breaking sort beneath us. Doing both would just
// produce a redundant SortExec that the optimizer collapses.
let range = Arc::new(RangeRepartitionExec::new(
original_input,
lex.clone(),
halo_preceding,
halo_following,
));
// `parallel_aware = true` flips BWAG's required_input_distribution
// to UnspecifiedDistribution, so EnsureRequirements won't wrap
// us in an SPM. `can_repartition` is vacuous because
// candidate_halo already required partition_keys empty.
let new_window: Arc<dyn ExecutionPlan> = Arc::new(
BoundedWindowAggExec::try_new(
window.window_expr().to_vec(),
range,
window.input_order_mode.clone(),
true,
)?
.with_parallel_aware(true),
);
// Drop halo rows above the per-partition window. HaloDropExec
// reads its primary range from `input.runtime_partition_extremes`,
// which BWAG passes through and RangeRepartitionExec populates.
let drop_halo: Arc<dyn ExecutionPlan> =
Arc::new(HaloDropExec::try_new(new_window, &lex)?);
// Jump past the result's children: the BWAG we just emitted is
// still a candidate by shape (RANGE frame, no PARTITION BY) and
// `transform_down` would otherwise re-wrap it forever.
Ok(Transformed::new(drop_halo, true, TreeNodeRecursion::Jump))
})?;
Ok(out.data)
}

fn name(&self) -> &str {
"ParallelWindow"
}

fn schema_check(&self) -> bool {
true
}
}

/// Returns `(halo_preceding, halo_following)` if the window matches the
/// v1 shape we know how to parallelize: no PARTITION BY, a single
/// `Column` sort key, RANGE frame, finite Int64 bounds (or CurrentRow).
/// Returns `None` otherwise so the rule leaves the plan alone.
fn candidate_halo(window: &BoundedWindowAggExec) -> Option<(i64, i64)> {
if !window.partition_keys().is_empty() {
return None;
}
let order_by = window.window_expr()[0].order_by();
if order_by.len() != 1 {
return None;
}
if order_by[0].expr.downcast_ref::<Column>().is_none() {
return None;
}
let frame = window.window_expr()[0].get_window_frame();
if frame.units != WindowFrameUnits::Range {
return None;
}
i64_halo(&frame.start_bound, &frame.end_bound)
}

/// Extract `(halo_preceding, halo_following)` in order-key units from a
/// RANGE window frame. Returns `None` for UNBOUNDED bounds or non-`Int64`
/// distances. v1 scope: only `Preceding(Int64)` / `CurrentRow` for the
/// start bound, and `CurrentRow` / `Following(Int64)` for the end bound.
fn i64_halo(start: &WindowFrameBound, end: &WindowFrameBound) -> Option<(i64, i64)> {
let preceding = match start {
WindowFrameBound::Preceding(ScalarValue::Int64(Some(n))) => *n,
WindowFrameBound::CurrentRow => 0,
_ => return None,
};
let following = match end {
WindowFrameBound::Following(ScalarValue::Int64(Some(n))) => *n,
WindowFrameBound::CurrentRow => 0,
_ => return None,
};
Some((preceding, following))
}
57 changes: 56 additions & 1 deletion datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow_schema::Schema;
pub use datafusion_common::hash_utils;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
pub use datafusion_common::utils::project_schema;
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
pub use datafusion_common::{ColumnStatistics, ScalarValue, Statistics, internal_err};
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
pub use datafusion_expr::{Accumulator, ColumnarValue};
pub use datafusion_physical_expr::window::WindowExpr;
Expand Down Expand Up @@ -93,6 +93,38 @@ use futures::stream::{StreamExt, TryStreamExt};
///
/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

/// Endpoints of a single partition's output expressed in the operator's
/// declared output ordering (the `PhysicalSortExpr` list returned by
/// `equivalence_properties().output_ordering()`).
///
/// `min` and `max` are tuples of values — one entry per sort key — taken at
/// the lex-smallest and lex-largest output rows. For the leading sort key
/// these are exactly the natural min/max of that key (after honoring
/// ASC/DESC). For trailing sort keys the entries hold the value of that key
/// at the lex-extreme row, not that key's own natural extreme.
///
/// Default semantics — *observed*: these are the actual min/max of data this
/// partition has produced (or will produce, by the time the upstream is fully
/// consumed). `SortExec` is the canonical observer-style override.
///
/// Exception — *intended*: `RangeRepartitionExec` returns each output
/// partition's *intended primary range* (the range a downstream `HaloDropExec`
/// should keep, with halo rows excluded), which is **narrower** than the data
/// the partition actually carries when halo is non-zero. The "useful lie"
/// is what lets the operator above the window strip halo without threading a
/// boundaries side-channel. Consumers that need observed extremes must not
/// read through a `RangeRepartitionExec` boundary when halo > 0.
#[derive(Debug, Clone)]
pub struct PartitionExtremes {
/// Sort-key values at the lex-smallest row across the partition.
pub min: Vec<ScalarValue>,
/// Sort-key values at the lex-largest row across the partition.
pub max: Vec<ScalarValue>,
/// Total non-empty rows that contributed to `min`/`max`.
pub row_count: usize,
}

pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
/// Short name for the ExecutionPlan, such as 'DataSourceExec'.
///
Expand Down Expand Up @@ -513,6 +545,29 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
Ok(Arc::new(Statistics::new_unknown(&self.schema())))
}

/// Runtime-derived endpoints of `partition`'s output along the operator's
/// declared output ordering (i.e. each `PhysicalSortExpr` in
/// `equivalence_properties().output_ordering()`).
///
/// Returns `Ok(None)` by default. Operators that are pipeline-breaking
/// along their output ordering (e.g. `SortExec`) may override to expose
/// the lex-min / lex-max tuple once the partition's input has been fully
/// consumed. Callers are responsible for ensuring the upstream has made
/// enough progress (typically by polling `execute()`) before relying on
/// the result; until then this returns `Ok(None)`.
///
/// See [`PartitionExtremes`] for the result shape — and read the type doc
/// before assuming "observed" semantics: a few operators (notably
/// `RangeRepartitionExec`) intentionally return the *intended* range, not
/// the data they will actually carry.
fn runtime_partition_extremes(
&self,
partition: usize,
) -> Result<Option<PartitionExtremes>> {
let _ = partition;
Ok(None)
}

/// Returns `true` if a limit can be safely pushed down through this
/// `ExecutionPlan` node.
///
Expand Down
Loading
Loading