Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.filter_block_conjuncts = _conjuncts;
_conjuncts.clear();
}
} else if (_limit > 0 && olap_scan_local_state->_storage_no_merge()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Note - Correctness analysis] _limit here is tnode.limit (the full query LIMIT), not a per-scanner fraction. This is correct: each scanner independently limits its storage reads to N rows, and the outer ScanOperatorX::get_block() -> reached_limit() enforces the global N across all scanners. This matches the existing topn pattern.

However, note that for queries like SELECT * FROM t WHERE rare_condition LIMIT 10 with many tablets, every scanner reads up to 10 post-filter rows even though only 10 total are needed. The adaptive_pipeline_task_serial_read_on_limit optimization mitigates this for small limits by forcing serial scan, but for larger limits the amplification factor is O(num_scanners). This is a known trade-off, not a bug.

// General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge path).
// Only when topn optimization is NOT active (topn handles its own limit).
// Move _conjuncts into storage layer so that general_read_limit counts
// post-filter rows, same as the topn path above.
_tablet_reader_params.general_read_limit = _limit;
_tablet_reader_params.filter_block_conjuncts = _conjuncts;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug [Critical]: filter_block_conjuncts set but never consumed in the general limit path

This line moves _conjuncts into filter_block_conjuncts, and line 512 clears _conjuncts. However:

  1. filter_block_conjuncts is only evaluated inside VCollectIterator::_topn_next() (at vcollect_iterator.cpp:357-358). The new general limit path in VCollectIterator::next(Block*) (lines 262-281) never calls VExprContext::filter_block(filter_block_conjuncts, ...).

  2. Since _conjuncts is cleared, Scanner::_filter_output_block() becomes a no-op (empty conjuncts vector → early return in VExprContext::filter_block).

Result: For queries with WHERE clauses (e.g., SELECT * FROM t WHERE col > 5 LIMIT 10), the filter predicates are completely dropped. The limit counts unfiltered rows, and unfiltered rows are returned to the user.

Fix: Either (a) don't move _conjuncts into filter_block_conjuncts for the general limit path (keep filtering at the scanner level), or (b) add explicit VExprContext::filter_block calls in the general limit next() path in VCollectIterator, mirroring _topn_next.

_conjuncts.clear();
}

// set push down topn filter
Expand Down
31 changes: 31 additions & 0 deletions be/src/storage/iterator/vcollect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo
_topn_limit = 0;
DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug [Crash in ASAN/Debug]: Existing DCHECK at line 95 will fire

Two lines above this new code, at line 95 (context line in this diff, position 2), there is:

DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0);

This DCHECK is in the else branch — it fires when read_orderby_key_limit is 0 (topn NOT active). In the general limit path, read_orderby_key_limit is indeed 0, but filter_block_conjuncts is non-empty (set at olap_scanner.cpp:511). This means:

  • In debug/ASAN builds: process crashes on the DCHECK assertion failure.
  • In release builds: DCHECK is a no-op, so the crash is masked but the conjuncts are still silently lost.

This DCHECK needs to be updated to allow filter_block_conjuncts to be non-empty when general_read_limit > 0.

// General limit pushdown: only for READER_QUERY on non-merge path
// (DUP_KEYS or UNIQUE_KEYS with MOW)
if (!_merge && _reader->_reader_type == ReaderType::READER_QUERY &&
_reader->_reader_context.general_read_limit > 0 &&
(_reader->_tablet->keys_type() == KeysType::DUP_KEYS ||
(_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug [Critical]: General limit counts unfiltered rows

This _inner_iter->next(block) returns a block of rows from the storage layer. The _general_rows_returned counter then counts these rows toward the limit. However, unlike _topn_next() which applies filter_block_conjuncts at line 357-358 before counting, this path counts raw/unfiltered rows.

The comment in olap_scanner.cpp says "general_read_limit counts post-filter rows, same as the topn path above" — but no filtering code is present here to make that true.

If the intent is to count post-filter rows (which is correct), you need to add VExprContext::filter_block(_reader->_reader_context.filter_block_conjuncts, block, block->columns()) here before updating _general_rows_returned, similar to what _topn_next does.

_reader->_tablet->enable_unique_key_merge_on_write()))) {
_general_read_limit = _reader->_reader_context.general_read_limit;
}
}

Status VCollectIterator::add_child(const RowSetSplits& rs_splits) {
Expand Down Expand Up @@ -249,6 +259,27 @@ Status VCollectIterator::next(Block* block) {
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium - Performance] The filter_block_conjuncts check and limit enforcement logic now run on the hot path for all non-topn queries, even when _general_read_limit is not active (i.e., -1). While each individual check is cheap (empty vector check + integer comparison), this adds overhead to every next(Block*) call for queries without LIMIT.

Consider guarding the entire filter+limit block:

if (_general_read_limit > 0) {
    // Apply filter_block_conjuncts ...
    if (!_reader->_reader_context.filter_block_conjuncts.empty()) {
        RETURN_IF_ERROR(VExprContext::filter_block(...));
    }
    // Enforce limit ...
    _general_rows_returned += block->rows();
    if (_general_rows_returned > _general_read_limit) { ... }
}

This makes the intent clearer and avoids any overhead when the feature is not active.

if (LIKELY(_inner_iter)) {
if (_general_read_limit > 0) {
// Fast path: if general limit already reached, return EOF immediately
if (_general_rows_returned >= _general_read_limit) {
return Status::Error<END_OF_FILE>("");
}

RETURN_IF_ERROR(_inner_iter->next(block));

// Enforce general read limit: truncate block if needed
_general_rows_returned += block->rows();
if (_general_rows_returned > _general_read_limit) {
// Truncate block to return exactly the remaining rows needed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Correctness concern] block->rows() here counts rows BEFORE Scanner::_filter_output_block() applies _conjuncts. If any _conjuncts exist that are NOT also evaluated inside the storage layer (e.g., late-arriving runtime filters), this count is inflated. The limit will be reached prematurely, causing the scanner to return fewer rows than requested.

For comparison, the existing topn path moves _conjuncts into filter_block_conjuncts (olap_scanner.cpp:500-504) and evaluates them inside _topn_next (vcollect_iterator.cpp:359-360), so its row counting is post-filter and correct.

int64_t excess = _general_rows_returned - _general_read_limit;
int64_t keep = block->rows() - excess;
DCHECK_GT(keep, 0);
block->set_num_rows(keep);
_general_rows_returned = _general_read_limit;
}

return Status::OK();
}
return _inner_iter->next(block);
} else {
return Status::Error<END_OF_FILE>("");
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/iterator/vcollect_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ class VCollectIterator {
bool _topn_eof = false;
std::vector<RowSetSplits> _rs_splits;

// General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge path).
// When > 0, VCollectIterator will stop reading after returning this many rows.
int64_t _general_read_limit = -1;
// Number of rows already returned to the caller.
int64_t _general_rows_returned = 0;

// Hold reader point to access read params, such as fetch conditions.
TabletReader* _reader = nullptr;

Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ struct RowsetReaderContext {

// When true, push down value predicates for MOR tables
bool enable_mor_value_predicate_pushdown = false;

// General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW.
// Propagated from ReaderParams.general_read_limit.
int64_t general_read_limit = -1;
};

} // namespace doris
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/tablet/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.all_access_paths = read_params.all_access_paths;
_reader_context.predicate_access_paths = read_params.predicate_access_paths;

// Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW
_reader_context.general_read_limit = read_params.general_read_limit;
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/tablet/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ class TabletReader {
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;

uint64_t condition_cache_digest = 0;

// General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW.
// When > 0, the storage layer (VCollectIterator) will stop reading
// after returning this many rows. -1 means no limit.
int64_t general_read_limit = -1;
};

TabletReader() = default;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-- This file is automatically generated. You should know what you did if you want to edit this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test issue: Orphaned .out file

This .out file contains 7 tagged sections (dup_basic_limit, dup_filter_limit, dup_filter_over_limit, dup_complex_filter_limit, mow_basic_limit, mow_filter_limit, mow_complex_filter_limit) but none of these tags are referenced by any order_qt_ query in the .groovy test file.

The .groovy file only uses bare sql calls with assertEquals on counts — it never produces tagged output. This file is dead data that serves no purpose and will never be validated by the test framework.

Either remove this file, or rewrite the test to use order_qt_dup_basic_limit etc. queries that produce deterministic, ordered output matching these sections.

-- !dup_basic_limit --
1 99
10 90
2 98
3 97
4 96
5 95
6 94
7 93
8 92
9 91

-- !dup_filter_limit --
11 89
12 88
13 87
14 86
15 85
16 84
17 83
18 82
19 81
20 80
21 79
22 78
23 77
24 76
25 75

-- !dup_filter_over_limit --
46 54
47 53
48 52
49 51
50 50

-- !dup_complex_filter_limit --
16 84
17 83
18 82
19 81
20 80
21 79
22 78
23 77

-- !mow_basic_limit --
1 99
10 90
2 98
3 97
4 96
5 95
6 94
7 93
8 92
9 91

-- !mow_filter_limit --
11 89
12 88
13 87
14 86
15 85
16 84
17 83
18 82
19 81
20 80
21 79
22 78
23 77
24 76
25 75

-- !mow_complex_filter_limit --
16 84
17 83
18 82
19 81
20 80
21 79
22 78
23 77

Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Test general LIMIT pushdown into the storage layer (VCollectIterator).
// This optimization is active only for DUP_KEYS and UNIQUE_KEYS with MOW
// on the non-merge read path (no ORDER BY on key columns).
suite("test_general_limit_pushdown", "p0") {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test issues:

  1. No order_qt_ queries: All assertions use assertEquals on count(). No order_qt_ or ORDER BY is used. Per Doris regression test standards, use order_qt_ prefix or explicit ORDER BY for deterministic output.

  2. Does not test the feature: These tests only verify basic SQL LIMIT semantics (correct row counts), which would pass identically whether or not the storage-layer optimization is active. Use explain plan checks to verify the pushdown is active for DUP_KEYS/MOW and inactive for AGG_KEYS/MOR.

  3. Missing WHERE + LIMIT tests: No test combines WHERE predicates with LIMIT to verify that filtering still works correctly when limit is pushed down. This is the exact scenario where the current implementation has a correctness bug (filter predicates are dropped).

  4. Orphaned .out file: The companion .out file has tagged output sections that are never referenced. See comment on that file.

// -------------------------------------------------------------------------
// Positive: DUP_KEYS single bucket — basic correctness
// -------------------------------------------------------------------------
sql "drop table if exists dup_single_bucket"
sql """
CREATE TABLE dup_single_bucket (
id INT,
val INT
) ENGINE=OLAP
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1")
"""
sql """
INSERT INTO dup_single_bucket
SELECT number, number * 10
FROM numbers("number" = "100")
"""

// LIMIT 5 must return exactly 5 rows
def res1 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5) t"
assertEquals(5, res1[0][0])

// LIMIT 0 must return 0 rows
def res2 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 0) t"
assertEquals(0, res2[0][0])

// LIMIT with OFFSET: LIMIT 5 OFFSET 10 must return exactly 5 rows
// (assuming table has >= 15 rows, which it does: 100 rows)
def res3 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 5 OFFSET 10) t"
assertEquals(5, res3[0][0])

// LIMIT larger than table size: should return all rows (100)
def res4 = sql "SELECT count() FROM (SELECT id FROM dup_single_bucket LIMIT 200) t"
assertEquals(100, res4[0][0])

// -------------------------------------------------------------------------
// Positive: DUP_KEYS multi-bucket — correctness with multiple scanners
// -------------------------------------------------------------------------
sql "drop table if exists dup_multi_bucket"
sql """
CREATE TABLE dup_multi_bucket (
id INT,
val INT
) ENGINE=OLAP
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1")
"""
sql """
INSERT INTO dup_multi_bucket
SELECT number, number * 10
FROM numbers("number" = "1000")
"""

// With 4 buckets each scanner enforces the limit independently; the outer
// pipeline limit must truncate to the global LIMIT value.
def res5 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 5) t"
assertEquals(5, res5[0][0])

def res6 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 0) t"
assertEquals(0, res6[0][0])

def res7 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 10 OFFSET 20) t"
assertEquals(10, res7[0][0])

def res8 = sql "SELECT count() FROM (SELECT id FROM dup_multi_bucket LIMIT 2000) t"
assertEquals(1000, res8[0][0])

// -------------------------------------------------------------------------
// Positive: UNIQUE_KEYS with MOW — limit pushdown should activate
// -------------------------------------------------------------------------
sql "drop table if exists unique_mow"
sql """
CREATE TABLE unique_mow (
id INT,
val INT
) ENGINE=OLAP
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
)
"""
sql """
INSERT INTO unique_mow
SELECT number, number * 10
FROM numbers("number" = "500")
"""

def res9 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5) t"
assertEquals(5, res9[0][0])

def res10 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 0) t"
assertEquals(0, res10[0][0])

def res11 = sql "SELECT count() FROM (SELECT id FROM unique_mow LIMIT 5 OFFSET 10) t"
assertEquals(5, res11[0][0])

// -------------------------------------------------------------------------
// Negative: AGG_KEYS — limit pushdown must NOT short-circuit results
// (AGG_KEYS requires merge, which disables _general_read_limit)
// Correctness test: the query must still return the right count.
// -------------------------------------------------------------------------
sql "drop table if exists agg_keys_tbl"
sql """
CREATE TABLE agg_keys_tbl (
id INT,
val BIGINT SUM
) ENGINE=OLAP
AGGREGATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1")
"""
sql """
INSERT INTO agg_keys_tbl
SELECT number, number
FROM numbers("number" = "200")
"""

// Limit pushdown is not active here; results must still be correct.
def res12 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 5) t"
assertEquals(5, res12[0][0])

def res13 = sql "SELECT count() FROM (SELECT id FROM agg_keys_tbl LIMIT 0) t"
assertEquals(0, res13[0][0])

// -------------------------------------------------------------------------
// Negative: UNIQUE_KEYS MOR (without MOW) — limit pushdown must NOT activate
// Correctness test: query must still return the right count.
// -------------------------------------------------------------------------
sql "drop table if exists unique_mor"
sql """
CREATE TABLE unique_mor (
id INT,
val INT
) ENGINE=OLAP
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "false"
)
"""
sql """
INSERT INTO unique_mor
SELECT number, number * 10
FROM numbers("number" = "300")
"""

def res14 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 5) t"
assertEquals(5, res14[0][0])

def res15 = sql "SELECT count() FROM (SELECT id FROM unique_mor LIMIT 0) t"
assertEquals(0, res15[0][0])
}
Loading