From b878bac144cc774a1d6b100207c973ce69fcb32a Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 14 Jan 2026 18:45:08 +0800 Subject: [PATCH 1/2] Revert "[fix](scanner) Make Scanner close() method thread-safe using atomic operations (#57436)" This reverts commit bf0b87c1cb7db2a0ef663ddab5dab52e15d407cb. --- be/src/vec/exec/scan/es_scanner.cpp | 2 +- be/src/vec/exec/scan/file_scanner.cpp | 2 +- be/src/vec/exec/scan/jdbc_scanner.cpp | 3 --- be/src/vec/exec/scan/meta_scanner.cpp | 3 --- be/src/vec/exec/scan/olap_scanner.cpp | 2 +- be/src/vec/exec/scan/scan_node.h | 2 +- be/src/vec/exec/scan/scanner.cpp | 9 ++++----- be/src/vec/exec/scan/scanner.h | 8 +------- 8 files changed, 9 insertions(+), 22 deletions(-) diff --git a/be/src/vec/exec/scan/es_scanner.cpp b/be/src/vec/exec/scan/es_scanner.cpp index 224ba085f03876..1376fd1dbf7c38 100644 --- a/be/src/vec/exec/scan/es_scanner.cpp +++ b/be/src/vec/exec/scan/es_scanner.cpp @@ -191,7 +191,7 @@ Status EsScanner::_get_next(std::vector& columns) } Status EsScanner::close(RuntimeState* state) { - if (!_try_close()) { + if (_is_closed) { return Status::OK(); } diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 799280fed3faee..ca371b0c65c673 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -1756,7 +1756,7 @@ Status FileScanner::_init_expr_ctxes() { } Status FileScanner::close(RuntimeState* state) { - if (!_try_close()) { + if (_is_closed) { return Status::OK(); } diff --git a/be/src/vec/exec/scan/jdbc_scanner.cpp b/be/src/vec/exec/scan/jdbc_scanner.cpp index dc26f103a6491b..4b3ee9cf7f0b40 100644 --- a/be/src/vec/exec/scan/jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/jdbc_scanner.cpp @@ -188,9 +188,6 @@ void JdbcScanner::_update_profile() { } Status JdbcScanner::close(RuntimeState* state) { - if (!_try_close()) { - return Status::OK(); - } RETURN_IF_ERROR(Scanner::close(state)); RETURN_IF_ERROR(_jdbc_connector->close()); return Status::OK(); diff --git a/be/src/vec/exec/scan/meta_scanner.cpp b/be/src/vec/exec/scan/meta_scanner.cpp index bf5b9232e4b645..0464eceaef7884 100644 --- a/be/src/vec/exec/scan/meta_scanner.cpp +++ b/be/src/vec/exec/scan/meta_scanner.cpp @@ -531,9 +531,6 @@ Status MetaScanner::_build_partition_values_metadata_request( Status MetaScanner::close(RuntimeState* state) { VLOG_CRITICAL << "MetaScanner::close"; - if (!_try_close()) { - return Status::OK(); - } if (_reader) { RETURN_IF_ERROR(_reader->close()); } diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 07471049c5b63e..ff136d7dc2af8f 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -659,7 +659,7 @@ Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof } Status OlapScanner::close(RuntimeState* state) { - if (!_try_close()) { + if (_is_closed) { return Status::OK(); } RETURN_IF_ERROR(Scanner::close(state)); diff --git a/be/src/vec/exec/scan/scan_node.h b/be/src/vec/exec/scan/scan_node.h index 5455e419e34037..de2045aef17b58 100644 --- a/be/src/vec/exec/scan/scan_node.h +++ b/be/src/vec/exec/scan/scan_node.h @@ -25,7 +25,7 @@ class Scanner; class VSlotRef; // We want to close scanner automatically, so using a delegate class -// and call close method in the delegate class's destructor. +// and call close method in the delegate class's dctor. class ScannerDelegate { public: ScannerSPtr _scanner; diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp index aa6b9149cd70c5..9c5a6fb5d28e9a 100644 --- a/be/src/vec/exec/scan/scanner.cpp +++ b/be/src/vec/exec/scan/scanner.cpp @@ -254,17 +254,16 @@ Status Scanner::try_append_late_arrival_runtime_filter() { } Status Scanner::close(RuntimeState* state) { + if (_is_closed) { + return Status::OK(); + } #ifndef BE_TEST COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer, _scanner_wait_worker_timer); #endif + _is_closed = true; return Status::OK(); } -bool Scanner::_try_close() { - bool expected = false; - return _is_closed.compare_exchange_strong(expected, true); -} - void Scanner::_collect_profile_before_close() { COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer); COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read); diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h index 9aad37169464e1..5f9c73d5e22f5c 100644 --- a/be/src/vec/exec/scan/scanner.h +++ b/be/src/vec/exec/scan/scanner.h @@ -20,7 +20,6 @@ #include #include -#include #include #include "common/status.h" @@ -116,11 +115,6 @@ class Scanner { // Update the counters before closing this scanner virtual void _collect_profile_before_close(); - // Check if scanner is already closed, if not, mark it as closed. - // Returns true if the scanner was successfully marked as closed (first time). - // Returns false if the scanner was already closed. - bool _try_close(); - // Filter the output block finally. Status _filter_output_block(Block* block); @@ -212,7 +206,7 @@ class Scanner { Block _input_block; bool _is_open = false; - std::atomic _is_closed {false}; + bool _is_closed = false; bool _need_to_close = false; Status _status; From 22587a9d8266783a7750e58e1c07edf450ec0f79 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 14 Jan 2026 18:50:01 +0800 Subject: [PATCH 2/2] test --- be/src/vec/exec/jni_connector.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 790766108e7c38..2cc176c9ff0ec8 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -164,6 +164,8 @@ Status JniConnector::get_statistics(JNIEnv* env, std::map