Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -932,3 +932,143 @@ def test_export_partition_writes_column_statistics(cluster):

entries = fetch_manifest_entries(node, query_id)
assert_exported_stats(entries)


def test_export_50_monthly_partitions_to_iceberg_two_replicas(cluster):
"""
Reproduction of the QA scenario reported against EXPORT PARTITION on a
two-replica ReplicatedMergeTree. Each partition is exported with its own
ALTER TABLE statement, which mirrors how QA drove the failure (every
statement reports success but the destination row count does not match).

Schema mirrors QA:
- ``event_month`` is a MATERIALIZED Int32 derived from ``event_time``,
and is the partition expression.
- Wide row layout with a SHA256-derived ``payload`` column so each
row is several hundred bytes.

Data layout mirrors QA:
- 10M rows distributed deterministically across 50 monthly partitions
(200k rows per partition, partition IDs ``201501``..``201902``).
- Insert uses ``max_insert_threads = 8`` with no ``max_insert_block_size``
override, so each partition typically ends up with multiple parts
(instead of exactly one part per partition as in the prior version of
this test, which made the bug invisible).

Export pattern mirrors QA:
- 50 separate ``ALTER TABLE ... EXPORT PARTITION ID '<YYYYMM>'``
statements, NOT one ALTER bundling 50 clauses.
"""
replica1 = cluster.instances["replica1"]
replica2 = cluster.instances["replica2"]

uid = unique_suffix()
mt_table = f"mt_qa_{uid}"
iceberg_table = f"iceberg_qa_{uid}"

# Source RMT schema (with MATERIALIZED partition column, matching QA).
rmt_cols = (
"event_month Int32 MATERIALIZED toInt32(toYYYYMM(toDateTime(event_time))), "
"id Int64, "
"event_time DateTime64(6), "
"user_id Int32, "
"category String, "
"value Float64, "
"payload String"
)
# Iceberg destination has the same columns but no MATERIALIZED expression.
iceberg_cols = (
"event_month Int32, "
"id Int64, "
"event_time DateTime64(6), "
"user_id Int32, "
"category String, "
"value Float64, "
"payload String"
)

make_rmt(replica1, mt_table, rmt_cols, "event_month",
replica_name="replica1", order_by="(event_time, id)")
make_rmt(replica2, mt_table, rmt_cols, "event_month",
replica_name="replica2", order_by="(event_time, id)")

make_iceberg_s3(replica1, iceberg_table, iceberg_cols, "event_month")
make_iceberg_s3(replica2, iceberg_table, iceberg_cols, "event_month",
if_not_exists=True)

num_partitions = 50
rows_per_partition = 200_000
total_rows = num_partitions * rows_per_partition # 10,000,000

# Insert all rows on replica1 only. Each row's event_time selects which
# monthly partition it lands in (rows 0..199999 -> 2015-01, etc.). No
# max_insert_block_size override is set, so the insert is split into
# multiple parts per partition under max_insert_threads = 8.
replica1.query(
f"""
INSERT INTO {mt_table} (id, event_time, user_id, category, value, payload)
SELECT
n AS id,
toDateTime64(
toStartOfMonth(toDateTime('2015-01-01 00:00:00'))
+ toIntervalMonth(intDiv(n, {rows_per_partition}))
+ toIntervalSecond(n % {rows_per_partition}),
6
) AS event_time,
toInt32((n * 48271) % 2147483647) AS user_id,
concat('c', toString(n % 10)) AS category,
(n % 1000000) / 1000. AS value,
lower(hex(SHA256(toString(n)))) AS payload
FROM (SELECT number AS n FROM numbers({total_rows}))
SETTINGS max_insert_threads = 8
"""
)

inserted = int(replica1.query(f"SELECT count() FROM {mt_table}").strip())
assert inserted == total_rows, (
f"Expected {total_rows} rows in source MergeTree, got {inserted}"
)

# Make sure the second replica has the data before issuing exports.
replica2.query(f"SYSTEM SYNC REPLICA {mt_table}", timeout=600)

# Build the 50 monthly partition IDs starting from 201501.
partition_ids = [
f"{2015 + i // 12}{(i % 12) + 1:02d}" for i in range(num_partitions)
]

# Issue 50 SEPARATE ALTER TABLE statements (mirroring QA's reproduction)
# concatenated into one semicolon-separated script and sent in a single
# query call. Bundling them into a single ALTER with comma-separated
# clauses hides the issue -- the bug only shows up reliably when each
# partition is its own statement.
export_script = "\n".join(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg_table};"
for pid in partition_ids
)
replica1.query(export_script)

for pid in partition_ids:
wait_for_export_status(
replica1, mt_table, iceberg_table, pid,
expected_status="COMPLETED", timeout=600,
)

count = int(replica1.query(f"SELECT count() FROM {iceberg_table}").strip())
assert count == total_rows, (
f"Expected {total_rows} rows in Iceberg table after export, got {count}"
)

# Per-partition row count must also match (200k rows each). Mismatches at
# this granularity are what QA actually observed.
mismatches = []
for pid in partition_ids:
per_part = int(replica1.query(
f"SELECT count() FROM {iceberg_table} WHERE event_month = {int(pid)}"
).strip())
if per_part != rows_per_partition:
mismatches.append((pid, per_part))
assert not mismatches, (
f"Per-partition row counts do not match (expected {rows_per_partition} "
f"rows per partition): {mismatches}"
)
Loading