From 78304b385557fac7904773e64090f403f32bd539 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 16 Mar 2026 12:26:37 +0100 Subject: [PATCH 1/4] Fix unsynchronized access to replica_to_files_to_be_processed --- ...rageObjectStorageStableTaskDistributor.cpp | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 2984facad9d0..8a69bba3eaf0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -45,13 +45,16 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb saveLastNodeActivity(number_of_current_replica); - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Replica number {} was marked as lost, can't set task for it anymore", - number_of_current_replica - ); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + } // 1. Check pre-queued files first auto file = getPreQueuedFile(number_of_current_replica); @@ -63,7 +66,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb file = getAnyUnprocessedFile(number_of_current_replica); if (file) - processed_file_list_ptr->second.push_back(file); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + { // It is possible that replica was lost after check in the begining of the method + auto file_identifier = file->getAbsolutePath().value_or(file->getPath()); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); + connection_to_files[file_replica_idx].push_back(file); + } + else + processed_file_list_ptr->second.push_back(file); + } return file; } @@ -192,7 +207,13 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter file_identifier = object_info->getIdentifier(); } - size_t file_replica_idx = getReplicaForFile(file_identifier); + size_t file_replica_idx; + + { + std::lock_guard lock(mutex); + file_replica_idx = getReplicaForFile(file_identifier); + } + if (file_replica_idx == number_of_current_replica) { LOG_TRACE( From c71caec37e56d948dc46f83a76c8edaecf70d53e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 20 Mar 2026 13:07:01 +0100 Subject: [PATCH 2/4] Fix use after free in rescheduleTasksFromReplica --- .../StorageObjectStorageStableTaskDistributor.cpp | 3 ++- tests/integration/test_s3_cluster/test.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 8a69bba3eaf0..9346bb301d12 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -329,8 +329,9 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ "All replicas were marked as lost" ); + auto files = std::move(processed_file_list_ptr->second); replica_to_files_to_be_processed.erase(number_of_current_replica); - for (const auto & file : processed_file_list_ptr->second) + for (const auto & file : files) { auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath())); unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx)); diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 965de3727b05..cd2eacd8241d 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1195,7 +1195,7 @@ def test_joins(started_cluster): assert len(res) == 25 -def _test_graceful_shutdown(started_cluster): +def test_graceful_shutdown(started_cluster): node = started_cluster.instances["s0_0_0"] node_to_shutdown = started_cluster.instances["s0_1_0"] From bbe007e68925c3ab7fb6455840468243547c2fd5 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 9 Mar 2026 17:31:16 +0100 Subject: [PATCH 3/4] Fix file identifier in rescheduleTasksFromReplica --- ...rageObjectStorageStableTaskDistributor.cpp | 40 +++++++++++-------- ...torageObjectStorageStableTaskDistributor.h | 2 + 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 9346bb301d12..fab5b2bd2ff5 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -71,7 +71,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) { // It is possible that replica was lost after check in the begining of the method - auto file_identifier = file->getAbsolutePath().value_or(file->getPath()); + auto file_identifier = getFileIdentifier(file); auto file_replica_idx = getReplicaForFile(file_identifier); unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); @@ -136,7 +136,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); + auto file_identifier = getFileIdentifier(next_file); auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -194,18 +194,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_identifier; - if (send_over_whole_archive && object_info->isArchive()) - { - file_identifier = object_info->getPathOrPathToArchiveIfArchive(); - LOG_TEST(log, "Will send over the whole archive {} to replicas. " - "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); - } - else - { - file_identifier = object_info->getIdentifier(); - } + String file_identifier = getFileIdentifier(object_info, true); size_t file_replica_idx; @@ -269,7 +258,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); + auto file_path = getFileIdentifier(next_file); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", @@ -333,10 +322,27 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ replica_to_files_to_be_processed.erase(number_of_current_replica); for (const auto & file : files) { - auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath())); - unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx)); + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } } +String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const +{ + if (send_over_whole_archive && file_object->isArchive()) + { + auto file_identifier = file_object->getPathOrPathToArchiveIfArchive(); + if (write_to_log) + { + LOG_TEST(log, "Will send over the whole archive {} to replicas. " + "This will be suboptimal, consider turning on " + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + } + return file_identifier; + } + return file_object->getIdentifier(); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 25673b3eeb02..652bbb6a03e0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -40,6 +40,8 @@ class StorageObjectStorageStableTaskDistributor void saveLastNodeActivity(size_t number_of_current_replica); + String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; + const std::shared_ptr iterator; const bool send_over_whole_archive; From c6dda096a9719cd0076f5348637dd49b9feb0969 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 20 Mar 2026 18:37:34 +0100 Subject: [PATCH 4/4] Fix logical error catch --- ...rageObjectStorageStableTaskDistributor.cpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index fab5b2bd2ff5..229a66f8c904 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -48,6 +48,9 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb { std::lock_guard lock(mutex); auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + // rescheduleTasksFromReplica can be called only when error catched in RemoteQueryExecutor::processPacket + // so getnextTash can't bealled after that + // Check only for logical eeror in code if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) throw Exception( ErrorCodes::LOGICAL_ERROR, @@ -69,15 +72,15 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb { std::lock_guard lock(mutex); auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - { // It is possible that replica was lost after check in the begining of the method - auto file_identifier = getFileIdentifier(file); - auto file_replica_idx = getReplicaForFile(file_identifier); - unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); - connection_to_files[file_replica_idx].push_back(file); - } - else - processed_file_list_ptr->second.push_back(file); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + + processed_file_list_ptr->second.push_back(file); } return file;