update limit pushdown to tablet reader#61713
update limit pushdown to tablet reader#61713BiteTheDDDDt wants to merge 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR introduces a “general limit” pushdown path from the scan layer into the storage reader stack, intended to allow DUP_KEYS and UNIQUE_KEYS (merge-on-write) queries to stop scanning after producing enough rows.
Changes:
- Add
general_read_limittoTabletReader::ReaderParams,RowsetReaderContext, andStorageReadOptions, and propagate it through TabletReader → RowsetReader → read options. - Enforce the limit in
VCollectIterator::next()by early-EOF and block truncation on the non-merge query path. - Set
general_read_limitfromOlapScannerwhen the ORDER BY key TopN optimization is not active.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| be/src/storage/tablet/tablet_reader.h | Adds general_read_limit to reader params. |
| be/src/storage/tablet/tablet_reader.cpp | Propagates general_read_limit into RowsetReaderContext. |
| be/src/storage/rowset/rowset_reader_context.h | Adds general_read_limit to rowset reader context. |
| be/src/storage/rowset/beta_rowset_reader.cpp | Copies general_read_limit into StorageReadOptions. |
| be/src/storage/iterators.h | Adds general_read_limit to StorageReadOptions. |
| be/src/storage/iterator/vcollect_iterator.h | Adds member fields to track limit + returned rows. |
| be/src/storage/iterator/vcollect_iterator.cpp | Implements limit enforcement (EOF/truncation) in next(). |
| be/src/exec/scan/olap_scanner.cpp | Sets general_read_limit from scanner limit when TopN key read is inactive. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // General limit pushdown: only for READER_QUERY on non-merge path | ||
| // (DUP_KEYS or UNIQUE_KEYS with MOW) | ||
| if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY && | ||
| _reader->_reader_context.general_read_limit > 0 && | ||
| (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || | ||
| (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && | ||
| _reader->_tablet->enable_unique_key_merge_on_write()))) { | ||
| _general_read_limit = _reader->_reader_context.general_read_limit; | ||
| } |
There was a problem hiding this comment.
VCollectIterator::init() sets _general_read_limit only when the condition matches, but never resets _general_read_limit / _general_rows_returned when the condition does NOT match. If the same VCollectIterator instance is re-initialized (e.g. a BlockReader reused across scan ranges), a previously set limit can leak into the next scan and cause early EOF / truncated results. Reset these members to defaults at the start of init() (and reset _general_rows_returned whenever a new limit is applied).
| _read_options.version = _rowset->version(); | ||
| _read_options.tablet_id = _rowset->rowset_meta()->tablet_id(); | ||
| _read_options.topn_limit = _topn_limit; | ||
| _read_options.general_read_limit = _read_context->general_read_limit; |
There was a problem hiding this comment.
StorageReadOptions::general_read_limit is introduced and propagated down to _read_options here, but there is currently no consumer of this option anywhere in storage/segment iterators (searching the repo shows it is only written, never read). This adds API surface without behavior change and can mislead future readers; either wire it into the segment/rowset iterator implementations to actually stop IO early, or remove the unused option until it’s implemented.
| _read_options.general_read_limit = _read_context->general_read_limit; |
be/src/storage/iterators.h
Outdated
|
|
||
| // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. | ||
| // Propagated from RowsetReaderContext.general_read_limit. | ||
| int64_t general_read_limit = -1; |
There was a problem hiding this comment.
This new general_read_limit field is currently not used by any downstream code (repo-wide search shows it is only assigned, never read). Keeping an unused knob in StorageReadOptions increases maintenance cost and suggests a pushdown that doesn’t actually happen; either implement enforcement in the relevant iterators (e.g. SegmentIterator/RowwiseIterator) or remove it until enforcement is in place.
| // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. | |
| // Propagated from RowsetReaderContext.general_read_limit. | |
| int64_t general_read_limit = -1; |
| // Fast path: if general limit already reached, return EOF immediately | ||
| if (_general_read_limit > 0 && _general_rows_returned >= _general_read_limit) { | ||
| return Status::Error<END_OF_FILE>(""); | ||
| } | ||
|
|
||
| if (LIKELY(_inner_iter)) { | ||
| return _inner_iter->next(block); | ||
| auto st = _inner_iter->next(block); | ||
| if (UNLIKELY(!st.ok())) { | ||
| return st; | ||
| } | ||
|
|
||
| // Enforce general read limit: truncate block if needed | ||
| if (_general_read_limit > 0) { | ||
| _general_rows_returned += block->rows(); | ||
| if (_general_rows_returned > _general_read_limit) { | ||
| // Truncate block to return exactly the remaining rows needed | ||
| int64_t excess = _general_rows_returned - _general_read_limit; | ||
| int64_t keep = block->rows() - excess; | ||
| DCHECK_GT(keep, 0); | ||
| block->set_num_rows(keep); | ||
| _general_rows_returned = _general_read_limit; | ||
| } | ||
| } | ||
|
|
||
| return Status::OK(); |
There was a problem hiding this comment.
The new general-limit enforcement in VCollectIterator::next() changes scan termination/truncation behavior but doesn’t appear to be covered by unit tests. Please add a BE test that exercises DUP_KEYS and UNIQUE_KEYS(MOW) non-merge reads with a small limit, verifying that blocks are truncated to the remaining rows and that subsequent calls return EOF (and also that general_read_limit <= 0 keeps existing behavior).
There was a problem hiding this comment.
Code Review Summary: Limit Pushdown to Tablet Reader
PR Goal
Push the SQL LIMIT value from OlapScanner down to VCollectIterator in the storage layer, so that for DUP_KEYS and UNIQUE_KEYS with merge-on-write (MOW) tables, storage stops reading early once limit rows have been produced. This bypasses the topn ORDER BY key optimization path.
Critical Checkpoints
1. Does the code accomplish its goal? Is there a test?
- The plumbing is correct:
_limit->ReaderParams.general_read_limit->RowsetReaderContext->VCollectIterator._general_read_limit. - [CRITICAL] There are NO tests. No regression test, no unit test, no manual test documented. The PR checklist is entirely unchecked. For a storage-layer optimization that affects query correctness, this is unacceptable.
2. Is this modification as small, clear, and focused as possible?
- The propagation through
StorageReadOptionsandBetaRowsetReaderis dead code (see issue below), which adds unnecessary complexity. - Otherwise the change is focused.
3. Correctness: Late-arriving runtime filters cause wrong results
- [CRITICAL BUG]
general_read_limitcounts rows inVCollectIterator::next()BEFOREScanner::_filter_output_block()applies_conjuncts. Late-arriving runtime filters are added to Scanner's_conjunctsafter tablet reader initialization and are NEVER pushed intoremaining_conjunct_roots. When such a runtime filter filters out rows,VCollectIteratorhas already counted those rows toward its limit and may return EOF too early, causing the query to return fewer rows than the LIMIT specifies. This is a data correctness bug.
4. Correctness: Predicates not acting on a slot
- When
_common_expr_ctxs_push_downis non-empty, only those expressions go toremaining_conjunct_roots(olap_scanner.cpp:361). Any conjuncts still in_conjuncts(e.g., predicates not acting on a slot) are excluded from storage-level evaluation but still applied by_filter_output_block. Same issue as above -general_read_limitcounts pre-filter rows.
5. Correctness: enable_common_expr_pushdown=false
- When this session variable is
false, conjuncts that aren't fully normalizable to column predicates stay in_conjunctsand are applied post-storage. In this path,_common_expr_ctxs_push_downis empty, so_conjunctsARE forwarded toremaining_conjunct_roots- the storage layer applies them, and_filter_output_blockre-applies them (idempotent). So this specific case is safe.
6. Concurrency / parallel scanning
- Multiple scanners can process different tablets/splits in parallel. Each scanner independently pushes
_limitas itsgeneral_read_limit. This means each scanner may return up to_limitrows, and the total across all scanners could exceed_limit. The operator-level limit enforcement handles the excess, so over-reading is safe. However, if runtime filters cause under-reading per scanner (issue #3), the total across all scanners may also be insufficient.
7. Dead code: StorageReadOptions.general_read_limit
general_read_limitis propagated fromRowsetReaderContext->BetaRowsetReader._read_options->StorageReadOptions, butSegmentIteratornever reads it.VCollectIteratorreads the limit from_reader->_reader_context.general_read_limit(theRowsetReaderContext), not fromStorageReadOptions. The field inStorageReadOptionsand the assignment inBetaRowsetReader::get_segment_iteratorsare dead code.
8. Are there functionally parallel code paths?
- The topn path (
read_orderby_key) handles a similar optimization but correctly moves_conjunctsintofilter_block_conjuncts(evaluated inside storage). The general limit path does NOT do this, creating the filtering gap described above.
9. Test coverage
- Zero tests. This needs at minimum:
- Regression tests for
SELECT * FROM dup_table LIMIT N(basic correctness) - Regression tests with WHERE predicates + LIMIT (to verify correctness with filtering)
- Regression tests with runtime filters + LIMIT
- Regression tests for UNIQUE_KEYS MOW table + LIMIT
- Tests verifying AGG_KEYS and non-MOW UNIQUE_KEYS are NOT affected
- Regression tests for
10. Configuration items / observability
- No new configuration items or metrics. Given the risk, a session-level toggle to enable/disable this optimization would be prudent.
11. Incompatible changes / rolling upgrades
- The new field
general_read_limithas a safe default of-1(disabled) in all structs. No serialization or protocol compatibility concerns. Safe for rolling upgrades.
12. Performance
- The hot-path overhead (one comparison + counter increment per block) is negligible. The optimization itself is sound in concept - avoiding unnecessary I/O for LIMIT queries on append-only tables.
Verdict
This PR has a critical correctness bug (late-arriving runtime filters can cause wrong results) and zero test coverage. The dead code in StorageReadOptions/BetaRowsetReader should also be cleaned up. I recommend addressing these issues before merging.
be/src/exec/scan/olap_scanner.cpp
Outdated
| if (!_tablet_reader_params.read_orderby_key && _limit > 0) { | ||
| _tablet_reader_params.general_read_limit = _limit; | ||
| } | ||
|
|
There was a problem hiding this comment.
[Critical Bug] _limit is counted BEFORE post-storage filtering — late-arriving runtime filters cause wrong results.
_limit is pushed as general_read_limit into VCollectIterator, which counts rows returned from the storage layer. However, Scanner::_filter_output_block() (scanner.cpp:155) applies _conjuncts AFTER VCollectIterator returns. Late-arriving runtime filters are added to _conjuncts after tablet reader initialization and are never pushed into remaining_conjunct_roots. When these filters remove rows, VCollectIterator has already counted them, potentially returning EOF before enough post-filter rows are produced.
Concrete scenario:
- Query:
SELECT * FROM dup_table a JOIN dim_table b ON a.key = b.key LIMIT 10 _limit = 10→general_read_limit = 10- Scanner starts reading, VCollectIterator returns 10 raw rows → marks itself done (EOF)
- A runtime filter from the build side of the hash join arrives late, is added to
_conjuncts _filter_output_blockfilters 4 rows → only 6 rows survive- Scanner sees EOF from storage → reports EOF to operator with only 6 rows instead of 10
Suggested fix: Either (a) move _conjuncts to filter_block_conjuncts (like the topn path does at line 500-504) so they are evaluated inside storage, or (b) only enable general_read_limit when _conjuncts is empty and runtime filters are fully arrived, or (c) account for the post-filtering gap by not using a hard limit in storage.
| int64_t general_read_limit = -1; | ||
| }; | ||
|
|
||
| struct CompactionSampleInfo { |
There was a problem hiding this comment.
[Dead Code] This field is written by BetaRowsetReader::get_segment_iterators() but never read by SegmentIterator or any other consumer. VCollectIterator reads general_read_limit from _reader->_reader_context (the RowsetReaderContext), not from StorageReadOptions. This field and the corresponding assignment in beta_rowset_reader.cpp should be removed to avoid confusion.
| _read_options.general_read_limit = _read_context->general_read_limit; | ||
| if (_read_context->lower_bound_keys != nullptr) { | ||
| for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) { | ||
| _read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i), |
There was a problem hiding this comment.
[Dead Code] This assignment propagates general_read_limit into _read_options (StorageReadOptions), but no downstream consumer (SegmentIterator) ever reads it. The limit is consumed solely by VCollectIterator from RowsetReaderContext. This line can be removed.
| if (_general_read_limit > 0) { | ||
| _general_rows_returned += block->rows(); | ||
| if (_general_rows_returned > _general_read_limit) { | ||
| // Truncate block to return exactly the remaining rows needed |
There was a problem hiding this comment.
[Correctness concern] block->rows() here counts rows BEFORE Scanner::_filter_output_block() applies _conjuncts. If any _conjuncts exist that are NOT also evaluated inside the storage layer (e.g., late-arriving runtime filters), this count is inflated. The limit will be reached prematurely, causing the scanner to return fewer rows than requested.
For comparison, the existing topn path moves _conjuncts into filter_block_conjuncts (olap_scanner.cpp:500-504) and evaluates them inside _topn_next (vcollect_iterator.cpp:359-360), so its row counting is post-filter and correct.
| if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY && | ||
| _reader->_reader_context.general_read_limit > 0 && | ||
| (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || | ||
| (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
There was a problem hiding this comment.
The DUP_KEYS/UNIQUE_KEYS-with-MOW guard here is redundant with the !_merge check. On the non-merge path, _merge is already false only for these table types (vcollect_iterator.cpp:72-76). The explicit keys_type checks add safety but also maintenance burden — if the _merge logic changes, these checks could become stale. Consider adding a comment noting this intentional redundancy, or rely solely on !_merge.
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Code Review Summary: General Limit Pushdown to Storage Layer
Overview
This PR pushes LIMIT down to the storage layer (VCollectIterator) for DUP_KEYS and UNIQUE_KEYS with MOW tables, allowing early termination of reads when the limit is reached. The approach mirrors the existing topn limit pushdown pattern.
Critical Checkpoint Conclusions
1. Goal & Correctness: The goal is to optimize queries with LIMIT (but no ORDER BY) by stopping storage reads early. The core logic is correct: conjuncts are moved to the storage layer, applied before limit counting (post-filter counting), and the block is truncated when the limit is reached. The outer pipeline reached_limit() in ScanOperatorX::get_block() still enforces the global limit across all scanners.
2. Concurrency: No new concurrency concerns. _general_read_limit and _general_rows_returned are per-VCollectIterator instance (one per scanner), and each scanner runs single-threaded within its execution context. No shared mutable state is introduced.
3. Lifecycle: No lifecycle concerns. The new fields are plain integers with no special lifecycle management.
4. Configuration: No new configuration items added. The feature activates automatically based on existing conditions (_limit > 0 and _storage_no_merge()).
5. Incompatible changes / Rolling upgrade: No incompatible changes. The general_read_limit field is added to internal structs (ReaderParams, RowsetReaderContext) with default value -1 (no limit), so existing code paths are unaffected.
6. Parallel code paths: The VerticalBlockReader (used for compaction) does NOT need this optimization as it doesn't serve queries. Cloud mode uses the same OlapScanner and VCollectIterator, so it benefits automatically. No parallel paths missed.
7. Test coverage: Tests cover DUP_KEYS and MOW tables with basic LIMIT, LIMIT+WHERE, LIMIT > matching rows, and complex predicates. Row count assertions verify post-filter counting correctness. See inline comment for suggested improvements.
8. Observability: No new metrics or logging added. For a storage-level optimization like this, the existing scanner-level row counters are sufficient. No observability gap.
9. Transaction/persistence: Not applicable - this is a read-only optimization.
10. FE-BE variable passing: Not applicable - the limit is already available via tnode.limit at the scanner level; no new FE-BE protocol changes needed.
11. Performance: The optimization correctly avoids reading unnecessary data from storage. Minor concern noted inline about hot-path overhead for non-limit queries.
12. Other issues: The PR description is incomplete (no issue number, problem summary, or test checklist selections). The DCHECK removal is justified. See inline comments for details.
Issues Found
- [Medium] Hot-path overhead:
filter_block_conjunctsand limit checks run for ALL non-topn queries (see inline) - [Medium] Test coverage gaps: single-bucket only, no negative tests for AGG_KEYS/MOR tables (see inline)
- [Low] PR description incomplete
| @@ -248,8 +255,39 @@ Status VCollectIterator::next(Block* block) { | |||
| return _topn_next(block); | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
[Medium - Performance] The filter_block_conjuncts check and limit enforcement logic now run on the hot path for all non-topn queries, even when _general_read_limit is not active (i.e., -1). While each individual check is cheap (empty vector check + integer comparison), this adds overhead to every next(Block*) call for queries without LIMIT.
Consider guarding the entire filter+limit block:
if (_general_read_limit > 0) {
// Apply filter_block_conjuncts ...
if (!_reader->_reader_context.filter_block_conjuncts.empty()) {
RETURN_IF_ERROR(VExprContext::filter_block(...));
}
// Enforce limit ...
_general_rows_returned += block->rows();
if (_general_rows_returned > _general_read_limit) { ... }
}This makes the intent clearer and avoids any overhead when the feature is not active.
| v1 VARCHAR(100) NULL | ||
| ) | ||
| DUPLICATE KEY(k1, k2) | ||
| DISTRIBUTED BY HASH(k1) BUCKETS 1 |
There was a problem hiding this comment.
[Medium - Test Coverage] The tests only use BUCKETS 1, which means a single scanner per query. The most interesting scenario for this optimization is multi-bucket tables where multiple scanners each independently apply the limit, and the outer pipeline limit must correctly truncate to the global LIMIT. Consider adding a test with BUCKETS 4 or more and a larger dataset to exercise the multi-scanner path.
Also consider adding:
- Negative test for AGG_KEYS table: Verify limit pushdown does NOT activate (AGG_KEYS requires merge, so
_storage_no_merge()returns false). - Negative test for UNIQUE_KEYS MOR (without MOW): Same reasoning.
- LIMIT with OFFSET:
SELECT ... LIMIT 5 OFFSET 10to verify correctness when offset is involved. - LIMIT 0: Edge case that should return no rows.
| _tablet_reader_params.filter_block_conjuncts = _conjuncts; | ||
| _conjuncts.clear(); | ||
| } | ||
| } else if (_limit > 0 && olap_scan_local_state->_storage_no_merge()) { |
There was a problem hiding this comment.
[Note - Correctness analysis] _limit here is tnode.limit (the full query LIMIT), not a per-scanner fraction. This is correct: each scanner independently limits its storage reads to N rows, and the outer ScanOperatorX::get_block() -> reached_limit() enforces the global N across all scanners. This matches the existing topn pattern.
However, note that for queries like SELECT * FROM t WHERE rare_condition LIMIT 10 with many tablets, every scanner reads up to 10 post-filter rows even though only 10 total are needed. The adaptive_pipeline_task_serial_read_on_limit optimization mitigates this for small limits by forcing serial scan, but for larger limits the amplification factor is O(num_scanners). This is a known trade-off, not a bug.
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)