Skip to content

feat: TopK stats init + cumulative RG pruning for pure-TopK parquet scans (no-WHERE)#22385

Draft
zhuqi-lucas wants to merge 8 commits into
apache:mainfrom
zhuqi-lucas:feat/topk-stats-init-no-where
Draft

feat: TopK stats init + cumulative RG pruning for pure-TopK parquet scans (no-WHERE)#22385
zhuqi-lucas wants to merge 8 commits into
apache:mainfrom
zhuqi-lucas:feat/topk-stats-init-no-where

Conversation

@zhuqi-lucas
Copy link
Copy Markdown
Contributor

@zhuqi-lucas zhuqi-lucas commented May 20, 2026

Which issue does this PR close?

Carved out of #21580 — after #21956 merged the file/RG reorder + reverse machinery, this PR ships the two remaining pieces of the statistics-driven TopK pipeline, restricted to the simplest scope (no WHERE).

Rationale for this change

For ORDER BY <col> LIMIT N queries on parquet:

  1. TopK's dynamic filter starts as lit(true) — the first row groups read are never pruned by it.
  2. After file/RG reorder normalises data to ASC-by-min (and optional reverse for DESC), the first few row groups already contain the rows that will end up in the heap. The rest can be pruned upfront from parquet min/max statistics if the threshold is known before scanning.

This PR initialises the threshold from per-RG stats before the PruningPredicate is built, then accumulates row counts from the front of the reordered RG list and truncates once $\geq K$. The net effect on a sorted, non-overlapping dataset is that only one row group is decoded for a LIMIT N query — observed 17×–60× speedups in #21580's benchmark.

Design — using existing sort-pushdown plumbing

Both optimisations need to know K (the LIMIT) and the sort direction/column. Rather than touching DynamicFilterPhysicalExpr, this PR follows the pattern established by #21956:

  • Sort direction + column — read from ParquetSource::sort_order_for_reorder (already set by try_pushdown_sort and merged).
  • K — new topk_fetch: Option<usize> field on ParquetSource, plumbed by PushdownSort rule via a new FileSource::with_topk_fetch_hint(fetch) trait method (default None, only overridden on ParquetSource).

DynamicFilterPhysicalExpr is not modified.

What changes are included in this PR?

Six small focused commits.

1. feat: add topk_fetch field on ParquetSource

New optional field next to sort_order_for_reorder / reverse_row_groups. Not surfaced in EXPLAIN (the SortExec.fetch above is the same value).

2. feat: add FileSource::with_topk_fetch_hint trait method

Adds the trait method with a None default; ParquetSource overrides it to clone-and-set topk_fetch. Other file source impls don't need to change.

3. feat: plumb SortExec.fetch via with_topk_fetch_hint in PushdownSort

Mirrors try_pushdown_sort's plumbing chain — with_topk_fetch_hint exposed on ExecutionPlan, DataSource, DataSourceExec, FileScanConfig, FileSource. Both Inexact branches in PushdownSort (the SPM+Sort pattern and the standalone Sort) now call inner.with_topk_fetch_hint(fetch) when the SortExec has a fetch.

4. feat: add PreparedAccessPlan::truncate_row_groups

Small pub(crate) helper that keeps the first count entries of row_group_indexes. Bails unchanged if row_selection is set (page-level state would be hard to remap). Three unit tests.

5. feat: TopK stats init + cumulative RG pruning in opener

Two opener-side optimisations, both dual-gated on predicate_is_pure_dynamic_filter(pred) AND sort_order_for_reorder.is_some() AND topk_fetch.is_some().

Stats init (try_init_topk_threshold, runs in prepare_filters before PruningPredicate build):

  • DESC LIMIT K: T_min = max(min(col)) across RGs with num_rows >= K → filter becomes col >= T_min.
  • ASC LIMIT K: T_max = min(max(col)) across RGs with num_rows >= K → filter becomes col <= T_max.
  • >= / <= (not > / <) — T from stats is a safe bound, not strict; the real TopK will tighten it on its first heap update.
  • Sort direction + column come from sort_order_for_reorder, not from the dynamic filter.
  • Skips silently on: no qualifying RGs, sort expression not a simple column (unwraps single-child wrappers like CastExpr), column outside file schema, stats unavailable, type cast failure.

Cumulative RG prune (inside prepare_access_plan, after reorder_by_statistics and reverse):

  • Accumulate num_rows from the front of row_group_indexes until the running total reaches K, then truncate_row_groups.

Helpers: find_dynamic_filter, find_column_in_expr (unwraps CastExpr etc.), compute_best_threshold, predicate_is_pure_dynamic_filter.

6. test: add Test I SLT

Six sub-tests in sort_pushdown.slt:

  • I.1: DESC LIMIT — EXPLAIN shows DynamicFilter [ empty ] + sort_order_for_reorder=[id@0 DESC] + reverse_row_groups=true
  • I.2: DESC LIMIT result correctness
  • I.3: ASC LIMIT in file order — Exact path, SortExec eliminated, LIMIT becomes a static fetch
  • I.4: DESC LIMIT with WHERE — both optimisations skip, result still correct via the regular dynamic-filter path
  • I.5: Larger LIMIT spanning multiple RGs
  • I.6: LIMIT > total rows — returns all rows

Scope

Intentionally narrow:

Are these changes tested?

  • 125 datafusion-datasource-parquet lib tests pass (including 3 new tests for truncate_row_groups).
  • sort_pushdown SLT passes (existing tests unchanged, new Test I added).
  • cargo clippy --all-targets -- -D warnings clean on touched crates.

Are there any user-facing changes?

No. Transparent optimisation — same query results, faster TopK on sorted parquet when no WHERE clause is present.

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt) datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels May 20, 2026
@zhuqi-lucas zhuqi-lucas force-pushed the feat/topk-stats-init-no-where branch from 5b11478 to 55a4aba Compare May 20, 2026 06:45
@github-actions github-actions Bot added optimizer Optimizer rules and removed physical-expr Changes to the physical-expr crates labels May 20, 2026
Adds a new `topk_fetch: Option<usize>` field on `ParquetSource`
alongside the existing sort-pushdown markers (`sort_order_for_reorder`
and `reverse_row_groups`). Future commits will:

1. Surface a `FileSource::with_topk_fetch_hint` trait method that
   `PushdownSort` calls with the outer `SortExec.fetch()` after
   building an `Inexact` plan.
2. Use the value in the opener to (a) seed the TopK dynamic filter
   from per-RG min/max statistics before `PruningPredicate` build
   and (b) truncate `row_group_indexes` once cumulative `num_rows`
   reaches K.

The field is intentionally not surfaced in EXPLAIN — the same
fetch value is already shown on the `SortExec` above the source.

The temporary `#[expect(dead_code)]` will be removed when the
companion plumbing lands in the next commit.
Adds a new `with_topk_fetch_hint(fetch: usize) -> Option<Arc<dyn FileSource>>`
method to the `FileSource` trait with a `None` default. The new method
gives a sort-pushdown `Inexact` `FileSource` a chance to take the
surrounding `SortExec`'s `fetch` (K) as a hint and produce a refined
source — e.g. one that can seed a TopK dynamic filter from per-RG
statistics or prune row groups by cumulative `num_rows`.

`ParquetSource` overrides the method: it clones itself, stores the
hint in the `topk_fetch` field introduced in the previous commit,
and returns the new source.

The trait default returns `None` so file formats that don't benefit
(CSV, JSON, Avro, Arrow, ...) need no changes.

The `PushdownSort` rule will start calling this in the next commit.
When `PushdownSort` produces an `Inexact` result (the source can only
approximate the requested ordering, so a `SortExec` is kept), and the
outer `SortExec` carried a fetch (i.e. we are in a TopK shape), the
rule now surfaces `K` to the data source.

To carry the hint through the plan tree without a hard downcast on
`DataSourceExec` in `physical-optimizer` — which would require a
new crate dependency on `datafusion-datasource` — the plumbing
mirrors the existing `try_pushdown_sort` chain:

* `ExecutionPlan::with_topk_fetch_hint`  (default `None`)
* `DataSourceExec::with_topk_fetch_hint`  (delegates to data source)
* `DataSource::with_topk_fetch_hint`      (default `None`)
* `FileScanConfig::with_topk_fetch_hint`  (swaps in the new file
                                           source returned by
                                           `FileSource::with_topk_fetch_hint`)

Both `Inexact` branches in `PushdownSort` (the SPM → SortExec
pattern and the standalone SortExec pattern) now call
`inner.with_topk_fetch_hint(fetch)` when the SortExec has a fetch,
falling back to `inner` unchanged otherwise. Sources that don't
override the hint (CSV, JSON, Avro, Arrow, MemorySource, ...) keep
behaving exactly as before.

The Parquet opener will start using the hint in the next two commits.
Introduces `truncate_row_groups(count)` on `PreparedAccessPlan` —
the building block for the opener's upcoming cumulative-RG-prune
step in the no-WHERE TopK path. After `reorder_by_statistics` and
optional `reverse`, the opener walks `row_group_indexes` front to
back, accumulating `num_rows` until it reaches `K`, then truncates.

Semantics:

* Keep at most the first `count` entries of `row_group_indexes`.
* No-op when `count >= row_group_indexes.len()`.
* Bail unchanged when `row_selection.is_some()`. Page-level
  selections are keyed by surviving row groups and would have
  to be remapped; the no-WHERE TopK gate keeps `row_selection`
  at `None` in the caller, so the early-return is a safety net.

Three new unit tests cover the happy path, the no-op path, and
the row-selection bail-out.

The method is `pub(crate)` and has a `#[cfg_attr(not(test),
expect(dead_code, ...))]` to keep clippy clean while the caller
in the opener is still on its way; that attribute will go away
in the next commit when the wiring lands.
Two new opener optimisations on the no-WHERE TopK path, both gated
on the same conditions:

1. The surrounding `SortExec(fetch=K)` plumbed `K` through
   `with_topk_fetch_hint` (`topk_fetch.is_some()`).
2. Sort pushdown fired (`sort_order_for_reorder.is_some()`).
3. The source predicate is a bare `DynamicFilterPhysicalExpr` —
   i.e. no static WHERE conjuncts (`predicate_is_pure_dynamic_filter`).

When all three hold:

**Stats init (in `prepare_filters`, before `PruningPredicate` build).**

Computes the "best safe threshold" from per-row-group min/max
statistics:

* DESC sort → keep the largest `max` across RGs with `num_rows >= K`.
* ASC sort  → keep the smallest `min` across RGs with `num_rows >= K`.

Then builds `col {>|<} threshold` and feeds it into the dynamic
filter via the existing `DynamicFilterPhysicalExpr::update` API.
The pruning predicate built immediately afterwards picks up the
tighter bound, so cold-start TopK skips many row groups before
ever decoding data.

The "RGs smaller than `K`" filter is the correctness guarantee:
we never raise the threshold past a value that a single qualifying
row group could provide, so K rows are always reachable.

**Cumulative RG prune (in the `prepare_access_plan` closure).**

After `reorder_by_statistics` + optional `reverse`, the iteration
order is final and the first row group is the best for the request.
Walk forward, accumulate `num_rows`, and truncate via the new
`PreparedAccessPlan::truncate_row_groups` once cumulative count
reaches `K`. Skipped when `row_selection.is_some()` (already
asserted by `truncate_row_groups`, but the gate is cheaper).

Both optimisations are best-effort and fully graceful — any
per-file failure (missing stats, non-column sort key, type
mismatch) `debug!`s and falls back to the unmodified behaviour.

Plumbing:

* `ParquetMorselizer` and `PreparedParquetOpen` gain a
  `topk_fetch: Option<usize>` field, populated by
  `ParquetSource::create_morselizer`.
* New private helpers live in a `topk_no_where` submodule inside
  `opener/mod.rs`: `predicate_is_pure_dynamic_filter`,
  `find_dynamic_filter`, `find_column_in_expr`,
  `compute_best_threshold`, `try_init_topk_threshold`.

`topk_fetch` is intentionally not surfaced in EXPLAIN — the same
value is already shown on the `SortExec` above the data source.
… path)

Test I.1: DESC LIMIT — EXPLAIN shows DynamicFilter [ empty ] +
  sort_order_for_reorder + reverse_row_groups=true (sort pushdown fires)
Test I.2: DESC LIMIT result correctness
Test I.3: ASC LIMIT in same direction as file — Exact path,
  SortExec eliminated, limit becomes static fetch on source
Test I.4: DESC LIMIT with WHERE — stats init and cumulative prune
  both skip (predicate is not a bare DynamicFilter), result still
  correct via dynamic filter pushdown
Test I.5: Larger LIMIT spanning multiple RGs
Test I.6: LIMIT larger than total rows — returns all rows
Three issues surfaced after CI ran the no-WHERE TopK path against the
full SLT corpus and the topk_filter_pushdown fuzz test:

* `cargo doc` failed on an unresolved intra-doc link
  `[crate::ParquetSource]`. Rewrite as a reference link with an explicit
  path target so rustdoc resolves it from the morsel module.

* Stats init installed a `BinaryExpr` using the scan-side column from
  `sort_order_for_reorder`, but the dynamic filter's stored expression
  lives in TopK output column space when a projection sits between
  TopK and the scan. The `remap_children` walk then either bypassed
  the rewrite (TopK saw scan-side columns, producing `Int32 <= Utf8View`
  / `Int32 <= Int64` failures) or double-wrapped the projection's
  CAST/computation (producing `CAST(CAST(...) AS Int32) AS Int64) + 1`).
  Gate stats init on `remapped_children == original_children`: the
  filter-pushdown path always rebuilds children via
  `reassign_expr_columns` so `remapped_children` is essentially always
  `Some`, but it's safe when the two slices are *logically* equal.

* Cumulative RG prune is unsafe for multi-column sort: leading-column
  reorder doesn't disambiguate ties on the leading column, so a tie
  may span row groups and truncating after the first K rows can drop
  the row group that actually contains a member of the top-K. This
  was caught by the topk_filter_pushdown fuzz with
  `ORDER BY department ASC NULLS FIRST, id ASC NULLS LAST, name ASC
  NULLS LAST LIMIT 1` returning the wrong row. Gate the cumulative
  truncation on `sort_order.len() == 1` and on the same
  `remapped_children == original_children` check. Multi-column TopK
  still benefits from reorder + reverse; only the cumulative-K
  truncation is skipped.

Stats init is also gated on `sort_order.len() == 1` for symmetry, even
though it is correct for multi-column sort — the seeded leading-column
threshold is still a valid lower/upper bound on the K-th value. The
single-column condition keeps the no-WHERE TopK path obviously
correct and matches the gate on the cumulative truncation.
Single-column no-WHERE TopK is the one shape where stats init now
fires, and it tightens the seeded threshold enough that the row group
is marked fully-matched at PruningPredicate build time. Update the
EXPLAIN ANALYZE snapshot to reflect the new metric pattern:

  row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched
  page_index_pages_skipped_by_fully_matched=1
  pushdown_rows_matched=0
  predicate_cache_inner_records=0
  predicate_cache_records=0

Result and dynamic-filter shape are unchanged; this is a pure
optimization-metric refresh. All other failing queries from the
previous run (`topk_multi_col`, `topk_proj`) now match their old
snapshots because the gates in opener/mod.rs skip stats init for
multi-column / projection-rewritten cases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate optimizer Optimizer rules physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant