diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 54a65f7802af59..17fec580e1dc82 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -379,6 +379,7 @@ DEFINE_mInt32(pending_data_expire_time_sec, "1800"); DEFINE_mInt32(tablet_rowset_stale_sweep_time_sec, "600"); // tablet stale rowset sweep by threshold size DEFINE_Bool(tablet_rowset_stale_sweep_by_size, "false"); +DEFINE_Bool(enable_slow_scanner_pool, "false"); DEFINE_mInt32(tablet_rowset_stale_sweep_threshold_size, "100"); // garbage sweep policy DEFINE_Int32(max_garbage_sweep_interval, "3600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 07272806481235..629a9fe2764cad 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -439,6 +439,7 @@ DECLARE_mInt32(pending_data_expire_time_sec); DECLARE_mInt32(tablet_rowset_stale_sweep_time_sec); // tablet stale rowset sweep by threshold size DECLARE_Bool(tablet_rowset_stale_sweep_by_size); +DECLARE_Bool(enable_slow_scanner_pool); DECLARE_mInt32(tablet_rowset_stale_sweep_threshold_size); // garbage sweep policy DECLARE_Int32(max_garbage_sweep_interval); diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index b6ac38c44ccaee..4e20854fadd311 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -921,7 +921,7 @@ Status RowIdStorageReader::read_batch_external_row( colname_to_slot_id, producer_count, scan_rows.size(), semaphore, cv, mtx, tuple_desc); }, - nullptr, nullptr), + nullptr, nullptr, false), fmt::format("{}-read_batch_external_row-{}", print_id(query_id), idx))); idx++; } diff --git a/be/src/exec/scan/file_scanner.cpp b/be/src/exec/scan/file_scanner.cpp index 1e84c774d4135e..3ec30e2c014e27 100644 --- a/be/src/exec/scan/file_scanner.cpp +++ b/be/src/exec/scan/file_scanner.cpp @@ -1899,8 +1899,8 @@ void FileScanner::_collect_profile_before_close() { Scanner::_collect_profile_before_close(); if (config::enable_file_cache && _state->query_options().enable_file_cache && _profile != nullptr) { - io::FileCacheProfileReporter cache_profile(_profile); - cache_profile.update(_file_cache_statistics.get()); + _file_cache_profile_reporter = std::make_unique(_profile); + _file_cache_profile_reporter->update(_file_cache_statistics.get()); _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache( _file_cache_statistics->bytes_write_into_cache); } diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index b1f6f8531b9769..e85d655dde6f87 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -846,8 +846,9 @@ void OlapScanner::_collect_profile_before_close() { // only cloud deploy mode will use file cache. if (config::is_cloud_mode() && config::enable_file_cache) { - io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get()); - cache_profile.update(&stats.file_cache_stats); + _file_cache_profile_reporter = + std::make_unique(local_state->_segment_profile.get()); + _file_cache_profile_reporter->update(&stats.file_cache_stats); _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache( stats.file_cache_stats.bytes_write_into_cache); } diff --git a/be/src/exec/scan/olap_scanner.h b/be/src/exec/scan/olap_scanner.h index 2187be4ce45bdd..18763969de4ea2 100644 --- a/be/src/exec/scan/olap_scanner.h +++ b/be/src/exec/scan/olap_scanner.h @@ -32,6 +32,7 @@ #include "common/status.h" #include "core/data_type/data_type.h" #include "exec/scan/scanner.h" +#include "io/cache/block_file_cache_profile.h" #include "runtime/runtime_state.h" #include "storage/data_dir.h" #include "storage/rowset/rowset_meta.h" diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index 7d7b4caeff98d4..57a364a1e76ae5 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -41,11 +41,19 @@ Scanner::Scanner(RuntimeState* state, ScanLocalStateBase* local_state, int64_t l _profile(profile), _output_tuple_desc(_local_state->output_tuple_desc()), _output_row_descriptor(_local_state->_parent->output_row_descriptor()), - _has_prepared(false) { + _has_prepared(false), + _remote_slow_task_threshold(state->query_options().__isset.remote_slow_task_threshold + ? state->query_options().remote_slow_task_threshold + : INT64_MAX) { _total_rf_num = cast_set(_local_state->_helper.runtime_filter_nums()); DorisMetrics::instance()->scanner_cnt->increment(1); } +bool Scanner::is_slow_task() const { + return _file_cache_profile_reporter && + _file_cache_profile_reporter->num_remote_io_total->value() > _remote_slow_task_threshold; +} + Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { if (!conjuncts.empty()) { _conjuncts.resize(conjuncts.size()); diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h index 041af638c52dd8..0789cc6483ef93 100644 --- a/be/src/exec/scan/scanner.h +++ b/be/src/exec/scan/scanner.h @@ -25,6 +25,7 @@ #include "common/status.h" #include "core/block/block.h" +#include "io/cache/block_file_cache_profile.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "storage/tablet/tablet.h" @@ -189,6 +190,8 @@ class Scanner { void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes = block_avg_bytes; } + bool is_slow_task() const; + protected: RuntimeState* _state = nullptr; ScanLocalStateBase* _local_state = nullptr; @@ -255,6 +258,8 @@ class Scanner { int64_t _projection_timer = 0; bool _should_stop = false; + std::unique_ptr _file_cache_profile_reporter = nullptr; + const int64_t _remote_slow_task_threshold = 0; }; using ScannerSPtr = std::shared_ptr; diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp index 9961407bdbb55e..7e96ed75cbc3f0 100644 --- a/be/src/exec/scan/scanner_scheduler.cpp +++ b/be/src/exec/scan/scanner_scheduler.cpp @@ -83,7 +83,9 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, } return scanner_ref->is_eos(); }; - SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task}; + SimplifiedScanTask simple_scan_task = { + work_func, ctx, scan_task, + config::enable_slow_scanner_pool && scanner_delegate->_scanner->is_slow_task()}; return this->submit_scan_task(simple_scan_task); }; diff --git a/be/src/exec/scan/scanner_scheduler.h b/be/src/exec/scan/scanner_scheduler.h index de1553b026ed35..fe50caf94989e5 100644 --- a/be/src/exec/scan/scanner_scheduler.h +++ b/be/src/exec/scan/scanner_scheduler.h @@ -48,15 +48,17 @@ struct SimplifiedScanTask { SimplifiedScanTask() = default; SimplifiedScanTask(std::function scan_func, std::shared_ptr scanner_context, - std::shared_ptr scan_task) { + std::shared_ptr scan_task, bool slow_task_) { this->scan_func = scan_func; this->scanner_context = scanner_context; this->scan_task = scan_task; + this->slow_task = slow_task_; } std::function scan_func; std::shared_ptr scanner_context = nullptr; std::shared_ptr scan_task = nullptr; + bool slow_task = false; }; class ScannerSplitRunner : public SplitRunner { @@ -167,6 +169,8 @@ class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerSched _is_stop.store(true); _scan_thread_pool->shutdown(); _scan_thread_pool->wait(); + _slow_scan_thread_pool->shutdown(); + _slow_scan_thread_pool->wait(); } Status start(int max_thread_num, int min_thread_num, int queue_size, @@ -178,12 +182,22 @@ class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerSched .set_max_queue_size(queue_size) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_scan_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group) + .set_min_threads(min_thread_num) + .set_max_threads(max_thread_num) + .set_max_queue_size(queue_size) + .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) + .build(&_slow_scan_thread_pool)); return Status::OK(); } Status submit_scan_task(SimplifiedScanTask scan_task) override { if (!_is_stop) { - return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); + if (scan_task.slow_task) { + return _slow_scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); + } else { + return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); + } } else { return Status::InternalError("scanner pool {} is shutdown.", _sched_name); } @@ -239,6 +253,7 @@ class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerSched private: std::unique_ptr _scan_thread_pool; + std::unique_ptr _slow_scan_thread_pool; std::atomic _is_stop; std::weak_ptr _cgroup_cpu_ctl; std::string _sched_name; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2f2250f265db90..d4b1f812446fed 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2259,7 +2259,7 @@ void PInternalService::multiget_data_v2(google::protobuf::RpcController* control << watch.elapsed_time() / 1000; return true; }, - nullptr, nullptr), + nullptr, nullptr, false), fmt::format("{}-multiget_data_v2", print_id(request->query_id()))); if (!st.ok()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2d24753db7bfa4..3dfe7954284575 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -199,6 +199,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_QUERY_CACHE = "enable_query_cache"; public static final String QUERY_CACHE_FORCE_REFRESH = "query_cache_force_refresh"; public static final String QUERY_CACHE_ENTRY_MAX_BYTES = "query_cache_entry_max_bytes"; + public static final String REMOTE_SLOW_TASK_THRESHOLD = "remote_slow_task_threshold"; public static final String QUERY_CACHE_ENTRY_MAX_ROWS = "query_cache_entry_max_rows"; public static final String ENABLE_CONDITION_CACHE = "enable_condition_cache"; @@ -1440,6 +1441,9 @@ public enum IgnoreSplitType { @VarAttr(name = QUERY_CACHE_ENTRY_MAX_BYTES) private long queryCacheEntryMaxBytes = 5242880; + @VarAttr(name = REMOTE_SLOW_TASK_THRESHOLD) + private long remoteSlowTaskThreshold = 512; + @VarAttr(name = QUERY_CACHE_ENTRY_MAX_ROWS) private long queryCacheEntryMaxRows = 500000; @@ -5292,6 +5296,7 @@ public boolean isRequireSequenceInInsert() { public TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMemLimit(maxExecMemByte); + tResult.setRemoteSlowTaskThreshold(remoteSlowTaskThreshold); tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit); tResult.setScanQueueMemLimit(maxScanQueueMemByte); tResult.setMaxScannersConcurrency(maxScannersConcurrency); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index f2da2243ff56a9..fbf0e013e2c883 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -472,6 +472,8 @@ struct TQueryOptions { // session variable `spill_repartition_max_depth` in FE. Default is 8. 209: optional i32 spill_repartition_max_depth = 8 + 210: optional i64 remote_slow_task_threshold = 0 + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache.