diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index b1f6f8531b9769..8180311f49ebac 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -502,6 +502,14 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.filter_block_conjuncts = _conjuncts; _conjuncts.clear(); } + } else if (_limit > 0 && olap_scan_local_state->_storage_no_merge()) { + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge path). + // Only when topn optimization is NOT active (topn handles its own limit). + // Move _conjuncts into storage layer so that general_read_limit counts + // post-filter rows, same as the topn path above. + _tablet_reader_params.general_read_limit = _limit; + _tablet_reader_params.filter_block_conjuncts = _conjuncts; + _conjuncts.clear(); } // set push down topn filter diff --git a/be/src/storage/iterator/vcollect_iterator.cpp b/be/src/storage/iterator/vcollect_iterator.cpp index 4065efefb7e752..9b3b7705384de6 100644 --- a/be/src/storage/iterator/vcollect_iterator.cpp +++ b/be/src/storage/iterator/vcollect_iterator.cpp @@ -94,6 +94,16 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo _topn_limit = 0; DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0); } + + // General limit pushdown: only for READER_QUERY on non-merge path + // (DUP_KEYS or UNIQUE_KEYS with MOW) + if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY && + _reader->_reader_context.general_read_limit > 0 && + (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || + (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _reader->_tablet->enable_unique_key_merge_on_write()))) { + _general_read_limit = _reader->_reader_context.general_read_limit; + } } Status VCollectIterator::add_child(const RowSetSplits& rs_splits) { @@ -249,6 +259,27 @@ Status VCollectIterator::next(Block* block) { } if (LIKELY(_inner_iter)) { + if (_general_read_limit > 0) { + // Fast path: if general limit already reached, return EOF immediately + if (_general_rows_returned >= _general_read_limit) { + return Status::Error(""); + } + + RETURN_IF_ERROR(_inner_iter->next(block)); + + // Enforce general read limit: truncate block if needed + _general_rows_returned += block->rows(); + if (_general_rows_returned > _general_read_limit) { + // Truncate block to return exactly the remaining rows needed + int64_t excess = _general_rows_returned - _general_read_limit; + int64_t keep = block->rows() - excess; + DCHECK_GT(keep, 0); + block->set_num_rows(keep); + _general_rows_returned = _general_read_limit; + } + + return Status::OK(); + } return _inner_iter->next(block); } else { return Status::Error(""); diff --git a/be/src/storage/iterator/vcollect_iterator.h b/be/src/storage/iterator/vcollect_iterator.h index 4201546c04882b..2b1731a3503aae 100644 --- a/be/src/storage/iterator/vcollect_iterator.h +++ b/be/src/storage/iterator/vcollect_iterator.h @@ -350,6 +350,12 @@ class VCollectIterator { bool _topn_eof = false; std::vector _rs_splits; + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge path). + // When > 0, VCollectIterator will stop reading after returning this many rows. + int64_t _general_read_limit = -1; + // Number of rows already returned to the caller. + int64_t _general_rows_returned = 0; + // Hold reader point to access read params, such as fetch conditions. TabletReader* _reader = nullptr; diff --git a/be/src/storage/rowset/rowset_reader_context.h b/be/src/storage/rowset/rowset_reader_context.h index e44733367c8441..2dde24e0e197a3 100644 --- a/be/src/storage/rowset/rowset_reader_context.h +++ b/be/src/storage/rowset/rowset_reader_context.h @@ -103,6 +103,10 @@ struct RowsetReaderContext { // When true, push down value predicates for MOR tables bool enable_mor_value_predicate_pushdown = false; + + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. + // Propagated from ReaderParams.general_read_limit. + int64_t general_read_limit = -1; }; } // namespace doris diff --git a/be/src/storage/tablet/tablet_reader.cpp b/be/src/storage/tablet/tablet_reader.cpp index b241d621d05858..9f751142322014 100644 --- a/be/src/storage/tablet/tablet_reader.cpp +++ b/be/src/storage/tablet/tablet_reader.cpp @@ -216,6 +216,8 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.all_access_paths = read_params.all_access_paths; _reader_context.predicate_access_paths = read_params.predicate_access_paths; + // Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW + _reader_context.general_read_limit = read_params.general_read_limit; return Status::OK(); } diff --git a/be/src/storage/tablet/tablet_reader.h b/be/src/storage/tablet/tablet_reader.h index e4842086cc86bf..2fb6d18c1224c2 100644 --- a/be/src/storage/tablet/tablet_reader.h +++ b/be/src/storage/tablet/tablet_reader.h @@ -207,6 +207,11 @@ class TabletReader { std::shared_ptr ann_topn_runtime; uint64_t condition_cache_digest = 0; + + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. + // When > 0, the storage layer (VCollectIterator) will stop reading + // after returning this many rows. -1 means no limit. + int64_t general_read_limit = -1; }; TabletReader() = default; diff --git a/regression-test/data/query_p0/limit/test_general_limit_pushdown.out b/regression-test/data/query_p0/limit/test_general_limit_pushdown.out new file mode 100644 index 00000000000000..7ef54df8aaf6b4 --- /dev/null +++ b/regression-test/data/query_p0/limit/test_general_limit_pushdown.out @@ -0,0 +1,86 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !dup_basic_limit -- +1 99 +10 90 +2 98 +3 97 +4 96 +5 95 +6 94 +7 93 +8 92 +9 91 + +-- !dup_filter_limit -- +11 89 +12 88 +13 87 +14 86 +15 85 +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 +24 76 +25 75 + +-- !dup_filter_over_limit -- +46 54 +47 53 +48 52 +49 51 +50 50 + +-- !dup_complex_filter_limit -- +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 + +-- !mow_basic_limit -- +1 99 +10 90 +2 98 +3 97 +4 96 +5 95 +6 94 +7 93 +8 92 +9 91 + +-- !mow_filter_limit -- +11 89 +12 88 +13 87 +14 86 +15 85 +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 +24 76 +25 75 + +-- !mow_complex_filter_limit -- +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 + diff --git a/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy new file mode 100644 index 00000000000000..e21e1ecfba1b3d --- /dev/null +++ b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Test general LIMIT pushdown into the storage layer (VCollectIterator). +// This optimization is active only for DUP_KEYS and UNIQUE_KEYS with MOW +// on the non-merge read path (no ORDER BY on key columns). +suite("test_general_limit_pushdown", "p0") { + + // ------------------------------------------------------------------------- + // Positive: DUP_KEYS single bucket — basic correctness + // ------------------------------------------------------------------------- + sql "drop table if exists dup_single_bucket" + sql """ + CREATE TABLE dup_single_bucket ( + id INT, + val INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES("replication_num" = "1") + """ + sql """ + INSERT INTO dup_single_bucket + SELECT number, number * 10 + FROM numbers("number" = "100") + """ + + // LIMIT 5 must return exactly 5 rows + def res1 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5) t" + assertEquals(5, res1[0][0]) + + // LIMIT 0 must return 0 rows + def res2 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 0) t" + assertEquals(0, res2[0][0]) + + // LIMIT with OFFSET: LIMIT 5 OFFSET 10 must return exactly 5 rows + // (assuming table has >= 15 rows, which it does: 100 rows) + def res3 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5 OFFSET 10) t" + assertEquals(5, res3[0][0]) + + // LIMIT larger than table size: should return all rows (100) + def res4 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 200) t" + assertEquals(100, res4[0][0]) + + // ------------------------------------------------------------------------- + // Positive: DUP_KEYS multi-bucket — correctness with multiple scanners + // ------------------------------------------------------------------------- + sql "drop table if exists dup_multi_bucket" + sql """ + CREATE TABLE dup_multi_bucket ( + id INT, + val INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES("replication_num" = "1") + """ + sql """ + INSERT INTO dup_multi_bucket + SELECT number, number * 10 + FROM numbers("number" = "1000") + """ + + // With 4 buckets each scanner enforces the limit independently; the outer + // pipeline limit must truncate to the global LIMIT value. + def res5 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 5) t" + assertEquals(5, res5[0][0]) + + def res6 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 0) t" + assertEquals(0, res6[0][0]) + + def res7 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 10 OFFSET 20) t" + assertEquals(10, res7[0][0]) + + def res8 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 2000) t" + assertEquals(1000, res8[0][0]) + + // ------------------------------------------------------------------------- + // Positive: UNIQUE_KEYS with MOW — limit pushdown should activate + // ------------------------------------------------------------------------- + sql "drop table if exists unique_mow" + sql """ + CREATE TABLE unique_mow ( + id INT, + val INT + ) ENGINE=OLAP + UNIQUE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true" + ) + """ + sql """ + INSERT INTO unique_mow + SELECT number, number * 10 + FROM numbers("number" = "500") + """ + + def res9 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5) t" + assertEquals(5, res9[0][0]) + + def res10 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 0) t" + assertEquals(0, res10[0][0]) + + def res11 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5 OFFSET 10) t" + assertEquals(5, res11[0][0]) + + // ------------------------------------------------------------------------- + // Negative: AGG_KEYS — limit pushdown must NOT short-circuit results + // (AGG_KEYS requires merge, which disables _general_read_limit) + // Correctness test: the query must still return the right count. + // ------------------------------------------------------------------------- + sql "drop table if exists agg_keys_tbl" + sql """ + CREATE TABLE agg_keys_tbl ( + id INT, + val BIGINT SUM + ) ENGINE=OLAP + AGGREGATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES("replication_num" = "1") + """ + sql """ + INSERT INTO agg_keys_tbl + SELECT number, number + FROM numbers("number" = "200") + """ + + // Limit pushdown is not active here; results must still be correct. + def res12 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 5) t" + assertEquals(5, res12[0][0]) + + def res13 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 0) t" + assertEquals(0, res13[0][0]) + + // ------------------------------------------------------------------------- + // Negative: UNIQUE_KEYS MOR (without MOW) — limit pushdown must NOT activate + // Correctness test: query must still return the right count. + // ------------------------------------------------------------------------- + sql "drop table if exists unique_mor" + sql """ + CREATE TABLE unique_mor ( + id INT, + val INT + ) ENGINE=OLAP + UNIQUE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false" + ) + """ + sql """ + INSERT INTO unique_mor + SELECT number, number * 10 + FROM numbers("number" = "300") + """ + + def res14 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 5) t" + assertEquals(5, res14[0][0]) + + def res15 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 0) t" + assertEquals(0, res15[0][0]) +}