-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathtablet_reader.h
More file actions
326 lines (265 loc) · 12.3 KB
/
tablet_reader.h
File metadata and controls
326 lines (265 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "agent/be_exec_version_manager.h"
#include "common/status.h"
#include "exprs/function_filter.h"
#include "io/io_common.h"
#include "storage/delete/delete_handler.h"
#include "storage/iterators.h"
#include "storage/olap_common.h"
#include "storage/olap_tuple.h"
#include "storage/predicate/filter_olap_param.h"
#include "storage/row_cursor.h"
#include "storage/rowid_conversion.h"
#include "storage/rowset/rowset.h"
#include "storage/rowset/rowset_meta.h"
#include "storage/rowset/rowset_reader.h"
#include "storage/rowset/rowset_reader_context.h"
#include "storage/tablet/base_tablet.h"
#include "storage/tablet/tablet_fwd.h"
namespace doris {
class RuntimeState;
class BitmapFilterFuncBase;
class BloomFilterFuncBase;
class ColumnPredicate;
class DeleteBitmap;
class HybridSetBase;
class RuntimeProfile;
class VCollectIterator;
class Block;
class VExpr;
class Arena;
class VExprContext;
// Used to compare row with input scan key. Scan key only contains key columns,
// row contains all key columns, which is superset of key columns.
// So we should compare the common prefix columns of lhs and rhs.
//
// NOTE: if you are not sure if you can use it, please don't use this function.
inline int compare_row_key(const RowCursor& lhs, const RowCursor& rhs) {
auto cmp_cids = std::min(lhs.schema()->num_column_ids(), rhs.schema()->num_column_ids());
for (uint32_t cid = 0; cid < cmp_cids; ++cid) {
auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid));
if (res != 0) {
return res;
}
}
return 0;
}
class TabletReader {
struct KeysParam {
std::vector<RowCursor> start_keys;
std::vector<RowCursor> end_keys;
bool start_key_include = false;
bool end_key_include = false;
};
public:
// Params for Reader,
// mainly include tablet, data version and fetch range.
struct ReaderParams {
bool has_single_version() const {
return (rs_splits.size() == 1 &&
rs_splits[0].rs_reader->rowset()->start_version() == 0 &&
!rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) ||
(rs_splits.size() == 2 &&
rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 &&
rs_splits[1].rs_reader->rowset()->start_version() == 2 &&
!rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping());
}
int get_be_exec_version() const {
if (runtime_state) {
return runtime_state->be_exec_version();
}
return BeExecVersionManager::get_newest_version();
}
void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) {
rs_splits = std::move(read_source.rs_splits);
delete_predicates = std::move(read_source.delete_predicates);
#ifndef BE_TEST
if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) {
delete_bitmap = std::move(read_source.delete_bitmap);
}
#endif
}
BaseTabletSPtr tablet;
TabletSchemaSPtr tablet_schema;
ReaderType reader_type = ReaderType::READER_QUERY;
bool direct_mode = false;
bool aggregation = false;
// for compaction, schema_change, check_sum: we don't use page cache
// for query and config::disable_storage_page_cache is false, we use page cache
bool use_page_cache = false;
Version version = Version(-1, 0);
std::vector<OlapTuple> start_key;
std::vector<OlapTuple> end_key;
bool start_key_include = false;
bool end_key_include = false;
std::vector<std::shared_ptr<ColumnPredicate>> predicates;
std::vector<FunctionFilter> function_filters;
std::vector<RowsetMetaSharedPtr> delete_predicates;
// slots that cast may be eliminated in storage layer
std::map<std::string, DataTypePtr> target_cast_type_for_variants;
std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
std::vector<RowSetSplits> rs_splits;
// For unique key table with merge-on-write
DeleteBitmapPtr delete_bitmap = nullptr;
// return_columns is init from query schema
std::vector<ColumnId> return_columns;
// output_columns only contain columns in OrderByExprs and outputExprs
std::set<int32_t> output_columns;
RuntimeProfile* profile = nullptr;
RuntimeState* runtime_state = nullptr;
// use only in vec exec engine
std::vector<ColumnId>* origin_return_columns = nullptr;
std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
std::vector<VExprSPtr> remaining_conjunct_roots;
VExprContextSPtrs common_expr_ctxs_push_down;
// used for compaction to record row ids
bool record_rowids = false;
RowIdConversion* rowid_conversion = nullptr;
std::vector<int> topn_filter_source_node_ids;
int topn_filter_target_node_id = -1;
// used for special optimization for query : ORDER BY key LIMIT n
bool read_orderby_key = false;
// used for special optimization for query : ORDER BY key DESC LIMIT n
bool read_orderby_key_reverse = false;
// num of columns for orderby key
size_t read_orderby_key_num_prefix_columns = 0;
// limit of rows for read_orderby_key
size_t read_orderby_key_limit = 0;
// filter_block arguments
VExprContextSPtrs filter_block_conjuncts;
// for vertical compaction
bool is_key_column_group = false;
std::vector<uint32_t> key_group_cluster_key_idxes;
// For sparse column compaction optimization
// When true, use optimized path for sparse wide tables
bool enable_sparse_optimization = false;
bool is_segcompaction = false;
// Enable value predicate pushdown for MOR tables
bool enable_mor_value_predicate_pushdown = false;
std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr;
void check_validation() const;
std::string to_string() const;
int64_t batch_size = -1;
std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, DataTypePtr> vir_col_idx_to_type;
std::shared_ptr<ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
uint64_t condition_cache_digest = 0;
// General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW.
// When > 0, the storage layer (VCollectIterator) will stop reading
// after returning this many rows. -1 means no limit.
int64_t general_read_limit = -1;
};
TabletReader() = default;
virtual ~TabletReader() = default;
TabletReader(const TabletReader&) = delete;
void operator=(const TabletReader&) = delete;
// Initialize TabletReader with tablet, data version and fetch range.
virtual Status init(const ReaderParams& read_params);
// Read next block with aggregation.
// Return OK and set `*eof` to false when next block is read
// Return OK and set `*eof` to true when no more rows can be read.
// Return others when unexpected error happens.
virtual Status next_block_with_aggregation(Block* block, bool* eof) {
return Status::Error<ErrorCode::READER_INITIALIZE_ERROR>(
"TabletReader not support next_block_with_aggregation");
}
virtual uint64_t merged_rows() const { return _merged_rows; }
uint64_t filtered_rows() const {
return _stats.rows_del_filtered + _stats.rows_del_by_bitmap +
_stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered +
_stats.rows_vec_cond_filtered + _stats.rows_short_circuit_cond_filtered;
}
void set_batch_size(int batch_size) { _reader_context.batch_size = batch_size; }
int batch_size() const { return _reader_context.batch_size; }
const OlapReaderStatistics& stats() const { return _stats; }
OlapReaderStatistics* mutable_stats() { return &_stats; }
virtual void update_profile(RuntimeProfile* profile) {}
static Status init_reader_params_and_create_block(
TabletSharedPtr tablet, ReaderType reader_type,
const std::vector<RowsetSharedPtr>& input_rowsets,
TabletReader::ReaderParams* reader_params, Block* block);
protected:
friend class VCollectIterator;
friend class DeleteHandler;
Status _init_params(const ReaderParams& read_params);
Status _capture_rs_readers(const ReaderParams& read_params);
Status _init_keys_param(const ReaderParams& read_params);
Status _init_orderby_keys_param(const ReaderParams& read_params);
Status _init_conditions_param(const ReaderParams& read_params);
virtual std::shared_ptr<ColumnPredicate> _parse_to_predicate(
const FunctionFilter& function_filter);
Status _init_delete_condition(const ReaderParams& read_params);
Status _init_return_columns(const ReaderParams& read_params);
const BaseTabletSPtr& tablet() { return _tablet; }
// If original column is a variant type column, and it's predicate is normalized
// so in order to get the real type of column predicate, we need to reset type
// according to the related type in `target_cast_type_for_variants`.Since variant is not
// an predicate applicable type.Otherwise return the original tablet column.
// Eg. `where cast(v:a as bigint) > 1` will elimate cast, and materialize this variant column
// to type bigint
TabletColumn materialize_column(const TabletColumn& orig);
const TabletSchema& tablet_schema() { return *_tablet_schema; }
Arena _predicate_arena;
std::vector<ColumnId> _return_columns;
// used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n
// columns for orderby keys
std::vector<uint32_t> _orderby_key_columns;
// only use in outer join which change the column nullable which must keep same in
// vec query engine
std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr;
BaseTabletSPtr _tablet;
RowsetReaderContext _reader_context;
TabletSchemaSPtr _tablet_schema;
KeysParam _keys_param;
std::vector<bool> _is_lower_keys_included;
std::vector<bool> _is_upper_keys_included;
std::vector<std::shared_ptr<ColumnPredicate>> _col_predicates;
std::vector<std::shared_ptr<ColumnPredicate>> _value_col_predicates;
DeleteHandler _delete_handler;
// Indicates whether the tablets has do a aggregation in storage engine.
bool _aggregation = false;
// for agg query, we don't need to finalize when scan agg object data
ReaderType _reader_type = ReaderType::READER_QUERY;
bool _next_delete_flag = false;
bool _delete_sign_available = false;
bool _filter_delete = false;
int32_t _sequence_col_idx = -1;
bool _direct_mode = false;
std::vector<uint32_t> _key_cids;
std::vector<uint32_t> _value_cids;
uint64_t _merged_rows = 0;
OlapReaderStatistics _stats;
};
} // namespace doris