-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathscan_operator.h
More file actions
440 lines (359 loc) · 18.6 KB
/
scan_operator.h
File metadata and controls
440 lines (359 loc) · 18.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <mutex>
#include <set>
#include <string>
#include "common/status.h"
#include "core/field.h"
#include "exec/common/util.hpp"
#include "exec/operator/operator.h"
#include "exec/pipeline/dependency.h"
#include "exec/runtime_filter/runtime_filter_consumer_helper.h"
#include "exec/scan/scan_node.h"
#include "exec/scan/scanner_context.h"
#include "exprs/function_filter.h"
#include "exprs/vectorized_fn_call.h"
#include "exprs/vin_predicate.h"
#include "runtime/descriptors.h"
#include "storage/predicate/filter_olap_param.h"
namespace doris {
#include "common/compile_check_begin.h"
class ScannerDelegate;
class OlapScanner;
} // namespace doris
namespace doris {
enum class PushDownType {
// The predicate can not be pushed down to data source
UNACCEPTABLE,
// The predicate can be pushed down to data source
// and the data source can fully evaludate it
ACCEPTABLE,
// The predicate can be pushed down to data source
// but the data source can not fully evaluate it.
PARTIAL_ACCEPTABLE
};
class ScanLocalStateBase : public PipelineXLocalState<> {
public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent), _helper(parent->runtime_filter_descs()) {}
~ScanLocalStateBase() override = default;
[[nodiscard]] virtual bool should_run_serial() const = 0;
virtual RuntimeProfile* scanner_profile() = 0;
[[nodiscard]] virtual const TupleDescriptor* input_tuple_desc() const = 0;
[[nodiscard]] virtual const TupleDescriptor* output_tuple_desc() const = 0;
virtual int64_t limit_per_scanner() = 0;
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) = 0;
virtual TPushAggOp::type get_push_down_agg_type() = 0;
// If scan operator is serial operator(like topn), its real parallelism is 1.
// Otherwise, its real parallelism is query_parallel_instance_num.
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
// Related pr:
// https://github.com/apache/doris/pull/42460
// https://github.com/apache/doris/pull/44635
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const;
[[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
[[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const;
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
uint64_t get_condition_cache_digest() const { return _condition_cache_digest; }
Status update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num);
Status clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts);
protected:
friend class ScannerContext;
friend class Scanner;
virtual Status _init_profile() = 0;
std::atomic<bool> _opened {false};
DependencySPtr _scan_dependency = nullptr;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
// Num of newly created free blocks when running query
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
// Max num of scanner thread
RuntimeProfile::Counter* _max_scan_concurrency = nullptr;
RuntimeProfile::Counter* _min_scan_concurrency = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
// time of filter output block from scanner
RuntimeProfile::Counter* _filter_timer = nullptr;
// rows read from the scanner (including those discarded by (pre)filters)
RuntimeProfile::Counter* _rows_read_counter = nullptr;
RuntimeProfile::Counter* _num_scanners = nullptr;
RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
RuntimeProfile::Counter* _scan_rows = nullptr;
RuntimeProfile::Counter* _scan_bytes = nullptr;
std::mutex _conjuncts_lock;
RuntimeFilterConsumerHelper _helper;
// magic number as seed to generate hash value for condition cache
uint64_t _condition_cache_digest = 0;
// Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type.
std::atomic<bool> _eos = false;
int _max_pushdown_conditions_per_column = 1024;
// Save all function predicates which may be pushed down to data source.
std::vector<FunctionFilter> _push_down_functions;
// Virtual methods with default implementations; overridden by subclasses when supported.
// Declared here so that the normalize methods below (non-Derived-template) can call them.
virtual bool _push_down_topn(const RuntimePredicate& predicate) { return false; }
virtual PushDownType _should_push_down_bloom_filter() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_topn_filter() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_bitmap_filter() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_is_null_predicate(VectorizedFnCall* fn_call) const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_in_predicate() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_binary_predicate(
VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val,
const std::set<std::string> fn_name) const {
return PushDownType::UNACCEPTABLE;
}
virtual Status _should_push_down_function_filter(VectorizedFnCall* fn_call,
VExprContext* expr_ctx,
StringRef* constant_str,
doris::FunctionContext** fn_ctx,
PushDownType& pdt) {
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
}
// Non-templated normalize methods, moved here to avoid re-compilation per Derived type.
Status _eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt);
Status _normalize_bloom_filter(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);
Status _normalize_topn_filter(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);
Status _normalize_bitmap_filter(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);
Status _normalize_function_filters(VExprContext* expr_ctx, SlotDescriptor* slot,
PushDownType* pdt);
// Inner PrimitiveType-template methods. Moved to base to avoid N(Derived)×M(PrimitiveType)
// instantiation blowup: now instantiated M times total instead of N×M times.
template <PrimitiveType T>
Status _normalize_in_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt);
template <PrimitiveType T>
Status _normalize_binary_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt);
template <PrimitiveType T>
Status _normalize_is_null_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt);
template <PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc>
Status _change_value_range(bool is_equal_op, ColumnValueRange<PrimitiveType>& range,
const Field& value, const ChangeFixedValueRangeFunc& func,
const std::string& fn_name);
};
template <typename LocalStateType>
class ScanOperatorX;
template <typename Derived>
class ScanLocalState : public ScanLocalStateBase {
ENABLE_FACTORY_CREATOR(ScanLocalState);
ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
~ScanLocalState() override = default;
virtual Status init(RuntimeState* state, LocalStateInfo& info) override;
virtual Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const final;
[[nodiscard]] bool should_run_serial() const override;
RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); }
[[nodiscard]] const TupleDescriptor* input_tuple_desc() const override;
[[nodiscard]] const TupleDescriptor* output_tuple_desc() const override;
int64_t limit_per_scanner() override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override {}
TPushAggOp::type get_push_down_agg_type() override;
std::vector<Dependency*> execution_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res(_filter_dependencies.size());
std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(),
[](DependencySPtr dep) { return dep.get(); });
return res;
}
std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }
std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) {
std::vector<int> result;
for (int id : _parent->cast<typename Derived::Parent>()._topn_filter_source_node_ids) {
if (!state->get_query_ctx()->has_runtime_predicate(id)) {
// compatible with older versions fe
continue;
}
const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
if (!pred.enable()) {
continue;
}
if (_push_down_topn(pred) == push_down) {
result.push_back(id);
}
}
return result;
}
protected:
template <typename LocalStateType>
friend class ScanOperatorX;
friend class ScannerContext;
friend class Scanner;
Status _init_profile() override;
virtual Status _process_conjuncts(RuntimeState* state) { return _normalize_conjuncts(state); }
virtual bool _should_push_down_common_expr() { return false; }
virtual bool _storage_no_merge() { return false; }
virtual bool _is_key_column(const std::string& col_name) { return false; }
// Create a list of scanners.
// The number of scanners is related to the implementation of the data source,
// predicate conditions, and scheduling strategy.
// So this method needs to be implemented separately by the subclass of ScanNode.
// Finally, a set of scanners that have been prepared are returned.
virtual Status _init_scanners(std::list<ScannerSPtr>* scanners) { return Status::OK(); }
Status _normalize_conjuncts(RuntimeState* state);
// Normalize a conjunct and try to convert it to column predicate recursively.
Status _normalize_predicate(VExprContext* context, const VExprSPtr& root,
VExprSPtr& output_expr);
bool _is_predicate_acting_on_slot(const VExprSPtrs& children, SlotDescriptor** slot_desc,
ColumnValueRangeType** range);
Status _prepare_scanners();
// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& scanners);
// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
// cast dst column type equals storage column type
void get_cast_types_for_variants();
void _filter_and_collect_cast_type_for_variant(
const VExpr* expr,
std::unordered_map<std::string, std::vector<DataTypePtr>>& colname_to_cast_types);
Status _get_topn_filters(RuntimeState* state);
// Stores conjuncts that have been fully pushed down to the storage layer as predicate columns.
// These expr contexts are kept alive to prevent their FunctionContext and constant strings
// from being freed prematurely.
VExprContextSPtrs _stale_expr_ctxs;
VExprContextSPtrs _common_expr_ctxs_push_down;
atomic_shared_ptr<ScannerContext> _scanner_ctx;
// colname -> cast dst type
std::map<std::string, DataTypePtr> _cast_types_for_variants;
// slot id -> ColumnValueRange
// Parsed from conjuncts
phmap::flat_hash_map<int, ColumnValueRangeType> _slot_id_to_value_range;
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> _slot_id_to_predicates;
std::vector<std::shared_ptr<MutilColumnBlockPredicate>> _or_predicates;
std::vector<std::shared_ptr<Dependency>> _filter_dependencies;
// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
std::list<std::shared_ptr<ScannerDelegate>> _scanners;
Arena _arena;
int _instance_idx = 0;
};
template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status get_block(RuntimeState* state, Block* block, bool* eos) override;
Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override {
Status status = get_block(state, block, eos);
if (status.ok()) {
if (auto rows = block->rows()) {
auto* local_state = state->get_local_state(operator_id());
COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
}
}
return status;
}
[[nodiscard]] bool is_source() const override { return true; }
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
return _runtime_filter_descs;
}
[[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
if (OperatorX<LocalStateType>::is_serial_operator()) {
// `is_serial_operator()` returns true means we ignore the distribution.
return {ExchangeType::NOOP};
}
return {ExchangeType::BUCKET_HASH_SHUFFLE};
}
void set_low_memory_mode(RuntimeState* state) override {
auto& local_state = get_local_state(state);
if (auto ctx = local_state._scanner_ctx.load()) {
ctx->clear_free_blocks();
}
}
using OperatorX<LocalStateType>::node_id;
using OperatorX<LocalStateType>::operator_id;
using OperatorX<LocalStateType>::get_local_state;
#ifdef BE_TEST
ScanOperatorX() = default;
#endif
protected:
using LocalState = LocalStateType;
friend class OlapScanner;
ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int parallel_tasks = 0);
virtual ~ScanOperatorX() = default;
template <typename Derived>
friend class ScanLocalState;
friend class OlapScanLocalState;
// For load scan node, there should be both input and output tuple descriptor.
// For query scan node, there is only output_tuple_desc.
TupleId _input_tuple_id = -1;
TupleId _output_tuple_id = -1;
const TupleDescriptor* _input_tuple_desc = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
phmap::flat_hash_map<int, SlotDescriptor*> _slot_id_to_slot_desc;
std::unordered_map<std::string, int> _colname_to_slot_id;
// These two values are from query_options
int _max_scan_key_num = 48;
int _max_pushdown_conditions_per_column = 1024;
// If the query like select * from table limit 10; then the query should run in
// single scanner to avoid too many scanners which will cause lots of useless read.
bool _should_run_serial = false;
VExprContextSPtrs _common_expr_ctxs_push_down;
// If sort info is set, push limit to each scanner;
int64_t _limit_per_scanner = -1;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
TPushAggOp::type _push_down_agg_type;
// Record the value of the aggregate function 'count' from doris's be
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;
std::vector<int> _topn_filter_source_node_ids;
std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
std::shared_ptr<MemLimiter> _mem_limiter = nullptr;
};
#include "common/compile_check_end.h"
} // namespace doris