Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ In case a table function is used as the destination, the schema can be omitted a
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_filename_pattern

- **Type**: `String`
- **Default**: `{part_name}_{checksum}`
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.


## Examples

### Basic Export to S3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ TO TABLE [destination_database.]destination_table
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_filename_pattern

- **Type**: `String`
- **Default**: `{part_name}_{checksum}`
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we duplicate part_export.md content here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is export partition (slightly different feature), and at some point there might be settings that are not supported by export partition and only by export part.

I don't have a good answer tbh.

## Examples

### Basic Export to S3
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7388,6 +7388,9 @@ Possible values:
- `` (empty value) - use server or session timezone

Default value is empty.
)", 0) \
DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"(
Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
)", 0) \
\
/* ####################################################### */ \
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_bytes_per_file;
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;
bool lock_inside_the_task; /// todo temporary

std::string toJsonString() const
Expand All @@ -139,6 +140,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("max_bytes_per_file", max_bytes_per_file);
json.set("max_rows_per_file", max_rows_per_file);
json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy)));
json.set("filename_pattern", filename_pattern);
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
Expand Down Expand Up @@ -175,6 +177,7 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
manifest.filename_pattern = json->getValue<String>("filename_pattern");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve manifest backward compatibility for filename_pattern

Deserialization now requires filename_pattern unconditionally, but metadata written by earlier versions does not include this key. Any node that reads an older exports/.../metadata.json (for example while checking existing exports or canceling an export in StorageReplicatedMergeTree) will throw during fromJsonString, breaking in-flight export management after upgrade. Make this field optional and fall back to the default pattern when absent.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arthurpassos , what do you think on ^^ ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody is using this feature yet, it is ok to introduce backwards incompatible changes like this. We literally have 0 users so far.


if (json->has("file_already_exists_policy"))
{
Expand Down
26 changes: 24 additions & 2 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Settings.h>
#include <Common/Macros.h>
#include <boost/algorithm/string/replace.hpp>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
Expand Down Expand Up @@ -47,6 +49,7 @@ namespace Setting
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsBool allow_experimental_analyzer;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace
Expand Down Expand Up @@ -93,6 +96,23 @@ namespace
plan_for_part.addStep(std::move(expression_step));
}
}

String buildDestinationFilename(
const MergeTreePartExportManifest & manifest,
const StorageID & storage_id,
const ContextPtr & local_context)
{
auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value;

boost::replace_all(filename, "{part_name}", manifest.data_part->name);
boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex());

Macros::MacroExpansionInfo macro_info;
macro_info.table_id = storage_id;
filename = local_context->getMacros()->expand(filename, macro_info);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need special logic from {part_name} and {checksum}?
In other words, why we do not put it inside expand() ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because part_name and checksum are calculated on the fly based on the data part being exported. They are not meant to be extracted from macros, it would not even work tbh

return filename;
}
}

ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
Expand Down Expand Up @@ -154,8 +174,10 @@ bool ExportPartTask::executeStep()

try
{
const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context);

sink = destination_storage->import(
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
filename,
block_with_partition_values,
new_file_path_callback,
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ namespace
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);

return context_copy;
context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern);

return context_copy;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ namespace Setting
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
extern const SettingsBool export_merge_tree_partition_lock_inside_the_task;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -8209,6 +8210,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &


manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value;
manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value;

ops.emplace_back(zkutil::makeCreateRequest(
fs::path(partition_exports_path) / "metadata.json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard1</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard2</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ def cluster():
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)
# Sharded instances for filename pattern tests
cluster.add_instance(
"shard1_replica1",
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard1_replica1.xml"],
user_configs=["configs/users.d/profile.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)

cluster.add_instance(
"shard2_replica1",
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard2_replica1.xml"],
user_configs=["configs/users.d/profile.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)
logging.info("Starting cluster...")
cluster.start()
yield cluster
Expand Down Expand Up @@ -161,6 +181,14 @@ def create_tables_and_insert_data(node, mt_table, s3_table, replica_name):
create_s3_table(node, s3_table)


def create_sharded_tables_and_insert_data(node, mt_table, s3_table, replica_name):
"""Create sharded ReplicatedMergeTree table with {shard} macro in ZooKeeper path."""
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{shard}}/{mt_table}', '{replica_name}') PARTITION BY year ORDER BY tuple()")
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)")

create_s3_table(node, s3_table)


def test_restart_nodes_during_export(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["replica1"]
Expand Down Expand Up @@ -1148,3 +1176,82 @@ def test_export_partition_with_mixed_computed_columns(cluster):
AND partition_id = '1'
""")
assert status.strip() == "COMPLETED", f"Expected COMPLETED status, got: {status}"


def test_sharded_export_partition_with_filename_pattern(cluster):
"""Test that export partition with filename pattern prevents collisions in sharded setup."""
shard1_r1 = cluster.instances["shard1_replica1"]
shard2_r1 = cluster.instances["shard2_replica1"]
watcher_node = cluster.instances["watcher_node"]

mt_table = "sharded_mt_table"
s3_table = "sharded_s3_table"

# Create sharded tables on all shards with same partition data (same part names)
# Each shard uses different ZooKeeper path via {shard} macro
create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1")
create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1")
create_s3_table(watcher_node, s3_table)

# Export partition from both shards with filename pattern including shard
# This should prevent filename collisions
shard1_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'"
)
shard2_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'"
)

# Wait for exports to complete
wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED")

total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()
assert total_count == "6", f"Expected 6 total rows (3 from each shard), got {total_count}"

# Verify filenames contain shard information (check via S3 directly)
# Get all files from S3 - query from watcher_node since S3 is shared
files_shard1 = watcher_node.query(
f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard1%' LIMIT 1"
).strip()
files_shard2 = watcher_node.query(
f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard2%' LIMIT 1"
).strip()

# Both shards should have files with their shard names
assert "shard1" in files_shard1 or files_shard1 == "", f"Expected shard1 in filenames, got: {files_shard1}"
assert "shard2" in files_shard2 or files_shard2 == "", f"Expected shard2 in filenames, got: {files_shard2}"


def test_sharded_export_partition_default_pattern(cluster):
shard1_r1 = cluster.instances["shard1_replica1"]
shard2_r1 = cluster.instances["shard2_replica1"]
watcher_node = cluster.instances["watcher_node"]

mt_table = "sharded_mt_table_default"
s3_table = "sharded_s3_table_default"

# Create sharded tables with different ZooKeeper paths per shard
create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1")
create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1")
create_s3_table(watcher_node, s3_table)

# Export with default pattern ({part_name}_{checksum}) - may cause collisions if parts have same name and the same checksum
shard1_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
)
shard2_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
)

wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED")

# Both exports should complete (even if there are collisions, the overwrite policy handles it)
# S3 tables are shared, so query from watcher_node
total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()

# only one file with 3 rows should be present
assert int(total_count) == 3, f"Expected 3 rows, got {total_count}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---- Test: Default pattern {part_name}_{checksum}
1 2020
2 2020
3 2020
---- Verify filename matches 2020_1_1_0_*.1.parquet
1
---- Test: Custom prefix pattern
4 2021
---- Verify filename matches myprefix_2021_2_2_0.1.parquet
1
---- Test: Pattern with macros
1 2020
2 2020
3 2020
---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests)
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: requires s3 storage

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

R=$RANDOM
mt="mt_${R}"
dest1="fp_dest1_${R}"
dest2="fp_dest2_${R}"
dest3="fp_dest3_${R}"

query() {
$CLICKHOUSE_CLIENT --query "$1"
}

query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3"

query "CREATE TABLE $mt (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()"
query "INSERT INTO $mt VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)"

query "CREATE TABLE $dest1 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest1', format=Parquet, partition_strategy='hive') PARTITION BY year"
query "CREATE TABLE $dest2 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest2', format=Parquet, partition_strategy='hive') PARTITION BY year"
query "CREATE TABLE $dest3 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest3', format=Parquet, partition_strategy='hive') PARTITION BY year"

echo "---- Test: Default pattern {part_name}_{checksum}"
query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest1 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{part_name}_{checksum}'"
sleep 3
query "SELECT * FROM $dest1 ORDER BY id"
echo "---- Verify filename matches 2020_1_1_0_*.1.parquet"
query "SELECT count() FROM s3(s3_conn, filename='$dest1/**/2020_1_1_0_*.1.parquet', format='One')"

echo "---- Test: Custom prefix pattern"
query "ALTER TABLE $mt EXPORT PART '2021_2_2_0' TO TABLE $dest2 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = 'myprefix_{part_name}'"
sleep 3
query "SELECT * FROM $dest2 ORDER BY id"
echo "---- Verify filename matches myprefix_2021_2_2_0.1.parquet"
query "SELECT count() FROM s3(s3_conn, filename='$dest2/**/myprefix_2021_2_2_0.1.parquet', format='One')"

echo "---- Test: Pattern with macros"
query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest3 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{database}_{table}_{part_name}'"
sleep 3
query "SELECT * FROM $dest3 ORDER BY id"
echo "---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests)"
query "SELECT count() = 0 FROM s3(s3_conn, filename='$dest3/**/*.1.parquet', format='One') WHERE _file LIKE '%{%'"

query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3"
Loading