feat: TopK stats init + cumulative RG pruning for pure-TopK parquet scans (no-WHERE)#22385
Draft
zhuqi-lucas wants to merge 8 commits into
Draft
feat: TopK stats init + cumulative RG pruning for pure-TopK parquet scans (no-WHERE)#22385zhuqi-lucas wants to merge 8 commits into
zhuqi-lucas wants to merge 8 commits into
Conversation
5b11478 to
55a4aba
Compare
Adds a new `topk_fetch: Option<usize>` field on `ParquetSource` alongside the existing sort-pushdown markers (`sort_order_for_reorder` and `reverse_row_groups`). Future commits will: 1. Surface a `FileSource::with_topk_fetch_hint` trait method that `PushdownSort` calls with the outer `SortExec.fetch()` after building an `Inexact` plan. 2. Use the value in the opener to (a) seed the TopK dynamic filter from per-RG min/max statistics before `PruningPredicate` build and (b) truncate `row_group_indexes` once cumulative `num_rows` reaches K. The field is intentionally not surfaced in EXPLAIN — the same fetch value is already shown on the `SortExec` above the source. The temporary `#[expect(dead_code)]` will be removed when the companion plumbing lands in the next commit.
Adds a new `with_topk_fetch_hint(fetch: usize) -> Option<Arc<dyn FileSource>>` method to the `FileSource` trait with a `None` default. The new method gives a sort-pushdown `Inexact` `FileSource` a chance to take the surrounding `SortExec`'s `fetch` (K) as a hint and produce a refined source — e.g. one that can seed a TopK dynamic filter from per-RG statistics or prune row groups by cumulative `num_rows`. `ParquetSource` overrides the method: it clones itself, stores the hint in the `topk_fetch` field introduced in the previous commit, and returns the new source. The trait default returns `None` so file formats that don't benefit (CSV, JSON, Avro, Arrow, ...) need no changes. The `PushdownSort` rule will start calling this in the next commit.
When `PushdownSort` produces an `Inexact` result (the source can only
approximate the requested ordering, so a `SortExec` is kept), and the
outer `SortExec` carried a fetch (i.e. we are in a TopK shape), the
rule now surfaces `K` to the data source.
To carry the hint through the plan tree without a hard downcast on
`DataSourceExec` in `physical-optimizer` — which would require a
new crate dependency on `datafusion-datasource` — the plumbing
mirrors the existing `try_pushdown_sort` chain:
* `ExecutionPlan::with_topk_fetch_hint` (default `None`)
* `DataSourceExec::with_topk_fetch_hint` (delegates to data source)
* `DataSource::with_topk_fetch_hint` (default `None`)
* `FileScanConfig::with_topk_fetch_hint` (swaps in the new file
source returned by
`FileSource::with_topk_fetch_hint`)
Both `Inexact` branches in `PushdownSort` (the SPM → SortExec
pattern and the standalone SortExec pattern) now call
`inner.with_topk_fetch_hint(fetch)` when the SortExec has a fetch,
falling back to `inner` unchanged otherwise. Sources that don't
override the hint (CSV, JSON, Avro, Arrow, MemorySource, ...) keep
behaving exactly as before.
The Parquet opener will start using the hint in the next two commits.
Introduces `truncate_row_groups(count)` on `PreparedAccessPlan` — the building block for the opener's upcoming cumulative-RG-prune step in the no-WHERE TopK path. After `reorder_by_statistics` and optional `reverse`, the opener walks `row_group_indexes` front to back, accumulating `num_rows` until it reaches `K`, then truncates. Semantics: * Keep at most the first `count` entries of `row_group_indexes`. * No-op when `count >= row_group_indexes.len()`. * Bail unchanged when `row_selection.is_some()`. Page-level selections are keyed by surviving row groups and would have to be remapped; the no-WHERE TopK gate keeps `row_selection` at `None` in the caller, so the early-return is a safety net. Three new unit tests cover the happy path, the no-op path, and the row-selection bail-out. The method is `pub(crate)` and has a `#[cfg_attr(not(test), expect(dead_code, ...))]` to keep clippy clean while the caller in the opener is still on its way; that attribute will go away in the next commit when the wiring lands.
Two new opener optimisations on the no-WHERE TopK path, both gated
on the same conditions:
1. The surrounding `SortExec(fetch=K)` plumbed `K` through
`with_topk_fetch_hint` (`topk_fetch.is_some()`).
2. Sort pushdown fired (`sort_order_for_reorder.is_some()`).
3. The source predicate is a bare `DynamicFilterPhysicalExpr` —
i.e. no static WHERE conjuncts (`predicate_is_pure_dynamic_filter`).
When all three hold:
**Stats init (in `prepare_filters`, before `PruningPredicate` build).**
Computes the "best safe threshold" from per-row-group min/max
statistics:
* DESC sort → keep the largest `max` across RGs with `num_rows >= K`.
* ASC sort → keep the smallest `min` across RGs with `num_rows >= K`.
Then builds `col {>|<} threshold` and feeds it into the dynamic
filter via the existing `DynamicFilterPhysicalExpr::update` API.
The pruning predicate built immediately afterwards picks up the
tighter bound, so cold-start TopK skips many row groups before
ever decoding data.
The "RGs smaller than `K`" filter is the correctness guarantee:
we never raise the threshold past a value that a single qualifying
row group could provide, so K rows are always reachable.
**Cumulative RG prune (in the `prepare_access_plan` closure).**
After `reorder_by_statistics` + optional `reverse`, the iteration
order is final and the first row group is the best for the request.
Walk forward, accumulate `num_rows`, and truncate via the new
`PreparedAccessPlan::truncate_row_groups` once cumulative count
reaches `K`. Skipped when `row_selection.is_some()` (already
asserted by `truncate_row_groups`, but the gate is cheaper).
Both optimisations are best-effort and fully graceful — any
per-file failure (missing stats, non-column sort key, type
mismatch) `debug!`s and falls back to the unmodified behaviour.
Plumbing:
* `ParquetMorselizer` and `PreparedParquetOpen` gain a
`topk_fetch: Option<usize>` field, populated by
`ParquetSource::create_morselizer`.
* New private helpers live in a `topk_no_where` submodule inside
`opener/mod.rs`: `predicate_is_pure_dynamic_filter`,
`find_dynamic_filter`, `find_column_in_expr`,
`compute_best_threshold`, `try_init_topk_threshold`.
`topk_fetch` is intentionally not surfaced in EXPLAIN — the same
value is already shown on the `SortExec` above the data source.
… path) Test I.1: DESC LIMIT — EXPLAIN shows DynamicFilter [ empty ] + sort_order_for_reorder + reverse_row_groups=true (sort pushdown fires) Test I.2: DESC LIMIT result correctness Test I.3: ASC LIMIT in same direction as file — Exact path, SortExec eliminated, limit becomes static fetch on source Test I.4: DESC LIMIT with WHERE — stats init and cumulative prune both skip (predicate is not a bare DynamicFilter), result still correct via dynamic filter pushdown Test I.5: Larger LIMIT spanning multiple RGs Test I.6: LIMIT larger than total rows — returns all rows
Three issues surfaced after CI ran the no-WHERE TopK path against the full SLT corpus and the topk_filter_pushdown fuzz test: * `cargo doc` failed on an unresolved intra-doc link `[crate::ParquetSource]`. Rewrite as a reference link with an explicit path target so rustdoc resolves it from the morsel module. * Stats init installed a `BinaryExpr` using the scan-side column from `sort_order_for_reorder`, but the dynamic filter's stored expression lives in TopK output column space when a projection sits between TopK and the scan. The `remap_children` walk then either bypassed the rewrite (TopK saw scan-side columns, producing `Int32 <= Utf8View` / `Int32 <= Int64` failures) or double-wrapped the projection's CAST/computation (producing `CAST(CAST(...) AS Int32) AS Int64) + 1`). Gate stats init on `remapped_children == original_children`: the filter-pushdown path always rebuilds children via `reassign_expr_columns` so `remapped_children` is essentially always `Some`, but it's safe when the two slices are *logically* equal. * Cumulative RG prune is unsafe for multi-column sort: leading-column reorder doesn't disambiguate ties on the leading column, so a tie may span row groups and truncating after the first K rows can drop the row group that actually contains a member of the top-K. This was caught by the topk_filter_pushdown fuzz with `ORDER BY department ASC NULLS FIRST, id ASC NULLS LAST, name ASC NULLS LAST LIMIT 1` returning the wrong row. Gate the cumulative truncation on `sort_order.len() == 1` and on the same `remapped_children == original_children` check. Multi-column TopK still benefits from reorder + reverse; only the cumulative-K truncation is skipped. Stats init is also gated on `sort_order.len() == 1` for symmetry, even though it is correct for multi-column sort — the seeded leading-column threshold is still a valid lower/upper bound on the K-th value. The single-column condition keeps the no-WHERE TopK path obviously correct and matches the gate on the cumulative truncation.
Single-column no-WHERE TopK is the one shape where stats init now fires, and it tightens the seeded threshold enough that the row group is marked fully-matched at PruningPredicate build time. Update the EXPLAIN ANALYZE snapshot to reflect the new metric pattern: row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched page_index_pages_skipped_by_fully_matched=1 pushdown_rows_matched=0 predicate_cache_inner_records=0 predicate_cache_records=0 Result and dynamic-filter shape are unchanged; this is a pure optimization-metric refresh. All other failing queries from the previous run (`topk_multi_col`, `topk_proj`) now match their old snapshots because the gates in opener/mod.rs skip stats init for multi-column / projection-rewritten cases.
55a4aba to
fbcf623
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Carved out of #21580 — after #21956 merged the file/RG reorder + reverse machinery, this PR ships the two remaining pieces of the statistics-driven TopK pipeline, restricted to the simplest scope (no WHERE).
Rationale for this change
For
ORDER BY <col> LIMIT Nqueries on parquet:TopK's dynamic filter starts aslit(true)— the first row groups read are never pruned by it.min/maxstatistics if the threshold is known before scanning.This PR initialises the threshold from per-RG stats before the$\geq K$ . The net effect on a sorted, non-overlapping dataset is that only one row group is decoded for a
PruningPredicateis built, then accumulates row counts from the front of the reordered RG list and truncates onceLIMIT Nquery — observed 17×–60× speedups in #21580's benchmark.Design — using existing sort-pushdown plumbing
Both optimisations need to know
K(the LIMIT) and the sort direction/column. Rather than touchingDynamicFilterPhysicalExpr, this PR follows the pattern established by #21956:ParquetSource::sort_order_for_reorder(already set bytry_pushdown_sortand merged).topk_fetch: Option<usize>field onParquetSource, plumbed byPushdownSortrule via a newFileSource::with_topk_fetch_hint(fetch)trait method (defaultNone, only overridden onParquetSource).DynamicFilterPhysicalExpris not modified.What changes are included in this PR?
Six small focused commits.
1.
feat: add topk_fetch field on ParquetSourceNew optional field next to
sort_order_for_reorder/reverse_row_groups. Not surfaced inEXPLAIN(the SortExec.fetch above is the same value).2.
feat: add FileSource::with_topk_fetch_hint trait methodAdds the trait method with a
Nonedefault;ParquetSourceoverrides it to clone-and-settopk_fetch. Other file source impls don't need to change.3.
feat: plumb SortExec.fetch via with_topk_fetch_hint in PushdownSortMirrors
try_pushdown_sort's plumbing chain —with_topk_fetch_hintexposed onExecutionPlan,DataSource,DataSourceExec,FileScanConfig,FileSource. BothInexactbranches inPushdownSort(the SPM+Sort pattern and the standalone Sort) now callinner.with_topk_fetch_hint(fetch)when the SortExec has a fetch.4.
feat: add PreparedAccessPlan::truncate_row_groupsSmall
pub(crate)helper that keeps the firstcountentries ofrow_group_indexes. Bails unchanged ifrow_selectionis set (page-level state would be hard to remap). Three unit tests.5.
feat: TopK stats init + cumulative RG pruning in openerTwo opener-side optimisations, both dual-gated on
predicate_is_pure_dynamic_filter(pred)ANDsort_order_for_reorder.is_some()ANDtopk_fetch.is_some().Stats init (
try_init_topk_threshold, runs inprepare_filtersbeforePruningPredicatebuild):LIMIT K:T_min = max(min(col))across RGs withnum_rows >= K→ filter becomescol >= T_min.LIMIT K:T_max = min(max(col))across RGs withnum_rows >= K→ filter becomescol <= T_max.>=/<=(not>/<) —Tfrom stats is a safe bound, not strict; the real TopK will tighten it on its first heap update.sort_order_for_reorder, not from the dynamic filter.CastExpr), column outside file schema, stats unavailable, type cast failure.Cumulative RG prune (inside
prepare_access_plan, afterreorder_by_statisticsandreverse):num_rowsfrom the front ofrow_group_indexesuntil the running total reachesK, thentruncate_row_groups.Helpers:
find_dynamic_filter,find_column_in_expr(unwrapsCastExpretc.),compute_best_threshold,predicate_is_pure_dynamic_filter.6.
test: add Test I SLTSix sub-tests in
sort_pushdown.slt:DynamicFilter [ empty ]+sort_order_for_reorder=[id@0 DESC]+reverse_row_groups=trueExactpath,SortExeceliminated, LIMIT becomes a static fetchScope
Intentionally narrow:
DynamicFilterPhysicalExpr.sort_options, but it's a separable concern).sort_order_for_reorderplain-Columnconstraint).Are these changes tested?
datafusion-datasource-parquetlib tests pass (including 3 new tests fortruncate_row_groups).sort_pushdownSLT passes (existing tests unchanged, new Test I added).cargo clippy --all-targets -- -D warningsclean on touched crates.Are there any user-facing changes?
No. Transparent optimisation — same query results, faster TopK on sorted parquet when no WHERE clause is present.