Search before asking
Motivation
This PR implements compact_chain_table procedure for Spark, which compacts chain table data from delta branch into snapshot branch.
Chain table is a Paimon feature (see PIP-37) that maintains:
- Snapshot branch: Full data from previous cycle
- Delta branch: Incremental data from current cycle
PIP-37 ChainCompact:
The compaction merges these two branches to generate the new full data for the target partition.
1. How does compact_chain_table differ from regular compact?
compact procedure compacts files within a single table/branch by merging small files and removing duplicates.
compact_chain_table is specifically designed for chain tables (tables with a fallback/delta branch structure, like ChainGroupReadTable). It:
- Reads data from both the snapshot branch (main/wrapped table) and the delta branch (fallback/other table)
- Merges them together and writes the result back to the snapshot branch
- Effectively "materializes" the chain view into the snapshot branch
2. Does it compact both snapshot and delta branches?
No. It only writes to the snapshot branch. The delta branch remains untouched.
3. What's the merge strategy for chain compaction?
The merge strategy is implicit in FallbackReadScan's read path:
- Partition exists in both branches: read from snapshot branch (snapshot has priority)
- Partition exists only in delta: read from delta branch
After compaction:
- Delta-only partitions become present in snapshot
- Partitions existing in both are refreshed with snapshot-priority merged state
4. example
Initial state (dt=20250723 only exists in delta, not in snapshot):
┌──────────┬─────────────┬──────────────────────┐
│ Branch │ Partition │ Data │
├──────────┼─────────────┼──────────────────────┤
│ snapshot │ dt=20250722 │ (1, Alice), (2, Bob) │
│ delta │ dt=20250723 │ (3, Charlie) │
└──────────┴─────────────┴──────────────────────┘
After CALL sys.compact_chain_table(table => 'db.t', partition => 'dt="20250723"'):
┌──────────┬─────────────┬────────────────────────────────────────────────────────────────────┐
│ Branch │ Partition │ Data │
├──────────┼─────────────┼────────────────────────────────────────────────────────────────────┤
│ snapshot │ dt=20250722 │ (1, Alice), (2, Bob) │
│ snapshot │ dt=20250723 │ (1, Alice), (2, Bob), (3, Charlie) ← merged from delta to snapshot │
│ delta │ dt=20250723 │ (3, Charlie) ← unchanged │
└──────────┴─────────────┴────────────────────────────────────────────────────────────────────┘
Overwrite scenario (dt=20250722 already exists in snapshot, delta has updates):
┌──────────┬─────────────┬──────────────────────────────────┐
│ Branch │ Partition │ Data │
├──────────┼─────────────┼──────────────────────────────────┤
│ snapshot │ dt=20250721 │ (1, Alice), (2, Bob) │
│ snapshot │ dt=20250722 │ (1, Alice), (2, Bob), (4, David) │
│ delta │ dt=20250722 │ (3, Charlie) │
└──────────┴─────────────┴──────────────────────────────────┘
After CALL sys.compact_chain_table(table => 'db.t', partition => 'dt="20250722"', overwrite => true):
┌──────────┬─────────────┬───────────────────────────────────────────────────────────────────┐
│ Branch │ Partition │ Data │
├──────────┼─────────────┼───────────────────────────────────────────────────────────────────┤
│ snapshot │ dt=20250721 │ (1, Alice), (2, Bob) │
│ snapshot │ dt=20250722 │ (1, Alice), (2, Bob), (3, Charlie) ← overwritten with merged data │
│ delta │ dt=20250722 │ (3, Charlie) ← unchanged │
└──────────┴─────────────┴───────────────────────────────────────────────────────────────────┘
Solution
Prerequisite
Core changes (FallbackReadFileStoreTable, ChainGroupReadTable) are extracted to separate PR #7950, which introduces separate partition predicates for main and fallback.
Changes
- Add
CompactChainTableProcedure with parameters: table, partition, overwrite (optional)
- Register procedure in
SparkProcedures
Tests
- Add
CompactChainTableProcedureTest
Anything else?
No response
Are you willing to submit a PR?
Search before asking
Motivation
This PR implements
compact_chain_tableprocedure for Spark, which compacts chain table data from delta branch into snapshot branch.Chain table is a Paimon feature (see PIP-37) that maintains:
PIP-37 ChainCompact:
The compaction merges these two branches to generate the new full data for the target partition.
1. How does
compact_chain_tablediffer from regularcompact?compactprocedure compacts files within a single table/branch by merging small files and removing duplicates.compact_chain_tableis specifically designed for chain tables (tables with a fallback/delta branch structure, like ChainGroupReadTable). It:2. Does it compact both snapshot and delta branches?
No. It only writes to the snapshot branch. The delta branch remains untouched.
3. What's the merge strategy for chain compaction?
The merge strategy is implicit in
FallbackReadScan's read path:After compaction:
4. example
Overwrite scenario (dt=20250722 already exists in snapshot, delta has updates):
Solution
Prerequisite
Core changes (
FallbackReadFileStoreTable,ChainGroupReadTable) are extracted to separate PR #7950, which introduces separate partition predicates for main and fallback.Changes
CompactChainTableProcedurewith parameters:table,partition,overwrite(optional)SparkProceduresTests
CompactChainTableProcedureTestAnything else?
No response
Are you willing to submit a PR?