Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/Storages/ColumnsDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,15 @@ NamesAndTypesList ColumnsDescription::getInsertable() const
return ret;
}

NamesAndTypesList ColumnsDescription::getReadable() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.kind != ColumnDefaultKind::Ephemeral)
ret.emplace_back(col.name, col.type);
return ret;
}

NamesAndTypesList ColumnsDescription::getMaterialized() const
{
NamesAndTypesList ret;
Expand Down Expand Up @@ -851,7 +860,6 @@ std::optional<ColumnDefault> ColumnsDescription::getDefault(const String & colum
return {};
}


bool ColumnsDescription::hasCompressionCodec(const String & column_name) const
{
const auto it = columns.get<1>().find(column_name);
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ColumnsDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class ColumnsDescription : public IHints<>
NamesAndTypesList getOrdinary() const;
NamesAndTypesList getMaterialized() const;
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
NamesAndTypesList getReadable() const; /// ordinary + materialized + aliases (no ephemeral)
NamesAndTypesList getAliases() const;
NamesAndTypesList getEphemeral() const;
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
Expand Down
47 changes: 46 additions & 1 deletion src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Settings.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/Exception.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/MergeTree/ExportList.h>
#include <Formats/FormatFactory.h>
#include <Databases/enableAllExperimentalSettings.h>

namespace ProfileEvents
{
Expand Down Expand Up @@ -42,6 +45,43 @@ namespace Setting
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
}

namespace
{
void materializeSpecialColumns(
const SharedHeader & header,
const StorageMetadataPtr & storage_metadata,
const ContextPtr & local_context,
QueryPlan & plan_for_part
)
{
const auto readable_columns = storage_metadata->getColumns().getReadable();

// Enable all experimental settings for default expressions
// (same pattern as in IMergeTreeReader::evaluateMissingDefaults)
auto context_for_defaults = Context::createCopy(local_context);
enableAllExperimentalSettings(context_for_defaults);

auto defaults_dag = evaluateMissingDefaults(
*header,
readable_columns,
storage_metadata->getColumns(),
context_for_defaults);

if (defaults_dag)
{
/// Ensure columns are in the correct order matching readable_columns
defaults_dag->removeUnusedActions(readable_columns.getNames(), false);
defaults_dag->addMaterializingOutputActions(/*materialize_sparse=*/ false);

auto expression_step = std::make_unique<ExpressionStep>(
header,
std::move(*defaults_dag));
expression_step->setStepDescription("Compute alias and default expressions for export");
plan_for_part.addStep(std::move(expression_step));
}
}
}

ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
: storage(storage_),
manifest(manifest_)
Expand All @@ -58,7 +98,8 @@ bool ExportPartTask::executeStep()

const auto & metadata_snapshot = manifest.metadata_snapshot;

Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
/// Read only physical columns from the part
const auto columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();

MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;

Expand Down Expand Up @@ -146,6 +187,10 @@ bool ExportPartTask::executeStep()
local_context,
getLogger("ExportPartition"));

/// We need to support exporting materialized and alias columns to object storage. For some reason, object storage engines don't support them.
/// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns
materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part);

ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");

QueryPlanOptimizationSettings optimization_settings(local_context);
Expand Down
8 changes: 7 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6242,7 +6242,13 @@ void MergeTreeData::exportPartToTable(
auto source_metadata_ptr = getInMemoryMetadataPtr();
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();

if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
const auto & source_columns = source_metadata_ptr->getColumns();

const auto & destination_columns = destination_metadata_ptr->getColumns();

/// compare all source readable columns with all destination insertable columns
/// this allows us to skip ephemeral columns
if (source_columns.getReadable().sizeOfDifference(destination_columns.getInsertable()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");

if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8133,7 +8133,9 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
auto src_snapshot = getInMemoryMetadataPtr();
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();

if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
/// compare all source readable columns with all destination insertable columns
/// this allows us to skip ephemeral columns
if (src_snapshot->getColumns().getReadable().sizeOfDifference(destination_snapshot->getColumns().getInsertable()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");

if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,59 @@ def test_multiple_exports_within_a_single_query(cluster):
# # Wait for export to finish and then verify destination still reflects the original snapshot (3 rows)
# time.sleep(5)
# assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '3\n', "Export did not preserve snapshot at start time after source mutation"


def test_export_partition_with_mixed_computed_columns(cluster):
"""Test export partition with ALIAS, MATERIALIZED, and EPHEMERAL columns."""
node = cluster.instances["replica1"]

mt_table = "mixed_computed_mt_table"
s3_table = "mixed_computed_s3_table"

node.query(f"""
CREATE TABLE {mt_table} (
id UInt32,
value UInt32,
tag_input String EPHEMERAL,
doubled UInt64 ALIAS value * 2,
tripled UInt64 MATERIALIZED value * 3,
tag String DEFAULT upper(tag_input)
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'replica1')
PARTITION BY id
ORDER BY id
SETTINGS index_granularity = 1
""")

# Create S3 destination table with regular columns (no EPHEMERAL)
node.query(f"""
CREATE TABLE {s3_table} (
id UInt32,
value UInt32,
doubled UInt64,
tripled UInt64,
tag String
) ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive')
PARTITION BY id
""")

node.query(f"INSERT INTO {mt_table} (id, value, tag_input) VALUES (1, 5, 'test'), (1, 10, 'prod')")

node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '1' TO TABLE {s3_table}")

wait_for_export_status(node, mt_table, s3_table, "1", "COMPLETED")

# Verify source data (ALIAS computed, EPHEMERAL not stored)
source_result = node.query(f"SELECT id, value, doubled, tripled, tag FROM {mt_table} ORDER BY value")
expected = "1\t5\t10\t15\tTEST\n1\t10\t20\t30\tPROD\n"
assert source_result == expected, f"Source table data mismatch. Expected:\n{expected}\nGot:\n{source_result}"

dest_result = node.query(f"SELECT id, value, doubled, tripled, tag FROM {s3_table} ORDER BY value")
assert dest_result == expected, f"Exported data mismatch. Expected:\n{expected}\nGot:\n{dest_result}"

status = node.query(f"""
SELECT status FROM system.replicated_partition_exports
WHERE source_table = '{mt_table}'
AND destination_table = '{s3_table}'
AND partition_id = '1'
""")
assert status.strip() == "COMPLETED", f"Expected COMPLETED status, got: {status}"
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,38 @@
---- Count rows in big_table and big_destination_max_rows
4194304
4194304
---- Test ALIAS columns export
---- Verify ALIAS column data in source table (arr_1 computed from arr[1])
1 [1,2,3] 1
1 [10,20,30] 10
---- Verify ALIAS column data exported to S3 (should match source)
1 [1,2,3] 1
1 [10,20,30] 10
---- Test MATERIALIZED columns export
---- Verify MATERIALIZED column data in source table (arr_1 computed from arr[1])
1 [1,2,3] 1
1 [10,20,30] 10
---- Verify MATERIALIZED column data exported to S3 (should match source)
1 [1,2,3] 1
1 [10,20,30] 10
---- Test EPHEMERAL column (not stored, ignored during export)
---- Verify data in source
1 ALICE
1 BOB
---- Verify exported data
1 ALICE
1 BOB
---- Test Mixed ALIAS, MATERIALIZED, and EPHEMERAL in same table
---- Verify mixed columns in source table
1 5 10 15 TEST
1 10 20 30 PROD
---- Verify mixed columns exported to S3 (should match source)
1 5 10 15 TEST
1 10 20 30 PROD
---- Test Complex Expressions in computed columns
---- Verify complex expressions in source table
1 alice ALICE alice-1
1 bob BOB bob-1
---- Verify complex expressions exported to S3 (should match source)
1 alice ALICE alice-1
1 bob BOB bob-1
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@ mt_table_roundtrip="mt_table_roundtrip_${RANDOM}"
big_table="big_table_${RANDOM}"
big_destination_max_bytes="big_destination_max_bytes_${RANDOM}"
big_destination_max_rows="big_destination_max_rows_${RANDOM}"
mt_table_tf="mt_table_tf_${RANDOM}"
mt_alias="mt_alias_${RANDOM}"
mt_materialized="mt_materialized_${RANDOM}"
s3_alias_export="s3_alias_export_${RANDOM}"
s3_materialized_export="s3_materialized_export_${RANDOM}"
mt_mixed="mt_mixed_${RANDOM}"
s3_mixed_export="s3_mixed_export_${RANDOM}"
mt_complex_expr="mt_complex_expr_${RANDOM}"
s3_complex_expr_export="s3_complex_expr_export_${RANDOM}"
mt_ephemeral="mt_ephemeral_${RANDOM}"
s3_ephemeral_export="s3_ephemeral_export_${RANDOM}"

query() {
$CLICKHOUSE_CLIENT --query "$1"
}

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function"
query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $mt_alias, $mt_materialized, $s3_alias_export, $s3_materialized_export, $mt_mixed, $s3_mixed_export, $mt_complex_expr, $s3_complex_expr_export, $mt_ephemeral, $s3_ephemeral_export"

query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()"
query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year"
Expand Down Expand Up @@ -114,4 +125,127 @@ echo "---- Count rows in big_table and big_destination_max_rows"
query "SELECT COUNT() from $big_table"
query "SELECT COUNT() from $big_destination_max_rows"

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $big_table, $big_destination_max_bytes, $big_destination_max_rows"
echo "---- Test ALIAS columns export"
query "CREATE TABLE $mt_alias (a UInt32, arr Array(UInt64), arr_1 UInt64 ALIAS arr[1]) ENGINE = MergeTree() PARTITION BY a ORDER BY (a, arr[1]) SETTINGS index_granularity = 1"
query "CREATE TABLE $s3_alias_export (a UInt32, arr Array(UInt64), arr_1 UInt64) ENGINE = S3(s3_conn, filename='$s3_alias_export', format=Parquet, partition_strategy='hive') PARTITION BY a"

query "INSERT INTO $mt_alias VALUES (1, [1, 2, 3]), (1, [10, 20, 30])"

alias_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$mt_alias' AND partition_id = '1' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n')

query "ALTER TABLE $mt_alias EXPORT PART '$alias_part' TO TABLE $s3_alias_export SETTINGS allow_experimental_export_merge_tree_part = 1"

sleep 3

echo "---- Verify ALIAS column data in source table (arr_1 computed from arr[1])"
query "SELECT a, arr, arr_1 FROM $mt_alias ORDER BY arr"

echo "---- Verify ALIAS column data exported to S3 (should match source)"
query "SELECT a, arr, arr_1 FROM $s3_alias_export ORDER BY arr"

echo "---- Test MATERIALIZED columns export"
query "CREATE TABLE $mt_materialized (a UInt32, arr Array(UInt64), arr_1 UInt64 MATERIALIZED arr[1]) ENGINE = MergeTree() PARTITION BY a ORDER BY (a, arr_1) SETTINGS index_granularity = 1"
query "CREATE TABLE $s3_materialized_export (a UInt32, arr Array(UInt64), arr_1 UInt64) ENGINE = S3(s3_conn, filename='$s3_materialized_export', format=Parquet, partition_strategy='hive') PARTITION BY a"

query "INSERT INTO $mt_materialized VALUES (1, [1, 2, 3]), (1, [10, 20, 30])"

materialized_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$mt_materialized' AND partition_id = '1' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n')

query "ALTER TABLE $mt_materialized EXPORT PART '$materialized_part' TO TABLE $s3_materialized_export SETTINGS allow_experimental_export_merge_tree_part = 1"

sleep 3

echo "---- Verify MATERIALIZED column data in source table (arr_1 computed from arr[1])"
query "SELECT a, arr, arr_1 FROM $mt_materialized ORDER BY arr"

echo "---- Verify MATERIALIZED column data exported to S3 (should match source)"
query "SELECT a, arr, arr_1 FROM $s3_materialized_export ORDER BY arr"

echo "---- Test EPHEMERAL column (not stored, ignored during export)"
query "CREATE TABLE $mt_ephemeral (
id UInt32,
name_input String EPHEMERAL,
name_upper String DEFAULT upper(name_input)
) ENGINE = MergeTree() PARTITION BY id ORDER BY id SETTINGS index_granularity = 1"

query "CREATE TABLE $s3_ephemeral_export (
id UInt32,
name_upper String
) ENGINE = S3(s3_conn, filename='$s3_ephemeral_export', format=Parquet, partition_strategy='hive') PARTITION BY id"

query "INSERT INTO $mt_ephemeral (id, name_input) VALUES (1, 'alice'), (1, 'bob')"

ephemeral_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$mt_ephemeral' AND partition_id = '1' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n')

query "ALTER TABLE $mt_ephemeral EXPORT PART '$ephemeral_part' TO TABLE $s3_ephemeral_export SETTINGS allow_experimental_export_merge_tree_part = 1"

sleep 3

echo "---- Verify data in source"
query "SELECT id, name_upper FROM $mt_ephemeral ORDER BY name_upper"

echo "---- Verify exported data"
query "SELECT id, name_upper FROM $s3_ephemeral_export ORDER BY name_upper"

echo "---- Test Mixed ALIAS, MATERIALIZED, and EPHEMERAL in same table"
query "CREATE TABLE $mt_mixed (
id UInt32,
value UInt32,
tag_input String EPHEMERAL,
doubled UInt64 ALIAS value * 2,
tripled UInt64 MATERIALIZED value * 3,
tag String DEFAULT upper(tag_input)
) ENGINE = MergeTree() PARTITION BY id ORDER BY id SETTINGS index_granularity = 1"

query "CREATE TABLE $s3_mixed_export (
id UInt32,
value UInt32,
doubled UInt64,
tripled UInt64,
tag String
) ENGINE = S3(s3_conn, filename='$s3_mixed_export', format=Parquet, partition_strategy='hive') PARTITION BY id"

query "INSERT INTO $mt_mixed (id, value, tag_input) VALUES (1, 5, 'test'), (1, 10, 'prod')"

mixed_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$mt_mixed' AND partition_id = '1' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n')

query "ALTER TABLE $mt_mixed EXPORT PART '$mixed_part' TO TABLE $s3_mixed_export SETTINGS allow_experimental_export_merge_tree_part = 1"

sleep 3

echo "---- Verify mixed columns in source table"
query "SELECT id, value, doubled, tripled, tag FROM $mt_mixed ORDER BY value"

echo "---- Verify mixed columns exported to S3 (should match source)"
query "SELECT id, value, doubled, tripled, tag FROM $s3_mixed_export ORDER BY value"

echo "---- Test Complex Expressions in computed columns"
query "CREATE TABLE $mt_complex_expr (
id UInt32,
name String,
upper_name String ALIAS upper(name),
concat_result String MATERIALIZED concat(name, '-', toString(id))
) ENGINE = MergeTree() PARTITION BY id ORDER BY id SETTINGS index_granularity = 1"

query "CREATE TABLE $s3_complex_expr_export (
id UInt32,
name String,
upper_name String,
concat_result String
) ENGINE = S3(s3_conn, filename='$s3_complex_expr_export', format=Parquet, partition_strategy='hive') PARTITION BY id"

query "INSERT INTO $mt_complex_expr (id, name) VALUES (1, 'alice'), (1, 'bob')"

complex_expr_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$mt_complex_expr' AND partition_id = '1' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n')

query "ALTER TABLE $mt_complex_expr EXPORT PART '$complex_expr_part' TO TABLE $s3_complex_expr_export SETTINGS allow_experimental_export_merge_tree_part = 1"

sleep 3

echo "---- Verify complex expressions in source table"
query "SELECT id, name, upper_name, concat_result FROM $mt_complex_expr ORDER BY name"

echo "---- Verify complex expressions exported to S3 (should match source)"
query "SELECT id, name, upper_name, concat_result FROM $s3_complex_expr_export ORDER BY name"

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $big_table, $big_destination_max_bytes, $big_destination_max_rows, $mt_alias, $mt_materialized, $s3_alias_export, $s3_materialized_export, $mt_ephemeral, $s3_ephemeral_export, $mt_mixed, $s3_mixed_export, $mt_complex_expr, $s3_complex_expr_export"
Loading
Loading