Skip to content

Parallel bounded RANGE-frame window functions without PARTITION BY (draft)#23026

Draft
avantgardnerio wants to merge 12 commits into
apache:mainfrom
coralogix:brent/parallel-window
Draft

Parallel bounded RANGE-frame window functions without PARTITION BY (draft)#23026
avantgardnerio wants to merge 12 commits into
apache:mainfrom
coralogix:brent/parallel-window

Conversation

@avantgardnerio

@avantgardnerio avantgardnerio commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

None, yet.

Rationale for this change

This is a draft PR as a PoC up for architecture feedback, not approval.

In Coralogix, roughly 80% of the observed slow/failed/OOM queries are window queries with an order, bounds, and no patitionby. These move all the data to 1 core of 1 node and are never likely to be successful this way. This PR demonstrates what I believe is a relatively simple & straight forward solution to parallelizing these queries.

What changes are included in this PR?

  1. Exploiting an observed fact within DataFusion. Although it is generally a streaming engine, pipeline-breaking methods like sorts load every row into memory/disk before they emit a single result. This is analogous to stages in Ballista. So this PR exploits that by adding a run time PartitionExtremes method to physical plan nodes to expose the min/max values of their partition, if known. SortExec can easily know this once it has seen all the data and before it emits it's first batch.

  2. RangeRepartitionExec. I know this is duplicate work ATM, and before a PR like this one is ever merged, I would unify this operator with PartitionExec::Range to align directionally with upstream. But for reasons of expedience it is a new operator in this PR. Rather than trying to add dynamic numbers of output partitions, it uses PartitionExtremes at run time to calculate the ranges of values that should go into each partition, including the new concept of Halo Rows (also under investigation in this PR). The output partitions from RRE can have overlaps, and if a row belongs in multiple output partitions, it gets duplicated - this gives BoundedWindowAgg (BWAG) what it needs to correctly calculate totals, and the halo rows will be dropped later

  3. HaloDropExec. A new operator that checks rows against what should be in PartitionExtremes, dropping any rows that are outside ownership of the partition. This is how we remove halo rows later

  4. ParallelWindow optimizer rule. This detects candidate BWAGs and inserts the appropriate RRE & HaloDrop

Target physical plan shape (from the included SLT):

SortPreservingMergeExec  (N → 1)
  HaloDropExec  (N partitions; per-partition primary filter)
    BoundedWindowAggExec  (N partitions, parallel_aware)
      RangeRepartitionExec  (N, halo duplicates intended)
        SortExec  (preserve_partitioning, N)
          DataSource
flowchart TB
    subgraph Before["Before ParallelWindow"]
        direction TB
        DS1[DataSource<br/>4 partitions] --> S1[SortExec<br/>preserve_partitioning]
        S1 --> SPM1[SortPreservingMergeExec<br/><b>N → 1</b>]
        SPM1 --> BWAG1["BoundedWindowAggExec<br/><b>SinglePartition</b><br/>(serial)"]
        BWAG1 --> Out1[Projection + SortLimit]
    end
    subgraph After["After ParallelWindow"]
        direction TB
        DS2[DataSource<br/>4 partitions] --> S2[SortExec<br/>preserve_partitioning]
        S2 --> RRE["RangeRepartitionExec<br/>K-way coordinator<br/>+ halo duplication"]
        RRE --> BWAG2["BoundedWindowAggExec<br/><b>parallel_aware</b><br/>N partitions"]
        BWAG2 --> HDE["HaloDropExec<br/>per-partition filter<br/>reads runtime_partition_extremes"]
        HDE --> SPM2[SortPreservingMergeExec<br/>N → 1, fetch=5]
        SPM2 --> Out2[Projection]
    end
    style BWAG1 fill:#f9b,stroke:#a00
    style BWAG2 fill:#bf9,stroke:#0a0
    style RRE fill:#bf9,stroke:#0a0
    style HDE fill:#bf9,stroke:#0a0
Loading
Input (post-sort, single sequence 0..99):
  ┌─────────────────────────────────────────────────────────────────────────┐
  │ 0 1 2 ... 23 24 │ 25 26 ... 48 49 │ 50 51 ... 73 74 │ 75 76 ... 98 99   │
  └─────────────────┴─────────────────┴─────────────────┴───────────────────┘
   primary [0, 25)   primary [25, 50)  primary [50, 75)  primary [75, 100)

RangeRepartitionExec — boundaries computed from runtime extremes,
                       halo_preceding = 5 (from RANGE 5 PRECEDING):

  bucket 0:                  [   0 .. 24 ]              25 rows
  bucket 1:        [20..24 │ 25 .. 49 ]                 30 rows  ← 5 halo + 25 primary
  bucket 2:        [45..49 │ 50 .. 74 ]                 30 rows  ← 5 halo + 25 primary
  bucket 3:        [70..74 │ 75 .. 99 ]                 30 rows  ← 5 halo + 25 primary
                    ─────                              ─────
                    halo                          total routed = 115

BoundedWindowAggExec (per-partition, parallel_aware):
  bucket 1 at row 25 sees rows 20..25 in its frame → rolling_sum correct at the seam.
  Same for buckets 2, 3.

HaloDropExec (per-partition, reads bucket's intended primary range):
  bucket 0 keeps [0, 25)     → 25 rows
  bucket 1 drops [20, 25), keeps [25, 50) → 25 rows
  bucket 2 drops [45, 50), keeps [50, 75) → 25 rows
  bucket 3 drops [70, 75), keeps [75, 100) → 25 rows
                                              ─────
                                       total emitted = 100

The "useful lie": each bucket's runtime_partition_extremes returns the
INTENDED range [primary_lo, primary_hi], not the ACTUAL data range
[primary_lo - halo, primary_hi]. HaloDropExec consumes that lie to strip
exactly the halo rows.

Are these changes tested?

sqllogictest/test_files/parallel_window.slt is the fixture:

  • 4 parquet files with overlapping seq ranges (so routing actually
    has to move rows around),
  • EXPLAIN assertion on the new plan shape,
  • value assertion on a 5-row truncation,
  • row-count assertion that catches halo leakage
    (count(rolling_sum) = 100, not 115).

I have not yet added a feature flag + equality SLT comparing parallel
vs. serial outputs row-for-row. That's the next baby step and is what
will catch slide-accumulator boundary bugs that the current count
assertion can't see. Posting now to drive the design conversation
before investing in that loop.

Are there any user-facing changes?

  • New nodes in plan trees: RangeRepartitionExec, HaloDropExec.
  • New trait method ExecutionPlan::runtime_partition_extremes with a
    default Ok(None) — existing implementations are unaffected.
  • New PartitionExtremes re-export from physical-plan.
  • No SQL surface changes.

Known scope cuts (PoC)

  • Int64-only leading sort key with Int64-bounded RANGE distances.
    The runtime_partition_extremes type is generic
    (Vec<ScalarValue>); the coordinator's bucketing arithmetic is
    what's Int64-only. DataType-dispatch is the permanent architecture
    here, not sample-based bucketing.
  • Single sort key (LexOrdering len 1). Multi-key extremes are
    in the type from day one; coordinator + halo math are not.
  • No feature flag yet. The rule fires unconditionally on eligible
    windows. A session config optimizer.enable_parallel_window (default
    off) is the next commit.
  • No equality SLT comparing parallel-on vs. parallel-off outputs.
    Next commit after the flag.
  • No Ballista distribution yet. The coordinator is the AQE
    reducer; lifting it scheduler-side is a follow-up (~300-500 LoC).
  • No benchmark numbers yet. Coming after the equality SLT lands.

Coexistence with #22395

Partitioning::Range + RangePartitioning { ordering, split_points }
have already merged from @gabotechs. The execution slot in
RepartitionExec for Partitioning::Range is still
not_impl_err!. This PR keeps RangeRepartitionExec as a separate
operator for now because it does something RepartitionExec is
unlikely to absorb (halo duplication). Once the design is settled here
I expect to thread the existing RangePartitioning shape through
where the boundaries live, rather than carry a parallel
Vec<ScalarValue> representation forever.

Draft status

This is up for architecture review, not approval. CI will be red —
clippy / fmt / unrelated tests are not yet tended. I will not push
forward to "green" until reviewers agree the shape is roughly right.

avantgardnerio and others added 12 commits June 17, 2026 18:47
…E-frame windows

Adds a new physical optimizer rule (run just before EnsureRequirements) that
finds BoundedWindowAggExec nodes with a single ORDER BY column, no PARTITION
BY, and a RANGE frame, then logs per-partition min/max on the order column
via partition_statistics(). No transformation yet — this confirms that the
per-input-partition Exact stats are available at the right spot in the
pipeline to drive a future range-repartitioning step.

Also adds a sqllogictest fixture (parallel_window.slt) with four scrambled
parquet files, overlapping seq ranges, so the routing problem is non-trivial
and stats remain Exact per partition.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reduces per-input-partition Exact stats into a global (min, max) on the
ORDER BY column, then splits that range into target_partitions equal-width
buckets and logs the N-1 interior cut points. Int64-only for now to keep
the boundary-math API tractable; types we don't handle log a skip message
and fall through to today's plan.

Still no plan transformation — boundaries are computed and printed only.
With the test fixture (min=0, max=99, target_partitions=4) we now log:
  interior boundaries: [Int64(24), Int64(49), Int64(74)]

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reads (halo_preceding, halo_following) from the window frame's start/end
bounds (Int64 only, finite values only). For each output partition, logs
its primary range [b_i, b_{i+1}) and the halo-expanded range
[b_i - halo_p, b_{i+1} + halo_f) — the latter is what the routing layer
will need to actually deliver per partition so the window frames at the
seams compute correctly.

Adds a TODO referencing a future HaloDropExec that sits above the window
per partition and drops rows outside the primary range, so each input row
surfaces in exactly one output partition.

With the test fixture (min=0, max=99, target_partitions=4,
RANGE BETWEEN 5 PRECEDING AND CURRENT ROW) we now log:
  bucket 0: primary [0, 24)  expanded [-5, 24)
  bucket 1: primary [24, 49)  expanded [19, 49)
  bucket 2: primary [49, 74)  expanded [44, 74)
  bucket 3: primary [74, 100)  expanded [69, 100)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ExecutionPlan gains a default-impl runtime_statistics(partition) returning
Pin<Box<dyn Future<Output = Result<Arc<Statistics>>> + Send>>. Default
resolves immediately to partition_statistics(Some(partition)); pipeline-
breaking operators (e.g. SortExec) will later override to complete the
future only after relevant work has run.

For the in-memory ExternalSorter path in SortExec, wrap the sorted output
stream with an observer that captures first/last value of the leading sort
column and logs the per-partition min/max once the stream ends. This is
side-effect logging only — the trait override is not wired yet — but
confirms the values we need for the upcoming RangeRepartitionExec are
computable at runtime and match what plan-time stats reported.

Updates parallel_window.slt to actually execute the query (in addition to
EXPLAIN) so the sort path runs and the new log fires.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…t path

Reshape the runtime hook to match the data: a `LexOrdering` (one or more
sort exprs over arbitrary expressions, ASC/DESC, nulls first/last) doesn't
align with `Statistics`'s per-column min/max. Replace the previous
`runtime_statistics` trait method with `runtime_sort_extremes`, returning
`Option<SortExtremes>` with `min`/`max: Vec<ScalarValue>` whose length
matches the operator's output ordering.

SortExec carries one `Arc<Mutex<Option<SortExtremes>>>` per output
partition. The slot is populated by `ExternalSorter::sort_batch_stream`
right after `sort_batch_chunked` produces sorted chunks — leading and
trailing rows are the lex-smallest and lex-largest endpoints for that
chunk, and `merge_chunk_into_slot` folds across chunks (path 3 spill case)
using a small `SortOptions`-aware lex_compare. Zero double-sort, zero
row iteration on our side.

`runtime_sort_extremes` reads the slot synchronously: by the time
downstream sees the first output batch, the slot is populated for that
partition. Default impl on the trait returns `Ok(None)` so other operators
opt in only when they have something to say.

RangeRepartitionExec is rewritten as a pass-through that wraps each
forwarded stream with a tiny one-shot observer: on the first batch
yielded, call `child.runtime_sort_extremes(partition)` and log. Confirms
end-to-end wiring; future commits replace the pass-through with real
range routing.

Adds a TODO in `evaluate_row` to switch from "evaluate over the whole
batch, take one row" to a single-row slice/RowConverter once we care about
the constant-factor cost.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the per-stream LogOnFirstBatch wrapper with a single-instance
coordinator task that:

  1. opens child.execute(k) for every input partition k,
  2. pulls the first batch from each — enough to drive a pipeline-
     breaking child sort to populate its SortExtremes slot,
  3. reads child.runtime_sort_extremes(k) for each input,
  4. lex-reduces the per-input extremes into one global SortExtremes
     using the input's declared output_ordering (so descending /
     nulls-first are honored), and logs it,
  5. hands each input's (first_batch, remaining_stream) pair to the
     corresponding output partition via a tokio::oneshot channel.

Output partition i returns a stream that awaits its handoff and emits
the buffered first batch followed by the remainder — so semantically
this is still a pass-through, but it now demonstrates the K-way fan-in
machinery the real routing impl needs.

Makes `lex_compare` pub in sort.rs so the reducer can reuse the small
SortOptions-aware comparator instead of duplicating it.

With the test fixture the log now reads:
  RangeRepartitionExec: coordinator gathered 4 input partitions;
    global extremes = Some(SortExtremes {
      min: [Int64(0)], max: [Int64(99)], row_count: 100
    })

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Boundary math is now consumed at runtime against the global SortExtremes
the coordinator gathers — no more plan-time partition_statistics math in
the optimizer rule. RangeRepartitionExec::new takes the halo distances
from the optimizer rule (extracted from the window frame at plan time)
and the coordinator combines them with the runtime global to compute
interior cut points and per-bucket primary / expanded ranges.

ParallelWindow shrinks to shape detection + halo extraction + plan
rewrite. The previous probe_candidate plan-time logging is gone — the
identical log lines now come out of RangeRepartitionExec at execution
time, where they're computed from actual data rather than from possibly-
inexact stats.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Coordinator's hand-off changes from "give output i my input partition i's
remaining stream" to "give output i an mpsc receiver, route from every
input into the per-bucket channel I own". For each input batch the
router computes bucket b's expanded range
`[boundaries[b-1] - halo_preceding, boundaries[b] + halo_following)` and
sends the matching slice of the batch (via arrow::compute::take_arrays)
to bucket b's channel. Bucket-driven loop — N takes per input batch.

Routing is Int64-only by design (matches the optimizer rule's gate);
non-Int64 leading keys propagate a clear error to every output stream
instead of silently producing wrong data.

SLT picks up a `count(rolling_sum)` assertion that exposes halo
duplication: plain `count(*)` would be statistics-pruned from parquet's
row count and never instantiate the window operator, so it wouldn't
notice. With routing in place but no halo drop yet, the merged output
has 115 rows = 100 + 15 halo duplicates (5 per boundary × 3 interior
boundaries). HaloDropExec is the next commit and will bring this back
to 100.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Move ParallelWindow rule ahead of EnsureRequirements so it owns the
distribution decision instead of surgically undoing what
EnsureRequirements inserts.

- BoundedWindowAggExec gains a `parallel_aware: bool` and a
  with_parallel_aware() builder. When set, required_input_distribution()
  returns UnspecifiedDistribution instead of SinglePartition, so
  EnsureRequirements stops wrapping us in an SPM and collapsing back to
  one partition.
- RangeRepartitionExec now takes a LexOrdering, declares
  required_input_ordering (Hard) on it, and maintains_input_order=true.
  This anchors the pipeline-breaking SortExec beneath us instead of
  letting EnsureRequirements push it above.
- ParallelWindow builds BWAG(parallel_aware) -> RangeRepartitionExec
  directly; EnsureRequirements plants the per-partition SortExec on its
  next pass. Removed the descend-and-wrap helper, no longer needed.
- SLT EXPLAIN updated to match the new (correct) plan shape.

Outer SPM in the EXPLAIN is just the user-visible `ORDER BY ... LIMIT`,
not BWAG distribution. count(rolling_sum)=115 still holds; the halo
duplication is now real per-partition output rather than corrupted
single-partition slide. HaloDropExec follow-up brings it to 100.
`SortExtremes` reads as "data extremes observed by a sort", which doesn't
cover what `RangeRepartitionExec` does when it implements the trait
method: it returns each output partition's *intended primary range*
(narrower than the data the partition will actually carry, by the halo
distance), so `HaloDropExec` upstream can read each bucket's bound
without threading a side-channel.

That's a different interpretation of "extremes" — *intended* rather than
*observed* — and the old name reads as a lie at that call site. Rename:

- struct: SortExtremes → PartitionExtremes
- trait method: runtime_sort_extremes → runtime_partition_extremes
- internal slot aliases in sorts/sort.rs follow

Type doc at `execution_plan.rs:97-127` documents both interpretations so
future consumers don't assume observed-only.
@github-actions github-actions Bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Jun 18, 2026
@avantgardnerio avantgardnerio requested a review from alamb June 18, 2026 18:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant