From 639e2e24c6a399c385f80d3e332c413cdef930f0 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:35:39 +0200 Subject: [PATCH 1/2] Cherry-pick of https://github.com/Altinity/ClickHouse/pull/1493 with unresolved conflict markers (resolution in next commit) --- Original cherry-pick message follows: Merge pull request #1493 from Altinity/bugfix/antalya-26.1/task_reschedule_fix Fix file identifier in rescheduleTasksFromReplica # Conflicts: # src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp --- ...rageObjectStorageStableTaskDistributor.cpp | 72 ++++++++++++++++--- ...torageObjectStorageStableTaskDistributor.h | 2 + 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index c661bf572a08..51e050fee928 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -47,13 +47,16 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb saveLastNodeActivity(number_of_current_replica); - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Replica number {} was marked as lost, can't set task for it anymore", - number_of_current_replica - ); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + } // 1. Check pre-queued files first auto file = getPreQueuedFile(number_of_current_replica); @@ -65,7 +68,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb file = getAnyUnprocessedFile(number_of_current_replica); if (file) - processed_file_list_ptr->second.push_back(file); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + { // It is possible that replica was lost after check in the begining of the method + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); + connection_to_files[file_replica_idx].push_back(file); + } + else + processed_file_list_ptr->second.push_back(file); + } return file; } @@ -123,7 +138,11 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); +<<<<<<< HEAD auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); +======= + auto file_identifier = getFileIdentifier(next_file); +>>>>>>> d078d53df2f (Merge pull request #1493 from Altinity/bugfix/antalya-26.1/task_reschedule_fix) auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -168,6 +187,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } +<<<<<<< HEAD String file_identifier; if (send_over_whole_archive && object_info->isArchive()) { @@ -180,6 +200,9 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter { file_identifier = object_info->getIdentifier(); } +======= + String file_identifier = getFileIdentifier(object_info, true); +>>>>>>> d078d53df2f (Merge pull request #1493 from Altinity/bugfix/antalya-26.1/task_reschedule_fix) if (iceberg_read_optimization_enabled) { @@ -192,7 +215,13 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - size_t file_replica_idx = getReplicaForFile(file_identifier); + size_t file_replica_idx; + + { + std::lock_guard lock(mutex); + file_replica_idx = getReplicaForFile(file_identifier); + } + if (file_replica_idx == number_of_current_replica) { LOG_TRACE( @@ -248,7 +277,11 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); +<<<<<<< HEAD auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); +======= + auto file_path = getFileIdentifier(next_file); +>>>>>>> d078d53df2f (Merge pull request #1493 from Altinity/bugfix/antalya-26.1/task_reschedule_fix) LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", @@ -310,11 +343,28 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ for (const auto & file : processed_file_list_ptr->second) { - auto file_replica_idx = getReplicaForFile(file->getPath()); - unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx)); + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } replica_to_files_to_be_processed.erase(number_of_current_replica); } +String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const +{ + if (send_over_whole_archive && file_object->isArchive()) + { + auto file_identifier = file_object->getPathOrPathToArchiveIfArchive(); + if (write_to_log) + { + LOG_TEST(log, "Will send over the whole archive {} to replicas. " + "This will be suboptimal, consider turning on " + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + } + return file_identifier; + } + return getAbsolutePathFromObjectInfo(file_object).value_or(file_object->getIdentifier()); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 0cd10ac7188e..3a5a16998be3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -41,6 +41,8 @@ class StorageObjectStorageStableTaskDistributor void saveLastNodeActivity(size_t number_of_current_replica); + String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; + const std::shared_ptr iterator; const bool send_over_whole_archive; From 8c42e13b2b9aec8a1fcc84c79ef3a3565074327b Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Thu, 7 May 2026 00:45:54 +0200 Subject: [PATCH 2/2] Resolve conflicts in cherry-pick of #1493 --- ...rageObjectStorageStableTaskDistributor.cpp | 25 +------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 51e050fee928..c395e7445317 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -138,11 +138,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); -<<<<<<< HEAD - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); -======= auto file_identifier = getFileIdentifier(next_file); ->>>>>>> d078d53df2f (Merge pull request #1493 from Altinity/bugfix/antalya-26.1/task_reschedule_fix) auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -187,22 +183,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } -<<<<<<< HEAD - String file_identifier; - if (send_over_whole_archive && object_info->isArchive()) - { - file_identifier = object_info->getPathOrPathToArchiveIfArchive(); - LOG_TEST(log, "Will send over the whole archive {} to replicas. " - "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); - } - else - { - file_identifier = object_info->getIdentifier(); - } -======= String file_identifier = getFileIdentifier(object_info, true); ->>>>>>> d078d53df2f (Merge pull request #1493 from Altinity/bugfix/antalya-26.1/task_reschedule_fix) if (iceberg_read_optimization_enabled) { @@ -277,11 +258,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); -<<<<<<< HEAD - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); -======= auto file_path = getFileIdentifier(next_file); ->>>>>>> d078d53df2f (Merge pull request #1493 from Altinity/bugfix/antalya-26.1/task_reschedule_fix) LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", @@ -364,7 +341,7 @@ String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPt } return file_identifier; } - return getAbsolutePathFromObjectInfo(file_object).value_or(file_object->getIdentifier()); + return file_object->getIdentifier(); } }