diff --git a/docs/content/primary-key-table/chain-table.md b/docs/content/primary-key-table/chain-table.md
index 197c79efa4a4..ad2ea2c239d4 100644
--- a/docs/content/primary-key-table/chain-table.md
+++ b/docs/content/primary-key-table/chain-table.md
@@ -87,7 +87,6 @@ Notice that:
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
- Chain table should ensure that the schema of each branch is consistent.
- Only spark support now, flink will be supported later.
-- Chain compact is not supported for now, and it will be supported later.
After creating a chain table, you can read and write data in the following ways.
@@ -146,3 +145,30 @@ you will get the following result:
| 2 | 1| 1 |
+---+----+-----+
```
+
+- Chain Table Compaction: Merge data from snapshot and delta branches into the snapshot branch.
+This is useful for periodically compacting incremental data into full snapshots.
+You can use the `compact_chain_table` procedure to merge a specific partition:
+
+```sql
+CALL sys.compact_chain_table(table => 'default.t', partition => 'date="20250811"');
+```
+
+After compaction, the data in the snapshot branch will contain the merged result from both snapshot
+and delta branches, and subsequent queries will benefit from direct snapshot access without
+merge-on-read overhead.
+
+```sql
+select t1, t2, t3 from `default`.`t$branch_snapshot` where date = '20250811';
+```
+
+you will get the following result:
+```text
++---+----+-----+
+| t1| t2| t3|
++---+----+-----+
+| 1 | 1| 1 |
+| 2 | 1| 1 |
++---+----+-----+
+```
+
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 1b8959d6bf15..e7217d38e1b1 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -59,6 +59,18 @@ This section introduce all available spark procedures about paimon.
CALL sys.compact(table => 'T', compact_strategy => 'minor')
+
| expire_snapshots |
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 9b7594cef17c..ff388a1bcdd2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -42,6 +42,8 @@
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -169,7 +171,7 @@ public Plan plan() {
splits.add(
new ChainSplit(
dataSplit.partition(),
- dataSplit.dataFiles(),
+ Lists.newArrayList(dataSplit),
fileBucketPathMapping,
fileBranchMapping));
completePartitions.add(dataSplit.partition());
@@ -277,10 +279,7 @@ public Plan plan() {
ChainSplit split =
new ChainSplit(
partitionParis.getKey(),
- entry.getValue().stream()
- .flatMap(
- datsSplit -> datsSplit.dataFiles().stream())
- .collect(Collectors.toList()),
+ entry.getValue(),
fileBucketPathMapping,
fileBranchMapping);
splits.add(split);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 65e875c6a59f..60ced8d18bdf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -411,6 +411,14 @@ public InnerTableScan withPartitionFilter(PartitionPredicate partitionPredicate)
return this;
}
+ public InnerTableScan withPartitionFilter(
+ PartitionPredicate mainPartitionPredicate,
+ PartitionPredicate fallbackPartitionPredicate) {
+ mainScan.withPartitionFilter(mainPartitionPredicate);
+ fallbackScan.withPartitionFilter(fallbackPartitionPredicate);
+ return this;
+ }
+
@Override
public FallbackReadScan withBucketFilter(Filter bucketFilter) {
mainScan.withBucketFilter(bucketFilter);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
index 5339cd06a69d..6a78e07e6fdd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
@@ -20,7 +20,6 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputView;
@@ -36,6 +35,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
+import java.util.stream.Collectors;
/**
* A split describes chain table read scope. It follows DataSplit's custom serialization pattern and
@@ -48,17 +48,24 @@ public class ChainSplit implements Split {
private static final int VERSION = 1;
private BinaryRow logicalPartition;
+ private List dataSplits;
private List dataFiles;
private Map fileBranchMapping;
private Map fileBucketPathMapping;
public ChainSplit(
BinaryRow logicalPartition,
- List dataFiles,
+ List dataSplits,
Map fileBranchMapping,
Map fileBucketPathMapping) {
this.logicalPartition = logicalPartition;
- this.dataFiles = dataFiles;
+ this.dataSplits = dataSplits;
+ this.dataFiles =
+ dataSplits == null
+ ? null
+ : dataSplits.stream()
+ .flatMap(dataSplit -> dataSplit.dataFiles().stream())
+ .collect(Collectors.toList());
this.fileBranchMapping = fileBranchMapping;
this.fileBucketPathMapping = fileBucketPathMapping;
}
@@ -67,6 +74,10 @@ public BinaryRow logicalPartition() {
return logicalPartition;
}
+ public List dataSplits() {
+ return dataSplits;
+ }
+
public List dataFiles() {
return dataFiles;
}
@@ -103,12 +114,12 @@ public boolean equals(Object o) {
}
ChainSplit that = (ChainSplit) o;
return Objects.equals(logicalPartition, that.logicalPartition)
- && Objects.equals(dataFiles, that.dataFiles);
+ && Objects.equals(dataSplits, that.dataSplits);
}
@Override
public int hashCode() {
- return Objects.hash(logicalPartition, dataFiles);
+ return Objects.hash(logicalPartition, dataSplits);
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -121,6 +132,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
protected void assign(ChainSplit other) {
this.logicalPartition = other.logicalPartition;
+ this.dataSplits = other.dataSplits;
this.dataFiles = other.dataFiles;
this.fileBranchMapping = other.fileBranchMapping;
this.fileBucketPathMapping = other.fileBucketPathMapping;
@@ -131,12 +143,11 @@ public void serialize(DataOutputView out) throws IOException {
SerializationUtils.serializeBinaryRow(logicalPartition, out);
- DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
- int size = dataFiles == null ? 0 : dataFiles.size();
+ int size = dataSplits == null ? 0 : dataSplits.size();
out.writeInt(size);
if (size > 0) {
- for (DataFileMeta file : dataFiles) {
- dataFileSer.serialize(file, out);
+ for (DataSplit file : dataSplits) {
+ file.serialize(out);
}
}
@@ -161,10 +172,9 @@ public static ChainSplit deserialize(DataInputView in) throws IOException {
BinaryRow logicalPartition = SerializationUtils.deserializeBinaryRow(in);
int n = in.readInt();
- List dataFiles = new ArrayList<>(n);
- DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
+ List dataSplits = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
- dataFiles.add(dataFileSer.deserialize(in));
+ dataSplits.add(DataSplit.deserialize(in));
}
int size = in.readInt();
@@ -183,6 +193,6 @@ public static ChainSplit deserialize(DataInputView in) throws IOException {
}
return new ChainSplit(
- logicalPartition, dataFiles, fileBucketPathMapping, fileBranchMapping);
+ logicalPartition, dataSplits, fileBucketPathMapping, fileBranchMapping);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
index c042f6a37133..564f6044f3a7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
@@ -25,6 +25,8 @@
import org.apache.paimon.io.DataFileTestDataGenerator;
import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -37,6 +39,22 @@
/** Test for {@link ChainSplit}. */
public class ChainSplitTest {
+ private DataSplit newDataSplit(
+ boolean rawConvertible,
+ List dataFiles,
+ List deletionFiles) {
+ DataSplit.Builder builder = DataSplit.builder();
+ builder.withSnapshot(1)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(1)
+ .withBucketPath("my path")
+ .rawConvertible(rawConvertible)
+ .withDataFiles(dataFiles);
+ if (deletionFiles != null) {
+ builder.withDataDeletionFiles(deletionFiles);
+ }
+ return builder.build();
+ }
@Test
public void testChainSplitSerde() throws IOException, ClassNotFoundException {
@@ -48,6 +66,7 @@ public void testChainSplitSerde() throws IOException, ClassNotFoundException {
for (int i = 0; i < 3; i++) {
dataFiles.add(gen.next().meta);
}
+ DataSplit dataSplit = newDataSplit(true, dataFiles, null);
Map fileBucketPathMapping = new HashMap<>();
Map fileBranchMapping = new HashMap<>();
for (DataFileMeta dataFile : dataFiles) {
@@ -56,7 +75,10 @@ public void testChainSplitSerde() throws IOException, ClassNotFoundException {
}
ChainSplit split =
new ChainSplit(
- logicalPartition, dataFiles, fileBucketPathMapping, fileBranchMapping);
+ logicalPartition,
+ Lists.newArrayList(dataSplit),
+ fileBucketPathMapping,
+ fileBranchMapping);
byte[] bytes = InstantiationUtil.serializeObject(split);
ChainSplit newSplit =
InstantiationUtil.deserializeObject(bytes, ChainSplit.class.getClassLoader());
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 74bf04d68c56..0f55079c568c 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -21,6 +21,7 @@
import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
+import org.apache.paimon.spark.procedure.CompactChainTableProcedure;
import org.apache.paimon.spark.procedure.CompactDatabaseProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
@@ -100,6 +101,7 @@ private static Map> initProcedureBuilders() {
procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
procedureBuilders.put("compact", CompactProcedure::builder);
procedureBuilders.put("compact_database", CompactDatabaseProcedure::builder);
+ procedureBuilders.put("compact_chain_table", CompactChainTableProcedure::builder);
procedureBuilders.put("rescale", RescaleProcedure::builder);
procedureBuilders.put("migrate_database", MigrateDatabaseProcedure::builder);
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactChainTableProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactChainTableProcedure.java
new file mode 100644
index 000000000000..6cafc9bb549c
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactChainTableProcedure.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.commands.PaimonSparkWriter;
+import org.apache.paimon.spark.util.ScanPlanHelper$;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.ChainGroupReadTable;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.PaimonUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Procedure to compact chain table. Usage:
+ *
+ *
+ * -- Compact chain table, overwrite default is false
+ * CALL sys.compact_chain_table(table => 'db.table', partition => 'dt="20250810",hour="22"', [overwrite => true])
+ *
+ */
+public class CompactChainTableProcedure extends BaseProcedure {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactChainTableProcedure.class);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.required("partition", StringType),
+ ProcedureParameter.optional("overwrite", BooleanType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", BooleanType, false, Metadata.empty())
+ });
+
+ protected CompactChainTableProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+ String partitionStr = args.getString(1);
+ boolean overwrite = !args.isNullAt(2) && args.getBoolean(2);
+ checkArgument(
+ partitionStr == null || !partitionStr.isEmpty(),
+ "Partition string cannot be empty");
+
+ return modifyPaimonTable(
+ tableIdent,
+ t -> {
+ checkArgument(
+ t instanceof FallbackReadFileStoreTable,
+ "Table %s is not a chain table",
+ tableIdent);
+ FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) t;
+ checkArgument(
+ table.fallback() instanceof ChainGroupReadTable,
+ "Table %s is not a chain table",
+ tableIdent);
+ DataSourceV2Relation relation = createRelation(tableIdent);
+ boolean success =
+ execute(
+ (ChainGroupReadTable) table.fallback(),
+ relation,
+ partitionStr,
+ overwrite);
+ return new InternalRow[] {newInternalRow(success)};
+ });
+ }
+
+ private boolean execute(
+ ChainGroupReadTable table,
+ DataSourceV2Relation relation,
+ String partitionStr,
+ boolean overwrite) {
+ String partition = SparkProcedureUtils.toWhere(partitionStr);
+ FileStoreTable snapshotTable = table.wrapped();
+
+ // Check if target partition already exists in snapshot branch
+ PartitionPredicate snapshotPartitionPredicate =
+ SparkProcedureUtils.convertToPartitionPredicate(
+ partition,
+ snapshotTable.schema().logicalPartitionType(),
+ spark(),
+ relation);
+ boolean partitionExists =
+ !snapshotTable
+ .newScan()
+ .withPartitionFilter(snapshotPartitionPredicate)
+ .plan()
+ .splits()
+ .isEmpty();
+
+ FallbackReadFileStoreTable.FallbackReadScan scan =
+ (FallbackReadFileStoreTable.FallbackReadScan) table.newScan();
+ PartitionPredicate partitionPredicate =
+ SparkProcedureUtils.convertToPartitionPredicate(
+ partition, table.schema().logicalPartitionType(), spark(), relation);
+ if (partitionExists) {
+ if (overwrite) {
+ scan.withPartitionFilter(
+ SparkProcedureUtils.convertToPartitionPredicate(
+ "!(" + partition + ")",
+ table.schema().logicalPartitionType(),
+ spark(),
+ relation),
+ partitionPredicate);
+ LOG.info("Found existing partition {}, will overwrite it.", partition);
+ } else {
+ LOG.info(
+ "Partition {} already exists in snapshot branch, skipping compaction.",
+ partitionStr);
+ return false;
+ }
+ } else {
+ scan.withPartitionFilter(partitionPredicate);
+ }
+
+ Dataset datasetForWrite =
+ scan.plan().splits().stream()
+ .map(
+ split -> {
+ DataSplit[] dataSplits =
+ ((ChainSplit) split)
+ .dataSplits()
+ .toArray(new DataSplit[0]);
+ return PaimonUtils.createDataset(
+ spark(),
+ ScanPlanHelper$.MODULE$.createNewScanPlan(
+ dataSplits, relation));
+ })
+ .reduce(Dataset::union)
+ .orElse(null);
+
+ if (datasetForWrite != null) {
+ PaimonSparkWriter writer = PaimonSparkWriter.apply(snapshotTable);
+ if (partitionExists) {
+ writer.writeBuilder().withOverwrite();
+ }
+ Map targetPartition = ParameterUtils.getPartitions(partitionStr).get(0);
+ for (Map.Entry entry : targetPartition.entrySet()) {
+ datasetForWrite =
+ datasetForWrite.withColumn(
+ entry.getKey(), functions.expr(entry.getValue()));
+ }
+ writer.commit(writer.write(datasetForWrite));
+ LOG.info("Successfully compacted partition {} to snapshot branch.", partitionStr);
+ return true;
+ } else {
+ LOG.warn("Table {} is empty, skip compaction.", table);
+ return false;
+ }
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder() {
+ @Override
+ public CompactChainTableProcedure doBuild() {
+ return new CompactChainTableProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "Compact chain table by merging snapshot + delta into target snapshot.";
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactChainTableProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactChainTableProcedureTest.scala
new file mode 100644
index 000000000000..f879f5be68c0
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactChainTableProcedureTest.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.streaming.StreamTest
+
+/** Test compact_chain_table procedure. See [[CompactChainTableProcedure]]. */
+class CompactChainTableProcedureTest extends PaimonSparkTestBase with StreamTest {
+
+ test("Paimon Procedure: compact_chain_table - basic test") {
+ // Create chain table
+ spark.sql("""
+ |CREATE TABLE IF NOT EXISTS chain_compact_t1 (
+ | `t1` BIGINT COMMENT 't1',
+ | `t2` BIGINT COMMENT 't2',
+ | `t3` STRING COMMENT 't3'
+ | ) PARTITIONED BY (`date` STRING COMMENT 'date')
+ |TBLPROPERTIES (
+ | 'chain-table.enabled' = 'true',
+ | 'primary-key' = 'date,t1',
+ | 'sequence.field' = 't2',
+ | 'bucket-key' = 't1',
+ | 'bucket' = '1',
+ | 'partition.timestamp-pattern' = '$date',
+ | 'partition.timestamp-formatter' = 'yyyyMMdd'
+ |)
+ |""".stripMargin)
+
+ // Create branches
+ setupChainTableBranches("chain_compact_t1")
+
+ // Insert snapshot data
+ spark.sql(
+ "insert into `chain_compact_t1$branch_snapshot` partition (date = '20260223') values (1, 1, '1')")
+
+ // Insert delta data
+ spark.sql(
+ "insert into `chain_compact_t1$branch_delta` partition (date = '20260224') values (2, 2, '2')")
+
+ // Before compaction: verify chain read shows snapshot + delta
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_t1` where date = '20260224'"),
+ Seq(Row(1, 1, "1", "20260224"), Row(2, 2, "2", "20260224"))
+ )
+
+ checkAnswer(
+ spark.sql(
+ "CALL sys.compact_chain_table(table => '`chain_compact_t1`', partition => 'date=\"20260224\"')"),
+ Row(true) :: Nil)
+
+ // After compaction: verify snapshot branch has merged data
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_t1` where date = '20260224'"),
+ Seq(Row(1, 1, "1", "20260224"), Row(2, 2, "2", "20260224"))
+ )
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_t1$branch_snapshot` where date = '20260223'"),
+ Seq(Row(1, 1, "1", "20260223"))
+ )
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_t1$branch_snapshot` where date = '20260224'"),
+ Seq(Row(1, 1, "1", "20260224"), Row(2, 2, "2", "20260224"))
+ )
+
+ }
+
+ test("Paimon Procedure: compact_chain_table - overwrite test") {
+ // Create chain table
+ spark.sql("""
+ |CREATE TABLE IF NOT EXISTS chain_compact_t2 (
+ | `t1` BIGINT COMMENT 't1',
+ | `t2` BIGINT COMMENT 't2',
+ | `t3` STRING COMMENT 't3'
+ | ) PARTITIONED BY (`date` STRING COMMENT 'date')
+ |TBLPROPERTIES (
+ | 'chain-table.enabled' = 'true',
+ | 'primary-key' = 'date,t1',
+ | 'sequence.field' = 't2',
+ | 'bucket-key' = 't1',
+ | 'bucket' = '1',
+ | 'partition.timestamp-pattern' = '$date',
+ | 'partition.timestamp-formatter' = 'yyyyMMdd'
+ | )
+ |""".stripMargin)
+
+ // Create branches
+ setupChainTableBranches("chain_compact_t2")
+
+ // Insert snapshot data
+ spark.sql(
+ "insert into `chain_compact_t2$branch_snapshot` partition (date = '20260224') values (1, 1, '1')")
+ spark.sql(
+ "insert into `chain_compact_t2$branch_snapshot` partition (date = '20260225') values (1, 2, '1-1')")
+
+ // Insert delta data
+ spark.sql(
+ "insert into `chain_compact_t2$branch_delta` partition (date = '20260225') values (2, 2, '2')")
+
+ // First call should fail because partition exists
+ checkAnswer(
+ spark.sql(
+ "CALL sys.compact_chain_table(table => '`chain_compact_t2`', partition => 'date=\"20260225\"')"),
+ Row(false) :: Nil)
+
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_t2` where date = '20260225'"),
+ Seq(Row(1, 2, "1-1", "20260225"))
+ )
+
+ checkAnswer(
+ spark.sql(
+ "CALL sys.compact_chain_table(table => '`chain_compact_t2`', partition => 'date=\"20260225\"', overwrite => true)"),
+ Row(true) :: Nil)
+
+ // Verify snapshot branch now has the data
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_t2` where date = '20260225'"),
+ Seq(Row(1, 1, "1", "20260225"), Row(2, 2, "2", "20260225"))
+ )
+ }
+
+ test("Paimon Procedure: compact_chain_table - test multiple partitions") {
+ spark.sql("""
+ |CREATE TABLE IF NOT EXISTS chain_compact_multi (
+ | `t1` BIGINT COMMENT 't1',
+ | `t2` BIGINT COMMENT 't2',
+ | `t3` STRING COMMENT 't3'
+ | ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour')
+ |TBLPROPERTIES (
+ | 'chain-table.enabled' = 'true',
+ | 'primary-key' = 'dt,hour,t1',
+ | 'sequence.field' = 't2',
+ | 'bucket-key' = 't1',
+ | 'bucket' = '2',
+ | 'partition.timestamp-pattern' = '$dt $hour:00:00',
+ | 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss'
+ | )
+ |""".stripMargin)
+
+ // Create branches
+ setupChainTableBranches("chain_compact_multi")
+
+ // Write snapshot branch data
+ spark.sql(
+ "insert into `chain_compact_multi$branch_snapshot` partition (dt = '20250810', hour = '22') values (1, 1, '1'),(2, 1, '1')")
+
+ // Write delta branch data
+ spark.sql(
+ "insert into `chain_compact_multi$branch_delta` partition (dt = '20250810', hour = '21') values (1, 1, '1'),(2, 1, '1')")
+ spark.sql(
+ "insert into `chain_compact_multi$branch_delta` partition (dt = '20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' )")
+ spark.sql(
+ "insert into `chain_compact_multi$branch_delta` partition (dt = '20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' )")
+
+ // Compact partition 20250810/hour=23
+ checkAnswer(
+ spark.sql(
+ "CALL sys.compact_chain_table(table => '`chain_compact_multi`', partition => 'dt=\"20250810\",hour=\"23\"')"),
+ Row(true) :: Nil
+ )
+
+ // Verify chain read still works correctly
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_multi` where dt = '20250810' and hour = '23'"),
+ Seq(
+ Row(1, 1, "1", "20250810", "23"),
+ Row(2, 2, "1-1", "20250810", "23"),
+ Row(4, 1, "1", "20250810", "23")
+ )
+ )
+
+ spark.sql(
+ "insert into `chain_compact_multi$branch_snapshot` partition (dt = '20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1')")
+ spark.sql(
+ "insert into `chain_compact_multi$branch_snapshot` partition (dt = '20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1')")
+ spark.sql(
+ "insert into `chain_compact_multi$branch_delta` partition (dt = '20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1')")
+ spark.sql(
+ "insert into `chain_compact_multi$branch_delta` partition (dt = '20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' )")
+ spark.sql(
+ "insert into `chain_compact_multi$branch_delta` partition (dt = '20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' )")
+
+ // Compact partition 20250811/hour=02 without overwrite
+ checkAnswer(
+ spark.sql(
+ "CALL sys.compact_chain_table(table => '`chain_compact_multi`', partition => 'dt=\"20250811\",hour=\"02\"')"),
+ Row(false) :: Nil
+ )
+
+ // Verify data is unchanged after skip
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_multi` where dt = '20250811' and hour = '02'"),
+ Seq(
+ Row(1, 2, "1-1", "20250811", "02"),
+ Row(2, 2, "1-1", "20250811", "02"),
+ Row(3, 2, "1-1", "20250811", "02"),
+ Row(4, 2, "1-1", "20250811", "02"),
+ Row(5, 1, "1", "20250811", "02"),
+ Row(6, 1, "1", "20250811", "02")
+ )
+ )
+
+ // Compact with overwrite to test overwrite path
+ checkAnswer(
+ spark.sql(
+ "CALL sys.compact_chain_table(table => '`chain_compact_multi`', partition => 'dt=\"20250811\",hour=\"02\"', overwrite => true)"),
+ Row(true) :: Nil
+ )
+ checkAnswer(
+ sql("select snapshot_id,commit_kind from `chain_compact_multi$branch_snapshot$snapshots`"),
+ Seq(
+ Row(1, "APPEND"),
+ Row(2, "APPEND"),
+ Row(3, "APPEND"),
+ Row(4, "APPEND"),
+ Row(5, "OVERWRITE")
+ )
+ )
+
+ // Check all snapshot partition data
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_multi` where dt = '20250811' and hour = '02'"),
+ Seq(
+ Row(1, 2, "1-1", "20250811", "02"),
+ Row(2, 2, "1-1", "20250811", "02"),
+ Row(3, 2, "1-1", "20250811", "02"),
+ Row(4, 2, "1-1", "20250811", "02"),
+ Row(5, 2, "1-1", "20250811", "02"),
+ Row(6, 2, "1-1", "20250811", "02")
+ )
+ )
+ checkAnswer(
+ sql("SELECT * FROM `chain_compact_multi` where dt = '20250811' and hour = '00'"),
+ Seq(
+ Row(1, 2, "1-1", "20250811", "00"),
+ Row(2, 2, "1-1", "20250811", "00"),
+ Row(3, 2, "1-1", "20250811", "00"),
+ Row(4, 2, "1-1", "20250811", "00")
+ )
+ )
+ checkAnswer(
+ sql(
+ "SELECT * FROM `chain_compact_multi$branch_snapshot` where dt = '20250810' and hour = '23'"),
+ Seq(
+ Row(1, 1, "1", "20250810", "23"),
+ Row(2, 2, "1-1", "20250810", "23"),
+ Row(4, 1, "1", "20250810", "23")
+ )
+ )
+ checkAnswer(
+ sql(
+ "SELECT * FROM `chain_compact_multi$branch_snapshot` where dt = '20250810' and hour = '22'"),
+ Seq(Row(1, 1, "1", "20250810", "22"), Row(2, 1, "1", "20250810", "22"))
+ )
+ }
+
+ def setupChainTableBranches(tableName: String): Unit = {
+ spark.sql(s"CALL sys.create_branch('$tableName', 'snapshot');")
+ spark.sql(s"CALL sys.create_branch('$tableName', 'delta');")
+
+ // Set branch properties
+ spark.sql(
+ s"ALTER TABLE $tableName SET tblproperties (" +
+ "'scan.fallback-snapshot-branch' = 'snapshot', " +
+ "'scan.fallback-delta-branch' = 'delta')")
+ spark.sql(
+ s"ALTER TABLE `$tableName$$branch_snapshot` SET tblproperties (" +
+ "'scan.fallback-snapshot-branch' = 'snapshot'," +
+ "'scan.fallback-delta-branch' = 'delta')")
+ spark.sql(
+ s"ALTER TABLE `$tableName$$branch_delta` SET tblproperties (" +
+ "'scan.fallback-snapshot-branch' = 'snapshot'," +
+ "'scan.fallback-delta-branch' = 'delta')")
+ }
+}
|