Is your feature request related to a problem or challenge?
DataFusion's TopK dynamic-filter pruning works at two granularities today:
- File-level (via
EarlyStoppingStream): once TopK's threshold tightens, un-opened files are pruned by their stats.
- Row-level (inside an open row group): once the sort column is decoded, rows below the threshold are dropped via
RowSelection.
There's a gap in the middle: once a file's row groups are picked at scan startup (via the upfront PruningPredicate), the per-RG decision is fixed. As TopK tightens at runtime, subsequent RGs in the already-opened file keep being decoded even when their stats already prove they can't beat the threshold.
This is the dominant cost for two query shapes:
- Single large parquet file (tens of GB, hundreds of RGs) — file-level pruning can't help because there's only one file.
Inexact TopK + WHERE — Exact can't fire (overlapping ranges or filter on a non-sort column), so SortExec stays; the threshold tightens at runtime but only inside an RG, not between RGs.
This issue tracks the gap: between consecutive row groups inside an open file, re-check the live DynamicFilterPhysicalExpr threshold against the next RG's stats and skip if prunable.
Describe the solution you'd like
Use the arrow-rs reader-split APIs that came out of the morsel-split discussion (#21598):
try_next_reader — return a fresh reader that continues at the next RG boundary
peek_next_row_group — get the next RG's RowGroupMetaData without opening it
skip_next_row_group — advance past it without I/O
Wired in the parquet opener:
// pseudo
loop {
let next = reader.peek_next_row_group()?;
let Some(next) = next else { break; }; // done
// Build the same RG-stats pruning predicate the upfront PruningPredicate uses,
// but evaluate it against the *current* DynamicFilterPhysicalExpr threshold.
if topk_threshold_prunes(next, &dynamic_filter, &sort_order) {
reader.skip_next_row_group()?;
continue;
}
let rg_reader = reader.try_next_reader()?;
// ... decode RG, feed batches to TopK, threshold may tighten ...
}
The check is structurally identical to what RowGroupAccessPlanFilter does at scan startup, just re-evaluated between RGs at runtime with the current threshold instead of the empty initial filter.
Algorithm
For ORDER BY col DESC LIMIT K, with threshold = current K-th best:
- DESC: prune next RG when
next.max(col) < threshold
- ASC: prune next RG when
next.min(col) > threshold
For WHERE filter ORDER BY col ... LIMIT K: same check, composed with the upfront WHERE PruningPredicate. Both must pass; if either prunes, skip.
Correctness
The check uses raw RG min/max stats — same source as the upfront PruningPredicate. It can only prove "no row in this RG could beat threshold", never the converse, so:
- Skip is always safe (no under-return).
- May be conservative on RGs that have a few qualifying rows alongside many non-qualifying ones — that's identical to existing stats pruning's worst case.
The dynamic filter itself is already proven correct upstream; this issue just applies it more often.
Scope
In-scope (single PR target):
- New helper that evaluates
DynamicFilterPhysicalExpr threshold against an RowGroupMetaData (the static counterpart already exists in row_group_filter.rs)
- Wire it into the parquet opener's RG loop using arrow-rs reader-split
- Gate on:
sort_order_for_reorder.is_some() (i.e. sort pushdown fired in Inexact mode) + dynamic filter exists in the predicate tree
- Works with or without
WHERE — the threshold check composes with whatever predicate is already there
- SLT coverage on
sort_pushdown.slt adding overlap + WHERE cases
Explicitly out-of-scope (separate follow-up):
- RG-granular work queue — changing
SharedWorkSource to distribute row-group descriptors instead of PartitionedFile. This is the bigger refactor that addresses cross-file load balancing and cross-partition early stop. Tracked separately if/when it lands. The two are complementary, not blocking: this issue gets the per-file early-stop win without touching the morsel scheduler.
Why now
- The pieces are aligned: sort pushdown's
Inexact runtime reorder is merged (#21956); stats-driven TopK threshold init is in flight (#22385); DynamicFilterPhysicalExpr already exposes the threshold and updates at runtime.
- arrow-rs reader-split APIs are the missing primitive. Coordination there (#21598) is converging on the iterator/stream shape.
- The dominant un-optimised cost for single-file
DESC LIMIT N with overlap and single-file WHERE filter ORDER BY ... LIMIT N queries lives in this gap. Closing it makes the existing TopK threshold actually pay off at RG granularity inside a file.
Related work
- #21351 (merged) — morsel-driven
FileStream (file-granular work stealing). This issue is the per-file companion: orthogonal axis, same architectural family.
- #21317 (closed) — RG reorder by stats inside a file. That issue's text already pointed at this work: "morselized scans where TopK could terminate after a single row group".
- #21598 — Morsel iterator/stream design + the arrow-rs
try_next_reader discussion this issue depends on.
- #21399 (closed COMPLETED) — earlier epic for dynamic RG pruning. Closed prematurely; this issue is the concrete follow-on.
- #21956 (merged) — runtime reorder + reverse machinery this slots into.
- #22385 — TopK stats init + cumulative prune (sort-pushdown / no-WHERE subset). This issue complements that: stats init handles the no-WHERE case at scan-startup; this issue handles the runtime case (including WHERE).
- Umbrella: #18489 (ClickBench leaderboard).
Is your feature request related to a problem or challenge?
DataFusion's TopK dynamic-filter pruning works at two granularities today:
EarlyStoppingStream): onceTopK's threshold tightens, un-opened files are pruned by their stats.RowSelection.There's a gap in the middle: once a file's row groups are picked at scan startup (via the upfront
PruningPredicate), the per-RG decision is fixed. AsTopKtightens at runtime, subsequent RGs in the already-opened file keep being decoded even when their stats already prove they can't beat the threshold.This is the dominant cost for two query shapes:
InexactTopK+WHERE—Exactcan't fire (overlapping ranges or filter on a non-sort column), soSortExecstays; the threshold tightens at runtime but only inside an RG, not between RGs.This issue tracks the gap: between consecutive row groups inside an open file, re-check the live
DynamicFilterPhysicalExprthreshold against the next RG's stats and skip if prunable.Describe the solution you'd like
Use the arrow-rs reader-split APIs that came out of the morsel-split discussion (#21598):
try_next_reader— return a fresh reader that continues at the next RG boundarypeek_next_row_group— get the next RG'sRowGroupMetaDatawithout opening itskip_next_row_group— advance past it without I/OWired in the parquet opener:
The check is structurally identical to what
RowGroupAccessPlanFilterdoes at scan startup, just re-evaluated between RGs at runtime with the current threshold instead of the empty initial filter.Algorithm
For
ORDER BY col DESC LIMIT K, withthreshold = current K-th best:next.max(col) < thresholdnext.min(col) > thresholdFor
WHERE filter ORDER BY col ... LIMIT K: same check, composed with the upfront WHEREPruningPredicate. Both must pass; if either prunes, skip.Correctness
The check uses raw RG
min/maxstats — same source as the upfrontPruningPredicate. It can only prove "no row in this RG could beat threshold", never the converse, so:The dynamic filter itself is already proven correct upstream; this issue just applies it more often.
Scope
In-scope (single PR target):
DynamicFilterPhysicalExprthreshold against anRowGroupMetaData(the static counterpart already exists inrow_group_filter.rs)sort_order_for_reorder.is_some()(i.e. sort pushdown fired inInexactmode) + dynamic filter exists in the predicate treeWHERE— the threshold check composes with whatever predicate is already theresort_pushdown.sltadding overlap +WHEREcasesExplicitly out-of-scope (separate follow-up):
SharedWorkSourceto distribute row-group descriptors instead ofPartitionedFile. This is the bigger refactor that addresses cross-file load balancing and cross-partition early stop. Tracked separately if/when it lands. The two are complementary, not blocking: this issue gets the per-file early-stop win without touching the morsel scheduler.Why now
Inexactruntime reorder is merged (#21956); stats-drivenTopKthreshold init is in flight (#22385);DynamicFilterPhysicalExpralready exposes the threshold and updates at runtime.DESC LIMIT Nwith overlap and single-fileWHERE filter ORDER BY ... LIMIT Nqueries lives in this gap. Closing it makes the existing TopK threshold actually pay off at RG granularity inside a file.Related work
FileStream(file-granular work stealing). This issue is the per-file companion: orthogonal axis, same architectural family.try_next_readerdiscussion this issue depends on.