[spark] Support compact_chain_table procedure#7313
Conversation
58cfdb0 to
d381de3
Compare
JingsongLi
left a comment
There was a problem hiding this comment.
Thanks for adding the Spark procedure for chain table compaction.
Comments:
-
PR description is minimal — the "Purpose" section just links issue #7312 without explaining the approach. Please describe:
- How does
compact_chain_tablediffer from regularcompact? - Does it compact both snapshot and delta branches?
- What's the merge strategy for chain compaction?
- How does
-
File changes: The PR touches
ChainGroupReadTable,FallbackReadFileStoreTable, andChainSplit. What changes were needed to support compaction vs. just read? Are these refactoring prerequisites or functional changes? -
613 additions is significant. A test file (
CompactChainTableProcedureTest.scala) + procedure + supporting core changes — consider whether the core changes could be a separate prerequisite PR. -
Documentation: Good that both
chain-table.mdandprocedures.mdare updated. -
ChainSplitTest: What cases does it cover? Chain tables have complex split logic due to cross-branch data dependencies.
Please fill in the PR description with the design approach before requesting final review.
d381de3 to
7ac2113
Compare
Thank you for the thorough review and valuable feedback.
|
This PR is a prerequisite for [`[spark] Support compact_chain_table procedure`](#7313). `FallbackReadScan` currently uses a single `partitionPredicate` for both main and fallback scans. However, in chain table compaction scenarios, we need to apply different partition filters to the main (snapshot) branch and the fallback (delta) branch. For example, when overwriting an existing partition, we need to: - Exclude the target partition from the main scan - Include only the target partition in the fallback scan This PR refactors `FallbackReadScan` to support separate partition predicates for main and fallback scans.
7ac2113 to
6da3d39
Compare
|
Can you rebase master? |
c6e5450 to
35672ea
Compare
|
JingsongLi
left a comment
There was a problem hiding this comment.
I found a couple of issues while reviewing this change.
| boolean partitionExists = checkPartitionExists(snapshotTable, partition, relation); | ||
| if (partitionExists) { | ||
| if (overwrite) { | ||
| scan.withPartitionFilter( |
There was a problem hiding this comment.
This overwrite path appears to read too much data. When the target partition already exists, the snapshot-side predicate is changed to NOT (target partition), while the delta side still uses the target partition. However, ChainGroupReadTable.plan() adds all mainScan.plan() splits directly before planning the delta/anchor splits, and this procedure later rewrites every output row's partition columns to the target partition. That means unrelated snapshot partitions can be copied into the compacted target partition during overwrite. The overwrite path should only read the chain-merge result needed for the target partition, not every snapshot partition except the target.
There was a problem hiding this comment.
Thanks a lot for your review. I added a flag of preloadTargetSnapshot to skip executing mainScan.plan() when overwriting the target partition.
Appreciate your reminder, thanks.
|
|
||
| you will get the following result: | ||
| ```text | ||
| +---+----+-----+ |
There was a problem hiding this comment.
Please remove the trailing whitespace in this added result block. git diff --check origin/master...HEAD reports trailing whitespace on this block, which will fail whitespace/style checks.
There was a problem hiding this comment.
Done, thanks for your reminder.
Purpose
Linked issue: close #7312
Tests
API and Format
Documentation
Generative AI tooling
No