Skip to content

[FLINK-39837][connectors] Add tombstone-based retention for removed splits in dynamic sources#28318

Open
jubins wants to merge 3 commits into
apache:masterfrom
jubins:j-FLINK-39837-kafka-source-retention
Open

[FLINK-39837][connectors] Add tombstone-based retention for removed splits in dynamic sources#28318
jubins wants to merge 3 commits into
apache:masterfrom
jubins:j-FLINK-39837-kafka-source-retention

Conversation

@jubins
Copy link
Copy Markdown
Contributor

@jubins jubins commented Jun 4, 2026

What is the purpose of the change

Fixes FLINK-39837 — Dynamic Kafka sources (and other dynamic sources) are vulnerable to metadata service instability during distributed system operations like rollouts or restarts. When metadata configs oscillate between versions (e.g., n → n-1 → n), the source may detect a partition/cluster removal at version n-1 and immediately purge it from the checkpoint. When version n reappears, the split is treated as brand new and consumption resets to EARLIEST, causing duplicate processing or data loss.

This PR introduces configurable tombstone-based retention at the framework level (SplitAssignmentTracker) to make dynamic sources resilient to temporary metadata inconsistencies. When a split is removed, it can be retained in checkpoint state for a configurable duration (e.g., 24 hours). If the split reappears within the retention window, its progress is restored instead of resetting to the beginning.

Brief change log

  • Added REMOVED_SPLITS_RETENTION_MS configuration option in SourceReaderOptions (default: 0 for backward compatibility, recommended: 86400000 for 24-hour retention in dynamic sources)
  • Extended SplitAssignmentTracker with tombstone tracking:
    • New inner class RemovedSplitInfo<SplitT> stores removed splits with removal timestamp and last assigned subtask
    • markSplitRemoved(splitId, split, subtaskId) — records split removal as tombstone (if retention enabled)
    • tryResurrectSplit(splitId) — attempts to restore split from tombstone if within retention window
    • cleanupExpiredTombstones() — automatically removes expired entries on checkpoint completion
  • Updated checkpoint serialization in SourceCoordinatorSerdeUtils:
    • Bumped version to VERSION_2 to include tombstone state
    • Added serializeTombstones() and deserializeTombstones() methods
    • Maintains full backward compatibility with VERSION_0 and VERSION_1 checkpoints
  • Extended SplitAssignmentTracker snapshot/restore logic:
    • snapshotState() now persists both assignments and tombstones
    • restoreState() gracefully handles old checkpoints without tombstones (backward compatibility)
  • Added constructor overload to SourceCoordinatorContext accepting removedSplitsRetentionMs parameter (default constructor preserves existing behavior)
  • Comprehensive JavaDoc with Kafka connector usage example showing how to integrate markSplitRemoved() and tryResurrectSplit() in partition discovery logic

Verifying this change

This change is covered by 8 new unit tests in SplitAssignmentTrackerTest:

  • testMarkSplitRemovedWithRetention() — verifies tombstone creation when retention is enabled
  • testMarkSplitRemovedWithoutRetention() — confirms default behavior (no tombstones) is unchanged
  • testTryResurrectSplitWithinRetention() — validates split resurrection within retention window
  • testTryResurrectSplitExpired() — verifies expired tombstones return null and are cleaned up
  • testTryResurrectNonExistentSplit() — confirms graceful handling of non-existent splits
  • testCleanupExpiredTombstones() — validates automatic cleanup on checkpoint completion
  • testSnapshotAndRestoreWithTombstones() — verifies tombstones survive checkpoint/restore cycles
  • testBackwardCompatibilityWithOldCheckpoints() — confirms new tracker can restore old-format checkpoints without tombstones

Existing tests in SplitAssignmentTrackerTest continue to pass, confirming backward compatibility.

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): noSplitAssignmentTracker, SourceCoordinatorSerdeUtils, and SourceCoordinatorContext are all @Internal. The new SourceReaderOptions.REMOVED_SPLITS_RETENTION_MS config is @PublicEvolving (standard for config options).
  • The serializers: yesSourceCoordinatorSerdeUtils version bumped to VERSION_2 with full backward compatibility
  • The runtime per-record code paths (performance sensitive): no — tombstone operations occur only during split discovery/checkpointing, not per-record processing
  • Anything that affects deployment or recovery (JobManager, Checkpointing, Kubernetes/Yarn, ZooKeeper): minimal — checkpoint state size may increase slightly when retention is enabled and splits are removed, but tombstones are bounded by retention duration and cleanup automatically
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes — framework-level tombstone retention for dynamic sources
  • If yes, how is the feature documented?
    • Inline JavaDoc on SplitAssignmentTracker with comprehensive usage example for Kafka connector developers
    • Config option documentation in SourceReaderOptions.REMOVED_SPLITS_RETENTION_MS
    • (Follow-up PR will add user-facing docs once Kafka connector integration is complete)

Was generative AI tooling used to co-author this PR?

  • Yes — Claude Code was used as a pair-programming assistant for discussing the architecture, reviewing implementation patterns, and structuring the tombstone retention logic. All code was written, understood, and verified by the author.

Generated-by: Claude Opus 4.8

@jubins jubins changed the title [FLINK-39837][connectors] Add tombstone-based retention for removed s… [FLINK-39837][connectors] Add tombstone-based retention for removed splits in dynamic sources Jun 4, 2026
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 4, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

jubins added 2 commits June 4, 2026 20:38
…eSplit

Ensure the Java compiler knows that SplitT must be a subtype of SourceSplit, which satisfies the type constraints required by SplitAssignmentTracker.RemovedSplitInfo<SplitT>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants