Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ DEFINE_mInt32(pending_data_expire_time_sec, "1800");
DEFINE_mInt32(tablet_rowset_stale_sweep_time_sec, "600");
// tablet stale rowset sweep by threshold size
DEFINE_Bool(tablet_rowset_stale_sweep_by_size, "false");
DEFINE_Bool(enable_slow_scanner_pool, "false");
DEFINE_mInt32(tablet_rowset_stale_sweep_threshold_size, "100");
// garbage sweep policy
DEFINE_Int32(max_garbage_sweep_interval, "3600");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ DECLARE_mInt32(pending_data_expire_time_sec);
DECLARE_mInt32(tablet_rowset_stale_sweep_time_sec);
// tablet stale rowset sweep by threshold size
DECLARE_Bool(tablet_rowset_stale_sweep_by_size);
DECLARE_Bool(enable_slow_scanner_pool);
DECLARE_mInt32(tablet_rowset_stale_sweep_threshold_size);
// garbage sweep policy
DECLARE_Int32(max_garbage_sweep_interval);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ Status RowIdStorageReader::read_batch_external_row(
colname_to_slot_id, producer_count,
scan_rows.size(), semaphore, cv, mtx, tuple_desc);
},
nullptr, nullptr),
nullptr, nullptr, false),
fmt::format("{}-read_batch_external_row-{}", print_id(query_id), idx)));
idx++;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1899,8 +1899,8 @@ void FileScanner::_collect_profile_before_close() {
Scanner::_collect_profile_before_close();
if (config::enable_file_cache && _state->query_options().enable_file_cache &&
_profile != nullptr) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
_file_cache_profile_reporter = std::make_unique<io::FileCacheProfileReporter>(_profile);
_file_cache_profile_reporter->update(_file_cache_statistics.get());
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
_file_cache_statistics->bytes_write_into_cache);
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,9 @@ void OlapScanner::_collect_profile_before_close() {

// only cloud deploy mode will use file cache.
if (config::is_cloud_mode() && config::enable_file_cache) {
io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get());
cache_profile.update(&stats.file_cache_stats);
_file_cache_profile_reporter =
std::make_unique<io::FileCacheProfileReporter>(local_state->_segment_profile.get());
_file_cache_profile_reporter->update(&stats.file_cache_stats);
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
stats.file_cache_stats.bytes_write_into_cache);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/scan/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "common/status.h"
#include "core/data_type/data_type.h"
#include "exec/scan/scanner.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/runtime_state.h"
#include "storage/data_dir.h"
#include "storage/rowset/rowset_meta.h"
Expand Down
10 changes: 9 additions & 1 deletion be/src/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ Scanner::Scanner(RuntimeState* state, ScanLocalStateBase* local_state, int64_t l
_profile(profile),
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()),
_has_prepared(false) {
_has_prepared(false),
_remote_slow_task_threshold(state->query_options().__isset.remote_slow_task_threshold
? state->query_options().remote_slow_task_threshold
: INT64_MAX) {
_total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums());
DorisMetrics::instance()->scanner_cnt->increment(1);
}

bool Scanner::is_slow_task() const {
return _file_cache_profile_reporter &&
_file_cache_profile_reporter->num_remote_io_total->value() > _remote_slow_task_threshold;
}

Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
if (!conjuncts.empty()) {
_conjuncts.resize(conjuncts.size());
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common/status.h"
#include "core/block/block.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "storage/tablet/tablet.h"
Expand Down Expand Up @@ -189,6 +190,8 @@ class Scanner {

void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes = block_avg_bytes; }

bool is_slow_task() const;

protected:
RuntimeState* _state = nullptr;
ScanLocalStateBase* _local_state = nullptr;
Expand Down Expand Up @@ -255,6 +258,8 @@ class Scanner {
int64_t _projection_timer = 0;

bool _should_stop = false;
std::unique_ptr<io::FileCacheProfileReporter> _file_cache_profile_reporter = nullptr;
const int64_t _remote_slow_task_threshold = 0;
};

using ScannerSPtr = std::shared_ptr<Scanner>;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
}
return scanner_ref->is_eos();
};
SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
SimplifiedScanTask simple_scan_task = {
work_func, ctx, scan_task,
config::enable_slow_scanner_pool && scanner_delegate->_scanner->is_slow_task()};
return this->submit_scan_task(simple_scan_task);
};

Expand Down
19 changes: 17 additions & 2 deletions be/src/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,17 @@ struct SimplifiedScanTask {
SimplifiedScanTask() = default;
SimplifiedScanTask(std::function<bool()> scan_func,
std::shared_ptr<ScannerContext> scanner_context,
std::shared_ptr<ScanTask> scan_task) {
std::shared_ptr<ScanTask> scan_task, bool slow_task_) {
this->scan_func = scan_func;
this->scanner_context = scanner_context;
this->scan_task = scan_task;
this->slow_task = slow_task_;
}

std::function<bool()> scan_func;
std::shared_ptr<ScannerContext> scanner_context = nullptr;
std::shared_ptr<ScanTask> scan_task = nullptr;
bool slow_task = false;
};

class ScannerSplitRunner : public SplitRunner {
Expand Down Expand Up @@ -167,6 +169,8 @@ class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerSched
_is_stop.store(true);
_scan_thread_pool->shutdown();
_scan_thread_pool->wait();
_slow_scan_thread_pool->shutdown();
_slow_scan_thread_pool->wait();
}

Status start(int max_thread_num, int min_thread_num, int queue_size,
Expand All @@ -178,12 +182,22 @@ class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerSched
.set_max_queue_size(queue_size)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_scan_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group)
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
.set_max_queue_size(queue_size)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_slow_scan_thread_pool));
return Status::OK();
}

Status submit_scan_task(SimplifiedScanTask scan_task) override {
if (!_is_stop) {
return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
if (scan_task.slow_task) {
return _slow_scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
} else {
return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
}
} else {
return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
}
Expand Down Expand Up @@ -239,6 +253,7 @@ class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerSched

private:
std::unique_ptr<ThreadPool> _scan_thread_pool;
std::unique_ptr<ThreadPool> _slow_scan_thread_pool;
std::atomic<bool> _is_stop;
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
std::string _sched_name;
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,7 @@ void PInternalService::multiget_data_v2(google::protobuf::RpcController* control
<< watch.elapsed_time() / 1000;
return true;
},
nullptr, nullptr),
nullptr, nullptr, false),
fmt::format("{}-multiget_data_v2", print_id(request->query_id())));

if (!st.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_QUERY_CACHE = "enable_query_cache";
public static final String QUERY_CACHE_FORCE_REFRESH = "query_cache_force_refresh";
public static final String QUERY_CACHE_ENTRY_MAX_BYTES = "query_cache_entry_max_bytes";
public static final String REMOTE_SLOW_TASK_THRESHOLD = "remote_slow_task_threshold";
public static final String QUERY_CACHE_ENTRY_MAX_ROWS = "query_cache_entry_max_rows";
public static final String ENABLE_CONDITION_CACHE = "enable_condition_cache";

Expand Down Expand Up @@ -1440,6 +1441,9 @@ public enum IgnoreSplitType {
@VarAttr(name = QUERY_CACHE_ENTRY_MAX_BYTES)
private long queryCacheEntryMaxBytes = 5242880;

@VarAttr(name = REMOTE_SLOW_TASK_THRESHOLD)
private long remoteSlowTaskThreshold = 512;

@VarAttr(name = QUERY_CACHE_ENTRY_MAX_ROWS)
private long queryCacheEntryMaxRows = 500000;

Expand Down Expand Up @@ -5292,6 +5296,7 @@ public boolean isRequireSequenceInInsert() {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
tResult.setRemoteSlowTaskThreshold(remoteSlowTaskThreshold);
tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit);
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
tResult.setMaxScannersConcurrency(maxScannersConcurrency);
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ struct TQueryOptions {
// session variable `spill_repartition_max_depth` in FE. Default is 8.
209: optional i32 spill_repartition_max_depth = 8

210: optional i64 remote_slow_task_threshold = 0


// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
Expand Down
Loading