|
3 | 3 | #include <Storages/MergeTree/MergeTreeSequentialSource.h> |
4 | 4 | #include <Storages/MergeTree/MergeTreeData.h> |
5 | 5 | #include <Interpreters/Context.h> |
6 | | -#include <Interpreters/DatabaseCatalog.h> |
7 | 6 | #include <Interpreters/inplaceBlockConversions.h> |
8 | 7 | #include <Core/Settings.h> |
| 8 | +#include <Common/Macros.h> |
| 9 | +#include <boost/algorithm/string/replace.hpp> |
9 | 10 | #include <Interpreters/ExpressionActions.h> |
10 | 11 | #include <Interpreters/ActionsDAG.h> |
| 12 | +#include <Interpreters/DatabaseCatalog.h> |
11 | 13 | #include <Processors/Executors/CompletedPipelineExecutor.h> |
12 | 14 | #include <Processors/QueryPlan/BuildQueryPipelineSettings.h> |
13 | 15 | #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> |
|
17 | 19 | #include "Common/setThreadName.h" |
18 | 20 | #include <Common/Exception.h> |
19 | 21 | #include <Common/ProfileEventsScope.h> |
| 22 | +#include <Databases/DatabaseReplicated.h> |
20 | 23 | #include <Storages/MergeTree/ExportList.h> |
21 | 24 | #include <Formats/FormatFactory.h> |
22 | 25 | #include <Databases/enableAllExperimentalSettings.h> |
@@ -47,6 +50,7 @@ namespace Setting |
47 | 50 | extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; |
48 | 51 | extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; |
49 | 52 | extern const SettingsBool allow_experimental_analyzer; |
| 53 | + extern const SettingsString export_merge_tree_part_filename_pattern; |
50 | 54 | } |
51 | 55 |
|
52 | 56 | namespace |
@@ -93,6 +97,33 @@ namespace |
93 | 97 | plan_for_part.addStep(std::move(expression_step)); |
94 | 98 | } |
95 | 99 | } |
| 100 | + |
| 101 | + String buildDestinationFilename( |
| 102 | + const MergeTreePartExportManifest & manifest, |
| 103 | + const StorageID & storage_id, |
| 104 | + const ContextPtr & local_context) |
| 105 | + { |
| 106 | + auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value; |
| 107 | + |
| 108 | + boost::replace_all(filename, "{part_name}", manifest.data_part->name); |
| 109 | + boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex()); |
| 110 | + |
| 111 | + Macros::MacroExpansionInfo macro_info; |
| 112 | + macro_info.table_id = storage_id; |
| 113 | + |
| 114 | + if (auto database = DatabaseCatalog::instance().tryGetDatabase(storage_id.database_name)) |
| 115 | + { |
| 116 | + if (const auto replicated = dynamic_cast<const DatabaseReplicated *>(database.get())) |
| 117 | + { |
| 118 | + macro_info.shard = replicated->getShardName(); |
| 119 | + macro_info.replica = replicated->getReplicaName(); |
| 120 | + } |
| 121 | + } |
| 122 | + |
| 123 | + filename = local_context->getMacros()->expand(filename, macro_info); |
| 124 | + |
| 125 | + return filename; |
| 126 | + } |
96 | 127 | } |
97 | 128 |
|
98 | 129 | ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_) |
@@ -154,8 +185,10 @@ bool ExportPartTask::executeStep() |
154 | 185 |
|
155 | 186 | try |
156 | 187 | { |
| 188 | + const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context); |
| 189 | + |
157 | 190 | sink = destination_storage->import( |
158 | | - manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), |
| 191 | + filename, |
159 | 192 | block_with_partition_values, |
160 | 193 | new_file_path_callback, |
161 | 194 | manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite, |
|
0 commit comments