-
Notifications
You must be signed in to change notification settings - Fork 18
add setting to define filename pattern for part exports #1490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,6 +116,7 @@ struct ExportReplicatedMergeTreePartitionManifest | |
| size_t max_bytes_per_file; | ||
| size_t max_rows_per_file; | ||
| MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; | ||
| String filename_pattern; | ||
| bool lock_inside_the_task; /// todo temporary | ||
|
|
||
| std::string toJsonString() const | ||
|
|
@@ -139,6 +140,7 @@ struct ExportReplicatedMergeTreePartitionManifest | |
| json.set("max_bytes_per_file", max_bytes_per_file); | ||
| json.set("max_rows_per_file", max_rows_per_file); | ||
| json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy))); | ||
| json.set("filename_pattern", filename_pattern); | ||
| json.set("create_time", create_time); | ||
| json.set("max_retries", max_retries); | ||
| json.set("ttl_seconds", ttl_seconds); | ||
|
|
@@ -175,6 +177,7 @@ struct ExportReplicatedMergeTreePartitionManifest | |
| manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding"); | ||
| manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file"); | ||
| manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file"); | ||
| manifest.filename_pattern = json->getValue<String>("filename_pattern"); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Deserialization now requires Useful? React with 👍 / 👎.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arthurpassos , what do you think on ^^ ?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nobody is using this feature yet, it is ok to introduce backwards incompatible changes like this. We literally have 0 users so far. |
||
|
|
||
| if (json->has("file_already_exists_policy")) | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,11 +3,13 @@ | |
| #include <Storages/MergeTree/MergeTreeSequentialSource.h> | ||
| #include <Storages/MergeTree/MergeTreeData.h> | ||
| #include <Interpreters/Context.h> | ||
| #include <Interpreters/DatabaseCatalog.h> | ||
| #include <Interpreters/inplaceBlockConversions.h> | ||
| #include <Core/Settings.h> | ||
| #include <Common/Macros.h> | ||
| #include <boost/algorithm/string/replace.hpp> | ||
| #include <Interpreters/ExpressionActions.h> | ||
| #include <Interpreters/ActionsDAG.h> | ||
| #include <Interpreters/DatabaseCatalog.h> | ||
| #include <Processors/Executors/CompletedPipelineExecutor.h> | ||
| #include <Processors/QueryPlan/BuildQueryPipelineSettings.h> | ||
| #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> | ||
|
|
@@ -47,6 +49,7 @@ namespace Setting | |
| extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; | ||
| extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; | ||
| extern const SettingsBool allow_experimental_analyzer; | ||
| extern const SettingsString export_merge_tree_part_filename_pattern; | ||
| } | ||
|
|
||
| namespace | ||
|
|
@@ -93,6 +96,23 @@ namespace | |
| plan_for_part.addStep(std::move(expression_step)); | ||
| } | ||
| } | ||
|
|
||
| String buildDestinationFilename( | ||
| const MergeTreePartExportManifest & manifest, | ||
| const StorageID & storage_id, | ||
| const ContextPtr & local_context) | ||
| { | ||
| auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value; | ||
|
|
||
| boost::replace_all(filename, "{part_name}", manifest.data_part->name); | ||
| boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex()); | ||
|
|
||
| Macros::MacroExpansionInfo macro_info; | ||
| macro_info.table_id = storage_id; | ||
| filename = local_context->getMacros()->expand(filename, macro_info); | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need special logic from {part_name} and {checksum}?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because |
||
| return filename; | ||
| } | ||
| } | ||
|
|
||
| ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_) | ||
|
|
@@ -154,8 +174,10 @@ bool ExportPartTask::executeStep() | |
|
|
||
| try | ||
| { | ||
| const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context); | ||
|
|
||
| sink = destination_storage->import( | ||
| manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), | ||
| filename, | ||
| block_with_partition_values, | ||
| new_file_path_callback, | ||
| manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| <clickhouse> | ||
| <macros> | ||
| <shard>shard1</shard> | ||
| <replica>replica1</replica> | ||
| </macros> | ||
| </clickhouse> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| <clickhouse> | ||
| <macros> | ||
| <shard>shard2</shard> | ||
| <replica>replica1</replica> | ||
| </macros> | ||
| </clickhouse> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| ---- Test: Default pattern {part_name}_{checksum} | ||
| 1 2020 | ||
| 2 2020 | ||
| 3 2020 | ||
| ---- Verify filename matches 2020_1_1_0_*.1.parquet | ||
| 1 | ||
| ---- Test: Custom prefix pattern | ||
| 4 2021 | ||
| ---- Verify filename matches myprefix_2021_2_2_0.1.parquet | ||
| 1 | ||
| ---- Test: Pattern with macros | ||
| 1 2020 | ||
| 2 2020 | ||
| 3 2020 | ||
| ---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests) | ||
| 1 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| #!/usr/bin/env bash | ||
| # Tags: no-fasttest | ||
| # Tag no-fasttest: requires s3 storage | ||
|
|
||
| CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) | ||
| # shellcheck source=../shell_config.sh | ||
| . "$CURDIR"/../shell_config.sh | ||
|
|
||
| R=$RANDOM | ||
| mt="mt_${R}" | ||
| dest1="fp_dest1_${R}" | ||
| dest2="fp_dest2_${R}" | ||
| dest3="fp_dest3_${R}" | ||
|
|
||
| query() { | ||
| $CLICKHOUSE_CLIENT --query "$1" | ||
| } | ||
|
|
||
| query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3" | ||
|
|
||
| query "CREATE TABLE $mt (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" | ||
| query "INSERT INTO $mt VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" | ||
|
|
||
| query "CREATE TABLE $dest1 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest1', format=Parquet, partition_strategy='hive') PARTITION BY year" | ||
| query "CREATE TABLE $dest2 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest2', format=Parquet, partition_strategy='hive') PARTITION BY year" | ||
| query "CREATE TABLE $dest3 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest3', format=Parquet, partition_strategy='hive') PARTITION BY year" | ||
|
|
||
| echo "---- Test: Default pattern {part_name}_{checksum}" | ||
| query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest1 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{part_name}_{checksum}'" | ||
| sleep 3 | ||
| query "SELECT * FROM $dest1 ORDER BY id" | ||
| echo "---- Verify filename matches 2020_1_1_0_*.1.parquet" | ||
| query "SELECT count() FROM s3(s3_conn, filename='$dest1/**/2020_1_1_0_*.1.parquet', format='One')" | ||
|
|
||
| echo "---- Test: Custom prefix pattern" | ||
| query "ALTER TABLE $mt EXPORT PART '2021_2_2_0' TO TABLE $dest2 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = 'myprefix_{part_name}'" | ||
| sleep 3 | ||
| query "SELECT * FROM $dest2 ORDER BY id" | ||
| echo "---- Verify filename matches myprefix_2021_2_2_0.1.parquet" | ||
| query "SELECT count() FROM s3(s3_conn, filename='$dest2/**/myprefix_2021_2_2_0.1.parquet', format='One')" | ||
|
|
||
| echo "---- Test: Pattern with macros" | ||
| query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest3 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{database}_{table}_{part_name}'" | ||
| sleep 3 | ||
| query "SELECT * FROM $dest3 ORDER BY id" | ||
| echo "---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests)" | ||
| query "SELECT count() = 0 FROM s3(s3_conn, filename='$dest3/**/*.1.parquet', format='One') WHERE _file LIKE '%{%'" | ||
|
|
||
| query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we duplicate part_export.md content here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is export partition (slightly different feature), and at some point there might be settings that are not supported by export partition and only by export part.
I don't have a good answer tbh.