Skip to content

In-scan RG-level dynamic pruning via arrow-rs reader split (Inexact TopK + WHERE) #22407

@zhuqi-lucas

Description

@zhuqi-lucas

Is your feature request related to a problem or challenge?

DataFusion's TopK dynamic-filter pruning works at two granularities today:

  • File-level (via EarlyStoppingStream): once TopK's threshold tightens, un-opened files are pruned by their stats.
  • Row-level (inside an open row group): once the sort column is decoded, rows below the threshold are dropped via RowSelection.

There's a gap in the middle: once a file's row groups are picked at scan startup (via the upfront PruningPredicate), the per-RG decision is fixed. As TopK tightens at runtime, subsequent RGs in the already-opened file keep being decoded even when their stats already prove they can't beat the threshold.

This is the dominant cost for two query shapes:

  1. Single large parquet file (tens of GB, hundreds of RGs) — file-level pruning can't help because there's only one file.
  2. Inexact TopK + WHEREExact can't fire (overlapping ranges or filter on a non-sort column), so SortExec stays; the threshold tightens at runtime but only inside an RG, not between RGs.

This issue tracks the gap: between consecutive row groups inside an open file, re-check the live DynamicFilterPhysicalExpr threshold against the next RG's stats and skip if prunable.

Describe the solution you'd like

Use the arrow-rs reader-split APIs that came out of the morsel-split discussion (#21598):

  • try_next_reader — return a fresh reader that continues at the next RG boundary
  • peek_next_row_group — get the next RG's RowGroupMetaData without opening it
  • skip_next_row_group — advance past it without I/O

Wired in the parquet opener:

// pseudo
loop {
    let next = reader.peek_next_row_group()?;
    let Some(next) = next else { break; }; // done

    // Build the same RG-stats pruning predicate the upfront PruningPredicate uses,
    // but evaluate it against the *current* DynamicFilterPhysicalExpr threshold.
    if topk_threshold_prunes(next, &dynamic_filter, &sort_order) {
        reader.skip_next_row_group()?;
        continue;
    }

    let rg_reader = reader.try_next_reader()?;
    // ... decode RG, feed batches to TopK, threshold may tighten ...
}

The check is structurally identical to what RowGroupAccessPlanFilter does at scan startup, just re-evaluated between RGs at runtime with the current threshold instead of the empty initial filter.

Algorithm

For ORDER BY col DESC LIMIT K, with threshold = current K-th best:

  • DESC: prune next RG when next.max(col) < threshold
  • ASC: prune next RG when next.min(col) > threshold

For WHERE filter ORDER BY col ... LIMIT K: same check, composed with the upfront WHERE PruningPredicate. Both must pass; if either prunes, skip.

Correctness

The check uses raw RG min/max stats — same source as the upfront PruningPredicate. It can only prove "no row in this RG could beat threshold", never the converse, so:

  • Skip is always safe (no under-return).
  • May be conservative on RGs that have a few qualifying rows alongside many non-qualifying ones — that's identical to existing stats pruning's worst case.

The dynamic filter itself is already proven correct upstream; this issue just applies it more often.

Scope

In-scope (single PR target):

  • New helper that evaluates DynamicFilterPhysicalExpr threshold against an RowGroupMetaData (the static counterpart already exists in row_group_filter.rs)
  • Wire it into the parquet opener's RG loop using arrow-rs reader-split
  • Gate on: sort_order_for_reorder.is_some() (i.e. sort pushdown fired in Inexact mode) + dynamic filter exists in the predicate tree
  • Works with or without WHERE — the threshold check composes with whatever predicate is already there
  • SLT coverage on sort_pushdown.slt adding overlap + WHERE cases

Explicitly out-of-scope (separate follow-up):

  • RG-granular work queue — changing SharedWorkSource to distribute row-group descriptors instead of PartitionedFile. This is the bigger refactor that addresses cross-file load balancing and cross-partition early stop. Tracked separately if/when it lands. The two are complementary, not blocking: this issue gets the per-file early-stop win without touching the morsel scheduler.

Why now

  • The pieces are aligned: sort pushdown's Inexact runtime reorder is merged (#21956); stats-driven TopK threshold init is in flight (#22385); DynamicFilterPhysicalExpr already exposes the threshold and updates at runtime.
  • arrow-rs reader-split APIs are the missing primitive. Coordination there (#21598) is converging on the iterator/stream shape.
  • The dominant un-optimised cost for single-file DESC LIMIT N with overlap and single-file WHERE filter ORDER BY ... LIMIT N queries lives in this gap. Closing it makes the existing TopK threshold actually pay off at RG granularity inside a file.

Related work

  • #21351 (merged) — morsel-driven FileStream (file-granular work stealing). This issue is the per-file companion: orthogonal axis, same architectural family.
  • #21317 (closed) — RG reorder by stats inside a file. That issue's text already pointed at this work: "morselized scans where TopK could terminate after a single row group".
  • #21598 — Morsel iterator/stream design + the arrow-rs try_next_reader discussion this issue depends on.
  • #21399 (closed COMPLETED) — earlier epic for dynamic RG pruning. Closed prematurely; this issue is the concrete follow-on.
  • #21956 (merged) — runtime reorder + reverse machinery this slots into.
  • #22385 — TopK stats init + cumulative prune (sort-pushdown / no-WHERE subset). This issue complements that: stats init handles the no-WHERE case at scan-startup; this issue handles the runtime case (including WHERE).
  • Umbrella: #18489 (ClickBench leaderboard).

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions