diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 2984facad9d0..229a66f8c904 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -45,13 +45,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb saveLastNodeActivity(number_of_current_replica); - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Replica number {} was marked as lost, can't set task for it anymore", - number_of_current_replica - ); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + // rescheduleTasksFromReplica can be called only when error catched in RemoteQueryExecutor::processPacket + // so getnextTash can't bealled after that + // Check only for logical eeror in code + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + } // 1. Check pre-queued files first auto file = getPreQueuedFile(number_of_current_replica); @@ -63,7 +69,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb file = getAnyUnprocessedFile(number_of_current_replica); if (file) + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + processed_file_list_ptr->second.push_back(file); + } return file; } @@ -121,7 +139,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); + auto file_identifier = getFileIdentifier(next_file); auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -179,20 +197,15 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_identifier; - if (send_over_whole_archive && object_info->isArchive()) - { - file_identifier = object_info->getPathOrPathToArchiveIfArchive(); - LOG_TEST(log, "Will send over the whole archive {} to replicas. " - "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); - } - else + String file_identifier = getFileIdentifier(object_info, true); + + size_t file_replica_idx; + { - file_identifier = object_info->getIdentifier(); + std::lock_guard lock(mutex); + file_replica_idx = getReplicaForFile(file_identifier); } - size_t file_replica_idx = getReplicaForFile(file_identifier); if (file_replica_idx == number_of_current_replica) { LOG_TRACE( @@ -248,7 +261,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); + auto file_path = getFileIdentifier(next_file); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", @@ -308,13 +321,31 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ "All replicas were marked as lost" ); + auto files = std::move(processed_file_list_ptr->second); replica_to_files_to_be_processed.erase(number_of_current_replica); - for (const auto & file : processed_file_list_ptr->second) + for (const auto & file : files) { - auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath())); - unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx)); + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } } +String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const +{ + if (send_over_whole_archive && file_object->isArchive()) + { + auto file_identifier = file_object->getPathOrPathToArchiveIfArchive(); + if (write_to_log) + { + LOG_TEST(log, "Will send over the whole archive {} to replicas. " + "This will be suboptimal, consider turning on " + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + } + return file_identifier; + } + return file_object->getIdentifier(); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 25673b3eeb02..652bbb6a03e0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -40,6 +40,8 @@ class StorageObjectStorageStableTaskDistributor void saveLastNodeActivity(size_t number_of_current_replica); + String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; + const std::shared_ptr iterator; const bool send_over_whole_archive; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 965de3727b05..cd2eacd8241d 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1195,7 +1195,7 @@ def test_joins(started_cluster): assert len(res) == 25 -def _test_graceful_shutdown(started_cluster): +def test_graceful_shutdown(started_cluster): node = started_cluster.instances["s0_0_0"] node_to_shutdown = started_cluster.instances["s0_1_0"]