Skip to content

Core, Spark: Fix copy plan to include live files with expired snapshot IDs in RewriteTablePath#15975

Closed
huaxingao wants to merge 3 commits intoapache:mainfrom
huaxingao:issue_14458
Closed

Core, Spark: Fix copy plan to include live files with expired snapshot IDs in RewriteTablePath#15975
huaxingao wants to merge 3 commits intoapache:mainfrom
huaxingao:issue_14458

Conversation

@huaxingao
Copy link
Copy Markdown
Contributor

Fixes #14458

During a full table copy, RewriteTablePathSparkAction builds deltaSnapshotIds from endMetadata.snapshots(), which excludes expired snapshots. Since entry.snapshotId() records the adding snapshot, live files whose adding snapshot was expired are excluded from the copy plan, resulting in a broken table at the target location.

This fix makes the snapshotIds filter nullable in RewriteTablePathUtil. For full copies (startMetadata == null), null is passed to skip the filter and include all live entries in the copy plan.

@anuragmantri
Copy link
Copy Markdown
Contributor

@krisnaru could you take a look at this as you have been working on this? Thanks.

Comment on lines -432 to -434
// keep the following entries in metadata but exclude them from copyPlan
// 1) deleted data files
// 2) entries not changed by snapshotIds
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we remove this comment : this is still valid for incremental copies right ? where snapshotIds is not null ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the comment is still valid for incremental copies. Added it back with a note that the snapshotIds only applies for incremental copies.

Comment on lines +372 to +373
* @param snapshotIds snapshot ids for filtering returned delete manifest entries. When null, all
* live entries are included in the copy plan regardless of their snapshot id.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is null being reused to keep the public api as it is ? i wonder if there is a better way to do it, just have a signature which say include all ? default false

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I used null to avoid adding a new public method to the utility class. The Javadoc documents the null behavior: when null, all live entries are included for full copies. I can add an overloaded method without the snapshotIds parameter if you think that's cleaner.

// For full copy (no start version), pass null so all live entries are included
// in the copy plan. Current snapshot IDs may not cover entries whose original
// adding snapshot has been expired, but those files can still be live.
Set<Snapshot> snapshotFilter = startMetadata == null ? null : deltaSnapshots;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter sounds like it should return true or false right ? but its not the case do you wanna wrap this to Predicate ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, filter is misleading here. Renamed to deltaSnapshotIds.

@huaxingao
Copy link
Copy Markdown
Contributor Author

@singhpk234 Thanks for your review! I have addressed all the comments, could you take one more look when you have a moment? Thanks!

// 2) entries not changed by snapshotIds
if (entry.isLive() && snapshotIds.contains(entry.snapshotId())) {
// 2) entries not changed by snapshotIds (incremental copy only)
if (entry.isLive() && (snapshotIds == null || snapshotIds.contains(entry.snapshotId()))) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix for full copies (passing null snapshotIds) looks correct, but I believe the same class of bug persists for incremental copies when RewriteManifests runs between replications.

Scenario:

  1. Incremental replication at T1 captures startVersion. Source has {S1, S2}.
  2. Between T1 and T2: append → S3(file_C), RewriteManifestsS4, expire S3.
  3. Incremental at T2:
    • deltaSnapshotIds = {S4} (S3 expired, not in endMetadata.snapshots())
    • S4's rewritten manifests are selected for processing ✓
    • But entry.snapshotId() = S3 for file_C, and {S4}.contains(S3) == false
    • file_C is excluded from copyPlanlive data silently lost

The root cause: deltaSnapshotIds is derived from endMetadata.snapshots() in RewriteTablePathSparkAction), which excludes expired snapshots. When
RewriteManifests reorganizes entries from expired snapshots into new manifests, the manifest is processed but the entry-level filter drops files whose adding snapshot was expired.

Should the incremental case also account for expired snapshot IDs — perhaps by
collecting all snapshot IDs that existed between start and end versions from the metadata log, or by using a different strategy (e.g., comparing against already-replicated file
sets)

@krisnaru
Copy link
Copy Markdown

krisnaru commented Apr 28, 2026

The incremental case is still broken

Scenario: Incremental replication after append + RewriteManifests + snapshot expiration:

  1. T1: Incremental replication done. Source has {S1, S2}. Target has all files.
  2. Between T1 and T2: Append → S3 (file C), RewriteManifests → S4 (merges all manifests), Expire → S3 removed from metadata.
  3. T2: Next incremental replication (start=T1):
    - endMetadata.snapshots() = {S1, S2, S4} (S3 expired)
    - deltaSnapshotIds = {S4} (lines 370-372 in RewriteTablePathSparkAction)
    - S4's rewritten manifests are selected for processing (ManifestFile.snapshotId = S4) ✓
    - But file C has entry.snapshotId() = S3, and {S4}.contains(S3) == false
    - File C is excluded from copyPlan — live data silently lost

The root cause: deltaSnapshotIds is derived from endMetadata.snapshots(), which excludes expired snapshots. When a file's adding snapshot is expired between replications, the entry-level filter drops it even though it's live and unreplicated.

Why snapshotId-based entry filtering is fundamentally fragile

entry.snapshotId() reflects the snapshot that originally added the file — which can be expired at any time by table maintenance. Using it as a correctness-critical filter for determining which files to copy means the copy plan's correctness depends on snapshot retention policy, which is outside the replication system's control.

A more robust approach (which we've tested extensively in our implementation) is:

  1. Include all live entries in the copy plan — filter by entry.isLive() only, no snapshotId check
  2. Deduplicate for incremental copies via anti-join — compare collected files against files already present at the target (from startVersion), using a distributed anti-join rather than a snapshot ID set
  3. Recover expired snapshot IDs from version history — if snapshot ID filtering is needed at the manifest level, iterate through intermediate version files (not just endMetadata) to collect all snapshot IDs including expired ones

This design is immune to any combination of RewriteManifests, RewriteDataFiles, and snapshot expiration between replications. We have 15+ integration tests covering every combination (append/overwrite/delete + rewrite manifests/data files + expire) and they all pass.

@huaxingao
Copy link
Copy Markdown
Contributor Author

Thanks @krisnaru, you're right. My PR only fixes the full-copy path. Since you already have a tested implementation that fixes both the full and incremental cases, I think it makes more sense for you to send the fix. Are you planning to open a PR for #14458? If yes, I'll close this one and review yours. If you'd rather not, let me know and I'll expand this PR to cover both cases.

@krisnaru
Copy link
Copy Markdown

krisnaru commented May 4, 2026

Thanks @krisnaru, you're right. My PR only fixes the full-copy path. Since you already have a tested implementation that fixes both the full and incremental cases, I think it makes more sense for you to send the fix. Are you planning to open a PR for #14458? If yes, I'll close this one and review yours. If you'd rather not, let me know and I'll expand this PR to cover both cases.

Sure @huaxingao I will open up new PR this week, you can close this one.

@huaxingao huaxingao closed this May 4, 2026
@krisnaru
Copy link
Copy Markdown

krisnaru commented May 5, 2026

Thanks @huaxingao , I opened up new PR #16220 fixing this issue for both full and incrementals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Remove incorrect snapshotId filtering in RewriteTablePathSparkAction

4 participants