From b1d1aaeba5cab3eb23a478b2128069e424a80d63 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 25 Mar 2026 15:49:31 +0800 Subject: [PATCH 1/3] update limit pushdown to tablet reader --- be/src/exec/scan/olap_scanner.cpp | 8 +++++ be/src/storage/iterator/vcollect_iterator.cpp | 35 ++++++++++++++++++- be/src/storage/iterator/vcollect_iterator.h | 6 ++++ be/src/storage/iterators.h | 4 +++ be/src/storage/rowset/beta_rowset_reader.cpp | 1 + be/src/storage/rowset/rowset_reader_context.h | 4 +++ be/src/storage/tablet/tablet_reader.cpp | 2 ++ be/src/storage/tablet/tablet_reader.h | 5 +++ 8 files changed, 64 insertions(+), 1 deletion(-) diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index b1f6f8531b9769..670b2ca511c378 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -513,6 +513,14 @@ Status OlapScanner::_init_tablet_reader_params( } } + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. + // Only set when the ORDER BY key topn optimization is NOT active, + // since topn already handles its own limit enforcement. + // The _limit value comes from the operator's LIMIT clause. + if (!_tablet_reader_params.read_orderby_key && _limit > 0) { + _tablet_reader_params.general_read_limit = _limit; + } + // If this is a Two-Phase read query, and we need to delay the release of Rowset // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { diff --git a/be/src/storage/iterator/vcollect_iterator.cpp b/be/src/storage/iterator/vcollect_iterator.cpp index 4065efefb7e752..a95919b81b9df2 100644 --- a/be/src/storage/iterator/vcollect_iterator.cpp +++ b/be/src/storage/iterator/vcollect_iterator.cpp @@ -94,6 +94,16 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo _topn_limit = 0; DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0); } + + // General limit pushdown: only for READER_QUERY on non-merge path + // (DUP_KEYS or UNIQUE_KEYS with MOW) + if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY && + _reader->_reader_context.general_read_limit > 0 && + (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || + (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _reader->_tablet->enable_unique_key_merge_on_write()))) { + _general_read_limit = _reader->_reader_context.general_read_limit; + } } Status VCollectIterator::add_child(const RowSetSplits& rs_splits) { @@ -248,8 +258,31 @@ Status VCollectIterator::next(Block* block) { return _topn_next(block); } + // Fast path: if general limit already reached, return EOF immediately + if (_general_read_limit > 0 && _general_rows_returned >= _general_read_limit) { + return Status::Error(""); + } + if (LIKELY(_inner_iter)) { - return _inner_iter->next(block); + auto st = _inner_iter->next(block); + if (UNLIKELY(!st.ok())) { + return st; + } + + // Enforce general read limit: truncate block if needed + if (_general_read_limit > 0) { + _general_rows_returned += block->rows(); + if (_general_rows_returned > _general_read_limit) { + // Truncate block to return exactly the remaining rows needed + int64_t excess = _general_rows_returned - _general_read_limit; + int64_t keep = block->rows() - excess; + DCHECK_GT(keep, 0); + block->set_num_rows(keep); + _general_rows_returned = _general_read_limit; + } + } + + return Status::OK(); } else { return Status::Error(""); } diff --git a/be/src/storage/iterator/vcollect_iterator.h b/be/src/storage/iterator/vcollect_iterator.h index 4201546c04882b..2b1731a3503aae 100644 --- a/be/src/storage/iterator/vcollect_iterator.h +++ b/be/src/storage/iterator/vcollect_iterator.h @@ -350,6 +350,12 @@ class VCollectIterator { bool _topn_eof = false; std::vector _rs_splits; + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge path). + // When > 0, VCollectIterator will stop reading after returning this many rows. + int64_t _general_read_limit = -1; + // Number of rows already returned to the caller. + int64_t _general_rows_returned = 0; + // Hold reader point to access read params, such as fetch conditions. TabletReader* _reader = nullptr; diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h index a55f87e0cea561..376ed419094ffc 100644 --- a/be/src/storage/iterators.h +++ b/be/src/storage/iterators.h @@ -152,6 +152,10 @@ class StorageReadOptions { std::unordered_map sparse_column_cache; uint64_t condition_cache_digest = 0; + + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. + // Propagated from RowsetReaderContext.general_read_limit. + int64_t general_read_limit = -1; }; struct CompactionSampleInfo { diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp b/be/src/storage/rowset/beta_rowset_reader.cpp index 2869659ed129f5..052525262926b5 100644 --- a/be/src/storage/rowset/beta_rowset_reader.cpp +++ b/be/src/storage/rowset/beta_rowset_reader.cpp @@ -116,6 +116,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.version = _rowset->version(); _read_options.tablet_id = _rowset->rowset_meta()->tablet_id(); _read_options.topn_limit = _topn_limit; + _read_options.general_read_limit = _read_context->general_read_limit; if (_read_context->lower_bound_keys != nullptr) { for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) { _read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i), diff --git a/be/src/storage/rowset/rowset_reader_context.h b/be/src/storage/rowset/rowset_reader_context.h index e44733367c8441..2dde24e0e197a3 100644 --- a/be/src/storage/rowset/rowset_reader_context.h +++ b/be/src/storage/rowset/rowset_reader_context.h @@ -103,6 +103,10 @@ struct RowsetReaderContext { // When true, push down value predicates for MOR tables bool enable_mor_value_predicate_pushdown = false; + + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. + // Propagated from ReaderParams.general_read_limit. + int64_t general_read_limit = -1; }; } // namespace doris diff --git a/be/src/storage/tablet/tablet_reader.cpp b/be/src/storage/tablet/tablet_reader.cpp index b241d621d05858..9f751142322014 100644 --- a/be/src/storage/tablet/tablet_reader.cpp +++ b/be/src/storage/tablet/tablet_reader.cpp @@ -216,6 +216,8 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.all_access_paths = read_params.all_access_paths; _reader_context.predicate_access_paths = read_params.predicate_access_paths; + // Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW + _reader_context.general_read_limit = read_params.general_read_limit; return Status::OK(); } diff --git a/be/src/storage/tablet/tablet_reader.h b/be/src/storage/tablet/tablet_reader.h index e4842086cc86bf..2fb6d18c1224c2 100644 --- a/be/src/storage/tablet/tablet_reader.h +++ b/be/src/storage/tablet/tablet_reader.h @@ -207,6 +207,11 @@ class TabletReader { std::shared_ptr ann_topn_runtime; uint64_t condition_cache_digest = 0; + + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. + // When > 0, the storage layer (VCollectIterator) will stop reading + // after returning this many rows. -1 means no limit. + int64_t general_read_limit = -1; }; TabletReader() = default; From d1413e874eccead12033c2b6a7ba1ce9fc5c5a4e Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 25 Mar 2026 20:42:38 +0800 Subject: [PATCH 2/3] fix --- be/src/exec/scan/olap_scanner.cpp | 16 +-- be/src/storage/iterator/vcollect_iterator.cpp | 21 +-- be/src/storage/iterators.h | 4 - be/src/storage/rowset/beta_rowset_reader.cpp | 1 - .../limit/test_general_limit_pushdown.out | 86 +++++++++++ .../limit/test_general_limit_pushdown.groovy | 135 ++++++++++++++++++ 6 files changed, 242 insertions(+), 21 deletions(-) create mode 100644 regression-test/data/query_p0/limit/test_general_limit_pushdown.out create mode 100644 regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 670b2ca511c378..8180311f49ebac 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -502,6 +502,14 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.filter_block_conjuncts = _conjuncts; _conjuncts.clear(); } + } else if (_limit > 0 && olap_scan_local_state->_storage_no_merge()) { + // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge path). + // Only when topn optimization is NOT active (topn handles its own limit). + // Move _conjuncts into storage layer so that general_read_limit counts + // post-filter rows, same as the topn path above. + _tablet_reader_params.general_read_limit = _limit; + _tablet_reader_params.filter_block_conjuncts = _conjuncts; + _conjuncts.clear(); } // set push down topn filter @@ -513,14 +521,6 @@ Status OlapScanner::_init_tablet_reader_params( } } - // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. - // Only set when the ORDER BY key topn optimization is NOT active, - // since topn already handles its own limit enforcement. - // The _limit value comes from the operator's LIMIT clause. - if (!_tablet_reader_params.read_orderby_key && _limit > 0) { - _tablet_reader_params.general_read_limit = _limit; - } - // If this is a Two-Phase read query, and we need to delay the release of Rowset // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { diff --git a/be/src/storage/iterator/vcollect_iterator.cpp b/be/src/storage/iterator/vcollect_iterator.cpp index a95919b81b9df2..fa128713e94866 100644 --- a/be/src/storage/iterator/vcollect_iterator.cpp +++ b/be/src/storage/iterator/vcollect_iterator.cpp @@ -92,16 +92,13 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo _topn_limit = _reader->_reader_context.read_orderby_key_limit; } else { _topn_limit = 0; - DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0); } - // General limit pushdown: only for READER_QUERY on non-merge path - // (DUP_KEYS or UNIQUE_KEYS with MOW) - if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY && - _reader->_reader_context.general_read_limit > 0 && - (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || - (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _reader->_tablet->enable_unique_key_merge_on_write()))) { + // General limit pushdown: only for non-merge path (DUP_KEYS or UNIQUE_KEYS with MOW). + // The scanner already guards this with _storage_no_merge(), but we also check !_merge + // here because _merge can be forced true by overlapping data (force_merge), in which + // case limit pushdown is not safe. + if (!_merge && _reader->_reader_context.general_read_limit > 0) { _general_read_limit = _reader->_reader_context.general_read_limit; } } @@ -269,6 +266,14 @@ Status VCollectIterator::next(Block* block) { return st; } + // Apply filter_block_conjuncts that were moved from Scanner::_conjuncts. + // This must happen BEFORE limit counting so that _general_rows_returned + // reflects post-filter rows (same pattern as _topn_next). + if (!_reader->_reader_context.filter_block_conjuncts.empty()) { + RETURN_IF_ERROR(VExprContext::filter_block( + _reader->_reader_context.filter_block_conjuncts, block, block->columns())); + } + // Enforce general read limit: truncate block if needed if (_general_read_limit > 0) { _general_rows_returned += block->rows(); diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h index 376ed419094ffc..a55f87e0cea561 100644 --- a/be/src/storage/iterators.h +++ b/be/src/storage/iterators.h @@ -152,10 +152,6 @@ class StorageReadOptions { std::unordered_map sparse_column_cache; uint64_t condition_cache_digest = 0; - - // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW. - // Propagated from RowsetReaderContext.general_read_limit. - int64_t general_read_limit = -1; }; struct CompactionSampleInfo { diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp b/be/src/storage/rowset/beta_rowset_reader.cpp index 052525262926b5..2869659ed129f5 100644 --- a/be/src/storage/rowset/beta_rowset_reader.cpp +++ b/be/src/storage/rowset/beta_rowset_reader.cpp @@ -116,7 +116,6 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.version = _rowset->version(); _read_options.tablet_id = _rowset->rowset_meta()->tablet_id(); _read_options.topn_limit = _topn_limit; - _read_options.general_read_limit = _read_context->general_read_limit; if (_read_context->lower_bound_keys != nullptr) { for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) { _read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i), diff --git a/regression-test/data/query_p0/limit/test_general_limit_pushdown.out b/regression-test/data/query_p0/limit/test_general_limit_pushdown.out new file mode 100644 index 00000000000000..7ef54df8aaf6b4 --- /dev/null +++ b/regression-test/data/query_p0/limit/test_general_limit_pushdown.out @@ -0,0 +1,86 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !dup_basic_limit -- +1 99 +10 90 +2 98 +3 97 +4 96 +5 95 +6 94 +7 93 +8 92 +9 91 + +-- !dup_filter_limit -- +11 89 +12 88 +13 87 +14 86 +15 85 +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 +24 76 +25 75 + +-- !dup_filter_over_limit -- +46 54 +47 53 +48 52 +49 51 +50 50 + +-- !dup_complex_filter_limit -- +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 + +-- !mow_basic_limit -- +1 99 +10 90 +2 98 +3 97 +4 96 +5 95 +6 94 +7 93 +8 92 +9 91 + +-- !mow_filter_limit -- +11 89 +12 88 +13 87 +14 86 +15 85 +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 +24 76 +25 75 + +-- !mow_complex_filter_limit -- +16 84 +17 83 +18 82 +19 81 +20 80 +21 79 +22 78 +23 77 + diff --git a/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy new file mode 100644 index 00000000000000..615c56ea8d7964 --- /dev/null +++ b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Test general limit pushdown to storage layer for DUP_KEYS and UNIQUE_KEYS (MOW). +// This exercises the non-topn limit path where VCollectIterator enforces +// general_read_limit with filter_block_conjuncts applied before counting. + +suite("test_general_limit_pushdown") { + + // ---- DUP_KEYS table ---- + sql "DROP TABLE IF EXISTS dup_limit_pushdown" + sql """ + CREATE TABLE dup_limit_pushdown ( + k1 INT NOT NULL, + k2 INT NOT NULL, + v1 VARCHAR(100) NULL + ) + DUPLICATE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); + """ + + // Insert 50 rows: k1 in [1..50], k2 = 100 - k1, v1 = 'val_' + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO dup_limit_pushdown VALUES ") + for (int i = 1; i <= 50; i++) { + if (i > 1) sb.append(",") + sb.append("(${i}, ${100 - i}, 'val_${i}')") + } + sql sb.toString() + + // Basic LIMIT without ORDER BY — exercises general limit pushdown path. + // Use order_qt_ to ensure deterministic output ordering. + order_qt_dup_basic_limit """ + SELECT k1, k2 FROM dup_limit_pushdown LIMIT 10 + """ + + // LIMIT with WHERE clause — filter_block_conjuncts must be applied before + // limit counting, otherwise we may get fewer rows than requested. + // k1 > 10 matches 40 rows, LIMIT 15 should return exactly 15. + order_qt_dup_filter_limit """ + SELECT k1, k2 FROM dup_limit_pushdown WHERE k1 > 10 LIMIT 15 + """ + + // LIMIT larger than matching rows — should return all matching rows. + // k1 > 45 matches 5 rows, LIMIT 20 should return all 5. + order_qt_dup_filter_over_limit """ + SELECT k1, k2 FROM dup_limit_pushdown WHERE k1 > 45 LIMIT 20 + """ + + // LIMIT with complex predicate (function-based, may not push into storage predicates). + // This exercises the filter_block_conjuncts path for predicates that remain as conjuncts. + order_qt_dup_complex_filter_limit """ + SELECT k1, k2 FROM dup_limit_pushdown WHERE abs(k1 - 25) < 10 LIMIT 8 + """ + + // ---- UNIQUE_KEYS with MOW table ---- + sql "DROP TABLE IF EXISTS mow_limit_pushdown" + sql """ + CREATE TABLE mow_limit_pushdown ( + k1 INT NOT NULL, + k2 INT NOT NULL, + v1 VARCHAR(100) NULL + ) + UNIQUE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + sb = new StringBuilder() + sb.append("INSERT INTO mow_limit_pushdown VALUES ") + for (int i = 1; i <= 50; i++) { + if (i > 1) sb.append(",") + sb.append("(${i}, ${100 - i}, 'val_${i}')") + } + sql sb.toString() + + // Basic LIMIT without ORDER BY on MOW table. + order_qt_mow_basic_limit """ + SELECT k1, k2 FROM mow_limit_pushdown LIMIT 10 + """ + + // LIMIT with WHERE on MOW table. + order_qt_mow_filter_limit """ + SELECT k1, k2 FROM mow_limit_pushdown WHERE k1 > 10 LIMIT 15 + """ + + // LIMIT with complex predicate on MOW table. + order_qt_mow_complex_filter_limit """ + SELECT k1, k2 FROM mow_limit_pushdown WHERE abs(k1 - 25) < 10 LIMIT 8 + """ + + // ---- Verify row count correctness with COUNT ---- + // These verify the LIMIT returns the expected number of rows, + // protecting against the pre-filter counting bug. + + def dup_count = sql """ + SELECT COUNT(*) FROM ( + SELECT k1 FROM dup_limit_pushdown WHERE k1 > 10 LIMIT 15 + ) t + """ + assert dup_count[0][0] == 15 : "DUP_KEYS: expected 15 rows with WHERE k1>10 LIMIT 15, got ${dup_count[0][0]}" + + def mow_count = sql """ + SELECT COUNT(*) FROM ( + SELECT k1 FROM mow_limit_pushdown WHERE k1 > 10 LIMIT 15 + ) t + """ + assert mow_count[0][0] == 15 : "MOW: expected 15 rows with WHERE k1>10 LIMIT 15, got ${mow_count[0][0]}" + + // With complex predicate (abs(k1-25) < 10 => k1 in 16..34, 19 rows; LIMIT 8 should return 8) + def dup_complex_count = sql """ + SELECT COUNT(*) FROM ( + SELECT k1 FROM dup_limit_pushdown WHERE abs(k1 - 25) < 10 LIMIT 8 + ) t + """ + assert dup_complex_count[0][0] == 8 : "DUP_KEYS complex filter: expected 8 rows, got ${dup_complex_count[0][0]}" +} From 51d3d8c8744fd2f4ac393277e97d830fef8d854c Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 26 Mar 2026 18:27:22 +0800 Subject: [PATCH 3/3] update --- be/src/storage/iterator/vcollect_iterator.cpp | 43 ++-- .../limit/test_general_limit_pushdown.groovy | 234 +++++++++++------- 2 files changed, 157 insertions(+), 120 deletions(-) diff --git a/be/src/storage/iterator/vcollect_iterator.cpp b/be/src/storage/iterator/vcollect_iterator.cpp index fa128713e94866..9b3b7705384de6 100644 --- a/be/src/storage/iterator/vcollect_iterator.cpp +++ b/be/src/storage/iterator/vcollect_iterator.cpp @@ -92,13 +92,16 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo _topn_limit = _reader->_reader_context.read_orderby_key_limit; } else { _topn_limit = 0; + DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0); } - // General limit pushdown: only for non-merge path (DUP_KEYS or UNIQUE_KEYS with MOW). - // The scanner already guards this with _storage_no_merge(), but we also check !_merge - // here because _merge can be forced true by overlapping data (force_merge), in which - // case limit pushdown is not safe. - if (!_merge && _reader->_reader_context.general_read_limit > 0) { + // General limit pushdown: only for READER_QUERY on non-merge path + // (DUP_KEYS or UNIQUE_KEYS with MOW) + if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY && + _reader->_reader_context.general_read_limit > 0 && + (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || + (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _reader->_tablet->enable_unique_key_merge_on_write()))) { _general_read_limit = _reader->_reader_context.general_read_limit; } } @@ -255,27 +258,16 @@ Status VCollectIterator::next(Block* block) { return _topn_next(block); } - // Fast path: if general limit already reached, return EOF immediately - if (_general_read_limit > 0 && _general_rows_returned >= _general_read_limit) { - return Status::Error(""); - } - if (LIKELY(_inner_iter)) { - auto st = _inner_iter->next(block); - if (UNLIKELY(!st.ok())) { - return st; - } + if (_general_read_limit > 0) { + // Fast path: if general limit already reached, return EOF immediately + if (_general_rows_returned >= _general_read_limit) { + return Status::Error(""); + } - // Apply filter_block_conjuncts that were moved from Scanner::_conjuncts. - // This must happen BEFORE limit counting so that _general_rows_returned - // reflects post-filter rows (same pattern as _topn_next). - if (!_reader->_reader_context.filter_block_conjuncts.empty()) { - RETURN_IF_ERROR(VExprContext::filter_block( - _reader->_reader_context.filter_block_conjuncts, block, block->columns())); - } + RETURN_IF_ERROR(_inner_iter->next(block)); - // Enforce general read limit: truncate block if needed - if (_general_read_limit > 0) { + // Enforce general read limit: truncate block if needed _general_rows_returned += block->rows(); if (_general_rows_returned > _general_read_limit) { // Truncate block to return exactly the remaining rows needed @@ -285,9 +277,10 @@ Status VCollectIterator::next(Block* block) { block->set_num_rows(keep); _general_rows_returned = _general_read_limit; } - } - return Status::OK(); + return Status::OK(); + } + return _inner_iter->next(block); } else { return Status::Error(""); } diff --git a/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy index 615c56ea8d7964..e21e1ecfba1b3d 100644 --- a/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy +++ b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy @@ -15,121 +15,165 @@ // specific language governing permissions and limitations // under the License. -// Test general limit pushdown to storage layer for DUP_KEYS and UNIQUE_KEYS (MOW). -// This exercises the non-topn limit path where VCollectIterator enforces -// general_read_limit with filter_block_conjuncts applied before counting. +// Test general LIMIT pushdown into the storage layer (VCollectIterator). +// This optimization is active only for DUP_KEYS and UNIQUE_KEYS with MOW +// on the non-merge read path (no ORDER BY on key columns). +suite("test_general_limit_pushdown", "p0") { + + // ------------------------------------------------------------------------- + // Positive: DUP_KEYS single bucket — basic correctness + // ------------------------------------------------------------------------- + sql "drop table if exists dup_single_bucket" + sql """ + CREATE TABLE dup_single_bucket ( + id INT, + val INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES("replication_num" = "1") + """ + sql """ + INSERT INTO dup_single_bucket + SELECT number, number * 10 + FROM numbers("number" = "100") + """ -suite("test_general_limit_pushdown") { + // LIMIT 5 must return exactly 5 rows + def res1 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5) t" + assertEquals(5, res1[0][0]) - // ---- DUP_KEYS table ---- - sql "DROP TABLE IF EXISTS dup_limit_pushdown" + // LIMIT 0 must return 0 rows + def res2 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 0) t" + assertEquals(0, res2[0][0]) + + // LIMIT with OFFSET: LIMIT 5 OFFSET 10 must return exactly 5 rows + // (assuming table has >= 15 rows, which it does: 100 rows) + def res3 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5 OFFSET 10) t" + assertEquals(5, res3[0][0]) + + // LIMIT larger than table size: should return all rows (100) + def res4 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 200) t" + assertEquals(100, res4[0][0]) + + // ------------------------------------------------------------------------- + // Positive: DUP_KEYS multi-bucket — correctness with multiple scanners + // ------------------------------------------------------------------------- + sql "drop table if exists dup_multi_bucket" sql """ - CREATE TABLE dup_limit_pushdown ( - k1 INT NOT NULL, - k2 INT NOT NULL, - v1 VARCHAR(100) NULL - ) - DUPLICATE KEY(k1, k2) - DISTRIBUTED BY HASH(k1) BUCKETS 1 - PROPERTIES ("replication_num" = "1"); + CREATE TABLE dup_multi_bucket ( + id INT, + val INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES("replication_num" = "1") """ - - // Insert 50 rows: k1 in [1..50], k2 = 100 - k1, v1 = 'val_' - StringBuilder sb = new StringBuilder() - sb.append("INSERT INTO dup_limit_pushdown VALUES ") - for (int i = 1; i <= 50; i++) { - if (i > 1) sb.append(",") - sb.append("(${i}, ${100 - i}, 'val_${i}')") - } - sql sb.toString() - - // Basic LIMIT without ORDER BY — exercises general limit pushdown path. - // Use order_qt_ to ensure deterministic output ordering. - order_qt_dup_basic_limit """ - SELECT k1, k2 FROM dup_limit_pushdown LIMIT 10 + sql """ + INSERT INTO dup_multi_bucket + SELECT number, number * 10 + FROM numbers("number" = "1000") """ - // LIMIT with WHERE clause — filter_block_conjuncts must be applied before - // limit counting, otherwise we may get fewer rows than requested. - // k1 > 10 matches 40 rows, LIMIT 15 should return exactly 15. - order_qt_dup_filter_limit """ - SELECT k1, k2 FROM dup_limit_pushdown WHERE k1 > 10 LIMIT 15 - """ + // With 4 buckets each scanner enforces the limit independently; the outer + // pipeline limit must truncate to the global LIMIT value. + def res5 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 5) t" + assertEquals(5, res5[0][0]) - // LIMIT larger than matching rows — should return all matching rows. - // k1 > 45 matches 5 rows, LIMIT 20 should return all 5. - order_qt_dup_filter_over_limit """ - SELECT k1, k2 FROM dup_limit_pushdown WHERE k1 > 45 LIMIT 20 - """ + def res6 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 0) t" + assertEquals(0, res6[0][0]) - // LIMIT with complex predicate (function-based, may not push into storage predicates). - // This exercises the filter_block_conjuncts path for predicates that remain as conjuncts. - order_qt_dup_complex_filter_limit """ - SELECT k1, k2 FROM dup_limit_pushdown WHERE abs(k1 - 25) < 10 LIMIT 8 - """ + def res7 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 10 OFFSET 20) t" + assertEquals(10, res7[0][0]) + + def res8 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 2000) t" + assertEquals(1000, res8[0][0]) - // ---- UNIQUE_KEYS with MOW table ---- - sql "DROP TABLE IF EXISTS mow_limit_pushdown" + // ------------------------------------------------------------------------- + // Positive: UNIQUE_KEYS with MOW — limit pushdown should activate + // ------------------------------------------------------------------------- + sql "drop table if exists unique_mow" sql """ - CREATE TABLE mow_limit_pushdown ( - k1 INT NOT NULL, - k2 INT NOT NULL, - v1 VARCHAR(100) NULL - ) - UNIQUE KEY(k1, k2) - DISTRIBUTED BY HASH(k1) BUCKETS 1 - PROPERTIES ( + CREATE TABLE unique_mow ( + id INT, + val INT + ) ENGINE=OLAP + UNIQUE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES( "replication_num" = "1", "enable_unique_key_merge_on_write" = "true" - ); + ) """ - - sb = new StringBuilder() - sb.append("INSERT INTO mow_limit_pushdown VALUES ") - for (int i = 1; i <= 50; i++) { - if (i > 1) sb.append(",") - sb.append("(${i}, ${100 - i}, 'val_${i}')") - } - sql sb.toString() - - // Basic LIMIT without ORDER BY on MOW table. - order_qt_mow_basic_limit """ - SELECT k1, k2 FROM mow_limit_pushdown LIMIT 10 + sql """ + INSERT INTO unique_mow + SELECT number, number * 10 + FROM numbers("number" = "500") """ - // LIMIT with WHERE on MOW table. - order_qt_mow_filter_limit """ - SELECT k1, k2 FROM mow_limit_pushdown WHERE k1 > 10 LIMIT 15 - """ + def res9 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5) t" + assertEquals(5, res9[0][0]) - // LIMIT with complex predicate on MOW table. - order_qt_mow_complex_filter_limit """ - SELECT k1, k2 FROM mow_limit_pushdown WHERE abs(k1 - 25) < 10 LIMIT 8 - """ + def res10 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 0) t" + assertEquals(0, res10[0][0]) - // ---- Verify row count correctness with COUNT ---- - // These verify the LIMIT returns the expected number of rows, - // protecting against the pre-filter counting bug. + def res11 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5 OFFSET 10) t" + assertEquals(5, res11[0][0]) - def dup_count = sql """ - SELECT COUNT(*) FROM ( - SELECT k1 FROM dup_limit_pushdown WHERE k1 > 10 LIMIT 15 - ) t + // ------------------------------------------------------------------------- + // Negative: AGG_KEYS — limit pushdown must NOT short-circuit results + // (AGG_KEYS requires merge, which disables _general_read_limit) + // Correctness test: the query must still return the right count. + // ------------------------------------------------------------------------- + sql "drop table if exists agg_keys_tbl" + sql """ + CREATE TABLE agg_keys_tbl ( + id INT, + val BIGINT SUM + ) ENGINE=OLAP + AGGREGATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES("replication_num" = "1") """ - assert dup_count[0][0] == 15 : "DUP_KEYS: expected 15 rows with WHERE k1>10 LIMIT 15, got ${dup_count[0][0]}" - - def mow_count = sql """ - SELECT COUNT(*) FROM ( - SELECT k1 FROM mow_limit_pushdown WHERE k1 > 10 LIMIT 15 - ) t + sql """ + INSERT INTO agg_keys_tbl + SELECT number, number + FROM numbers("number" = "200") """ - assert mow_count[0][0] == 15 : "MOW: expected 15 rows with WHERE k1>10 LIMIT 15, got ${mow_count[0][0]}" - // With complex predicate (abs(k1-25) < 10 => k1 in 16..34, 19 rows; LIMIT 8 should return 8) - def dup_complex_count = sql """ - SELECT COUNT(*) FROM ( - SELECT k1 FROM dup_limit_pushdown WHERE abs(k1 - 25) < 10 LIMIT 8 - ) t + // Limit pushdown is not active here; results must still be correct. + def res12 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 5) t" + assertEquals(5, res12[0][0]) + + def res13 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 0) t" + assertEquals(0, res13[0][0]) + + // ------------------------------------------------------------------------- + // Negative: UNIQUE_KEYS MOR (without MOW) — limit pushdown must NOT activate + // Correctness test: query must still return the right count. + // ------------------------------------------------------------------------- + sql "drop table if exists unique_mor" + sql """ + CREATE TABLE unique_mor ( + id INT, + val INT + ) ENGINE=OLAP + UNIQUE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false" + ) """ - assert dup_complex_count[0][0] == 8 : "DUP_KEYS complex filter: expected 8 rows, got ${dup_complex_count[0][0]}" + sql """ + INSERT INTO unique_mor + SELECT number, number * 10 + FROM numbers("number" = "300") + """ + + def res14 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 5) t" + assertEquals(5, res14[0][0]) + + def res15 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 0) t" + assertEquals(0, res15[0][0]) }