Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion docs/content/primary-key-table/chain-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ Notice that:
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
- Chain table should ensure that the schema of each branch is consistent.
- Only spark support now, flink will be supported later.
- Chain compact is not supported for now, and it will be supported later.

After creating a chain table, you can read and write data in the following ways.

Expand Down Expand Up @@ -146,3 +145,30 @@ you will get the following result:
| 2 | 1| 1 |
+---+----+-----+
```

- Chain Table Compaction: Merge data from snapshot and delta branches into the snapshot branch.
This is useful for periodically compacting incremental data into full snapshots.
You can use the `compact_chain_table` procedure to merge a specific partition:

```sql
CALL sys.compact_chain_table(table => 'default.t', partition => 'date="20250811"');
```

After compaction, the data in the snapshot branch will contain the merged result from both snapshot
and delta branches, and subsequent queries will benefit from direct snapshot access without
merge-on-read overhead.

```sql
select t1, t2, t3 from `default`.`t$branch_snapshot` where date = '20250811';
```

you will get the following result:
```text
+---+----+-----+
| t1| t2| t3|
+---+----+-----+
| 1 | 1| 1 |
| 2 | 1| 1 |
+---+----+-----+
```

12 changes: 12 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ This section introduce all available spark procedures about paimon.
CALL sys.compact(table => 'T', compact_strategy => 'minor')<br/><br/>
</td>
</tr>
<tr>
<td>compact_chain_table</td>
<td>
To compact chain table by merging snapshot and delta branches into the snapshot branch. Arguments:
<li>table: The target chain table identifier. Cannot be empty.</li>
<li>partition: Partition specification format (e.g., 'dt="20250810",hour="22"'). Cannot be empty.</li>
<li>overwrite: Whether to overwrite if the partition already exists in the snapshot branch. Default is false. Optional.</li>
</td>
<td>
CALL sys.compact_chain_table(table => 'default.T', partition => 'dt="20250810",hour="22"')<br/><br/>
</td>
</tr>
<tr>
<td>expire_snapshots</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -169,7 +171,7 @@ public Plan plan() {
splits.add(
new ChainSplit(
dataSplit.partition(),
dataSplit.dataFiles(),
Lists.newArrayList(dataSplit),
fileBucketPathMapping,
fileBranchMapping));
completePartitions.add(dataSplit.partition());
Expand Down Expand Up @@ -277,10 +279,7 @@ public Plan plan() {
ChainSplit split =
new ChainSplit(
partitionParis.getKey(),
entry.getValue().stream()
.flatMap(
datsSplit -> datsSplit.dataFiles().stream())
.collect(Collectors.toList()),
entry.getValue(),
fileBucketPathMapping,
fileBranchMapping);
splits.add(split);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ public InnerTableScan withPartitionFilter(PartitionPredicate partitionPredicate)
return this;
}

public InnerTableScan withPartitionFilter(
PartitionPredicate mainPartitionPredicate,
PartitionPredicate fallbackPartitionPredicate) {
mainScan.withPartitionFilter(mainPartitionPredicate);
fallbackScan.withPartitionFilter(fallbackPartitionPredicate);
return this;
}

@Override
public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter) {
mainScan.withBucketFilter(bucketFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputView;
Expand All @@ -36,6 +35,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.stream.Collectors;

/**
* A split describes chain table read scope. It follows DataSplit's custom serialization pattern and
Expand All @@ -48,17 +48,24 @@ public class ChainSplit implements Split {
private static final int VERSION = 1;

private BinaryRow logicalPartition;
private List<DataSplit> dataSplits;
private List<DataFileMeta> dataFiles;
private Map<String, String> fileBranchMapping;
private Map<String, String> fileBucketPathMapping;

public ChainSplit(
BinaryRow logicalPartition,
List<DataFileMeta> dataFiles,
List<DataSplit> dataSplits,
Map<String, String> fileBranchMapping,
Map<String, String> fileBucketPathMapping) {
this.logicalPartition = logicalPartition;
this.dataFiles = dataFiles;
this.dataSplits = dataSplits;
this.dataFiles =
dataSplits == null
? null
: dataSplits.stream()
.flatMap(dataSplit -> dataSplit.dataFiles().stream())
.collect(Collectors.toList());
this.fileBranchMapping = fileBranchMapping;
this.fileBucketPathMapping = fileBucketPathMapping;
}
Expand All @@ -67,6 +74,10 @@ public BinaryRow logicalPartition() {
return logicalPartition;
}

public List<DataSplit> dataSplits() {
return dataSplits;
}

public List<DataFileMeta> dataFiles() {
return dataFiles;
}
Expand Down Expand Up @@ -103,12 +114,12 @@ public boolean equals(Object o) {
}
ChainSplit that = (ChainSplit) o;
return Objects.equals(logicalPartition, that.logicalPartition)
&& Objects.equals(dataFiles, that.dataFiles);
&& Objects.equals(dataSplits, that.dataSplits);
}

@Override
public int hashCode() {
return Objects.hash(logicalPartition, dataFiles);
return Objects.hash(logicalPartition, dataSplits);
}

private void writeObject(ObjectOutputStream out) throws IOException {
Expand All @@ -121,6 +132,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE

protected void assign(ChainSplit other) {
this.logicalPartition = other.logicalPartition;
this.dataSplits = other.dataSplits;
this.dataFiles = other.dataFiles;
this.fileBranchMapping = other.fileBranchMapping;
this.fileBucketPathMapping = other.fileBucketPathMapping;
Expand All @@ -131,12 +143,11 @@ public void serialize(DataOutputView out) throws IOException {

SerializationUtils.serializeBinaryRow(logicalPartition, out);

DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
int size = dataFiles == null ? 0 : dataFiles.size();
int size = dataSplits == null ? 0 : dataSplits.size();
out.writeInt(size);
if (size > 0) {
for (DataFileMeta file : dataFiles) {
dataFileSer.serialize(file, out);
for (DataSplit file : dataSplits) {
file.serialize(out);
}
}

Expand All @@ -161,10 +172,9 @@ public static ChainSplit deserialize(DataInputView in) throws IOException {
BinaryRow logicalPartition = SerializationUtils.deserializeBinaryRow(in);

int n = in.readInt();
List<DataFileMeta> dataFiles = new ArrayList<>(n);
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
List<DataSplit> dataSplits = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
dataFiles.add(dataFileSer.deserialize(in));
dataSplits.add(DataSplit.deserialize(in));
}

int size = in.readInt();
Expand All @@ -183,6 +193,6 @@ public static ChainSplit deserialize(DataInputView in) throws IOException {
}

return new ChainSplit(
logicalPartition, dataFiles, fileBucketPathMapping, fileBranchMapping);
logicalPartition, dataSplits, fileBucketPathMapping, fileBranchMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.io.DataFileTestDataGenerator;
import org.apache.paimon.utils.InstantiationUtil;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -37,6 +39,22 @@

/** Test for {@link ChainSplit}. */
public class ChainSplitTest {
private DataSplit newDataSplit(
boolean rawConvertible,
List<DataFileMeta> dataFiles,
List<DeletionFile> deletionFiles) {
DataSplit.Builder builder = DataSplit.builder();
builder.withSnapshot(1)
.withPartition(BinaryRow.EMPTY_ROW)
.withBucket(1)
.withBucketPath("my path")
.rawConvertible(rawConvertible)
.withDataFiles(dataFiles);
if (deletionFiles != null) {
builder.withDataDeletionFiles(deletionFiles);
}
return builder.build();
}

@Test
public void testChainSplitSerde() throws IOException, ClassNotFoundException {
Expand All @@ -48,6 +66,7 @@ public void testChainSplitSerde() throws IOException, ClassNotFoundException {
for (int i = 0; i < 3; i++) {
dataFiles.add(gen.next().meta);
}
DataSplit dataSplit = newDataSplit(true, dataFiles, null);
Map<String, String> fileBucketPathMapping = new HashMap<>();
Map<String, String> fileBranchMapping = new HashMap<>();
for (DataFileMeta dataFile : dataFiles) {
Expand All @@ -56,7 +75,10 @@ public void testChainSplitSerde() throws IOException, ClassNotFoundException {
}
ChainSplit split =
new ChainSplit(
logicalPartition, dataFiles, fileBucketPathMapping, fileBranchMapping);
logicalPartition,
Lists.newArrayList(dataSplit),
fileBucketPathMapping,
fileBranchMapping);
byte[] bytes = InstantiationUtil.serializeObject(split);
ChainSplit newSplit =
InstantiationUtil.deserializeObject(bytes, ChainSplit.class.getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
import org.apache.paimon.spark.procedure.CompactChainTableProcedure;
import org.apache.paimon.spark.procedure.CompactDatabaseProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
Expand Down Expand Up @@ -100,6 +101,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
procedureBuilders.put("compact", CompactProcedure::builder);
procedureBuilders.put("compact_database", CompactDatabaseProcedure::builder);
procedureBuilders.put("compact_chain_table", CompactChainTableProcedure::builder);
procedureBuilders.put("rescale", RescaleProcedure::builder);
procedureBuilders.put("migrate_database", MigrateDatabaseProcedure::builder);
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
Expand Down
Loading
Loading