Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions docs/docs/primary-key-table/chain-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ Notice that:
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
- Chain table should ensure that the schema of each branch is consistent.
- Both Spark and Flink batch read/write are supported. Flink streaming read/write is not supported.
- Chain compact is not supported for now, and it will be supported later.
- Deletion vector is not supported for chain table.

After creating a chain table, you can read and write data in the following ways.
Expand All @@ -153,12 +152,12 @@ select t1, t2, t3 from default.t where date = '20250811'
```
you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
```

- Incremental Query: Read the incremental partition from t$branch_delta
Expand All @@ -167,11 +166,11 @@ select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
```
you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 2 | 1| 1 |
+---+----+-----+
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 2 | 1| 1 |
+---+----+-----+
```

- Hybrid Query: Read both full and incremental data simultaneously.
Expand All @@ -182,13 +181,39 @@ select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
```
you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
```

- Chain Table Compaction: Merge data from snapshot and delta branches into the snapshot branch.
This is useful for periodically compacting incremental data into full snapshots.
You can use the `compact_chain_table` procedure to merge a specific partition:

```sql
CALL sys.compact_chain_table(table => 'default.t', partition => 'date="20250811"');
```

After compaction, the data in the snapshot branch will contain the merged result from both snapshot
and delta branches, and subsequent queries will benefit from direct snapshot access without
merge-on-read overhead.

```sql
select t1, t2, t3 from `default`.`t$branch_snapshot` where date = '20250811';
```

you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
```

## Group Partition
Expand Down
12 changes: 12 additions & 0 deletions docs/docs/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ This section introduce all available spark procedures about paimon.
CALL sys.compact_database(including_databases => 'db1', options => 'target-file-size=128m')
</td>
</tr>
<tr>
<td>compact_chain_table</td>
<td>
To compact chain table by merging snapshot and delta branches into the snapshot branch. Arguments:
<li>table: The target chain table identifier. Cannot be empty.</li>
<li>partition: Partition specification format (e.g., 'dt="20250810",hour="22"'). Cannot be empty.</li>
<li>overwrite: Whether to overwrite if the partition already exists in the snapshot branch. Default is false. Optional.</li>
</td>
<td>
CALL sys.compact_chain_table(table => 'default.T', partition => 'dt="20250810",hour="22"')<br/><br/>
</td>
</tr>
<tr>
<td>expire_snapshots</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public static class ChainTableBatchScan extends FallbackReadScan {
private final ChainPartitionProjector partitionProjector;
private Predicate dataPredicate;
private Filter<Integer> bucketFilter;
protected boolean preloadTargetSnapshot = true;

public ChainTableBatchScan(
TableSchema tableSchema, ChainGroupReadTable chainGroupReadTable) {
Expand Down Expand Up @@ -210,6 +211,11 @@ public ChainTableBatchScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

public FallbackReadScan skipPreloadTargetSnapshot() {
this.preloadTargetSnapshot = false;
return this;
}

/**
* Builds a plan for chain tables.
*
Expand Down Expand Up @@ -237,26 +243,7 @@ public ChainTableBatchScan withBucketFilter(Filter<Integer> bucketFilter) {
public Plan plan() {
List<Split> splits = new ArrayList<>();
PredicateBuilder builder = new PredicateBuilder(tableSchema.logicalPartitionType());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
HashMap<String, String> fileBucketPathMapping = new HashMap<>();
HashMap<String, String> fileBranchMapping = new HashMap<>();
for (DataFileMeta file : dataSplit.dataFiles()) {
fileBucketPathMapping.put(file.fileName(), ((DataSplit) split).bucketPath());
fileBranchMapping.put(file.fileName(), options.scanFallbackSnapshotBranch());
}
splits.add(
new ChainSplit(
dataSplit.partition(),
dataSplit.dataFiles(),
fileBranchMapping,
fileBucketPathMapping));
}

Set<BinaryRow> snapshotPartitions =
new HashSet<>(
newChainPartitionListingScan(true, getMainPartitionPredicate())
.listPartitions());
Set<BinaryRow> snapshotPartitions = preloadTargetSnapshotSplits(splits);

DataTableScan deltaPartitionScan =
newChainPartitionListingScan(false, getFallbackPartitionPredicate());
Expand Down Expand Up @@ -453,6 +440,34 @@ private DataTableScan newChainPartitionListingScan(
return scan;
}

private Set<BinaryRow> preloadTargetSnapshotSplits(List<Split> splits) {
Set<BinaryRow> snapshotPartitions = new HashSet<>();
if (!preloadTargetSnapshot) {
return snapshotPartitions;
}

for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
HashMap<String, String> fileBucketPathMapping = new HashMap<>();
HashMap<String, String> fileBranchMapping = new HashMap<>();
for (DataFileMeta file : dataSplit.dataFiles()) {
fileBucketPathMapping.put(file.fileName(), ((DataSplit) split).bucketPath());
fileBranchMapping.put(file.fileName(), options.scanFallbackSnapshotBranch());
}
splits.add(
new ChainSplit(
dataSplit.partition(),
dataSplit.dataFiles(),
fileBranchMapping,
fileBucketPathMapping));
}

snapshotPartitions.addAll(
newChainPartitionListingScan(true, getMainPartitionPredicate())
.listPartitions());
return snapshotPartitions;
}

private DataTableScan newFilteredScan(boolean snapshot) {
DataTableScan scan =
snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
import org.apache.paimon.spark.procedure.CompactChainTableProcedure;
import org.apache.paimon.spark.procedure.CompactDatabaseProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
Expand Down Expand Up @@ -103,6 +104,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("rename_branch", RenameBranchProcedure::builder);
procedureBuilders.put("compact", CompactProcedure::builder);
procedureBuilders.put("compact_database", CompactDatabaseProcedure::builder);
procedureBuilders.put("compact_chain_table", CompactChainTableProcedure::builder);
procedureBuilders.put("rescale", RescaleProcedure::builder);
procedureBuilders.put("migrate_database", MigrateDatabaseProcedure::builder);
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
Expand Down
Loading
Loading