[FLINK-39837][connectors] Add tombstone-based retention for removed splits in dynamic sources#28318
Open
jubins wants to merge 3 commits into
Open
[FLINK-39837][connectors] Add tombstone-based retention for removed splits in dynamic sources#28318jubins wants to merge 3 commits into
jubins wants to merge 3 commits into
Conversation
…plits in dynamic sources
Collaborator
…eSplit Ensure the Java compiler knows that SplitT must be a subtype of SourceSplit, which satisfies the type constraints required by SplitAssignmentTracker.RemovedSplitInfo<SplitT>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Fixes FLINK-39837 — Dynamic Kafka sources (and other dynamic sources) are vulnerable to metadata service instability during distributed system operations like rollouts or restarts. When metadata configs oscillate between versions (e.g.,
n → n-1 → n), the source may detect a partition/cluster removal at versionn-1and immediately purge it from the checkpoint. When versionnreappears, the split is treated as brand new and consumption resets toEARLIEST, causing duplicate processing or data loss.This PR introduces configurable tombstone-based retention at the framework level (
SplitAssignmentTracker) to make dynamic sources resilient to temporary metadata inconsistencies. When a split is removed, it can be retained in checkpoint state for a configurable duration (e.g., 24 hours). If the split reappears within the retention window, its progress is restored instead of resetting to the beginning.Brief change log
REMOVED_SPLITS_RETENTION_MSconfiguration option inSourceReaderOptions(default:0for backward compatibility, recommended:86400000for 24-hour retention in dynamic sources)SplitAssignmentTrackerwith tombstone tracking:RemovedSplitInfo<SplitT>stores removed splits with removal timestamp and last assigned subtaskmarkSplitRemoved(splitId, split, subtaskId)— records split removal as tombstone (if retention enabled)tryResurrectSplit(splitId)— attempts to restore split from tombstone if within retention windowcleanupExpiredTombstones()— automatically removes expired entries on checkpoint completionSourceCoordinatorSerdeUtils:VERSION_2to include tombstone stateserializeTombstones()anddeserializeTombstones()methodsVERSION_0andVERSION_1checkpointsSplitAssignmentTrackersnapshot/restore logic:snapshotState()now persists both assignments and tombstonesrestoreState()gracefully handles old checkpoints without tombstones (backward compatibility)SourceCoordinatorContextacceptingremovedSplitsRetentionMsparameter (default constructor preserves existing behavior)markSplitRemoved()andtryResurrectSplit()in partition discovery logicVerifying this change
This change is covered by 8 new unit tests in
SplitAssignmentTrackerTest:testMarkSplitRemovedWithRetention()— verifies tombstone creation when retention is enabledtestMarkSplitRemovedWithoutRetention()— confirms default behavior (no tombstones) is unchangedtestTryResurrectSplitWithinRetention()— validates split resurrection within retention windowtestTryResurrectSplitExpired()— verifies expired tombstones returnnulland are cleaned uptestTryResurrectNonExistentSplit()— confirms graceful handling of non-existent splitstestCleanupExpiredTombstones()— validates automatic cleanup on checkpoint completiontestSnapshotAndRestoreWithTombstones()— verifies tombstones survive checkpoint/restore cyclestestBackwardCompatibilityWithOldCheckpoints()— confirms new tracker can restore old-format checkpoints without tombstonesExisting tests in
SplitAssignmentTrackerTestcontinue to pass, confirming backward compatibility.Does this pull request potentially affect one of the following parts
@Public(Evolving): no —SplitAssignmentTracker,SourceCoordinatorSerdeUtils, andSourceCoordinatorContextare all@Internal. The newSourceReaderOptions.REMOVED_SPLITS_RETENTION_MSconfig is@PublicEvolving(standard for config options).SourceCoordinatorSerdeUtilsversion bumped toVERSION_2with full backward compatibilityDocumentation
SplitAssignmentTrackerwith comprehensive usage example for Kafka connector developersSourceReaderOptions.REMOVED_SPLITS_RETENTION_MSWas generative AI tooling used to co-author this PR?
Generated-by: Claude Opus 4.8