diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 131f6f904dea7c..e88653a9242e31 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -343,6 +343,7 @@ DEFINE_mInt32(doris_scanner_row_num, "16384"); DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); // single read execute fragment max run time millseconds DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000"); +DEFINE_mInt32(doris_scanner_dynamic_interval_ms, "100"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64"); @@ -1679,6 +1680,10 @@ DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false"); // The maximum csv line reader output buffer size DEFINE_mInt64(max_csv_line_reader_output_buffer_size, "4294967296"); +// The maximum bytes of a single block returned by CsvReader::get_next_block. +// Default is 200MB. Set to 0 to disable the limit. +DEFINE_mInt64(csv_reader_max_block_bytes, "209715200"); + // Maximum number of OpenMP threads allowed for concurrent vector index builds. // -1 means auto: use 80% of the available CPU cores. DEFINE_Int32(omp_threads_limit, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index b1f76c32feaefb..f5d9a1d58d24c6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -400,8 +400,9 @@ DECLARE_mInt32(doris_scan_range_max_mb); DECLARE_mInt32(doris_scanner_row_num); // single read execute fragment row bytes DECLARE_mInt32(doris_scanner_row_bytes); -// single read execute fragment max run time millseconds +// Deprecated. single read execute fragment max run time millseconds DECLARE_mInt32(doris_scanner_max_run_time_ms); +DECLARE_mInt32(doris_scanner_dynamic_interval_ms); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); DECLARE_mInt32(exchg_buffer_queue_capacity_factor); @@ -1769,6 +1770,10 @@ DECLARE_String(fuzzy_test_type); // The maximum csv line reader output buffer size DECLARE_mInt64(max_csv_line_reader_output_buffer_size); +// The maximum bytes of a single block returned by CsvReader::get_next_block. +// Default is 200MB. Set to 0 to disable the limit. +DECLARE_mInt64(csv_reader_max_block_bytes); + // Maximum number of OpenMP threads available for concurrent index builds. // -1 means auto: use 80% of detected CPU cores. DECLARE_Int32(omp_threads_limit); diff --git a/be/src/exec/operator/scan_operator.cpp b/be/src/exec/operator/scan_operator.cpp index b2ac312ffdaee5..ffb3586d4cc4c8 100644 --- a/be/src/exec/operator/scan_operator.cpp +++ b/be/src/exec/operator/scan_operator.cpp @@ -148,6 +148,7 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) set_scan_ranges(state, info.scan_ranges); _wait_for_rf_timer = ADD_TIMER(common_profile(), "WaitForRuntimeFilter"); + _instance_idx = info.task_idx; return Status::OK(); } @@ -997,14 +998,16 @@ template Status ScanLocalState::_start_scanners( const std::list>& scanners) { auto& p = _parent->cast(); - _scanner_ctx.store(ScannerContext::create_shared(state(), this, p._output_tuple_desc, - p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency + _scanner_ctx.store(ScannerContext::create_shared( + state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), + _scan_dependency, p._mem_arb, p._mem_limiter, _instance_idx, + _state->get_query_ctx()->get_query_options().__isset.enable_adaptive_scan && + _state->get_query_ctx()->get_query_options().enable_adaptive_scan #ifdef BE_TEST - , - max_scanners_concurrency(state()) + , + max_scanners_concurrency(state()) #endif - )); + )); return Status::OK(); } @@ -1164,6 +1167,18 @@ Status ScanOperatorX::init(const TPlanNode& tnode, RuntimeState* if (query_options.__isset.max_pushdown_conditions_per_column) { _max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column; } +#ifdef BE_TEST + _mem_arb = nullptr; +#else + _mem_arb = state->get_query_ctx()->mem_arb(); +#endif + if (_mem_arb) { + _mem_arb->register_scan_node(); + _mem_limiter = + MemLimiter::create_shared(state->query_id(), state->query_parallel_instance_num(), + OperatorX::is_serial_operator(), + state->get_query_ctx()->get_query_options().mem_limit); + } // tnode.olap_scan_node.push_down_agg_type_opt field is deprecated // Introduced a new field : tnode.push_down_agg_type_opt // diff --git a/be/src/exec/operator/scan_operator.h b/be/src/exec/operator/scan_operator.h index 65faacca022d94..67647ccc0b981b 100644 --- a/be/src/exec/operator/scan_operator.h +++ b/be/src/exec/operator/scan_operator.h @@ -333,6 +333,7 @@ class ScanLocalState : public ScanLocalStateBase { // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr std::list> _scanners; Arena _arena; + int _instance_idx = 0; }; template @@ -430,6 +431,9 @@ class ScanOperatorX : public OperatorX { const int _parallel_tasks = 0; std::vector _topn_filter_source_node_ids; + + std::shared_ptr _mem_arb = nullptr; + std::shared_ptr _mem_limiter = nullptr; }; #include "common/compile_check_end.h" diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index aabfcb2961d047..a8e64c58d8845f 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -117,11 +117,9 @@ #include "load/stream_load/new_load_stream_mgr.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" -#include "runtime/result_block_buffer.h" #include "runtime/result_buffer_mgr.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" -#include "service/backend_options.h" #include "util/countdown_latch.h" #include "util/debug_util.h" #include "util/uid_util.h" diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index 7d7b4caeff98d4..d60ed9b5017059 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -266,7 +266,7 @@ void Scanner::_collect_profile_before_close() { _state->update_num_rows_load_unselected(_counter.num_rows_unselected); } -void Scanner::update_scan_cpu_timer() { +void Scanner::_update_scan_cpu_timer() { int64_t cpu_time = _cpu_watch.elapsed_time(); _scan_cpu_timer += cpu_time; if (_state && _state->get_query_ctx()) { diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h index 041af638c52dd8..037a3d7936cbf2 100644 --- a/be/src/exec/scan/scanner.h +++ b/be/src/exec/scan/scanner.h @@ -127,33 +127,41 @@ class Scanner { Status _do_projections(Block* origin_block, Block* output_block); -public: - int64_t get_time_cost_ns() const { return _per_scanner_timer; } - - int64_t projection_time() const { return _projection_timer; } - int64_t get_rows_read() const { return _num_rows_read; } - - bool has_prepared() const { return _has_prepared; } - - Status try_append_late_arrival_runtime_filter(); - +private: // Call start_wait_worker_timer() when submit the scanner to the thread pool. // And call update_wait_worker_timer() when it is actually being executed. - void start_wait_worker_timer() { + void _start_wait_worker_timer() { _watch.reset(); _watch.start(); } - void start_scan_cpu_timer() { + void _start_scan_cpu_timer() { _cpu_watch.reset(); _cpu_watch.start(); } - void update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); } + void _update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); } + void _update_scan_cpu_timer(); - int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; } +public: + void resume() { + _update_wait_worker_timer(); + _start_scan_cpu_timer(); + } + void pause() { + _update_scan_cpu_timer(); + _start_wait_worker_timer(); + } + int64_t get_time_cost_ns() const { return _per_scanner_timer; } + + int64_t projection_time() const { return _projection_timer; } + int64_t get_rows_read() const { return _num_rows_read; } - void update_scan_cpu_timer(); + bool has_prepared() const { return _has_prepared; } + + Status try_append_late_arrival_runtime_filter(); + + int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; } // Some counters need to be updated realtime, for example, workload group policy need // scan bytes to cancel the query exceed limit. diff --git a/be/src/exec/scan/scanner_context.cpp b/be/src/exec/scan/scanner_context.cpp index 2ab29805cd9e82..6b82d1054aef89 100644 --- a/be/src/exec/scan/scanner_context.cpp +++ b/be/src/exec/scan/scanner_context.cpp @@ -52,11 +52,82 @@ namespace doris { using namespace std::chrono_literals; #include "common/compile_check_begin.h" +// ==================== MemShareArbitrator ==================== +static constexpr int64_t DEFAULT_SCANNER_MEM_BYTES = 64 * 1024 * 1024; // 64MB default + +MemShareArbitrator::MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, + double max_scan_ratio) + : query_id(qid), + query_mem_limit(query_mem_limit), + mem_limit(std::max( + 1, static_cast(static_cast(query_mem_limit) * max_scan_ratio))) { +} + +void MemShareArbitrator::register_scan_node() { + total_mem_bytes.fetch_add(DEFAULT_SCANNER_MEM_BYTES); +} + +int64_t MemShareArbitrator::update_mem_bytes(int64_t old_value, int64_t new_value) { + int64_t diff = new_value - old_value; + int64_t total = total_mem_bytes.fetch_add(diff) + diff; + if (new_value == 0) return 0; + if (total <= 0) return mem_limit; + // Proportional sharing: allocate based on this context's share of total usage + double ratio = static_cast(new_value) / static_cast(std::max(total, new_value)); + return static_cast(static_cast(mem_limit) * ratio); +} + +// ==================== MemLimiter ==================== +int MemLimiter::available_scanner_count(int ins_idx) const { + int64_t mem_limit_value = mem_limit.load(); + int64_t running_tasks_count_value = running_tasks_count.load(); + int64_t estimated_block_mem_bytes_value = get_estimated_block_mem_bytes(); + + int64_t max_count = std::max(1L, mem_limit_value / estimated_block_mem_bytes_value); + int64_t avail_count = max_count; + int64_t per_count = avail_count / parallelism; + if (serial_operator) { + per_count += (avail_count - per_count * parallelism); + } else if (ins_idx < avail_count - per_count * parallelism) { + per_count += 1; + } + + VLOG_DEBUG << "available_scanner_count. max_count=" << max_count << "(" + << running_tasks_count_value << "/" << estimated_block_mem_bytes_value + << "), operator_mem_limit = " << operator_mem_limit + << ", running_tasks_count = " << running_tasks_count_value + << ", parallelism = " << parallelism << ", avail_count = " << avail_count + << ", ins_id = " << ins_idx << ", per_count = " << per_count + << " debug_string: " << debug_string(); + + return cast_set(per_count); +} + +void MemLimiter::reestimated_block_mem_bytes(int64_t value) { + if (value == 0) return; + value = std::min(value, operator_mem_limit); + + std::lock_guard L(lock); + auto old_value = estimated_block_mem_bytes.load(); + int64_t total = + get_estimated_block_mem_bytes() * estimated_block_mem_bytes_update_count + value; + estimated_block_mem_bytes_update_count += 1; + estimated_block_mem_bytes = total / estimated_block_mem_bytes_update_count; + VLOG_DEBUG << fmt::format( + "reestimated_block_mem_bytes. MemLimiter = {}, estimated_block_mem_bytes = {}, " + "old_value = {}, value: {}", + debug_string(), estimated_block_mem_bytes, old_value, value); +} + +// ==================== ScannerContext ==================== ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, std::shared_ptr dependency + int64_t limit_, std::shared_ptr dependency, + std::shared_ptr arb, + std::shared_ptr limiter, int ins_idx, + bool enable_adaptive_scan #ifdef BE_TEST , int num_parallel_instances @@ -83,7 +154,11 @@ ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_st _min_scan_concurrency_of_scan_scheduler(0), _max_scan_concurrency(num_parallel_instances), #endif - _min_scan_concurrency(local_state->min_scanners_concurrency(state)) { + _min_scan_concurrency(local_state->min_scanners_concurrency(state)), + _scanner_mem_limiter(limiter), + _mem_share_arb(arb), + _ins_idx(ins_idx), + _enable_adaptive_scanners(enable_adaptive_scan) { DCHECK(_state != nullptr); DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); @@ -91,8 +166,8 @@ ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_st _resource_ctx = _state->get_query_ctx()->resource_ctx(); ctx_id = UniqueId::gen_uid().to_string(); for (auto& scanner : _all_scanners) { - _pending_scanners.push(std::make_shared(scanner)); - }; + _pending_tasks.push(std::make_shared(scanner)); + } if (limit < 0) { limit = -1; } @@ -101,6 +176,145 @@ ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_st if (auto ctx = task_exec_ctx(); ctx) { ctx->ref_task_execution_ctx(); } + + // Initialize adaptive processor + _adaptive_processor = ScannerAdaptiveProcessor::create_shared(); +} + +void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t new_value) { + if (!_enable_adaptive_scanners) { + return; + } + + int64_t new_scan_mem_limit = _mem_share_arb->update_mem_bytes(old_value, new_value); + _scanner_mem_limiter->update_mem_limit(new_scan_mem_limit); + _scanner_mem_limiter->update_arb_mem_bytes(new_value); + + VLOG_DEBUG << fmt::format( + "adjust_scan_mem_limit. context = {}, new mem scan limit = {}, scanner mem bytes = {} " + "-> {}", + debug_string(), new_scan_mem_limit, old_value, new_value); +} + +int ScannerContext::_available_pickup_scanner_count() { + if (!_enable_adaptive_scanners) { + return _max_scan_concurrency; + } + + int min_scanners = std::max(1, _min_scan_concurrency); + int max_scanners = _scanner_mem_limiter->available_scanner_count(_ins_idx); + max_scanners = std::min(max_scanners, _max_scan_concurrency); + min_scanners = std::min(min_scanners, max_scanners); + if (_ins_idx == 0) { + // Adjust memory limit via memory share arbitrator + _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), + _scanner_mem_limiter->get_estimated_block_mem_bytes()); + } + + ScannerAdaptiveProcessor& P = *_adaptive_processor; + int& scanners = P.expected_scanners; + int64_t now = UnixMillis(); + // Avoid frequent adjustment - only adjust every 100ms + if (now - P.adjust_scanners_last_timestamp <= config::doris_scanner_dynamic_interval_ms) { + return scanners; + } + P.adjust_scanners_last_timestamp = now; + auto old_scanners = P.expected_scanners; + + scanners = std::max(min_scanners, scanners); + scanners = std::min(max_scanners, scanners); + VLOG_DEBUG << fmt::format( + "_available_pickup_scanner_count. context = {}, old_scanners = {}, scanners = {} " + ", min_scanners: {}, max_scanners: {}", + debug_string(), old_scanners, scanners, min_scanners, max_scanners); + + // TODO(gabriel): Scanners are scheduled adaptively based on the memory usage now. + // if (_in_flight_tasks_num == 0) { + // int64_t halt_time = 0; + // if (P.last_scanner_finish_timestamp != 0) { + // halt_time = now - P.last_scanner_finish_timestamp; + // } + // P.last_scanner_finish_timestamp = now; + // P.scanner_total_halt_time += halt_time; + // } + // // Calculate performance metrics for adjustment + // P.scanner_gen_blocks_time = (now - P.context_start_time - P.scanner_total_halt_time); + // + // // Blocks per 10ms + // // FIXME: + // double source_speed = static_cast(_tasks_queue.size()) * 1e1 / + // static_cast(P.scanner_gen_blocks_time + 1); + // // Scanner scan speed: bytes/ms + // int64_t scanner_total_scan_bytes = P.scanner_total_scan_bytes.load(); + // double scanner_scan_speed = static_cast(scanner_total_scan_bytes) / + // static_cast(P.scanner_gen_blocks_time + 1); + // // IO latency metrics + // double scanner_total_io_time = static_cast(P.scanner_total_io_time.load()) * 1e-6; + // double scanner_total_scan_bytes_mb = + // static_cast(scanner_total_scan_bytes) / (1024.0 * 1024.0); + // double io_latency = scanner_total_io_time / (scanner_total_scan_bytes_mb + 1e-3); + // + // // Adjustment routines + // auto try_add_scanners = [&]() { + // if (!P.try_add_scanners) return true; + // if (P.last_scanner_total_scan_bytes == scanner_total_scan_bytes) return true; + // return (scanner_scan_speed > (P.last_scanner_scan_speed * P.expected_speedup_ratio)); + // }; + // + // auto do_add_scanners = [&]() { + // P.try_add_scanners = true; + // const int smooth = 2; // Smoothing factor + // P.expected_speedup_ratio = + // static_cast(scanners + 1 + smooth) / static_cast(scanners + smooth); + // scanners += 1; + // }; + // + // auto do_sub_scanners = [&]() { + // scanners -= 1; + // P.try_add_scanners = false; + // P.try_add_scanners_fail_count += 1; + // if (P.try_add_scanners_fail_count >= 4) { + // P.try_add_scanners_fail_count = 0; + // scanners -= 1; + // } + // }; + // + // auto check_slow_io = [&]() { + // if (((P.check_slow_io++) % 8) != 0) return; + // if (io_latency >= 2 * P.slow_io_latency_ms) { + // scanners = std::max(scanners, _max_scan_concurrency / 2); + // } else if (io_latency >= P.slow_io_latency_ms) { + // scanners = std::max(scanners, _max_scan_concurrency / 4); + // } + // }; + // + // // Perform adjustment based on feedback + // auto do_adjustment = [&]() { + // check_slow_io(); + // + // // If source is too slow compared to capacity, add scanners + // if (source_speed < 0.5) { // Very slow block production + // do_add_scanners(); + // } else if (try_add_scanners()) { + // do_add_scanners(); + // } else { + // do_sub_scanners(); + // } + // }; + // P.last_scanner_scan_speed = scanner_scan_speed; + // P.last_scanner_total_scan_bytes = scanner_total_scan_bytes; + // + // do_adjustment(); + // scanners = std::min(scanners, max_scanners); + // scanners = std::max(scanners, min_scanners); + // + // VLOG_DEBUG << fmt::format( + // "available_pickup_scanner_count. ctx_id = {}, scan = {}, source_speed = {}, " + // "io_latency = {} ms/MB, proposal = {}, current = {}", + // ctx_id, scanner_scan_speed, source_speed, io_latency, scanners, + // _in_flight_tasks_num); + + return scanners; } // After init function call, should not access _parent @@ -148,6 +362,20 @@ Status ScannerContext::init() { _set_scanner_done(); } + // Initialize memory limiter if memory-aware scheduling is enabled + if (_enable_adaptive_scanners) { + DCHECK(_scanner_mem_limiter && _mem_share_arb); + int64_t c = _scanner_mem_limiter->update_open_tasks_count(1); + // TODO(gabriel): set estimated block size + _scanner_mem_limiter->reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES); + _scanner_mem_limiter->update_arb_mem_bytes(DEFAULT_SCANNER_MEM_BYTES); + if (c == 0) { + // First scanner context to open, adjust scan memory limit + _adjust_scan_mem_limit(DEFAULT_SCANNER_MEM_BYTES, + _scanner_mem_limiter->get_arb_scanner_mem_bytes()); + } + } + // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. // you can refer https://github.com/apache/doris/issues/35340 for details. @@ -181,13 +409,22 @@ Status ScannerContext::init() { ScannerContext::~ScannerContext() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); - _tasks_queue.clear(); + _completed_tasks.clear(); BlockUPtr block; while (_free_blocks.try_dequeue(block)) { // do nothing } block.reset(); DorisMetrics::instance()->scanner_ctx_cnt->increment(-1); + + // Cleanup memory limiter if last context closing + if (_enable_adaptive_scanners) { + if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) { + // Last scanner context to close, reset scan memory limit + _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), 0); + } + } + if (_task_handle) { if (auto* task_executor_scheduler = dynamic_cast(_scanner_scheduler)) { @@ -236,7 +473,7 @@ Status ScannerContext::submit_scan_task(std::shared_ptr scan_task, // and _num_finished_scanners will be reduced. // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task // see ScannerScheduler::_scanner_scan. - _num_scheduled_scanners++; + _in_flight_tasks_num++; return _scanner_scheduler->submit(shared_from_this(), scan_task); } @@ -246,13 +483,10 @@ void ScannerContext::clear_free_blocks() { void ScannerContext::push_back_scan_task(std::shared_ptr scan_task) { if (scan_task->status_ok()) { - for (const auto& [block, _] : scan_task->cached_blocks) { - if (block->rows() > 0) { - Status st = validate_block_schema(block.get()); - if (!st.ok()) { - scan_task->set_status(st); - break; - } + if (scan_task->cached_block && scan_task->cached_block->rows() > 0) { + Status st = validate_block_schema(scan_task->cached_block.get()); + if (!st.ok()) { + scan_task->set_status(st); } } } @@ -261,8 +495,8 @@ void ScannerContext::push_back_scan_task(std::shared_ptr scan_task) { if (!scan_task->status_ok()) { _process_status = scan_task->get_status(); } - _tasks_queue.push_back(scan_task); - _num_scheduled_scanners--; + _completed_tasks.push_back(scan_task); + _in_flight_tasks_num--; _dependency->set_ready(); } @@ -281,10 +515,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, b std::shared_ptr scan_task = nullptr; - if (!_tasks_queue.empty() && !done()) { + if (!_completed_tasks.empty() && !done()) { // https://en.cppreference.com/w/cpp/container/list/front // The behavior is undefined if the list is empty. - scan_task = _tasks_queue.front(); + scan_task = _completed_tasks.front(); + _completed_tasks.pop_front(); } if (scan_task != nullptr) { @@ -297,48 +532,39 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, b return _process_status; } - // No need to worry about small block, block is merged together when they are appended to cached_blocks. - if (!scan_task->cached_blocks.empty()) { - auto [current_block, block_size] = std::move(scan_task->cached_blocks.front()); - scan_task->cached_blocks.pop_front(); + if (scan_task->cached_block) { + // No need to worry about small block, block is merged together when they are appended to cached_blocks. + auto current_block = std::move(scan_task->cached_block); + auto block_size = current_block->allocated_bytes(); + scan_task->cached_block.reset(); _block_memory_usage -= block_size; // consume current block block->swap(*current_block); return_free_block(std::move(current_block)); } - VLOG_DEBUG << fmt::format( - "ScannerContext {} get block from queue, task_queue size {}, current scan " + "ScannerContext {} get block from queue, current scan " "task remaing cached_block size {}, eos {}, scheduled tasks {}", - ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(), scan_task->is_eos(), - _num_scheduled_scanners); - - if (scan_task->cached_blocks.empty()) { - // All Cached blocks are consumed, pop this task from task_queue. - if (!_tasks_queue.empty()) { - _tasks_queue.pop_front(); - } - - if (scan_task->is_eos()) { - // 1. if eos, record a finished scanner. - _num_finished_scanners++; - RETURN_IF_ERROR( - _scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); - } else { - RETURN_IF_ERROR( - _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l)); - } + ctx_id, _completed_tasks.size(), scan_task->is_eos(), _in_flight_tasks_num); + if (scan_task->is_eos()) { + // 1. if eos, record a finished scanner. + _num_finished_scanners++; + RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); + } else { + scan_task->set_state(ScanTask::State::IN_FLIGHT); + RETURN_IF_ERROR( + _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l)); } } - if (_num_finished_scanners == _all_scanners.size() && _tasks_queue.empty()) { + if (_num_finished_scanners == _all_scanners.size() && _completed_tasks.empty()) { _set_scanner_done(); _is_finished = true; } *eos = done(); - if (_tasks_queue.empty()) { + if (_completed_tasks.empty()) { _dependency->block(); } @@ -381,7 +607,7 @@ void ScannerContext::stop_scanners(RuntimeState* state) { sc->_scanner->try_stop(); } } - _tasks_queue.clear(); + _completed_tasks.clear(); if (_task_handle) { if (auto* task_executor_scheduler = dynamic_cast(_scanner_scheduler)) { @@ -438,13 +664,17 @@ void ScannerContext::stop_scanners(RuntimeState* state) { std::string ScannerContext::debug_string() { return fmt::format( - "id: {}, total scanners: {}, pending tasks: {}," + "_query_id: {}, id: {}, total scanners: {}, pending tasks: {}, completed tasks: {}," " _should_stop: {}, _is_finished: {}, free blocks: {}," - " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," - " _max_bytes_in_queue: {}, query_id: {}", - ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished, - _free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_scan_concurrency, - _max_bytes_in_queue, print_id(_query_id)); + " limit: {}, _in_flight_tasks_num: {}, _num_finished_scanners: {}, _max_thread_num: {}," + " _max_bytes_in_queue: {}, query_id: {}, _ins_idx: {}, _enable_adaptive_scanners: {}, " + "_mem_share_arb: {}, _scanner_mem_limiter: {}", + print_id(_query_id), ctx_id, _all_scanners.size(), _pending_tasks.size(), + _completed_tasks.size(), _should_stop, _is_finished, _free_blocks.size_approx(), limit, + _in_flight_tasks_num, _num_finished_scanners, _max_scan_concurrency, + _max_bytes_in_queue, print_id(_query_id), _ins_idx, _enable_adaptive_scanners, + _enable_adaptive_scanners ? _mem_share_arb->debug_string() : "NULL", + _enable_adaptive_scanners ? _scanner_mem_limiter->debug_string() : "NULL"); } void ScannerContext::_set_scanner_done() { @@ -452,38 +682,64 @@ void ScannerContext::_set_scanner_done() { } void ScannerContext::update_peak_running_scanner(int num) { +#ifndef BE_TEST _local_state->_peak_running_scanner->add(num); +#endif + if (_enable_adaptive_scanners) { + _scanner_mem_limiter->update_running_tasks_count(num); + } +} + +void ScannerContext::reestimated_block_mem_bytes(int64_t num) { + if (_enable_adaptive_scanners) { + _scanner_mem_limiter->reestimated_block_mem_bytes(num); + } } int32_t ScannerContext::_get_margin(std::unique_lock& transfer_lock, std::unique_lock& scheduler_lock) { + // Get effective max concurrency considering adaptive scheduling + int32_t effective_max_concurrency = _available_pickup_scanner_count(); + DCHECK_LE(effective_max_concurrency, _max_scan_concurrency); + // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks. int32_t margin_1 = _min_scan_concurrency - - (cast_set(_tasks_queue.size()) + _num_scheduled_scanners); + (cast_set(_completed_tasks.size()) + _in_flight_tasks_num); // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. int32_t margin_2 = _min_scan_concurrency_of_scan_scheduler - (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); + // margin_3 is used to respect adaptive max concurrency limit + int32_t margin_3 = + std::max(effective_max_concurrency - + (cast_set(_completed_tasks.size()) + _in_flight_tasks_num), + 1); + if (margin_1 <= 0 && margin_2 <= 0) { return 0; } int32_t margin = std::max(margin_1, margin_2); + if (_enable_adaptive_scanners) { + margin = std::min(margin, margin_3); // Cap by adaptive limit + } if (low_memory_mode()) { // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`. // So that we will not submit too many scan tasks to scheduler. - margin = std::min(low_memory_mode_scanners() - _num_scheduled_scanners, margin); + margin = std::min(low_memory_mode_scanners() - _in_flight_tasks_num, margin); } VLOG_DEBUG << fmt::format( "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " - "({} + {}), margin: {}", - print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _tasks_queue.size(), - _num_scheduled_scanners, margin_2, _min_scan_concurrency_of_scan_scheduler, - _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin); + "({} + {}), margin_3: {} = {} - ({} + {}), margin: {}, adaptive: {}", + print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _completed_tasks.size(), + _in_flight_tasks_num, margin_2, _min_scan_concurrency_of_scan_scheduler, + _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), + margin_3, effective_max_concurrency, _completed_tasks.size(), _in_flight_tasks_num, + margin, _enable_adaptive_scanners); return margin; } @@ -495,7 +751,7 @@ Status ScannerContext::schedule_scan_task(std::shared_ptr current_scan std::unique_lock& transfer_lock, std::unique_lock& scheduler_lock) { if (current_scan_task && - (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) { + (current_scan_task->cached_block != nullptr || current_scan_task->is_eos())) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); } @@ -509,18 +765,19 @@ Status ScannerContext::schedule_scan_task(std::shared_ptr current_scan // We need to add it back to task queue to make sure it could be resubmitted. if (current_scan_task) { // This usually happens when we should downgrade the concurrency. - _pending_scanners.push(current_scan_task); + current_scan_task->set_state(ScanTask::State::PENDING); + _pending_tasks.push(current_scan_task); VLOG_DEBUG << fmt::format( - "{} push back scanner to task queue, because diff <= 0, task_queue size " - "{}, _num_scheduled_scanners {}", - ctx_id, _tasks_queue.size(), _num_scheduled_scanners); + "{} push back scanner to task queue, because diff <= 0, _completed_tasks size " + "{}, _in_flight_tasks_num {}", + ctx_id, _completed_tasks.size(), _in_flight_tasks_num); } #ifndef NDEBUG // This DCHECK is necessary. // We need to make sure each scan operator could have at least 1 scan tasks. // Or this scan operator will not be re-scheduled. - if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) { + if (!_pending_tasks.empty() && _in_flight_tasks_num == 0 && _completed_tasks.empty()) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); } #endif @@ -533,10 +790,10 @@ Status ScannerContext::schedule_scan_task(std::shared_ptr current_scan while (margin-- > 0) { std::shared_ptr task_to_run; const int32_t current_concurrency = cast_set( - _tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size()); + _completed_tasks.size() + _in_flight_tasks_num + tasks_to_submit.size()); VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id, - current_concurrency, _tasks_queue.size(), _num_scheduled_scanners, - tasks_to_submit.size()); + current_concurrency, _completed_tasks.size(), + _in_flight_tasks_num, tasks_to_submit.size()); if (first_pull) { task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency); if (task_to_run == nullptr) { @@ -544,16 +801,16 @@ Status ScannerContext::schedule_scan_task(std::shared_ptr current_scan // 1. current_concurrency already reached _max_scan_concurrency. // 2. all scanners are finished. if (current_scan_task) { - DCHECK(current_scan_task->cached_blocks.empty()); + DCHECK(current_scan_task->cached_block == nullptr); DCHECK(!current_scan_task->is_eos()); - if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { + if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) { // This should not happen. throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); } - // Current scan task is not eos, but we can not resubmit it. - // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future. - _pending_scanners.push(current_scan_task); + // Current scan task is not scheduled, we need to add it back to task queue to make sure it could be resubmitted. + current_scan_task->set_state(ScanTask::State::PENDING); + _pending_tasks.push(current_scan_task); } } first_pull = false; @@ -574,7 +831,7 @@ Status ScannerContext::schedule_scan_task(std::shared_ptr current_scan VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}", print_id(_query_id), ctx_id, tasks_to_submit.size(), - _pending_scanners.size()); + _pending_tasks.size()); for (auto& scan_task_iter : tasks_to_submit) { Status submit_status = submit_scan_task(scan_task_iter, transfer_lock); @@ -590,26 +847,33 @@ Status ScannerContext::schedule_scan_task(std::shared_ptr current_scan std::shared_ptr ScannerContext::_pull_next_scan_task( std::shared_ptr current_scan_task, int32_t current_concurrency) { - if (current_concurrency >= _max_scan_concurrency) { + int32_t effective_max_concurrency = _max_scan_concurrency; + if (_enable_adaptive_scanners) { + effective_max_concurrency = _adaptive_processor->expected_scanners > 0 + ? _adaptive_processor->expected_scanners + : _max_scan_concurrency; + } + + if (current_concurrency >= effective_max_concurrency) { VLOG_DEBUG << fmt::format( - "ScannerContext {} current concurrency {} >= _max_scan_concurrency {}, skip " + "ScannerContext {} current concurrency {} >= effective_max_concurrency {}, skip " "pull", - ctx_id, current_concurrency, _max_scan_concurrency); + ctx_id, current_concurrency, effective_max_concurrency); return nullptr; } if (current_scan_task != nullptr) { - if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { + if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) { // This should not happen. throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); } return current_scan_task; } - if (!_pending_scanners.empty()) { + if (!_pending_tasks.empty()) { std::shared_ptr next_scan_task; - next_scan_task = _pending_scanners.top(); - _pending_scanners.pop(); + next_scan_task = _pending_tasks.top(); + _pending_tasks.pop(); return next_scan_task; } else { return nullptr; diff --git a/be/src/exec/scan/scanner_context.h b/be/src/exec/scan/scanner_context.h index 553408ebc96e92..f79fe317780c46 100644 --- a/be/src/exec/scan/scanner_context.h +++ b/be/src/exec/scan/scanner_context.h @@ -52,12 +52,127 @@ class Dependency; class Scanner; class ScannerDelegate; class ScannerScheduler; -class ScannerScheduler; class TaskExecutor; class TaskHandle; +struct MemLimiter; + +// Query-level memory arbitrator that distributes memory fairly across all scan contexts +struct MemShareArbitrator { + ENABLE_FACTORY_CREATOR(MemShareArbitrator) + TUniqueId query_id; + int64_t query_mem_limit = 0; + int64_t mem_limit = 0; + std::atomic total_mem_bytes = 0; + + MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double max_scan_ratio); + + // Update memory allocation when scanner memory usage changes + // Returns new scan memory limit for this context + int64_t update_mem_bytes(int64_t old_value, int64_t new_value); + void register_scan_node(); + std::string debug_string() const { + return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}", print_id(query_id), + query_mem_limit, mem_limit); + } +}; + +// Scan-context-level memory limiter that controls scanner concurrency based on memory +struct MemLimiter { +private: + TUniqueId query_id; + mutable std::mutex lock; + // Parallelism of the scan operator + const int64_t parallelism = 0; + const bool serial_operator = false; + const int64_t operator_mem_limit; + std::atomic running_tasks_count = 0; + + std::atomic estimated_block_mem_bytes = 0; + int64_t estimated_block_mem_bytes_update_count = 0; + int64_t arb_mem_bytes = 0; + std::atomic open_tasks_count = 0; + + // Memory limit for this scan node (shared by all instances), updated by memory share arbitrator + std::atomic mem_limit = 0; + +public: + ENABLE_FACTORY_CREATOR(MemLimiter) + MemLimiter(const TUniqueId& qid, int64_t parallelism, bool serial_operator_, int64_t mem_limit) + : query_id(qid), + parallelism(parallelism), + serial_operator(serial_operator_), + operator_mem_limit(mem_limit) {} + ~MemLimiter() { DCHECK_EQ(open_tasks_count, 0); } + + // Calculate available scanner count based on memory limit + int available_scanner_count(int ins_idx) const; + + int64_t update_running_tasks_count(int delta) { return running_tasks_count += delta; } + + // Re-estimated the average memory usage of a block, and update the estimated_block_mem_bytes accordingly. + void reestimated_block_mem_bytes(int64_t value); + void update_mem_limit(int64_t value) { mem_limit = value; } + void update_arb_mem_bytes(int64_t value) { + value = std::min(value, operator_mem_limit); + arb_mem_bytes = value; + } + int64_t get_arb_scanner_mem_bytes() const { return arb_mem_bytes; } + + int64_t get_estimated_block_mem_bytes() const { return estimated_block_mem_bytes; } + + int64_t update_open_tasks_count(int delta) { return open_tasks_count.fetch_add(delta); } + std::string debug_string() const { + return fmt::format( + "query_id: {}, parallelism: {}, serial_operator: {}, operator_mem_limit: {}, " + "running_tasks_count: {}, estimated_block_mem_bytes: {}, " + "estimated_block_mem_bytes_update_count: {}, arb_mem_bytes: {}, " + "open_tasks_count: {}, mem_limit: {}", + print_id(query_id), parallelism, serial_operator, operator_mem_limit, + running_tasks_count.load(), estimated_block_mem_bytes.load(), + estimated_block_mem_bytes_update_count, arb_mem_bytes, open_tasks_count, mem_limit); + } +}; + +// Adaptive processor for dynamic scanner concurrency adjustment +struct ScannerAdaptiveProcessor { + ENABLE_FACTORY_CREATOR(ScannerAdaptiveProcessor) + ScannerAdaptiveProcessor() = default; + ~ScannerAdaptiveProcessor() = default; + // Expected scanners in this cycle + + int expected_scanners = 0; + // Timing metrics + // int64_t context_start_time = 0; + // int64_t scanner_total_halt_time = 0; + // int64_t scanner_gen_blocks_time = 0; + // std::atomic_int64_t scanner_total_io_time = 0; + // std::atomic_int64_t scanner_total_running_time = 0; + // std::atomic_int64_t scanner_total_scan_bytes = 0; + + // Timestamps + // std::atomic_int64_t last_scanner_finish_timestamp = 0; + // int64_t check_all_scanners_last_timestamp = 0; + // int64_t last_driver_output_full_timestamp = 0; + int64_t adjust_scanners_last_timestamp = 0; + + // Adjustment strategy fields + // bool try_add_scanners = false; + // double expected_speedup_ratio = 0; + // double last_scanner_scan_speed = 0; + // int64_t last_scanner_total_scan_bytes = 0; + // int try_add_scanners_fail_count = 0; + // int check_slow_io = 0; + // int32_t slow_io_latency_ms = 100; // Default from config +}; class ScanTask { public: + enum class State : int { + PENDING, // not scheduled yet + IN_FLIGHT, // scheduled and running + COMPLETED, // finished with result or error, waiting to be collected by scan node + EOS, // finished and no more data, waiting to be collected by scan node + }; ScanTask(std::weak_ptr delegate_scanner) : scanner(delegate_scanner) { _resource_ctx = thread_context()->resource_ctx(); DorisMetrics::instance()->scanner_task_cnt->increment(1); @@ -65,19 +180,19 @@ class ScanTask { ~ScanTask() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); - cached_blocks.clear(); DorisMetrics::instance()->scanner_task_cnt->increment(-1); + cached_block.reset(); } private: // whether current scanner is finished - bool eos = false; Status status = Status::OK(); std::shared_ptr _resource_ctx; + State _state = State::PENDING; public: std::weak_ptr scanner; - std::list> cached_blocks; + BlockUPtr cached_block = nullptr; bool is_first_schedule = true; // Use weak_ptr to avoid circular references and potential memory leaks with SplitRunner. // ScannerContext only needs to observe the lifetime of SplitRunner without owning it. @@ -87,14 +202,39 @@ class ScanTask { void set_status(Status _status) { if (_status.is()) { // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error - eos = true; + _state = State::EOS; } status = _status; } Status get_status() const { return status; } bool status_ok() { return status.ok() || status.is(); } - bool is_eos() const { return eos; } - void set_eos(bool _eos) { eos = _eos; } + bool is_eos() const { return _state == State::EOS; } + void set_state(State state) { + switch (state) { + case State::PENDING: + DCHECK(_state == State::PENDING || _state == State::IN_FLIGHT) << (int)_state; + DCHECK(cached_block == nullptr); + break; + case State::IN_FLIGHT: + DCHECK(_state == State::COMPLETED || _state == State::PENDING || + _state == State::IN_FLIGHT) + << (int)_state; + DCHECK(cached_block == nullptr); + break; + case State::COMPLETED: + DCHECK(_state == State::IN_FLIGHT) << (int)_state; + DCHECK(cached_block != nullptr); + break; + case State::EOS: + DCHECK(_state == State::IN_FLIGHT || status.is()) + << (int)_state; + break; + default: + break; + } + + _state = state; + } }; // ScannerContext is responsible for recording the execution status @@ -115,7 +255,8 @@ class ScannerContext : public std::enable_shared_from_this, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, - std::shared_ptr dependency + std::shared_ptr dependency, std::shared_ptr arb, + std::shared_ptr limiter, int ins_idx, bool enable_adaptive_scan #ifdef BE_TEST , int num_parallel_instances @@ -125,6 +266,7 @@ class ScannerContext : public std::enable_shared_from_this, ~ScannerContext() override; Status init(); + // TODO(gabriel): we can also consider to return a list of blocks to reduce the scheduling overhead, but it may cause larger memory usage and more complex logic of block management. BlockUPtr get_free_block(bool force); void return_free_block(BlockUPtr block); void clear_free_blocks(); @@ -134,6 +276,7 @@ class ScannerContext : public std::enable_shared_from_this, // Caller should make sure the pipeline task is still running when calling this function void update_peak_running_scanner(int num); + void reestimated_block_mem_bytes(int64_t num); // Get next block from blocks queue. Called by ScanNode/ScanOperator // Set eos to true if there is no more data to read. @@ -186,7 +329,7 @@ class ScannerContext : public std::enable_shared_from_this, int32_t num_scheduled_scanners() { std::lock_guard l(_transfer_lock); - return _num_scheduled_scanners; + return _in_flight_tasks_num; } Status schedule_scan_task(std::shared_ptr current_scan_task, @@ -208,9 +351,6 @@ class ScannerContext : public std::enable_shared_from_this, const TupleDescriptor* _output_tuple_desc = nullptr; const RowDescriptor* _output_row_descriptor = nullptr; - std::mutex _transfer_lock; - std::list> _tasks_queue; - Status _process_status = Status::OK(); std::atomic_bool _should_stop = false; std::atomic_bool _is_finished = false; @@ -223,10 +363,40 @@ class ScannerContext : public std::enable_shared_from_this, int64_t limit; int64_t _max_bytes_in_queue = 0; - // Using stack so that we can resubmit scanner in a LIFO order, maybe more cache friendly - std::stack> _pending_scanners; - // Scanner that is submitted to the scheduler. - std::atomic_int _num_scheduled_scanners = 0; + // _transfer_lock protects _completed_tasks, _pending_tasks, and all other shared state + // accessed by both the scanner thread pool and the operator (get_block_from_queue). + std::mutex _transfer_lock; + + // Together, _completed_tasks and _in_flight_tasks_num represent all "occupied" concurrency + // slots. The scheduler uses their sum as the current concurrency: + // + // current_concurrency = _completed_tasks.size() + _in_flight_tasks_num + // + // Lifecycle of a ScanTask: + // _pending_tasks --(submit_scan_task)--> [thread pool] --(push_back_scan_task)--> + // _completed_tasks --(get_block_from_queue)--> operator + // After consumption: non-EOS task goes back to _pending_tasks; EOS increments + // _num_finished_scanners. + + // Completed scan tasks whose cached_block is ready for the operator to consume. + // Protected by _transfer_lock. Written by push_back_scan_task() (scanner thread), + // read/popped by get_block_from_queue() (operator thread). + std::list> _completed_tasks; + + // Scanners waiting to be submitted to the scheduler thread pool. Stored as a stack + // (LIFO) so that recently-used scanners are re-scheduled first, which is more likely + // to be cache-friendly. Protected by _transfer_lock. Populated in the constructor + // and by schedule_scan_task() when the concurrency limit is reached; drained by + // _pull_next_scan_task() during scheduling. + std::stack> _pending_tasks; + + // Number of scan tasks currently submitted to the scanner scheduler thread pool + // (i.e. in-flight). Incremented by submit_scan_task() before submission and + // decremented by push_back_scan_task() when the thread pool returns the task. + // Declared atomic so it can be read without _transfer_lock in non-critical paths, + // but must be read under _transfer_lock whenever combined with _completed_tasks.size() + // to form a consistent concurrency snapshot. + std::atomic_int _in_flight_tasks_num = 0; // Scanner that is eos or error. int32_t _num_finished_scanners = 0; // weak pointer for _scanners, used in stop function @@ -259,6 +429,19 @@ class ScannerContext : public std::enable_shared_from_this, int32_t _get_margin(std::unique_lock& transfer_lock, std::unique_lock& scheduler_lock); + // Memory-aware adaptive scheduling + std::shared_ptr _scanner_mem_limiter = nullptr; + std::shared_ptr _mem_share_arb = nullptr; + std::shared_ptr _adaptive_processor = nullptr; + const int _ins_idx; + const bool _enable_adaptive_scanners = false; + + // Adjust scan memory limit based on arbitrator feedback + void _adjust_scan_mem_limit(int64_t old_scanner_mem_bytes, int64_t new_scanner_mem_bytes); + + // Calculate available scanner count for adaptive scheduling + int _available_pickup_scanner_count(); + // TODO: Add implementation of runtime_info_feed_back // adaptive scan concurrency related end }; diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp index 3dfa1fdf4cdc8a..a2489f24e5ffa7 100644 --- a/be/src/exec/scan/scanner_scheduler.cpp +++ b/be/src/exec/scan/scanner_scheduler.cpp @@ -67,7 +67,8 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, return Status::OK(); } - scanner_delegate->_scanner->start_wait_worker_timer(); + scan_task->set_state(ScanTask::State::IN_FLIGHT); + scanner_delegate->_scanner->pause(); TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); auto sumbit_task = [&]() { auto work_func = [scanner_ref = scan_task, ctx]() { @@ -161,13 +162,12 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, MonotonicStopWatch max_run_time_watch; max_run_time_watch.start(); - scanner->update_wait_worker_timer(); - scanner->start_scan_cpu_timer(); + scanner->resume(); bool need_update_profile = true; auto update_scanner_profile = [&]() { if (need_update_profile) { - scanner->update_scan_cpu_timer(); + scanner->pause(); scanner->update_realtime_counters(); need_update_profile = false; } @@ -175,12 +175,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, Status status = Status::OK(); bool eos = false; - Defer defer_scanner([&] { - if (status.ok() && !eos) { - // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again. - scanner->start_wait_worker_timer(); - } - }); ASSIGN_STATUS_IF_CATCH_EXCEPTION( RuntimeState* state = ctx->state(); DCHECK(nullptr != state); @@ -217,113 +211,72 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, } } - size_t raw_bytes_read = 0; - bool first_read = true; int64_t limit = scanner->limit(); - // If the first block is full, then it is true. Or the first block + second block > batch_size - bool has_first_full_block = false; - - // During low memory mode, every scan task will return at most 2 block to reduce memory usage. - while (!eos && raw_bytes_read < raw_bytes_threshold && - (!ctx->low_memory_mode() || !has_first_full_block) && - (!has_first_full_block || doris::thread_context() - ->thread_mem_tracker_mgr->limiter_mem_tracker() - ->check_limit(1))) { - if (UNLIKELY(ctx->done())) { - eos = true; - break; - } - if (max_run_time_watch.elapsed_time() > - config::doris_scanner_max_run_time_ms * 1e6) { - break; - } - DEFER_RELEASE_RESERVED(); - BlockUPtr free_block; - if (first_read) { - free_block = ctx->get_free_block(first_read); - } else { - if (state->get_query_ctx() - ->resource_ctx() - ->task_controller() - ->is_enable_reserve_memory()) { - size_t block_avg_bytes = scanner->get_block_avg_bytes(); - auto st = thread_context()->thread_mem_tracker_mgr->try_reserve( - block_avg_bytes); - if (!st.ok()) { - handle_reserve_memory_failure(state, ctx, st, block_avg_bytes); - break; + bool first_read = true; + int64_t limit = scanner->limit(); + if (UNLIKELY(ctx->done())) { eos = true; } else if (!eos) { + do { + DEFER_RELEASE_RESERVED(); + BlockUPtr free_block; + if (first_read) { + free_block = ctx->get_free_block(first_read); + } else { + if (state->get_query_ctx() + ->resource_ctx() + ->task_controller() + ->is_enable_reserve_memory()) { + size_t block_avg_bytes = scanner->get_block_avg_bytes(); + auto st = thread_context()->thread_mem_tracker_mgr->try_reserve( + block_avg_bytes); + if (!st.ok()) { + handle_reserve_memory_failure(state, ctx, st, block_avg_bytes); + break; + } } + free_block = ctx->get_free_block(first_read); } - free_block = ctx->get_free_block(first_read); - } - if (free_block == nullptr) { - break; - } - // We got a new created block or a reused block. - status = scanner->get_block_after_projects(state, free_block.get(), &eos); - first_read = false; - if (!status.ok()) { - LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string(); - break; - } - // Check column type only after block is read successfully. - // Or it may cause a crash when the block is not normal. - _make_sure_virtual_col_is_materialized(scanner, free_block.get()); - // Projection will truncate useless columns, makes block size change. - auto free_block_bytes = free_block->allocated_bytes(); - raw_bytes_read += free_block_bytes; - if (!scan_task->cached_blocks.empty() && - scan_task->cached_blocks.back().first->rows() + free_block->rows() <= - ctx->batch_size()) { - size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); - MutableBlock mutable_block(scan_task->cached_blocks.back().first.get()); - status = mutable_block.merge(*free_block); - if (!status.ok()) { - LOG(WARNING) << "Block merge failed: " << status.to_string(); + if (free_block == nullptr) { break; } - scan_task->cached_blocks.back().second = mutable_block.allocated_bytes(); - scan_task->cached_blocks.back().first.get()->set_columns( - std::move(mutable_block.mutable_columns())); - - // Return block succeed or not, this free_block is not used by this scan task any more. - // If block can be reused, its memory usage will be added back. - ctx->return_free_block(std::move(free_block)); - ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() - - block_size); - } else { - if (!scan_task->cached_blocks.empty()) { - has_first_full_block = true; + // We got a new created block or a reused block. + status = scanner->get_block_after_projects(state, free_block.get(), &eos); + first_read = false; + if (!status.ok()) { + LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string(); + break; } + // Check column type only after block is read successfully. + // Or it may cause a crash when the block is not normal. + _make_sure_virtual_col_is_materialized(scanner, free_block.get()); + // Projection will truncate useless columns, makes block size change. + auto free_block_bytes = free_block->allocated_bytes(); + ctx->reestimated_block_mem_bytes(cast_set(free_block_bytes)); + DCHECK(scan_task->cached_block == nullptr); ctx->inc_block_usage(free_block->allocated_bytes()); - scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); - } - - if (limit > 0 && limit < ctx->batch_size()) { - // If this scanner has limit, and less than batch size, - // return immediately and no need to wait raw_bytes_threshold. - // This can save time that each scanner may only return a small number of rows, - // but rows are enough from all scanners. - // If not break, the query like "select * from tbl where id=1 limit 10" - // may scan a lot data when the "id=1"'s filter ratio is high. - // If limit is larger than batch size, this rule is skipped, - // to avoid user specify a large limit and causing too much small blocks. - break; - } + scan_task->cached_block = std::move(free_block); + + if (limit > 0 && limit < ctx->batch_size()) { + // If this scanner has limit, and less than batch size, + // return immediately and no need to wait raw_bytes_threshold. + // This can save time that each scanner may only return a small number of rows, + // but rows are enough from all scanners. + // If not break, the query like "select * from tbl where id=1 limit 10" + // may scan a lot data when the "id=1"'s filter ratio is high. + // If limit is larger than batch size, this rule is skipped, + // to avoid user specify a large limit and causing too much small blocks. + break; + } - if (scan_task->cached_blocks.back().first->rows() > 0) { - auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() + - scan_task->cached_blocks.back().first->rows() - 1) / - scan_task->cached_blocks.back().first->rows() * - ctx->batch_size(); - scanner->update_block_avg_bytes(block_avg_bytes); - } - if (ctx->low_memory_mode()) { - ctx->clear_free_blocks(); - if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { - raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); + if (scan_task->cached_block->rows() > 0) { + auto block_avg_bytes = (scan_task->cached_block->bytes() + + scan_task->cached_block->rows() - 1) / + scan_task->cached_block->rows() * ctx->batch_size(); + scanner->update_block_avg_bytes(block_avg_bytes); } - } - } // end for while + if (ctx->low_memory_mode()) { + ctx->clear_free_blocks(); + } + } while (false); + } if (UNLIKELY(!status.ok())) { scan_task->set_status(status); @@ -341,14 +294,15 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, // so we need update_scanner_profile here update_scanner_profile(); scanner->mark_to_need_to_close(); + scan_task->set_state(ScanTask::State::EOS); + } else { + scan_task->set_state(ScanTask::State::COMPLETED); } - scan_task->set_eos(eos); VLOG_DEBUG << fmt::format( - "Scanner context {} has finished task, cached_block {} current scheduled task is " + "Scanner context {} has finished task, current scheduled task is " "{}, eos: {}, status: {}", - ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos, - status.to_string()); + ctx->ctx_id, ctx->num_scheduled_scanners(), eos, status.to_string()); ctx->push_back_scan_task(scan_task); } diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp index 24012e5bd4374b..527da7ff62630f 100644 --- a/be/src/format/csv/csv_reader.cpp +++ b/be/src/format/csv/csv_reader.cpp @@ -31,6 +31,7 @@ #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/consts.h" #include "common/status.h" #include "core/block/block.h" @@ -312,12 +313,15 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); + const int64_t max_block_bytes = config::csv_reader_max_block_bytes; size_t rows = 0; + size_t block_bytes = 0; bool success = false; bool is_remove_bom = false; if (_push_down_agg_type == TPushAggOp::type::COUNT) { - while (rows < batch_size && !_line_reader_eof) { + while (rows < batch_size && !_line_reader_eof && + (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) { const uint8_t* ptr = nullptr; size_t size = 0; RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); @@ -345,6 +349,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); ++rows; + block_bytes += size; } auto mutate_columns = block->mutate_columns(); for (auto& col : mutate_columns) { @@ -353,7 +358,8 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { block->set_columns(std::move(mutate_columns)); } else { auto columns = block->mutate_columns(); - while (rows < batch_size && !_line_reader_eof) { + while (rows < batch_size && !_line_reader_eof && + (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) { const uint8_t* ptr = nullptr; size_t size = 0; RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); @@ -384,6 +390,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { continue; } RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); + block_bytes += size; } block->set_columns(std::move(columns)); } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index c9f73e9b9b4c53..5a116b9d208bcb 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -138,6 +138,9 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, } clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp); DorisMetrics::instance()->query_ctx_cnt->increment(1); + _mem_arb = MemShareArbitrator::create_shared( + query_id, query_options.mem_limit, + query_options.__isset.max_scan_mem_ratio ? query_options.max_scan_mem_ratio : 1.0); } void QueryContext::_init_query_mem_tracker() { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index aa1746ed8b0939..fcb9b991bce218 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -220,6 +220,7 @@ class QueryContext : public std::enable_shared_from_this { } bool is_nereids() const { return _is_nereids; } + std::shared_ptr mem_arb() const { return _mem_arb; } WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); } std::shared_ptr query_mem_tracker() const { @@ -394,6 +395,7 @@ class QueryContext : public std::enable_shared_from_this { // instance id + node id -> cte scan std::map, RecCTEScanLocalState*> _cte_scan; std::mutex _cte_scan_lock; + std::shared_ptr _mem_arb = nullptr; public: // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile diff --git a/be/test/exec/scan/scanner_context_test.cpp b/be/test/exec/scan/scanner_context_test.cpp index 5b741bcae687a1..1d17ad37add6e4 100644 --- a/be/test/exec/scan/scanner_context_test.cpp +++ b/be/test/exec/scan/scanner_context_test.cpp @@ -148,7 +148,7 @@ TEST_F(ScannerContextTest, test_init) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scan_operator->_should_run_serial = false; @@ -208,7 +208,7 @@ TEST_F(ScannerContextTest, test_serial_run) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scan_operator->_should_run_serial = true; @@ -266,7 +266,7 @@ TEST_F(ScannerContextTest, test_max_column_reader_num) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scan_operator->_should_run_serial = false; @@ -316,14 +316,14 @@ TEST_F(ScannerContextTest, test_push_back_scan_task) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); - scanner_context->_num_scheduled_scanners = 11; + scanner_context->_in_flight_tasks_num = 11; for (int i = 0; i < 5; ++i) { auto scan_task = std::make_shared(std::make_shared(scanner)); scanner_context->push_back_scan_task(scan_task); - ASSERT_EQ(scanner_context->_num_scheduled_scanners, 10 - i); + ASSERT_EQ(scanner_context->_in_flight_tasks_num, 10 - i); } } @@ -353,7 +353,7 @@ TEST_F(ScannerContextTest, get_margin) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); std::mutex transfer_mutex; std::unique_lock transfer_lock(transfer_mutex); @@ -407,7 +407,7 @@ TEST_F(ScannerContextTest, get_margin) { EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10)); scanner_context->_scanner_scheduler = scheduler.get(); scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; - scanner_context->_num_scheduled_scanners = 0; + scanner_context->_in_flight_tasks_num = 0; margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); ASSERT_EQ(margin, scanner_context->_min_scan_concurrency); @@ -418,7 +418,7 @@ TEST_F(ScannerContextTest, get_margin) { EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10)); scanner_context->_scanner_scheduler = scheduler.get(); scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; - scanner_context->_num_scheduled_scanners = 20; + scanner_context->_in_flight_tasks_num = 20; margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); ASSERT_EQ(margin, 0); } @@ -449,7 +449,7 @@ TEST_F(ScannerContextTest, pull_next_scan_task) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); std::mutex transfer_mutex; std::unique_lock transfer_lock(transfer_mutex); @@ -473,26 +473,27 @@ TEST_F(ScannerContextTest, pull_next_scan_task) { scanner_context->_max_scan_concurrency = 2; BlockUPtr cached_block = Block::create_unique(); - scan_task->cached_blocks.emplace_back(std::move(cached_block), 0); + scan_task->cached_block = std::move(cached_block); EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task( scan_task, scanner_context->_max_scan_concurrency - 1)); - scan_task->cached_blocks.clear(); - scan_task->eos = true; + scan_task->cached_block.reset(); + scan_task->_state = ScanTask::State::IN_FLIGHT; + scan_task->set_state(ScanTask::State::EOS); EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task( scan_task, scanner_context->_max_scan_concurrency - 1)); - scan_task->cached_blocks.clear(); - scan_task->eos = false; + scan_task->cached_block.reset(); + scan_task->_state = ScanTask::State::IN_FLIGHT; pull_scan_task = scanner_context->_pull_next_scan_task( scan_task, scanner_context->_max_scan_concurrency - 1); EXPECT_EQ(pull_scan_task.get(), scan_task.get()); - scanner_context->_pending_scanners = std::stack>(); + scanner_context->_pending_tasks = std::stack>(); pull_scan_task = scanner_context->_pull_next_scan_task( nullptr, scanner_context->_max_scan_concurrency - 1); EXPECT_EQ(pull_scan_task, nullptr); - scanner_context->_pending_scanners.push( + scanner_context->_pending_tasks.push( std::make_shared(std::make_shared(scanner))); pull_scan_task = scanner_context->_pull_next_scan_task( nullptr, scanner_context->_max_scan_concurrency - 1); @@ -525,7 +526,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); std::mutex transfer_mutex; std::unique_lock transfer_lock(transfer_mutex); @@ -546,18 +547,18 @@ TEST_F(ScannerContextTest, schedule_scan_task) { Status st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock); ASSERT_TRUE(st.ok()); - ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1); + ASSERT_EQ(scanner_context->_in_flight_tasks_num, 1); scanner_context->_max_scan_concurrency = 10; scanner_context->_max_scan_concurrency = 1; scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock); ASSERT_TRUE(st.ok()); - ASSERT_EQ(scanner_context->_num_scheduled_scanners, scanner_context->_max_scan_concurrency); + ASSERT_EQ(scanner_context->_in_flight_tasks_num, scanner_context->_max_scan_concurrency); scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scanner_context->_scanner_scheduler = scheduler.get(); @@ -569,7 +570,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) { st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock); ASSERT_TRUE(st.ok()); // 15 since we have 15 scanners. - ASSERT_EQ(scanner_context->_num_scheduled_scanners, 15); + ASSERT_EQ(scanner_context->_in_flight_tasks_num, 15); scanners = std::list>(); for (int i = 0; i < 1; ++i) { @@ -578,7 +579,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) { scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scanner_context->_scanner_scheduler = scheduler.get(); @@ -589,12 +590,12 @@ TEST_F(ScannerContextTest, schedule_scan_task) { auto scan_task = std::make_shared(std::make_shared(scanner)); st = scanner_context->schedule_scan_task(scan_task, transfer_lock, scheduler_lock); // current scan task is added back. - ASSERT_EQ(scanner_context->_pending_scanners.size(), 1); - ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1); + ASSERT_EQ(scanner_context->_pending_tasks.size(), 1); + ASSERT_EQ(scanner_context->_in_flight_tasks_num, 1); scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scanner_context->_scanner_scheduler = scheduler.get(); @@ -603,7 +604,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) { scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock); scan_task = std::make_shared(std::make_shared(scanner)); - scan_task->cached_blocks.emplace_back(Block::create_unique(), 0); + scan_task->cached_block = Block::create_unique(); // Illigeal situation. // If current scan task has cached block, it should not be called with this methods. EXPECT_ANY_THROW(std::ignore = scanner_context->schedule_scan_task(scan_task, transfer_lock, @@ -647,7 +648,7 @@ TEST_F(ScannerContextTest, scan_queue_mem_limit) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); std::unique_ptr scheduler = std::make_unique(cgroup_cpu_ctl); @@ -687,7 +688,7 @@ TEST_F(ScannerContextTest, get_free_block) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); scanner_context->_newly_create_free_blocks_num->set(0L); scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); @@ -740,7 +741,7 @@ TEST_F(ScannerContextTest, return_free_block) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); scanner_context->_max_bytes_in_queue = 200; @@ -784,7 +785,7 @@ TEST_F(ScannerContextTest, get_block_from_queue) { std::shared_ptr scanner_context = ScannerContext::create_shared( state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, - scanners, limit, scan_dependency, parallel_tasks); + scanners, limit, scan_dependency, nullptr, nullptr, 0, false, parallel_tasks); scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); scanner_context->_max_bytes_in_queue = 200; @@ -818,8 +819,9 @@ TEST_F(ScannerContextTest, get_block_from_queue) { scanner_context->_is_finished = false; scanner_context->_should_stop = false; auto scan_task = std::make_shared(std::make_shared(scanner)); - scan_task->set_eos(true); - scanner_context->_tasks_queue.push_back(scan_task); + scan_task->_state = ScanTask::State::IN_FLIGHT; + scan_task->set_state(ScanTask::State::EOS); + scanner_context->_completed_tasks.push_back(scan_task); std::unique_ptr scheduler = std::make_unique(cgroup_cpu_ctl); EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_)) @@ -833,4 +835,436 @@ TEST_F(ScannerContextTest, get_block_from_queue) { EXPECT_EQ(scanner_context->_num_finished_scanners, 1); } +/** + MemShareArbitrator Tests (5 tests) + - scanner_mem_share_arbitrator_basic: Tests initialization, query_id, memory limits, and initial state + - scanner_mem_share_arbitrator_register_scan_node: Tests registering scan nodes and default memory allocation (64MB) + - scanner_mem_share_arbitrator_update_mem_bytes: Tests updating memory bytes and handling zero values + - scanner_mem_share_arbitrator_proportional_sharing: Tests proportional memory distribution across multiple contexts + - scanner_mem_share_arbitrator_zero_ratio: Tests edge case with zero scan ratio + + MemLimiter Tests (9 tests) + + - scanner_mem_limiter_basic: Tests initialization and default values + - scanner_mem_limiter_reestimated_block_mem_bytes: Tests averaging algorithm for block memory estimation + - scanner_mem_limiter_reestimated_zero_value: Tests that zero values are properly ignored + - scanner_mem_limiter_available_scanner_count: Tests scanner count calculation based on memory limits + - scanner_mem_limiter_serial_scan: Tests serial scan mode behavior + - scanner_mem_limiter_update_running_tasks_count: Tests atomic counter updates + - scanner_mem_limiter_update_open_tasks_count: Tests context count tracking + - scanner_mem_limiter_update_arb_mem_bytes: Tests memory capping at query limit + - scanner_mem_limiter_available_count_distribution: Tests fair distribution across parallel instances + + ScannerContext with Memory Control Tests (4 tests) + - scanner_context_with_adaptive_memory: Tests integration with arbitrator and limiter + - scanner_context_adjust_scan_mem_limit: Tests dynamic memory limit adjustment + - scanner_context_reestimated_block_mem_bytes: Tests block memory re-estimation propagation + - scanner_context_update_peak_running_scanner: Tests peak scanner tracking with memory control + + Total: 18 new test cases + + All tests follow the existing patterns in the codebase and cover: + - Normal operation scenarios + - Edge cases (zero values, limits, etc.) + - Integration between components + - Atomic operations and thread safety + - Memory distribution algorithms +*/ +// ==================== MemShareArbitrator Tests ==================== +TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_basic) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t query_mem_limit = 1024 * 1024 * 1024; + double max_scan_ratio = 0.3; + + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio); + + ASSERT_EQ(arbitrator->query_id.hi, 1); + ASSERT_EQ(arbitrator->query_id.lo, 2); + ASSERT_EQ(arbitrator->query_mem_limit, query_mem_limit); + ASSERT_EQ(arbitrator->mem_limit, static_cast(query_mem_limit * max_scan_ratio)); + ASSERT_EQ(arbitrator->total_mem_bytes.load(), 0); +} + +TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_register_scan_node) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t query_mem_limit = 1024 * 1024 * 1024; + double max_scan_ratio = 0.3; + + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio); + + arbitrator->register_scan_node(); + ASSERT_EQ(arbitrator->total_mem_bytes.load(), 64 * 1024 * 1024); + + arbitrator->register_scan_node(); + ASSERT_EQ(arbitrator->total_mem_bytes.load(), 128 * 1024 * 1024); +} + +TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_update_mem_bytes) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t query_mem_limit = 1024 * 1024 * 1024; + double max_scan_ratio = 0.3; + + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio); + + int64_t new_limit = arbitrator->update_mem_bytes(0, 100 * 1024 * 1024); + ASSERT_EQ(arbitrator->total_mem_bytes.load(), 100 * 1024 * 1024); + ASSERT_GT(new_limit, 0); + + new_limit = arbitrator->update_mem_bytes(100 * 1024 * 1024, 0); + ASSERT_EQ(new_limit, 0); + ASSERT_EQ(arbitrator->total_mem_bytes.load(), 0); +} + +TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_proportional_sharing) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t query_mem_limit = 1024 * 1024 * 1024; + double max_scan_ratio = 0.5; + + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio); + + int64_t limit1 = arbitrator->update_mem_bytes(0, 200 * 1024 * 1024); + int64_t limit2 = arbitrator->update_mem_bytes(0, 300 * 1024 * 1024); + + ASSERT_LT(limit2, limit1); + ASSERT_EQ(arbitrator->total_mem_bytes.load(), 500 * 1024 * 1024); +} + +TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_zero_ratio) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t query_mem_limit = 1024 * 1024 * 1024; + double max_scan_ratio = 0.0; + + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio); + + ASSERT_GE(arbitrator->mem_limit, 1); +} + +// ==================== MemLimiter Tests ==================== +TEST_F(ScannerContextTest, scanner_mem_limiter_basic) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 0); + ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), 0); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_reestimated_block_mem_bytes) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + limiter->reestimated_block_mem_bytes(100 * 1024 * 1024); + ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024); + + limiter->reestimated_block_mem_bytes(200 * 1024 * 1024); + ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 150 * 1024 * 1024); + + limiter->reestimated_block_mem_bytes(300 * 1024 * 1024); + ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 200 * 1024 * 1024); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_reestimated_zero_value) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + limiter->reestimated_block_mem_bytes(100 * 1024 * 1024); + ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024); + + limiter->reestimated_block_mem_bytes(0); + ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_available_scanner_count) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + limiter->update_mem_limit(400 * 1024 * 1024); + limiter->reestimated_block_mem_bytes(100 * 1024 * 1024); + + int count = limiter->available_scanner_count(0); + ASSERT_GE(count, 1); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_serial_scan) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = true; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + limiter->update_mem_limit(400 * 1024 * 1024); + limiter->reestimated_block_mem_bytes(100 * 1024 * 1024); + + int count = limiter->available_scanner_count(0); + ASSERT_GE(count, 1); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_update_running_tasks_count) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + ASSERT_EQ(limiter->update_running_tasks_count(5), 5); + ASSERT_EQ(limiter->update_running_tasks_count(-2), 3); + ASSERT_EQ(limiter->update_running_tasks_count(1), 4); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_update_open_tasks_count) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + ASSERT_EQ(limiter->update_open_tasks_count(1), 0); + ASSERT_EQ(limiter->update_open_tasks_count(1), 1); + ASSERT_EQ(limiter->update_open_tasks_count(-1), 2); + ASSERT_EQ(limiter->update_open_tasks_count(-1), 1); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_update_arb_mem_bytes) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 4; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + limiter->update_arb_mem_bytes(100 * 1024 * 1024); + ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), 100 * 1024 * 1024); + + limiter->update_arb_mem_bytes(1024 * 1024 * 1024); + ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), mem_limit); +} + +TEST_F(ScannerContextTest, scanner_mem_limiter_available_count_distribution) { + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 2; + int64_t parallelism = 3; + bool serial_scan = false; + int64_t mem_limit = 512 * 1024 * 1024; + + auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit); + + limiter->update_mem_limit(500 * 1024 * 1024); + limiter->reestimated_block_mem_bytes(100 * 1024 * 1024); + + int count0 = limiter->available_scanner_count(0); + int count1 = limiter->available_scanner_count(1); + int count2 = limiter->available_scanner_count(2); + + ASSERT_GE(count0, 1); + ASSERT_GE(count1, 1); + ASSERT_GE(count2, 1); +} + +// ==================== ScannerContext with Memory Control Tests ==================== +TEST_F(ScannerContextTest, scanner_context_with_adaptive_memory) { + const int parallel_tasks = 2; + auto scan_operator = std::make_unique(obj_pool.get(), tnode, 0, *descs, + parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + OlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); + + std::shared_ptr scanner = + OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 5; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + TUniqueId query_id = state->get_query_ctx()->query_id(); + int64_t query_mem_limit = 1024 * 1024 * 1024; + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3); + auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false, + static_cast(query_mem_limit * 0.3)); + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, arbitrator, limiter, 0, true, parallel_tasks); + + limiter->update_open_tasks_count(1); + ASSERT_TRUE(scanner_context->_enable_adaptive_scanners); + ASSERT_NE(scanner_context->_mem_share_arb, nullptr); + ASSERT_NE(scanner_context->_scanner_mem_limiter, nullptr); +} + +TEST_F(ScannerContextTest, scanner_context_adjust_scan_mem_limit) { + const int parallel_tasks = 2; + auto scan_operator = std::make_unique(obj_pool.get(), tnode, 0, *descs, + parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + OlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); + + std::shared_ptr scanner = + OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 5; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + TUniqueId query_id = state->get_query_ctx()->query_id(); + int64_t query_mem_limit = 1024 * 1024 * 1024; + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3); + auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false, + static_cast(query_mem_limit * 0.3)); + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, arbitrator, limiter, 0, true, parallel_tasks); + + int64_t old_mem = 100 * 1024 * 1024; + int64_t new_mem = 200 * 1024 * 1024; + scanner_context->_adjust_scan_mem_limit(old_mem, new_mem); + + limiter->update_open_tasks_count(1); + ASSERT_GT(arbitrator->total_mem_bytes.load(), 0); +} + +TEST_F(ScannerContextTest, scanner_context_reestimated_block_mem_bytes) { + const int parallel_tasks = 2; + auto scan_operator = std::make_unique(obj_pool.get(), tnode, 0, *descs, + parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + OlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); + + std::shared_ptr scanner = + OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 5; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + TUniqueId query_id = state->get_query_ctx()->query_id(); + int64_t query_mem_limit = 1024 * 1024 * 1024; + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3); + auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false, + static_cast(query_mem_limit * 0.3)); + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, arbitrator, limiter, 0, true, parallel_tasks); + + scanner_context->reestimated_block_mem_bytes(150 * 1024 * 1024); + ASSERT_GT(limiter->get_estimated_block_mem_bytes(), 0); + limiter->update_open_tasks_count(1); +} + +TEST_F(ScannerContextTest, scanner_context_update_peak_running_scanner) { + const int parallel_tasks = 2; + auto scan_operator = std::make_unique(obj_pool.get(), tnode, 0, *descs, + parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + olap_scan_local_state->_parent = scan_operator.get(); + + const int64_t limit = 100; + + OlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); + + std::shared_ptr scanner = + OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 5; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + TUniqueId query_id = state->get_query_ctx()->query_id(); + int64_t query_mem_limit = 1024 * 1024 * 1024; + auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3); + auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false, + static_cast(query_mem_limit * 0.3)); + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, arbitrator, limiter, 0, true, parallel_tasks); + + scanner_context->update_peak_running_scanner(3); + ASSERT_EQ(limiter->update_running_tasks_count(0), 3); + limiter->update_open_tasks_count(1); +} + } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 6888134673c860..38a0ddf00ed7d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -90,6 +90,8 @@ public class SessionVariable implements Serializable, Writable { public static final List affectQueryResultFields; public static final List affectQueryResultInPlanFields; public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; + public static final String MAX_SCAN_MEM_RATIO = "max_scan_mem_ratio"; + public static final String ENABLE_ADAPTIVE_SCAN = "enable_adaptive_scan"; public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT = "local_exchange_free_blocks_limit"; public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit"; public static final String MAX_SCANNERS_CONCURRENCY = "max_scanners_concurrency"; @@ -1041,6 +1043,10 @@ public static double getHotValueThreshold() { // max memory used on every backend. Default value to 100G. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true) public long maxExecMemByte = 100147483648L; + @VariableMgr.VarAttr(name = MAX_SCAN_MEM_RATIO, needForward = true) + public double maxScanMemRatio = 0.3; + @VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_SCAN, needForward = true) + public boolean enableAdaptiveScan = true; @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT, needForward = true, description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block", @@ -5246,6 +5252,8 @@ public boolean isRequireSequenceInInsert() { public TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMemLimit(maxExecMemByte); + tResult.setMaxScanMemRatio(maxScanMemRatio); + tResult.setEnableAdaptiveScan(enableAdaptiveScan); tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit); tResult.setScanQueueMemLimit(maxScanQueueMemByte); tResult.setMaxScannersConcurrency(maxScannersConcurrency); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 495c1477647cc1..7d03f3131de27a 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -439,12 +439,40 @@ struct TQueryOptions { 200: optional bool enable_adjust_conjunct_order_by_cost; // Use paimon-cpp to read Paimon splits on BE 201: optional bool enable_paimon_cpp_reader = false; - // Whether all fragments of this query are assigned to a single backend. // When true, the streaming aggregation operator can use more aggressive // hash table expansion thresholds since all data is local. 202: optional bool single_backend_query = false; + 203: optional bool enable_inverted_index_wand_query = true; + + // Per-read/per-write buffer size used during spill I/O, in bytes. Controls the + // I/O batch size for spill write and merge read. This value can be overridden + // per-query by setting the session variable `spill_buffer_size_bytes` in FE. + // Default is 8MB. + 204: optional i64 spill_buffer_size_bytes = 8388608 + + // Per-sink memory limit after spill is triggered. When a sink operator's revocable + // memory exceeds the corresponding threshold, it proactively spills to disk. + // Default is 64MB for all three. + 205: optional i64 spill_join_build_sink_mem_limit_bytes = 67108864 + 206: optional i64 spill_aggregation_sink_mem_limit_bytes = 67108864 + 207: optional i64 spill_sort_sink_mem_limit_bytes = 67108864 + + // Total memory budget for the sort merge phase after spill. Divided by + // spill_buffer_size_bytes gives the number of files merged in parallel. + // Default is 64MB. + 208: optional i64 spill_sort_merge_mem_limit_bytes = 67108864 + + // Maximum depth for repartitioning recursion. Controls how many recursive + // repartition rounds are allowed before giving up and treating a partition + // as terminal. This value can be overridden per-query by setting the + // session variable `spill_repartition_max_depth` in FE. Default is 8. + 209: optional i32 spill_repartition_max_depth = 8 + + + 210: optional double max_scan_mem_ratio = 0.3; + 211: optional bool enable_adaptive_scan = false; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/regression-test/data/mv_p0/test_casewhen/test_casewhen.out b/regression-test/data/mv_p0/test_casewhen/test_casewhen.out index fdf4432d537922..4f9aa5b7600e5a 100644 --- a/regression-test/data/mv_p0/test_casewhen/test_casewhen.out +++ b/regression-test/data/mv_p0/test_casewhen/test_casewhen.out @@ -1,8 +1,8 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_star -- 1 1 1 2020-02-02 1 -1 1 1 2020-02-02 11 1 1 1 2020-02-02 1 +1 1 1 2020-02-02 11 1 2 2 2020-02-02 1 -- !select_mv -- diff --git a/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out index 44b849dedc09e7..330c75523416ab 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out @@ -15,12 +15,12 @@ 1 20 1 88 1 0 20 4 1 20 99 88 1 0 20 4 2 2 2 2 2 0 2 2 -2 10 99 2 2 0 10 3 2 10 2 88 2 0 10 4 +2 10 99 2 2 0 10 3 2 10 99 88 2 0 10 4 3 3 3 3 3 0 3 2 -3 10 99 3 3 0 10 3 3 5 3 88 3 0 5 4 +3 10 99 3 3 0 10 3 -- !seq_map_2 -- 1 20 99 88 33 @@ -29,23 +29,23 @@ -- !inspect -- 1 1 1 1 1 0 1 2 +1 9 77 \N \N 0 9 5 1 10 99 1 1 0 10 3 1 20 1 88 1 0 20 4 1 20 99 88 1 0 20 4 -1 9 77 \N \N 0 9 5 1 20 99 88 33 0 20 6 2 2 2 2 2 0 2 2 -2 10 99 2 2 0 10 3 2 10 2 88 2 0 10 4 -2 10 99 88 2 0 10 4 2 10 77 88 2 0 10 5 -2 10 99 88 33 0 10 6 2 10 77 88 33 0 10 6 +2 10 99 2 2 0 10 3 +2 10 99 88 2 0 10 4 +2 10 99 88 33 0 10 6 3 3 3 3 3 0 3 2 -3 10 99 3 3 0 10 3 3 5 3 88 3 0 5 4 -3 50 77 3 3 0 50 5 +3 10 99 3 3 0 10 3 3 10 99 3 33 0 10 6 +3 50 77 3 3 0 50 5 3 50 77 3 33 0 50 6 -- !seq_map_3 -- @@ -53,46 +53,46 @@ -- !inspect -- 1 1 1 1 1 0 1 2 +1 9 77 \N \N 0 9 5 1 10 99 1 1 0 10 3 -1 20 99 88 1 0 20 4 1 20 1 88 1 0 20 4 -1 9 77 \N \N 0 9 5 +1 20 99 88 1 0 20 4 1 20 99 88 33 0 20 6 1 80 66 88 33 0 80 7 1 100 66 88 33 1 100 8 1 100 99 88 33 1 100 8 2 2 2 2 2 0 2 2 -2 10 99 2 2 0 10 3 -2 10 99 88 2 0 10 4 2 10 2 88 2 0 10 4 2 10 77 88 2 0 10 5 -2 10 99 88 33 0 10 6 2 10 77 88 33 0 10 6 +2 10 99 2 2 0 10 3 +2 10 99 88 2 0 10 4 +2 10 99 88 33 0 10 6 2 100 66 88 33 0 100 7 2 100 66 88 33 1 100 8 2 100 77 88 33 1 100 8 3 3 3 3 3 0 3 2 -3 10 99 3 3 0 10 3 3 5 3 88 3 0 5 4 -3 50 77 3 3 0 50 5 +3 10 99 3 3 0 10 3 3 10 99 3 33 0 10 6 +3 50 77 3 3 0 50 5 3 50 77 3 33 0 50 6 -3 120 66 3 33 0 120 7 3 100 77 3 33 1 100 8 +3 120 66 3 33 0 120 7 -- !seq_map_4 -- -- !inspect -- 1 10 1 1 1 0 10 2 -1 20 55 1 1 0 20 3 1 10 1 1 100 1 10 4 +1 20 55 1 1 0 20 3 1 20 55 1 100 1 20 4 2 10 2 2 2 0 10 2 -2 100 55 2 2 0 100 3 2 10 2 2 100 1 10 4 +2 100 55 2 2 0 100 3 2 100 55 2 100 1 100 4 3 10 3 3 3 0 10 2 -3 120 55 3 3 0 120 3 3 10 3 3 100 1 10 4 +3 120 55 3 3 0 120 3 3 120 55 3 100 1 120 4 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out b/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out index 6dc2e7e3b3a274..6d1db50dbdfcb9 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out @@ -12,19 +12,19 @@ 4 \N 987 77777 1234 \N 100 -- !inspect -- +1 \N 987 77777 1234 \N 100 5 0 1 100 1 1 1 1 100 2 0 1 100 1 1 1 1 100 3 1 1 100 987 77777 1 1 100 5 0 -1 \N 987 77777 1234 \N 100 5 0 +2 \N 987 77777 1234 \N 200 5 0 2 100 2 2 2 2 100 2 0 -2 200 2 2 2 2 200 3 1 2 100 987 77777 2 2 100 5 0 -2 \N 987 77777 1234 \N 200 5 0 -3 100 3 3 3 3 100 2 0 +2 200 2 2 2 2 200 3 1 3 50 \N 9876 1234 \N 50 3 1 +3 100 3 3 3 3 100 2 0 3 100 987 77777 3 3 100 5 0 +4 \N 987 77777 1234 \N 100 5 0 4 100 4 4 4 4 100 2 0 4 100 4 4 4 4 100 4 1 4 100 987 77777 4 4 100 5 0 -4 \N 987 77777 1234 \N 100 5 0 diff --git a/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy b/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy index 7e63e43bd2cb85..4750428b119eda 100644 --- a/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy +++ b/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy @@ -37,7 +37,7 @@ suite ("test_casewhen") { sql """analyze table sales_records with sync;""" sql """alter table sales_records modify column record_id set stats ('row_count'='4');""" - qt_select_star "select * from sales_records order by 1,2;" + qt_select_star "select * from sales_records order by 1,2,3,4,5;" mv_rewrite_success("select store_id, sum(case when sale_amt>10 then 1 else 2 end) from sales_records group by store_id order by 1;", "store_amt") qt_select_mv "select store_id, sum(case when sale_amt>10 then 1 else 2 end) from sales_records group by store_id order by 1;" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy index 042c8276777fbb..ea7c5dfdc3a59a 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy @@ -117,7 +117,7 @@ suite("test_partial_update_publish_seq") { wait_for_publish(txn2, 10) qt_seq_map_1 "select * from ${table1} order by k1;" - inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,__DORIS_VERSION_COL__;" + inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;" // without seq map val, the filled seq val >/=/< conflicting seq val def txn3 = load_data("k1,c1,c2", "1,9,77\n2,10,77\n3,50,77\n") @@ -127,7 +127,7 @@ suite("test_partial_update_publish_seq") { do_streamload_2pc_commit(txn4) wait_for_publish(txn4, 10) qt_seq_map_2 "select * from ${table1} order by k1;" - inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,__DORIS_VERSION_COL__;" + inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;" // with delete sign and seq col val, >/=/< conflicting seq val def txn5 = load_data("k1,c1,c2", "1,80,66\n2,100,66\n3,120,66\n") @@ -138,7 +138,7 @@ suite("test_partial_update_publish_seq") { wait_for_publish(txn6, 10) qt_seq_map_3 "select * from ${table1} order by k1;" - inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,__DORIS_VERSION_COL__;" + inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;" sql "truncate table ${table1};" @@ -154,5 +154,5 @@ suite("test_partial_update_publish_seq") { wait_for_publish(txn8, 10) qt_seq_map_4 "select * from ${table1} order by k1;" - inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,__DORIS_VERSION_COL__;" + inspect_rows "select *,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from ${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;" } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy index 81423436c069bd..f1a6ea813e85da 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy @@ -162,6 +162,6 @@ suite("test_p_seq_publish_read_from_old") { wait_for_publish(txnId2, 60) sql "sync;" - qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__ from ${tableName} order by k;" - inspectRows "select k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ from ${tableName} order by k,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__;" + qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__ from ${tableName} order by k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__;" + inspectRows "select k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ from ${tableName} order by k,v1,v2,v3,v4,v5,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__;" }