Skip to content

[Feature] [spark] Support compact_chain_table procedure #7312

@juntaozhang

Description

@juntaozhang

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

This PR implements compact_chain_table procedure for Spark, which compacts chain table data from delta branch into snapshot branch.

Chain table is a Paimon feature (see PIP-37) that maintains:

  • Snapshot branch: Full data from previous cycle
  • Delta branch: Incremental data from current cycle

PIP-37 ChainCompact:
The compaction merges these two branches to generate the new full data for the target partition.

1. How does compact_chain_table differ from regular compact?

compact procedure compacts files within a single table/branch by merging small files and removing duplicates.

compact_chain_table is specifically designed for chain tables (tables with a fallback/delta branch structure, like ChainGroupReadTable). It:

  • Reads data from both the snapshot branch (main/wrapped table) and the delta branch (fallback/other table)
  • Merges them together and writes the result back to the snapshot branch
  • Effectively "materializes" the chain view into the snapshot branch

2. Does it compact both snapshot and delta branches?

No. It only writes to the snapshot branch. The delta branch remains untouched.

3. What's the merge strategy for chain compaction?

The merge strategy is implicit in FallbackReadScan's read path:

  • Partition exists in both branches: read from snapshot branch (snapshot has priority)
  • Partition exists only in delta: read from delta branch

After compaction:

  • Delta-only partitions become present in snapshot
  • Partitions existing in both are refreshed with snapshot-priority merged state

4. example

  Initial state (dt=20250723 only exists in delta, not in snapshot):

  ┌──────────┬─────────────┬──────────────────────┐
  │ Branch   │ Partition   │ Data                 │
  ├──────────┼─────────────┼──────────────────────┤
  │ snapshot │ dt=20250722 │ (1, Alice), (2, Bob) │
  │ delta    │ dt=20250723 │ (3, Charlie)         │
  └──────────┴─────────────┴──────────────────────┘

  After CALL sys.compact_chain_table(table => 'db.t', partition => 'dt="20250723"'):

  ┌──────────┬─────────────┬────────────────────────────────────────────────────────────────────┐
  │ Branch   │ Partition   │ Data                                                               │
  ├──────────┼─────────────┼────────────────────────────────────────────────────────────────────┤
  │ snapshot │ dt=20250722 │ (1, Alice), (2, Bob)                                               │
  │ snapshot │ dt=20250723 │ (1, Alice), (2, Bob), (3, Charlie) ← merged from delta to snapshot │
  │ delta    │ dt=20250723 │ (3, Charlie) ← unchanged                                           │
  └──────────┴─────────────┴────────────────────────────────────────────────────────────────────┘

Overwrite scenario (dt=20250722 already exists in snapshot, delta has updates):


  ┌──────────┬─────────────┬──────────────────────────────────┐
  │ Branch   │ Partition   │ Data                             │
  ├──────────┼─────────────┼──────────────────────────────────┤
  │ snapshot │ dt=20250721 │ (1, Alice), (2, Bob)             │
  │ snapshot │ dt=20250722 │ (1, Alice), (2, Bob), (4, David) │
  │ delta    │ dt=20250722 │ (3, Charlie)                     │
  └──────────┴─────────────┴──────────────────────────────────┘

  After CALL sys.compact_chain_table(table => 'db.t', partition => 'dt="20250722"', overwrite => true):

  ┌──────────┬─────────────┬───────────────────────────────────────────────────────────────────┐
  │ Branch   │ Partition   │ Data                                                              │
  ├──────────┼─────────────┼───────────────────────────────────────────────────────────────────┤
  │ snapshot │ dt=20250721 │ (1, Alice), (2, Bob)                                              │
  │ snapshot │ dt=20250722 │ (1, Alice), (2, Bob), (3, Charlie) ← overwritten with merged data │
  │ delta    │ dt=20250722 │ (3, Charlie) ← unchanged                                          │
  └──────────┴─────────────┴───────────────────────────────────────────────────────────────────┘

Solution

Prerequisite

Core changes (FallbackReadFileStoreTable, ChainGroupReadTable) are extracted to separate PR #7950, which introduces separate partition predicates for main and fallback.

Changes

  • Add CompactChainTableProcedure with parameters: table, partition, overwrite (optional)
  • Register procedure in SparkProcedures

Tests

  • Add CompactChainTableProcedureTest

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions