Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ DEFINE_mInt32(doris_scanner_row_num, "16384");
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
DEFINE_mInt32(doris_scanner_dynamic_interval_ms, "100");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down Expand Up @@ -1679,6 +1680,10 @@ DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
// The maximum csv line reader output buffer size
DEFINE_mInt64(max_csv_line_reader_output_buffer_size, "4294967296");

// The maximum bytes of a single block returned by CsvReader::get_next_block.
// Default is 200MB. Set to 0 to disable the limit.
DEFINE_mInt64(csv_reader_max_block_bytes, "209715200");

// Maximum number of OpenMP threads allowed for concurrent vector index builds.
// -1 means auto: use 80% of the available CPU cores.
DEFINE_Int32(omp_threads_limit, "-1");
Expand Down
7 changes: 6 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,9 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
// Deprecated. single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
DECLARE_mInt32(doris_scanner_dynamic_interval_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down Expand Up @@ -1769,6 +1770,10 @@ DECLARE_String(fuzzy_test_type);
// The maximum csv line reader output buffer size
DECLARE_mInt64(max_csv_line_reader_output_buffer_size);

// The maximum bytes of a single block returned by CsvReader::get_next_block.
// Default is 200MB. Set to 0 to disable the limit.
DECLARE_mInt64(csv_reader_max_block_bytes);

// Maximum number of OpenMP threads available for concurrent index builds.
// -1 means auto: use 80% of detected CPU cores.
DECLARE_Int32(omp_threads_limit);
Expand Down
27 changes: 21 additions & 6 deletions be/src/exec/operator/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
set_scan_ranges(state, info.scan_ranges);

_wait_for_rf_timer = ADD_TIMER(common_profile(), "WaitForRuntimeFilter");
_instance_idx = info.task_idx;
return Status::OK();
}

Expand Down Expand Up @@ -997,14 +998,16 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<std::shared_ptr<ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx.store(ScannerContext::create_shared(state(), this, p._output_tuple_desc,
p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency
_scanner_ctx.store(ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p._mem_arb, p._mem_limiter, _instance_idx,
_state->get_query_ctx()->get_query_options().__isset.enable_adaptive_scan &&
_state->get_query_ctx()->get_query_options().enable_adaptive_scan
#ifdef BE_TEST
,
max_scanners_concurrency(state())
,
max_scanners_concurrency(state())
#endif
));
));
return Status::OK();
}

Expand Down Expand Up @@ -1164,6 +1167,18 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
if (query_options.__isset.max_pushdown_conditions_per_column) {
_max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column;
}
#ifdef BE_TEST
_mem_arb = nullptr;
#else
_mem_arb = state->get_query_ctx()->mem_arb();
#endif
if (_mem_arb) {
_mem_arb->register_scan_node();
_mem_limiter =
MemLimiter::create_shared(state->query_id(), state->query_parallel_instance_num(),
OperatorX<LocalStateType>::is_serial_operator(),
state->get_query_ctx()->get_query_options().mem_limit);
}
// tnode.olap_scan_node.push_down_agg_type_opt field is deprecated
// Introduced a new field : tnode.push_down_agg_type_opt
//
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/operator/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ class ScanLocalState : public ScanLocalStateBase {
// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
std::list<std::shared_ptr<ScannerDelegate>> _scanners;
Arena _arena;
int _instance_idx = 0;
};

template <typename LocalStateType>
Expand Down Expand Up @@ -430,6 +431,9 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
const int _parallel_tasks = 0;

std::vector<int> _topn_filter_source_node_ids;

std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
std::shared_ptr<MemLimiter> _mem_limiter = nullptr;
};

#include "common/compile_check_end.h"
Expand Down
2 changes: 0 additions & 2 deletions be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,9 @@
#include "load/stream_load/new_load_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/result_block_buffer.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/countdown_latch.h"
#include "util/debug_util.h"
#include "util/uid_util.h"
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void Scanner::_collect_profile_before_close() {
_state->update_num_rows_load_unselected(_counter.num_rows_unselected);
}

void Scanner::update_scan_cpu_timer() {
void Scanner::_update_scan_cpu_timer() {
int64_t cpu_time = _cpu_watch.elapsed_time();
_scan_cpu_timer += cpu_time;
if (_state && _state->get_query_ctx()) {
Expand Down
38 changes: 23 additions & 15 deletions be/src/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,33 +127,41 @@ class Scanner {

Status _do_projections(Block* origin_block, Block* output_block);

public:
int64_t get_time_cost_ns() const { return _per_scanner_timer; }

int64_t projection_time() const { return _projection_timer; }
int64_t get_rows_read() const { return _num_rows_read; }

bool has_prepared() const { return _has_prepared; }

Status try_append_late_arrival_runtime_filter();

private:
// Call start_wait_worker_timer() when submit the scanner to the thread pool.
// And call update_wait_worker_timer() when it is actually being executed.
void start_wait_worker_timer() {
void _start_wait_worker_timer() {
_watch.reset();
_watch.start();
}

void start_scan_cpu_timer() {
void _start_scan_cpu_timer() {
_cpu_watch.reset();
_cpu_watch.start();
}

void update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); }
void _update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); }
void _update_scan_cpu_timer();

int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }
public:
void resume() {
_update_wait_worker_timer();
_start_scan_cpu_timer();
}
void pause() {
_update_scan_cpu_timer();
_start_wait_worker_timer();
}
int64_t get_time_cost_ns() const { return _per_scanner_timer; }

int64_t projection_time() const { return _projection_timer; }
int64_t get_rows_read() const { return _num_rows_read; }

void update_scan_cpu_timer();
bool has_prepared() const { return _has_prepared; }

Status try_append_late_arrival_runtime_filter();

int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }

// Some counters need to be updated realtime, for example, workload group policy need
// scan bytes to cancel the query exceed limit.
Expand Down
Loading