From d381de3fb85127a36669e92b69ffa5bf6599a614 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Fri, 27 Feb 2026 14:26:18 +0800 Subject: [PATCH] [spark] Support compact_chain_table procedure --- docs/content/primary-key-table/chain-table.md | 28 +- docs/content/spark/procedures.md | 12 + .../paimon/table/ChainGroupReadTable.java | 9 +- .../table/FallbackReadFileStoreTable.java | 8 + .../paimon/table/source/ChainSplit.java | 36 ++- .../paimon/table/source/ChainSplitTest.java | 24 +- .../apache/paimon/spark/SparkProcedures.java | 2 + .../procedure/CompactChainTableProcedure.java | 220 +++++++++++++ .../CompactChainTableProcedureTest.scala | 294 ++++++++++++++++++ 9 files changed, 613 insertions(+), 20 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactChainTableProcedure.java create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactChainTableProcedureTest.scala diff --git a/docs/content/primary-key-table/chain-table.md b/docs/content/primary-key-table/chain-table.md index 197c79efa4a4..ad2ea2c239d4 100644 --- a/docs/content/primary-key-table/chain-table.md +++ b/docs/content/primary-key-table/chain-table.md @@ -87,7 +87,6 @@ Notice that: - Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table. - Chain table should ensure that the schema of each branch is consistent. - Only spark support now, flink will be supported later. -- Chain compact is not supported for now, and it will be supported later. After creating a chain table, you can read and write data in the following ways. @@ -146,3 +145,30 @@ you will get the following result: | 2 | 1| 1 | +---+----+-----+ ``` + +- Chain Table Compaction: Merge data from snapshot and delta branches into the snapshot branch. +This is useful for periodically compacting incremental data into full snapshots. +You can use the `compact_chain_table` procedure to merge a specific partition: + +```sql +CALL sys.compact_chain_table(table => 'default.t', partition => 'date="20250811"'); +``` + +After compaction, the data in the snapshot branch will contain the merged result from both snapshot +and delta branches, and subsequent queries will benefit from direct snapshot access without +merge-on-read overhead. + +```sql +select t1, t2, t3 from `default`.`t$branch_snapshot` where date = '20250811'; +``` + +you will get the following result: +```text ++---+----+-----+ +| t1| t2| t3| ++---+----+-----+ +| 1 | 1| 1 | +| 2 | 1| 1 | ++---+----+-----+ +``` + diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 1b8959d6bf15..e7217d38e1b1 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -59,6 +59,18 @@ This section introduce all available spark procedures about paimon. CALL sys.compact(table => 'T', compact_strategy => 'minor')

+ + compact_chain_table + + To compact chain table by merging snapshot and delta branches into the snapshot branch. Arguments: +
  • table: The target chain table identifier. Cannot be empty.
  • +
  • partition: Partition specification format (e.g., 'dt="20250810",hour="22"'). Cannot be empty.
  • +
  • overwrite: Whether to overwrite if the partition already exists in the snapshot branch. Default is false. Optional.
  • + + + CALL sys.compact_chain_table(table => 'default.T', partition => 'dt="20250810",hour="22"')

    + + expire_snapshots diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java index 9b7594cef17c..ff388a1bcdd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java @@ -42,6 +42,8 @@ import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.RowDataToObjectArrayConverter; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -169,7 +171,7 @@ public Plan plan() { splits.add( new ChainSplit( dataSplit.partition(), - dataSplit.dataFiles(), + Lists.newArrayList(dataSplit), fileBucketPathMapping, fileBranchMapping)); completePartitions.add(dataSplit.partition()); @@ -277,10 +279,7 @@ public Plan plan() { ChainSplit split = new ChainSplit( partitionParis.getKey(), - entry.getValue().stream() - .flatMap( - datsSplit -> datsSplit.dataFiles().stream()) - .collect(Collectors.toList()), + entry.getValue(), fileBucketPathMapping, fileBranchMapping); splits.add(split); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 65e875c6a59f..60ced8d18bdf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -411,6 +411,14 @@ public InnerTableScan withPartitionFilter(PartitionPredicate partitionPredicate) return this; } + public InnerTableScan withPartitionFilter( + PartitionPredicate mainPartitionPredicate, + PartitionPredicate fallbackPartitionPredicate) { + mainScan.withPartitionFilter(mainPartitionPredicate); + fallbackScan.withPartitionFilter(fallbackPartitionPredicate); + return this; + } + @Override public FallbackReadScan withBucketFilter(Filter bucketFilter) { mainScan.withBucketFilter(bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java index 5339cd06a69d..6a78e07e6fdd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java @@ -20,7 +20,6 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataInputViewStreamWrapper; import org.apache.paimon.io.DataOutputView; @@ -36,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.stream.Collectors; /** * A split describes chain table read scope. It follows DataSplit's custom serialization pattern and @@ -48,17 +48,24 @@ public class ChainSplit implements Split { private static final int VERSION = 1; private BinaryRow logicalPartition; + private List dataSplits; private List dataFiles; private Map fileBranchMapping; private Map fileBucketPathMapping; public ChainSplit( BinaryRow logicalPartition, - List dataFiles, + List dataSplits, Map fileBranchMapping, Map fileBucketPathMapping) { this.logicalPartition = logicalPartition; - this.dataFiles = dataFiles; + this.dataSplits = dataSplits; + this.dataFiles = + dataSplits == null + ? null + : dataSplits.stream() + .flatMap(dataSplit -> dataSplit.dataFiles().stream()) + .collect(Collectors.toList()); this.fileBranchMapping = fileBranchMapping; this.fileBucketPathMapping = fileBucketPathMapping; } @@ -67,6 +74,10 @@ public BinaryRow logicalPartition() { return logicalPartition; } + public List dataSplits() { + return dataSplits; + } + public List dataFiles() { return dataFiles; } @@ -103,12 +114,12 @@ public boolean equals(Object o) { } ChainSplit that = (ChainSplit) o; return Objects.equals(logicalPartition, that.logicalPartition) - && Objects.equals(dataFiles, that.dataFiles); + && Objects.equals(dataSplits, that.dataSplits); } @Override public int hashCode() { - return Objects.hash(logicalPartition, dataFiles); + return Objects.hash(logicalPartition, dataSplits); } private void writeObject(ObjectOutputStream out) throws IOException { @@ -121,6 +132,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE protected void assign(ChainSplit other) { this.logicalPartition = other.logicalPartition; + this.dataSplits = other.dataSplits; this.dataFiles = other.dataFiles; this.fileBranchMapping = other.fileBranchMapping; this.fileBucketPathMapping = other.fileBucketPathMapping; @@ -131,12 +143,11 @@ public void serialize(DataOutputView out) throws IOException { SerializationUtils.serializeBinaryRow(logicalPartition, out); - DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer(); - int size = dataFiles == null ? 0 : dataFiles.size(); + int size = dataSplits == null ? 0 : dataSplits.size(); out.writeInt(size); if (size > 0) { - for (DataFileMeta file : dataFiles) { - dataFileSer.serialize(file, out); + for (DataSplit file : dataSplits) { + file.serialize(out); } } @@ -161,10 +172,9 @@ public static ChainSplit deserialize(DataInputView in) throws IOException { BinaryRow logicalPartition = SerializationUtils.deserializeBinaryRow(in); int n = in.readInt(); - List dataFiles = new ArrayList<>(n); - DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer(); + List dataSplits = new ArrayList<>(n); for (int i = 0; i < n; i++) { - dataFiles.add(dataFileSer.deserialize(in)); + dataSplits.add(DataSplit.deserialize(in)); } int size = in.readInt(); @@ -183,6 +193,6 @@ public static ChainSplit deserialize(DataInputView in) throws IOException { } return new ChainSplit( - logicalPartition, dataFiles, fileBucketPathMapping, fileBranchMapping); + logicalPartition, dataSplits, fileBucketPathMapping, fileBranchMapping); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java index c042f6a37133..564f6044f3a7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java @@ -25,6 +25,8 @@ import org.apache.paimon.io.DataFileTestDataGenerator; import org.apache.paimon.utils.InstantiationUtil; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + import org.junit.jupiter.api.Test; import java.io.IOException; @@ -37,6 +39,22 @@ /** Test for {@link ChainSplit}. */ public class ChainSplitTest { + private DataSplit newDataSplit( + boolean rawConvertible, + List dataFiles, + List deletionFiles) { + DataSplit.Builder builder = DataSplit.builder(); + builder.withSnapshot(1) + .withPartition(BinaryRow.EMPTY_ROW) + .withBucket(1) + .withBucketPath("my path") + .rawConvertible(rawConvertible) + .withDataFiles(dataFiles); + if (deletionFiles != null) { + builder.withDataDeletionFiles(deletionFiles); + } + return builder.build(); + } @Test public void testChainSplitSerde() throws IOException, ClassNotFoundException { @@ -48,6 +66,7 @@ public void testChainSplitSerde() throws IOException, ClassNotFoundException { for (int i = 0; i < 3; i++) { dataFiles.add(gen.next().meta); } + DataSplit dataSplit = newDataSplit(true, dataFiles, null); Map fileBucketPathMapping = new HashMap<>(); Map fileBranchMapping = new HashMap<>(); for (DataFileMeta dataFile : dataFiles) { @@ -56,7 +75,10 @@ public void testChainSplitSerde() throws IOException, ClassNotFoundException { } ChainSplit split = new ChainSplit( - logicalPartition, dataFiles, fileBucketPathMapping, fileBranchMapping); + logicalPartition, + Lists.newArrayList(dataSplit), + fileBucketPathMapping, + fileBranchMapping); byte[] bytes = InstantiationUtil.serializeObject(split); ChainSplit newSplit = InstantiationUtil.deserializeObject(bytes, ChainSplit.class.getClassLoader()); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 74bf04d68c56..0f55079c568c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -21,6 +21,7 @@ import org.apache.paimon.spark.procedure.AlterFunctionProcedure; import org.apache.paimon.spark.procedure.AlterViewDialectProcedure; import org.apache.paimon.spark.procedure.ClearConsumersProcedure; +import org.apache.paimon.spark.procedure.CompactChainTableProcedure; import org.apache.paimon.spark.procedure.CompactDatabaseProcedure; import org.apache.paimon.spark.procedure.CompactManifestProcedure; import org.apache.paimon.spark.procedure.CompactProcedure; @@ -100,6 +101,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder); procedureBuilders.put("compact", CompactProcedure::builder); procedureBuilders.put("compact_database", CompactDatabaseProcedure::builder); + procedureBuilders.put("compact_chain_table", CompactChainTableProcedure::builder); procedureBuilders.put("rescale", RescaleProcedure::builder); procedureBuilders.put("migrate_database", MigrateDatabaseProcedure::builder); procedureBuilders.put("migrate_table", MigrateTableProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactChainTableProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactChainTableProcedure.java new file mode 100644 index 000000000000..6cafc9bb549c --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactChainTableProcedure.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.spark.commands.PaimonSparkWriter; +import org.apache.paimon.spark.util.ScanPlanHelper$; +import org.apache.paimon.spark.utils.SparkProcedureUtils; +import org.apache.paimon.table.ChainGroupReadTable; +import org.apache.paimon.table.FallbackReadFileStoreTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.ChainSplit; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.PaimonUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.spark.sql.types.DataTypes.BooleanType; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Procedure to compact chain table. Usage: + * + *
    
    + *  -- Compact chain table, overwrite default is false
    + *  CALL sys.compact_chain_table(table => 'db.table', partition => 'dt="20250810",hour="22"', [overwrite => true])
    + * 
    + */ +public class CompactChainTableProcedure extends BaseProcedure { + + private static final Logger LOG = LoggerFactory.getLogger(CompactChainTableProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.required("partition", StringType), + ProcedureParameter.optional("overwrite", BooleanType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", BooleanType, false, Metadata.empty()) + }); + + protected CompactChainTableProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String partitionStr = args.getString(1); + boolean overwrite = !args.isNullAt(2) && args.getBoolean(2); + checkArgument( + partitionStr == null || !partitionStr.isEmpty(), + "Partition string cannot be empty"); + + return modifyPaimonTable( + tableIdent, + t -> { + checkArgument( + t instanceof FallbackReadFileStoreTable, + "Table %s is not a chain table", + tableIdent); + FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) t; + checkArgument( + table.fallback() instanceof ChainGroupReadTable, + "Table %s is not a chain table", + tableIdent); + DataSourceV2Relation relation = createRelation(tableIdent); + boolean success = + execute( + (ChainGroupReadTable) table.fallback(), + relation, + partitionStr, + overwrite); + return new InternalRow[] {newInternalRow(success)}; + }); + } + + private boolean execute( + ChainGroupReadTable table, + DataSourceV2Relation relation, + String partitionStr, + boolean overwrite) { + String partition = SparkProcedureUtils.toWhere(partitionStr); + FileStoreTable snapshotTable = table.wrapped(); + + // Check if target partition already exists in snapshot branch + PartitionPredicate snapshotPartitionPredicate = + SparkProcedureUtils.convertToPartitionPredicate( + partition, + snapshotTable.schema().logicalPartitionType(), + spark(), + relation); + boolean partitionExists = + !snapshotTable + .newScan() + .withPartitionFilter(snapshotPartitionPredicate) + .plan() + .splits() + .isEmpty(); + + FallbackReadFileStoreTable.FallbackReadScan scan = + (FallbackReadFileStoreTable.FallbackReadScan) table.newScan(); + PartitionPredicate partitionPredicate = + SparkProcedureUtils.convertToPartitionPredicate( + partition, table.schema().logicalPartitionType(), spark(), relation); + if (partitionExists) { + if (overwrite) { + scan.withPartitionFilter( + SparkProcedureUtils.convertToPartitionPredicate( + "!(" + partition + ")", + table.schema().logicalPartitionType(), + spark(), + relation), + partitionPredicate); + LOG.info("Found existing partition {}, will overwrite it.", partition); + } else { + LOG.info( + "Partition {} already exists in snapshot branch, skipping compaction.", + partitionStr); + return false; + } + } else { + scan.withPartitionFilter(partitionPredicate); + } + + Dataset datasetForWrite = + scan.plan().splits().stream() + .map( + split -> { + DataSplit[] dataSplits = + ((ChainSplit) split) + .dataSplits() + .toArray(new DataSplit[0]); + return PaimonUtils.createDataset( + spark(), + ScanPlanHelper$.MODULE$.createNewScanPlan( + dataSplits, relation)); + }) + .reduce(Dataset::union) + .orElse(null); + + if (datasetForWrite != null) { + PaimonSparkWriter writer = PaimonSparkWriter.apply(snapshotTable); + if (partitionExists) { + writer.writeBuilder().withOverwrite(); + } + Map targetPartition = ParameterUtils.getPartitions(partitionStr).get(0); + for (Map.Entry entry : targetPartition.entrySet()) { + datasetForWrite = + datasetForWrite.withColumn( + entry.getKey(), functions.expr(entry.getValue())); + } + writer.commit(writer.write(datasetForWrite)); + LOG.info("Successfully compacted partition {} to snapshot branch.", partitionStr); + return true; + } else { + LOG.warn("Table {} is empty, skip compaction.", table); + return false; + } + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public CompactChainTableProcedure doBuild() { + return new CompactChainTableProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "Compact chain table by merging snapshot + delta into target snapshot."; + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactChainTableProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactChainTableProcedureTest.scala new file mode 100644 index 000000000000..f879f5be68c0 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactChainTableProcedureTest.scala @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.streaming.StreamTest + +/** Test compact_chain_table procedure. See [[CompactChainTableProcedure]]. */ +class CompactChainTableProcedureTest extends PaimonSparkTestBase with StreamTest { + + test("Paimon Procedure: compact_chain_table - basic test") { + // Create chain table + spark.sql(""" + |CREATE TABLE IF NOT EXISTS chain_compact_t1 ( + | `t1` BIGINT COMMENT 't1', + | `t2` BIGINT COMMENT 't2', + | `t3` STRING COMMENT 't3' + | ) PARTITIONED BY (`date` STRING COMMENT 'date') + |TBLPROPERTIES ( + | 'chain-table.enabled' = 'true', + | 'primary-key' = 'date,t1', + | 'sequence.field' = 't2', + | 'bucket-key' = 't1', + | 'bucket' = '1', + | 'partition.timestamp-pattern' = '$date', + | 'partition.timestamp-formatter' = 'yyyyMMdd' + |) + |""".stripMargin) + + // Create branches + setupChainTableBranches("chain_compact_t1") + + // Insert snapshot data + spark.sql( + "insert into `chain_compact_t1$branch_snapshot` partition (date = '20260223') values (1, 1, '1')") + + // Insert delta data + spark.sql( + "insert into `chain_compact_t1$branch_delta` partition (date = '20260224') values (2, 2, '2')") + + // Before compaction: verify chain read shows snapshot + delta + checkAnswer( + sql("SELECT * FROM `chain_compact_t1` where date = '20260224'"), + Seq(Row(1, 1, "1", "20260224"), Row(2, 2, "2", "20260224")) + ) + + checkAnswer( + spark.sql( + "CALL sys.compact_chain_table(table => '`chain_compact_t1`', partition => 'date=\"20260224\"')"), + Row(true) :: Nil) + + // After compaction: verify snapshot branch has merged data + checkAnswer( + sql("SELECT * FROM `chain_compact_t1` where date = '20260224'"), + Seq(Row(1, 1, "1", "20260224"), Row(2, 2, "2", "20260224")) + ) + checkAnswer( + sql("SELECT * FROM `chain_compact_t1$branch_snapshot` where date = '20260223'"), + Seq(Row(1, 1, "1", "20260223")) + ) + checkAnswer( + sql("SELECT * FROM `chain_compact_t1$branch_snapshot` where date = '20260224'"), + Seq(Row(1, 1, "1", "20260224"), Row(2, 2, "2", "20260224")) + ) + + } + + test("Paimon Procedure: compact_chain_table - overwrite test") { + // Create chain table + spark.sql(""" + |CREATE TABLE IF NOT EXISTS chain_compact_t2 ( + | `t1` BIGINT COMMENT 't1', + | `t2` BIGINT COMMENT 't2', + | `t3` STRING COMMENT 't3' + | ) PARTITIONED BY (`date` STRING COMMENT 'date') + |TBLPROPERTIES ( + | 'chain-table.enabled' = 'true', + | 'primary-key' = 'date,t1', + | 'sequence.field' = 't2', + | 'bucket-key' = 't1', + | 'bucket' = '1', + | 'partition.timestamp-pattern' = '$date', + | 'partition.timestamp-formatter' = 'yyyyMMdd' + | ) + |""".stripMargin) + + // Create branches + setupChainTableBranches("chain_compact_t2") + + // Insert snapshot data + spark.sql( + "insert into `chain_compact_t2$branch_snapshot` partition (date = '20260224') values (1, 1, '1')") + spark.sql( + "insert into `chain_compact_t2$branch_snapshot` partition (date = '20260225') values (1, 2, '1-1')") + + // Insert delta data + spark.sql( + "insert into `chain_compact_t2$branch_delta` partition (date = '20260225') values (2, 2, '2')") + + // First call should fail because partition exists + checkAnswer( + spark.sql( + "CALL sys.compact_chain_table(table => '`chain_compact_t2`', partition => 'date=\"20260225\"')"), + Row(false) :: Nil) + + checkAnswer( + sql("SELECT * FROM `chain_compact_t2` where date = '20260225'"), + Seq(Row(1, 2, "1-1", "20260225")) + ) + + checkAnswer( + spark.sql( + "CALL sys.compact_chain_table(table => '`chain_compact_t2`', partition => 'date=\"20260225\"', overwrite => true)"), + Row(true) :: Nil) + + // Verify snapshot branch now has the data + checkAnswer( + sql("SELECT * FROM `chain_compact_t2` where date = '20260225'"), + Seq(Row(1, 1, "1", "20260225"), Row(2, 2, "2", "20260225")) + ) + } + + test("Paimon Procedure: compact_chain_table - test multiple partitions") { + spark.sql(""" + |CREATE TABLE IF NOT EXISTS chain_compact_multi ( + | `t1` BIGINT COMMENT 't1', + | `t2` BIGINT COMMENT 't2', + | `t3` STRING COMMENT 't3' + | ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour') + |TBLPROPERTIES ( + | 'chain-table.enabled' = 'true', + | 'primary-key' = 'dt,hour,t1', + | 'sequence.field' = 't2', + | 'bucket-key' = 't1', + | 'bucket' = '2', + | 'partition.timestamp-pattern' = '$dt $hour:00:00', + | 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss' + | ) + |""".stripMargin) + + // Create branches + setupChainTableBranches("chain_compact_multi") + + // Write snapshot branch data + spark.sql( + "insert into `chain_compact_multi$branch_snapshot` partition (dt = '20250810', hour = '22') values (1, 1, '1'),(2, 1, '1')") + + // Write delta branch data + spark.sql( + "insert into `chain_compact_multi$branch_delta` partition (dt = '20250810', hour = '21') values (1, 1, '1'),(2, 1, '1')") + spark.sql( + "insert into `chain_compact_multi$branch_delta` partition (dt = '20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' )") + spark.sql( + "insert into `chain_compact_multi$branch_delta` partition (dt = '20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' )") + + // Compact partition 20250810/hour=23 + checkAnswer( + spark.sql( + "CALL sys.compact_chain_table(table => '`chain_compact_multi`', partition => 'dt=\"20250810\",hour=\"23\"')"), + Row(true) :: Nil + ) + + // Verify chain read still works correctly + checkAnswer( + sql("SELECT * FROM `chain_compact_multi` where dt = '20250810' and hour = '23'"), + Seq( + Row(1, 1, "1", "20250810", "23"), + Row(2, 2, "1-1", "20250810", "23"), + Row(4, 1, "1", "20250810", "23") + ) + ) + + spark.sql( + "insert into `chain_compact_multi$branch_snapshot` partition (dt = '20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1')") + spark.sql( + "insert into `chain_compact_multi$branch_snapshot` partition (dt = '20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1')") + spark.sql( + "insert into `chain_compact_multi$branch_delta` partition (dt = '20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1')") + spark.sql( + "insert into `chain_compact_multi$branch_delta` partition (dt = '20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' )") + spark.sql( + "insert into `chain_compact_multi$branch_delta` partition (dt = '20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' )") + + // Compact partition 20250811/hour=02 without overwrite + checkAnswer( + spark.sql( + "CALL sys.compact_chain_table(table => '`chain_compact_multi`', partition => 'dt=\"20250811\",hour=\"02\"')"), + Row(false) :: Nil + ) + + // Verify data is unchanged after skip + checkAnswer( + sql("SELECT * FROM `chain_compact_multi` where dt = '20250811' and hour = '02'"), + Seq( + Row(1, 2, "1-1", "20250811", "02"), + Row(2, 2, "1-1", "20250811", "02"), + Row(3, 2, "1-1", "20250811", "02"), + Row(4, 2, "1-1", "20250811", "02"), + Row(5, 1, "1", "20250811", "02"), + Row(6, 1, "1", "20250811", "02") + ) + ) + + // Compact with overwrite to test overwrite path + checkAnswer( + spark.sql( + "CALL sys.compact_chain_table(table => '`chain_compact_multi`', partition => 'dt=\"20250811\",hour=\"02\"', overwrite => true)"), + Row(true) :: Nil + ) + checkAnswer( + sql("select snapshot_id,commit_kind from `chain_compact_multi$branch_snapshot$snapshots`"), + Seq( + Row(1, "APPEND"), + Row(2, "APPEND"), + Row(3, "APPEND"), + Row(4, "APPEND"), + Row(5, "OVERWRITE") + ) + ) + + // Check all snapshot partition data + checkAnswer( + sql("SELECT * FROM `chain_compact_multi` where dt = '20250811' and hour = '02'"), + Seq( + Row(1, 2, "1-1", "20250811", "02"), + Row(2, 2, "1-1", "20250811", "02"), + Row(3, 2, "1-1", "20250811", "02"), + Row(4, 2, "1-1", "20250811", "02"), + Row(5, 2, "1-1", "20250811", "02"), + Row(6, 2, "1-1", "20250811", "02") + ) + ) + checkAnswer( + sql("SELECT * FROM `chain_compact_multi` where dt = '20250811' and hour = '00'"), + Seq( + Row(1, 2, "1-1", "20250811", "00"), + Row(2, 2, "1-1", "20250811", "00"), + Row(3, 2, "1-1", "20250811", "00"), + Row(4, 2, "1-1", "20250811", "00") + ) + ) + checkAnswer( + sql( + "SELECT * FROM `chain_compact_multi$branch_snapshot` where dt = '20250810' and hour = '23'"), + Seq( + Row(1, 1, "1", "20250810", "23"), + Row(2, 2, "1-1", "20250810", "23"), + Row(4, 1, "1", "20250810", "23") + ) + ) + checkAnswer( + sql( + "SELECT * FROM `chain_compact_multi$branch_snapshot` where dt = '20250810' and hour = '22'"), + Seq(Row(1, 1, "1", "20250810", "22"), Row(2, 1, "1", "20250810", "22")) + ) + } + + def setupChainTableBranches(tableName: String): Unit = { + spark.sql(s"CALL sys.create_branch('$tableName', 'snapshot');") + spark.sql(s"CALL sys.create_branch('$tableName', 'delta');") + + // Set branch properties + spark.sql( + s"ALTER TABLE $tableName SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot', " + + "'scan.fallback-delta-branch' = 'delta')") + spark.sql( + s"ALTER TABLE `$tableName$$branch_snapshot` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')") + spark.sql( + s"ALTER TABLE `$tableName$$branch_delta` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')") + } +}