-
Notifications
You must be signed in to change notification settings - Fork 3.7k
update limit pushdown to tablet reader #61713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -502,6 +502,14 @@ Status OlapScanner::_init_tablet_reader_params( | |
| _tablet_reader_params.filter_block_conjuncts = _conjuncts; | ||
| _conjuncts.clear(); | ||
| } | ||
| } else if (_limit > 0 && olap_scan_local_state->_storage_no_merge()) { | ||
| // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge path). | ||
| // Only when topn optimization is NOT active (topn handles its own limit). | ||
| // Move _conjuncts into storage layer so that general_read_limit counts | ||
| // post-filter rows, same as the topn path above. | ||
| _tablet_reader_params.general_read_limit = _limit; | ||
| _tablet_reader_params.filter_block_conjuncts = _conjuncts; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug [Critical]: This line moves
Result: For queries with WHERE clauses (e.g., Fix: Either (a) don't move |
||
| _conjuncts.clear(); | ||
| } | ||
|
|
||
| // set push down topn filter | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,6 +94,16 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo | |
| _topn_limit = 0; | ||
| DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0); | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug [Crash in ASAN/Debug]: Existing DCHECK at line 95 will fire Two lines above this new code, at line 95 (context line in this diff, position 2), there is: DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0);This DCHECK is in the
This DCHECK needs to be updated to allow |
||
| // General limit pushdown: only for READER_QUERY on non-merge path | ||
| // (DUP_KEYS or UNIQUE_KEYS with MOW) | ||
| if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY && | ||
| _reader->_reader_context.general_read_limit > 0 && | ||
| (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || | ||
| (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug [Critical]: General limit counts unfiltered rows This The comment in If the intent is to count post-filter rows (which is correct), you need to add |
||
| _reader->_tablet->enable_unique_key_merge_on_write()))) { | ||
| _general_read_limit = _reader->_reader_context.general_read_limit; | ||
| } | ||
| } | ||
|
|
||
| Status VCollectIterator::add_child(const RowSetSplits& rs_splits) { | ||
|
|
@@ -249,6 +259,27 @@ Status VCollectIterator::next(Block* block) { | |
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium - Performance] The Consider guarding the entire filter+limit block: if (_general_read_limit > 0) {
// Apply filter_block_conjuncts ...
if (!_reader->_reader_context.filter_block_conjuncts.empty()) {
RETURN_IF_ERROR(VExprContext::filter_block(...));
}
// Enforce limit ...
_general_rows_returned += block->rows();
if (_general_rows_returned > _general_read_limit) { ... }
}This makes the intent clearer and avoids any overhead when the feature is not active. |
||
| if (LIKELY(_inner_iter)) { | ||
| if (_general_read_limit > 0) { | ||
| // Fast path: if general limit already reached, return EOF immediately | ||
| if (_general_rows_returned >= _general_read_limit) { | ||
| return Status::Error<END_OF_FILE>(""); | ||
| } | ||
|
|
||
| RETURN_IF_ERROR(_inner_iter->next(block)); | ||
|
|
||
| // Enforce general read limit: truncate block if needed | ||
| _general_rows_returned += block->rows(); | ||
| if (_general_rows_returned > _general_read_limit) { | ||
| // Truncate block to return exactly the remaining rows needed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Correctness concern] For comparison, the existing topn path moves |
||
| int64_t excess = _general_rows_returned - _general_read_limit; | ||
| int64_t keep = block->rows() - excess; | ||
| DCHECK_GT(keep, 0); | ||
| block->set_num_rows(keep); | ||
| _general_rows_returned = _general_read_limit; | ||
| } | ||
|
|
||
| return Status::OK(); | ||
| } | ||
| return _inner_iter->next(block); | ||
| } else { | ||
| return Status::Error<END_OF_FILE>(""); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| -- This file is automatically generated. You should know what you did if you want to edit this | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test issue: Orphaned This The Either remove this file, or rewrite the test to use |
||
| -- !dup_basic_limit -- | ||
| 1 99 | ||
| 10 90 | ||
| 2 98 | ||
| 3 97 | ||
| 4 96 | ||
| 5 95 | ||
| 6 94 | ||
| 7 93 | ||
| 8 92 | ||
| 9 91 | ||
|
|
||
| -- !dup_filter_limit -- | ||
| 11 89 | ||
| 12 88 | ||
| 13 87 | ||
| 14 86 | ||
| 15 85 | ||
| 16 84 | ||
| 17 83 | ||
| 18 82 | ||
| 19 81 | ||
| 20 80 | ||
| 21 79 | ||
| 22 78 | ||
| 23 77 | ||
| 24 76 | ||
| 25 75 | ||
|
|
||
| -- !dup_filter_over_limit -- | ||
| 46 54 | ||
| 47 53 | ||
| 48 52 | ||
| 49 51 | ||
| 50 50 | ||
|
|
||
| -- !dup_complex_filter_limit -- | ||
| 16 84 | ||
| 17 83 | ||
| 18 82 | ||
| 19 81 | ||
| 20 80 | ||
| 21 79 | ||
| 22 78 | ||
| 23 77 | ||
|
|
||
| -- !mow_basic_limit -- | ||
| 1 99 | ||
| 10 90 | ||
| 2 98 | ||
| 3 97 | ||
| 4 96 | ||
| 5 95 | ||
| 6 94 | ||
| 7 93 | ||
| 8 92 | ||
| 9 91 | ||
|
|
||
| -- !mow_filter_limit -- | ||
| 11 89 | ||
| 12 88 | ||
| 13 87 | ||
| 14 86 | ||
| 15 85 | ||
| 16 84 | ||
| 17 83 | ||
| 18 82 | ||
| 19 81 | ||
| 20 80 | ||
| 21 79 | ||
| 22 78 | ||
| 23 77 | ||
| 24 76 | ||
| 25 75 | ||
|
|
||
| -- !mow_complex_filter_limit -- | ||
| 16 84 | ||
| 17 83 | ||
| 18 82 | ||
| 19 81 | ||
| 20 80 | ||
| 21 79 | ||
| 22 78 | ||
| 23 77 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| // Test general LIMIT pushdown into the storage layer (VCollectIterator). | ||
| // This optimization is active only for DUP_KEYS and UNIQUE_KEYS with MOW | ||
| // on the non-merge read path (no ORDER BY on key columns). | ||
| suite("test_general_limit_pushdown", "p0") { | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test issues:
|
||
| // ------------------------------------------------------------------------- | ||
| // Positive: DUP_KEYS single bucket — basic correctness | ||
| // ------------------------------------------------------------------------- | ||
| sql "drop table if exists dup_single_bucket" | ||
| sql """ | ||
| CREATE TABLE dup_single_bucket ( | ||
| id INT, | ||
| val INT | ||
| ) ENGINE=OLAP | ||
| DUPLICATE KEY(id) | ||
| DISTRIBUTED BY HASH(id) BUCKETS 1 | ||
| PROPERTIES("replication_num" = "1") | ||
| """ | ||
| sql """ | ||
| INSERT INTO dup_single_bucket | ||
| SELECT number, number * 10 | ||
| FROM numbers("number" = "100") | ||
| """ | ||
|
|
||
| // LIMIT 5 must return exactly 5 rows | ||
| def res1 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5) t" | ||
| assertEquals(5, res1[0][0]) | ||
|
|
||
| // LIMIT 0 must return 0 rows | ||
| def res2 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 0) t" | ||
| assertEquals(0, res2[0][0]) | ||
|
|
||
| // LIMIT with OFFSET: LIMIT 5 OFFSET 10 must return exactly 5 rows | ||
| // (assuming table has >= 15 rows, which it does: 100 rows) | ||
| def res3 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5 OFFSET 10) t" | ||
| assertEquals(5, res3[0][0]) | ||
|
|
||
| // LIMIT larger than table size: should return all rows (100) | ||
| def res4 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 200) t" | ||
| assertEquals(100, res4[0][0]) | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // Positive: DUP_KEYS multi-bucket — correctness with multiple scanners | ||
| // ------------------------------------------------------------------------- | ||
| sql "drop table if exists dup_multi_bucket" | ||
| sql """ | ||
| CREATE TABLE dup_multi_bucket ( | ||
| id INT, | ||
| val INT | ||
| ) ENGINE=OLAP | ||
| DUPLICATE KEY(id) | ||
| DISTRIBUTED BY HASH(id) BUCKETS 4 | ||
| PROPERTIES("replication_num" = "1") | ||
| """ | ||
| sql """ | ||
| INSERT INTO dup_multi_bucket | ||
| SELECT number, number * 10 | ||
| FROM numbers("number" = "1000") | ||
| """ | ||
|
|
||
| // With 4 buckets each scanner enforces the limit independently; the outer | ||
| // pipeline limit must truncate to the global LIMIT value. | ||
| def res5 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 5) t" | ||
| assertEquals(5, res5[0][0]) | ||
|
|
||
| def res6 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 0) t" | ||
| assertEquals(0, res6[0][0]) | ||
|
|
||
| def res7 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 10 OFFSET 20) t" | ||
| assertEquals(10, res7[0][0]) | ||
|
|
||
| def res8 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 2000) t" | ||
| assertEquals(1000, res8[0][0]) | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // Positive: UNIQUE_KEYS with MOW — limit pushdown should activate | ||
| // ------------------------------------------------------------------------- | ||
| sql "drop table if exists unique_mow" | ||
| sql """ | ||
| CREATE TABLE unique_mow ( | ||
| id INT, | ||
| val INT | ||
| ) ENGINE=OLAP | ||
| UNIQUE KEY(id) | ||
| DISTRIBUTED BY HASH(id) BUCKETS 4 | ||
| PROPERTIES( | ||
| "replication_num" = "1", | ||
| "enable_unique_key_merge_on_write" = "true" | ||
| ) | ||
| """ | ||
| sql """ | ||
| INSERT INTO unique_mow | ||
| SELECT number, number * 10 | ||
| FROM numbers("number" = "500") | ||
| """ | ||
|
|
||
| def res9 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5) t" | ||
| assertEquals(5, res9[0][0]) | ||
|
|
||
| def res10 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 0) t" | ||
| assertEquals(0, res10[0][0]) | ||
|
|
||
| def res11 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5 OFFSET 10) t" | ||
| assertEquals(5, res11[0][0]) | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // Negative: AGG_KEYS — limit pushdown must NOT short-circuit results | ||
| // (AGG_KEYS requires merge, which disables _general_read_limit) | ||
| // Correctness test: the query must still return the right count. | ||
| // ------------------------------------------------------------------------- | ||
| sql "drop table if exists agg_keys_tbl" | ||
| sql """ | ||
| CREATE TABLE agg_keys_tbl ( | ||
| id INT, | ||
| val BIGINT SUM | ||
| ) ENGINE=OLAP | ||
| AGGREGATE KEY(id) | ||
| DISTRIBUTED BY HASH(id) BUCKETS 4 | ||
| PROPERTIES("replication_num" = "1") | ||
| """ | ||
| sql """ | ||
| INSERT INTO agg_keys_tbl | ||
| SELECT number, number | ||
| FROM numbers("number" = "200") | ||
| """ | ||
|
|
||
| // Limit pushdown is not active here; results must still be correct. | ||
| def res12 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 5) t" | ||
| assertEquals(5, res12[0][0]) | ||
|
|
||
| def res13 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 0) t" | ||
| assertEquals(0, res13[0][0]) | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // Negative: UNIQUE_KEYS MOR (without MOW) — limit pushdown must NOT activate | ||
| // Correctness test: query must still return the right count. | ||
| // ------------------------------------------------------------------------- | ||
| sql "drop table if exists unique_mor" | ||
| sql """ | ||
| CREATE TABLE unique_mor ( | ||
| id INT, | ||
| val INT | ||
| ) ENGINE=OLAP | ||
| UNIQUE KEY(id) | ||
| DISTRIBUTED BY HASH(id) BUCKETS 4 | ||
| PROPERTIES( | ||
| "replication_num" = "1", | ||
| "enable_unique_key_merge_on_write" = "false" | ||
| ) | ||
| """ | ||
| sql """ | ||
| INSERT INTO unique_mor | ||
| SELECT number, number * 10 | ||
| FROM numbers("number" = "300") | ||
| """ | ||
|
|
||
| def res14 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 5) t" | ||
| assertEquals(5, res14[0][0]) | ||
|
|
||
| def res15 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 0) t" | ||
| assertEquals(0, res15[0][0]) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Note - Correctness analysis]
_limithere istnode.limit(the full query LIMIT), not a per-scanner fraction. This is correct: each scanner independently limits its storage reads to N rows, and the outerScanOperatorX::get_block()->reached_limit()enforces the global N across all scanners. This matches the existing topn pattern.However, note that for queries like
SELECT * FROM t WHERE rare_condition LIMIT 10with many tablets, every scanner reads up to 10 post-filter rows even though only 10 total are needed. Theadaptive_pipeline_task_serial_read_on_limitoptimization mitigates this for small limits by forcing serial scan, but for larger limits the amplification factor is O(num_scanners). This is a known trade-off, not a bug.