Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ Status JniConnector::get_statistics(JNIEnv* env, std::map<std::string, std::stri
}

Status JniConnector::close() {
// JniConnector should not be closed multiple times. Just test in debug mode.
DCHECK(!_closed);
if (!_closed) {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(Jni::Env::Get(&env));
Expand Down Expand Up @@ -198,6 +200,7 @@ Status JniConnector::close() {
RETURN_IF_ERROR(
_jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call());
RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_close).call());
_closed = true;
}
}
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/es_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ Status EsScanner::_get_next(std::vector<vectorized::MutableColumnPtr>& columns)
}

Status EsScanner::close(RuntimeState* state) {
if (!_try_close()) {
if (_is_closed) {
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ Status FileScanner::_init_expr_ctxes() {
}

Status FileScanner::close(RuntimeState* state) {
if (!_try_close()) {
if (_is_closed) {
return Status::OK();
}

Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/exec/scan/jdbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,6 @@ void JdbcScanner::_update_profile() {
}

Status JdbcScanner::close(RuntimeState* state) {
if (!_try_close()) {
return Status::OK();
}
RETURN_IF_ERROR(Scanner::close(state));
RETURN_IF_ERROR(_jdbc_connector->close());
return Status::OK();
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/exec/scan/meta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,6 @@ Status MetaScanner::_build_partition_values_metadata_request(

Status MetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "MetaScanner::close";
if (!_try_close()) {
return Status::OK();
}
if (_reader) {
RETURN_IF_ERROR(_reader->close());
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof
}

Status OlapScanner::close(RuntimeState* state) {
if (!_try_close()) {
if (_is_closed) {
return Status::OK();
}
RETURN_IF_ERROR(Scanner::close(state));
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Scanner;
class VSlotRef;

// We want to close scanner automatically, so using a delegate class
// and call close method in the delegate class's destructor.
// and call close method in the delegate class's dctor.
class ScannerDelegate {
public:
ScannerSPtr _scanner;
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,16 @@ Status Scanner::try_append_late_arrival_runtime_filter() {
}

Status Scanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
#ifndef BE_TEST
COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer, _scanner_wait_worker_timer);
#endif
_is_closed = true;
return Status::OK();
}

bool Scanner::_try_close() {
bool expected = false;
return _is_closed.compare_exchange_strong(expected, true);
}

void Scanner::_collect_profile_before_close() {
COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
Expand Down
8 changes: 1 addition & 7 deletions be/src/vec/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <stdint.h>

#include <algorithm>
#include <atomic>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -116,11 +115,6 @@ class Scanner {
// Update the counters before closing this scanner
virtual void _collect_profile_before_close();

// Check if scanner is already closed, if not, mark it as closed.
// Returns true if the scanner was successfully marked as closed (first time).
// Returns false if the scanner was already closed.
bool _try_close();

// Filter the output block finally.
Status _filter_output_block(Block* block);

Expand Down Expand Up @@ -212,7 +206,7 @@ class Scanner {
Block _input_block;

bool _is_open = false;
std::atomic<bool> _is_closed {false};
bool _is_closed = false;
bool _need_to_close = false;
Status _status;

Expand Down