From 5a20fc1638164fd4b6e723dc52b3339e71e50135 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Thu, 4 Jun 2026 15:55:21 -0700 Subject: [PATCH 1/3] [FLINK-39837][connectors] Add tombstone-based retention for removed splits in dynamic sources --- .../source/reader/SourceReaderOptions.java | 11 + .../coordinator/SourceCoordinatorContext.java | 20 +- .../SourceCoordinatorSerdeUtils.java | 90 +++++- .../coordinator/SplitAssignmentTracker.java | 273 +++++++++++++++++- .../SplitAssignmentTrackerTest.java | 164 +++++++++++ 5 files changed, 550 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java index 71dafd85fd0d0..1c75515c432b1 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java @@ -39,6 +39,17 @@ public class SourceReaderOptions { .defaultValue(2) .withDescription("The capacity of the element queue in the source reader."); + public static final ConfigOption REMOVED_SPLITS_RETENTION_MS = + ConfigOptions.key("source.removed-splits.retention.ms") + .longType() + .defaultValue(0L) + .withDescription( + "The duration in milliseconds to retain removed splits/partitions in " + + "checkpoint state before permanently removing them. This helps maintain " + + "progress tracking during temporary metadata service instability. " + + "Default is 0 (immediate removal, current behavior). " + + "Recommended value for dynamic sources: 86400000 (24 hours)."); + // --------------- final fields ---------------------- public final long sourceReaderCloseTimeout; public final int elementQueueCapacity; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 3f9d78b58643a..a11ef9e0f38e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -128,6 +128,24 @@ public SourceCoordinatorContext( OperatorCoordinator.Context operatorCoordinatorContext, SimpleVersionedSerializer splitSerializer, boolean supportsConcurrentExecutionAttempts) { + this( + jobID, + coordinatorThreadFactory, + numWorkerThreads, + operatorCoordinatorContext, + splitSerializer, + supportsConcurrentExecutionAttempts, + 0L); // Default: no retention (current behavior) + } + + public SourceCoordinatorContext( + JobID jobID, + SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, + int numWorkerThreads, + OperatorCoordinator.Context operatorCoordinatorContext, + SimpleVersionedSerializer splitSerializer, + boolean supportsConcurrentExecutionAttempts, + long removedSplitsRetentionMs) { this( jobID, Executors.newScheduledThreadPool(1, coordinatorThreadFactory), @@ -138,7 +156,7 @@ public SourceCoordinatorContext( coordinatorThreadFactory, operatorCoordinatorContext, splitSerializer, - new SplitAssignmentTracker<>(), + new SplitAssignmentTracker<>(removedSplitsRetentionMs), supportsConcurrentExecutionAttempts); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java index ce1e9fe0a6f52..b055793b1117a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java @@ -36,9 +36,10 @@ public class SourceCoordinatorSerdeUtils { public static final int VERSION_0 = 0; public static final int VERSION_1 = 1; + public static final int VERSION_2 = 2; // Added tombstone support /** The current source coordinator serde version. */ - private static final int CURRENT_VERSION = VERSION_1; + private static final int CURRENT_VERSION = VERSION_2; /** Private constructor for utility class. */ private SourceCoordinatorSerdeUtils() {} @@ -117,4 +118,91 @@ static Map> deserializeAssignments( return assignments; } } + + /** + * Serialize tombstone entries for removed splits. + * + * @param tombstones map of split IDs to their removal information + * @param splitSerializer serializer for split objects + * @return serialized tombstone data + */ + static byte[] serializeTombstones( + Map> tombstones, + SimpleVersionedSerializer splitSerializer) + throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputViewStreamWrapper(baos)) { + out.writeInt(splitSerializer.getVersion()); + + int numTombstones = tombstones.size(); + out.writeInt(numTombstones); + + for (Map.Entry> entry : + tombstones.entrySet()) { + // Write split ID + out.writeUTF(entry.getKey()); + + SplitAssignmentTracker.RemovedSplitInfo info = entry.getValue(); + + // Write removal timestamp + out.writeLong(info.getRemovalTimestamp()); + + // Write last assigned subtask ID + out.writeInt(info.getLastAssignedSubtaskId()); + + // Write serialized split + byte[] serializedSplit = splitSerializer.serialize(info.getSplit()); + out.writeInt(serializedSplit.length); + out.write(serializedSplit); + } + + out.flush(); + return baos.toByteArray(); + } + } + + /** + * Deserialize tombstone entries for removed splits. + * + * @param tombstoneData serialized tombstone data + * @param splitSerializer serializer for split objects + * @return map of split IDs to their removal information + */ + static Map> + deserializeTombstones( + byte[] tombstoneData, SimpleVersionedSerializer splitSerializer) + throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(tombstoneData); + DataInputStream in = new DataInputViewStreamWrapper(bais)) { + int splitSerializerVersion = in.readInt(); + + int numTombstones = in.readInt(); + Map> tombstones = + new HashMap<>(numTombstones); + + for (int i = 0; i < numTombstones; i++) { + // Read split ID + String splitId = in.readUTF(); + + // Read removal timestamp + long removalTimestamp = in.readLong(); + + // Read last assigned subtask ID + int lastAssignedSubtaskId = in.readInt(); + + // Read serialized split + int serializedSplitSize = in.readInt(); + byte[] serializedSplit = readBytes(in, serializedSplitSize); + SplitT split = + splitSerializer.deserialize(splitSerializerVersion, serializedSplit); + + tombstones.put( + splitId, + new SplitAssignmentTracker.RemovedSplitInfo<>( + split, removalTimestamp, lastAssignedSubtaskId)); + } + + return tombstones; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java index 87525647e1b3a..04a3f5f685fef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java @@ -28,9 +28,11 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -38,6 +40,65 @@ Licensed to the Apache Software Foundation (ASF) under one /** * A class that is responsible for tracking the past split assignments made by {@link * SplitEnumerator}. + * + *

This tracker now supports tombstone-based retention of removed splits to handle metadata + * service instability in dynamic sources. When splits/partitions are removed due to temporary + * metadata inconsistencies, they can be retained in checkpoint state for a configurable duration. + * If the split reappears within the retention window, its progress is recovered instead of + * starting from scratch. + * + *

Usage Example for Kafka Connector: + * + *

{@code
+ * // In KafkaSourceEnumerator:
+ *
+ * // When discovering topology changes:
+ * public void discoverNewPartitions() {
+ *     Set currentSplits = fetchCurrentTopology();
+ *     Set currentSplitIds = currentSplits.stream()
+ *         .map(KafkaPartitionSplit::splitId)
+ *         .collect(Collectors.toSet());
+ *
+ *     // Check for removed splits
+ *     for (String existingSplitId : assignedSplits.keySet()) {
+ *         if (!currentSplitIds.contains(existingSplitId)) {
+ *             KafkaPartitionSplit removedSplit = assignedSplits.get(existingSplitId);
+ *             int subtaskId = getAssignedSubtaskId(existingSplitId);
+ *
+ *             // Mark as removed (will be stored as tombstone)
+ *             context.getAssignmentTracker()
+ *                 .markSplitRemoved(existingSplitId, removedSplit, subtaskId);
+ *
+ *             assignedSplits.remove(existingSplitId);
+ *         }
+ *     }
+ *
+ *     // Check for new/resurrected splits
+ *     for (KafkaPartitionSplit split : currentSplits) {
+ *         if (!assignedSplits.containsKey(split.splitId())) {
+ *             // Try to resurrect from tombstone
+ *             RemovedSplitInfo resurrected =
+ *                 context.getAssignmentTracker().tryResurrectSplit(split.splitId());
+ *
+ *             if (resurrected != null) {
+ *                 // Split was recently removed - restore its progress
+ *                 KafkaPartitionSplit oldSplit = resurrected.getSplit();
+ *                 split = new KafkaPartitionSplit(
+ *                     split.getTopicPartition(),
+ *                     oldSplit.getStartingOffset(), // Restore old offset!
+ *                     split.getStoppingOffset()
+ *                 );
+ *                 LOG.info("Resurrected split {} from tombstone", split.splitId());
+ *             } else {
+ *                 // Truly new split - use configured starting offset
+ *                 LOG.info("Discovered new split {}", split.splitId());
+ *             }
+ *
+ *             assignSplitToReader(split);
+ *         }
+ *     }
+ * }
+ * }
*/ @Internal public class SplitAssignmentTracker { @@ -49,9 +110,73 @@ public class SplitAssignmentTracker { // The mapping is [SubtaskId -> LinkedHashSet[SourceSplits]]. private Map> uncheckpointedAssignments; + // Tombstone entries for removed splits to handle metadata service instability. + // The mapping is [SplitId -> RemovedSplitInfo]. + // These entries are retained for a configurable duration to allow resurrection + // if the split reappears due to metadata service flakiness. + private final Map> removedSplitsTombstones; + + // Duration in milliseconds to retain removed splits before permanently removing them. + // A value of 0 means immediate removal (current behavior). + private final long removedSplitsRetentionMs; + public SplitAssignmentTracker() { + this(0L); + } + + public SplitAssignmentTracker(long removedSplitsRetentionMs) { this.assignmentsByCheckpointId = new TreeMap<>(); this.uncheckpointedAssignments = new HashMap<>(); + this.removedSplitsTombstones = new HashMap<>(); + this.removedSplitsRetentionMs = removedSplitsRetentionMs; + } + + /** + * Represents a removed split with its removal timestamp and last known state. + * Used to handle temporary metadata service inconsistencies. + */ + @Internal + static class RemovedSplitInfo { + private final SplitT split; + private final long removalTimestamp; + private final int lastAssignedSubtaskId; + + RemovedSplitInfo(SplitT split, long removalTimestamp, int lastAssignedSubtaskId) { + this.split = split; + this.removalTimestamp = removalTimestamp; + this.lastAssignedSubtaskId = lastAssignedSubtaskId; + } + + public SplitT getSplit() { + return split; + } + + public long getRemovalTimestamp() { + return removalTimestamp; + } + + public int getLastAssignedSubtaskId() { + return lastAssignedSubtaskId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RemovedSplitInfo that = (RemovedSplitInfo) o; + return removalTimestamp == that.removalTimestamp + && lastAssignedSubtaskId == that.lastAssignedSubtaskId + && Objects.equals(split.splitId(), that.split.splitId()); + } + + @Override + public int hashCode() { + return Objects.hash(split.splitId(), removalTimestamp, lastAssignedSubtaskId); + } } /** @@ -66,15 +191,41 @@ public void onCheckpoint(long checkpointId) throws Exception { uncheckpointedAssignments = new HashMap<>(); } - /** Take a snapshot of the split assignments. */ + /** + * Take a snapshot of the split assignments and tombstones. + * The snapshot includes both uncheckpointed assignments and tombstone entries. + */ public byte[] snapshotState(SimpleVersionedSerializer splitSerializer) throws Exception { - return SourceCoordinatorSerdeUtils.serializeAssignments( - uncheckpointedAssignments, splitSerializer); + // Serialize assignments + byte[] assignmentsData = + SourceCoordinatorSerdeUtils.serializeAssignments( + uncheckpointedAssignments, splitSerializer); + + // Serialize tombstones + byte[] tombstonesData = + SourceCoordinatorSerdeUtils.serializeTombstones( + removedSplitsTombstones, splitSerializer); + + // Combine both into a single snapshot + try (java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); + java.io.DataOutputStream out = new java.io.DataOutputStream(baos)) { + // Write assignments length and data + out.writeInt(assignmentsData.length); + out.write(assignmentsData); + + // Write tombstones length and data + out.writeInt(tombstonesData.length); + out.write(tombstonesData); + + out.flush(); + return baos.toByteArray(); + } } /** - * Restore the state of the SplitAssignmentTracker. + * Restore the state of the SplitAssignmentTracker including tombstones. + * Supports backward compatibility with older versions that don't have tombstones. * * @param splitSerializer The serializer of the splits. * @param assignmentData The state of the SplitAssignmentTracker. @@ -83,8 +234,34 @@ public byte[] snapshotState(SimpleVersionedSerializer splitSerializer) public void restoreState( SimpleVersionedSerializer splitSerializer, byte[] assignmentData) throws Exception { - uncheckpointedAssignments = - SourceCoordinatorSerdeUtils.deserializeAssignments(assignmentData, splitSerializer); + try (java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(assignmentData); + java.io.DataInputStream in = new java.io.DataInputStream(bais)) { + + // Read assignments + int assignmentsLength = in.readInt(); + byte[] assignmentsBytes = new byte[assignmentsLength]; + in.readFully(assignmentsBytes); + uncheckpointedAssignments = + SourceCoordinatorSerdeUtils.deserializeAssignments( + assignmentsBytes, splitSerializer); + + // Try to read tombstones (may not exist in older checkpoints) + if (in.available() > 0) { + int tombstonesLength = in.readInt(); + if (tombstonesLength > 0) { + byte[] tombstonesBytes = new byte[tombstonesLength]; + in.readFully(tombstonesBytes); + Map> restoredTombstones = + SourceCoordinatorSerdeUtils.deserializeTombstones( + tombstonesBytes, splitSerializer); + removedSplitsTombstones.putAll(restoredTombstones); + } + } + // If tombstones don't exist in checkpoint, that's fine - backward compatibility + } catch (java.io.EOFException e) { + // Backward compatibility: old checkpoints don't have tombstones + // This is expected and should not cause an error + } } /** @@ -95,6 +272,90 @@ public void restoreState( */ public void onCheckpointComplete(long checkpointId) { assignmentsByCheckpointId.entrySet().removeIf(entry -> entry.getKey() <= checkpointId); + cleanupExpiredTombstones(); + } + + /** + * Remove expired tombstone entries based on the retention duration. + * This is called after successful checkpoints to avoid accumulating stale entries. + */ + private void cleanupExpiredTombstones() { + if (removedSplitsRetentionMs == 0) { + // Immediate removal (current behavior), no tombstones to clean + return; + } + + long currentTime = System.currentTimeMillis(); + Iterator>> iterator = + removedSplitsTombstones.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + long removalTimestamp = entry.getValue().getRemovalTimestamp(); + if (currentTime - removalTimestamp > removedSplitsRetentionMs) { + iterator.remove(); + } + } + } + + /** + * Mark a split as removed and store it as a tombstone if retention is enabled. + * If the split reappears within the retention window, its progress can be recovered. + * + * @param splitId the ID of the split being removed + * @param split the split object + * @param lastAssignedSubtaskId the subtask ID where this split was last assigned + */ + public void markSplitRemoved(String splitId, SplitT split, int lastAssignedSubtaskId) { + if (removedSplitsRetentionMs > 0) { + long currentTime = System.currentTimeMillis(); + removedSplitsTombstones.put( + splitId, new RemovedSplitInfo<>(split, currentTime, lastAssignedSubtaskId)); + } + // If retention is 0, we don't store tombstones (current immediate removal behavior) + } + + /** + * Check if a split that is reappearing should be resurrected from tombstone state. + * If the split was recently removed (within retention window), return its previous state. + * + * @param splitId the ID of the split to check + * @return the RemovedSplitInfo if the split should be resurrected, null otherwise + */ + public RemovedSplitInfo tryResurrectSplit(String splitId) { + if (removedSplitsRetentionMs == 0) { + // No resurrection when retention is disabled + return null; + } + + RemovedSplitInfo tombstone = removedSplitsTombstones.get(splitId); + if (tombstone == null) { + // No tombstone found + return null; + } + + long currentTime = System.currentTimeMillis(); + long removalTimestamp = tombstone.getRemovalTimestamp(); + + if (currentTime - removalTimestamp <= removedSplitsRetentionMs) { + // Within retention window - resurrect the split + removedSplitsTombstones.remove(splitId); + return tombstone; + } else { + // Expired - remove tombstone and treat as new split + removedSplitsTombstones.remove(splitId); + return null; + } + } + + /** + * Get all current tombstone entries (for testing and serialization). + * + * @return map of split IDs to their removal information + */ + @VisibleForTesting + Map> getRemovedSplitsTombstones() { + return Collections.unmodifiableMap(removedSplitsTombstones); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java index bc0f01825858b..8344511b8a949 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java @@ -174,5 +174,169 @@ void testGetAndRemoveSplitsAfterSomeCheckpoint() throws Exception { verifyAssignment(Collections.singletonList("3"), splitsToPutBack); } + // -------------------- Tombstone Tests -------------------- + + @Test + void testMarkSplitRemovedWithRetention() throws Exception { + // Create tracker with 1-hour retention + SplitAssignmentTracker tracker = + new SplitAssignmentTracker<>(3600000L); + + MockSourceSplit split = new MockSourceSplit(0); + tracker.markSplitRemoved("0", split, 1); + + // Verify tombstone was created + assertThat(tracker.getRemovedSplitsTombstones()).hasSize(1); + assertThat(tracker.getRemovedSplitsTombstones().containsKey("0")).isTrue(); + assertThat(tracker.getRemovedSplitsTombstones().get("0").getLastAssignedSubtaskId()) + .isEqualTo(1); + } + + @Test + void testMarkSplitRemovedWithoutRetention() { + // Create tracker with no retention (default behavior) + SplitAssignmentTracker tracker = new SplitAssignmentTracker<>(0L); + + MockSourceSplit split = new MockSourceSplit(0); + tracker.markSplitRemoved("0", split, 1); + + // Verify no tombstone was created + assertThat(tracker.getRemovedSplitsTombstones()).isEmpty(); + } + + @Test + void testTryResurrectSplitWithinRetention() throws Exception { + // Create tracker with very long retention + SplitAssignmentTracker tracker = + new SplitAssignmentTracker<>(3600000L); + + MockSourceSplit split = new MockSourceSplit(0); + tracker.markSplitRemoved("0", split, 1); + + // Try to resurrect immediately (within retention window) + SplitAssignmentTracker.RemovedSplitInfo resurrected = + tracker.tryResurrectSplit("0"); + + assertThat(resurrected).isNotNull(); + assertThat(resurrected.getSplit().splitId()).isEqualTo("0"); + assertThat(resurrected.getLastAssignedSubtaskId()).isEqualTo(1); + + // Tombstone should be removed after resurrection + assertThat(tracker.getRemovedSplitsTombstones()).isEmpty(); + } + + @Test + void testTryResurrectSplitExpired() throws Exception { + // Create tracker with very short retention (1ms) + SplitAssignmentTracker tracker = new SplitAssignmentTracker<>(1L); + + MockSourceSplit split = new MockSourceSplit(0); + tracker.markSplitRemoved("0", split, 1); + + // Wait for retention to expire + Thread.sleep(10); + + // Try to resurrect after expiration + SplitAssignmentTracker.RemovedSplitInfo resurrected = + tracker.tryResurrectSplit("0"); + + // Should return null for expired tombstone + assertThat(resurrected).isNull(); + + // Tombstone should be cleaned up + assertThat(tracker.getRemovedSplitsTombstones()).isEmpty(); + } + + @Test + void testTryResurrectNonExistentSplit() { + SplitAssignmentTracker tracker = + new SplitAssignmentTracker<>(3600000L); + + // Try to resurrect a split that was never removed + SplitAssignmentTracker.RemovedSplitInfo resurrected = + tracker.tryResurrectSplit("nonexistent"); + + assertThat(resurrected).isNull(); + } + + @Test + void testCleanupExpiredTombstones() throws Exception { + // Create tracker with short retention + SplitAssignmentTracker tracker = new SplitAssignmentTracker<>(50L); + + // Add multiple tombstones + tracker.markSplitRemoved("0", new MockSourceSplit(0), 1); + Thread.sleep(30); // Wait a bit + tracker.markSplitRemoved("1", new MockSourceSplit(1), 1); + + assertThat(tracker.getRemovedSplitsTombstones()).hasSize(2); + + // Wait for first tombstone to expire + Thread.sleep(30); + + // Trigger cleanup via checkpoint complete + tracker.onCheckpointComplete(1L); + + // First tombstone should be cleaned, second should remain + assertThat(tracker.getRemovedSplitsTombstones()).hasSize(1); + assertThat(tracker.getRemovedSplitsTombstones().containsKey("1")).isTrue(); + } + + @Test + void testSnapshotAndRestoreWithTombstones() throws Exception { + // Create tracker with retention + SplitAssignmentTracker tracker = + new SplitAssignmentTracker<>(3600000L); + + // Record some assignments + tracker.recordSplitAssignment(getSplitsAssignment(2, 0)); + + // Mark a split as removed + MockSourceSplit removedSplit = new MockSourceSplit(42); + tracker.markSplitRemoved("42", removedSplit, 1); + + // Take snapshot + byte[] snapshotState = tracker.snapshotState(new MockSourceSplitSerializer()); + + // Restore to new tracker + SplitAssignmentTracker restoredTracker = + new SplitAssignmentTracker<>(3600000L); + restoredTracker.restoreState(new MockSourceSplitSerializer(), snapshotState); + + // Verify assignments were restored + assertThat(restoredTracker.uncheckpointedAssignments()).hasSize(2); + + // Verify tombstone was restored + assertThat(restoredTracker.getRemovedSplitsTombstones()).hasSize(1); + assertThat(restoredTracker.getRemovedSplitsTombstones().containsKey("42")).isTrue(); + + // Verify resurrection works after restore + SplitAssignmentTracker.RemovedSplitInfo resurrected = + restoredTracker.tryResurrectSplit("42"); + assertThat(resurrected).isNotNull(); + assertThat(resurrected.getSplit().splitId()).isEqualTo("42"); + } + + @Test + void testBackwardCompatibilityWithOldCheckpoints() throws Exception { + // Create old-style tracker (no retention) + SplitAssignmentTracker oldTracker = new SplitAssignmentTracker<>(); + oldTracker.recordSplitAssignment(getSplitsAssignment(2, 0)); + + // Manually serialize just assignments (old format) + byte[] oldFormatSnapshot = + SourceCoordinatorSerdeUtils.serializeAssignments( + oldTracker.uncheckpointedAssignments(), new MockSourceSplitSerializer()); + + // Restore with new tracker that supports tombstones + SplitAssignmentTracker newTracker = + new SplitAssignmentTracker<>(3600000L); + newTracker.restoreState(new MockSourceSplitSerializer(), oldFormatSnapshot); + + // Should handle gracefully - assignments restored, no tombstones + assertThat(newTracker.uncheckpointedAssignments()).hasSize(2); + assertThat(newTracker.getRemovedSplitsTombstones()).isEmpty(); + } + // --------------------- } From 5375ad67a09130c343ae7ac6c59e1e9e82ce3321 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Thu, 4 Jun 2026 20:38:36 -0700 Subject: [PATCH 2/3] Ensure the Java compiler knows that SplitT must be a subtype of SourceSplit Ensure the Java compiler knows that SplitT must be a subtype of SourceSplit, which satisfies the type constraints required by SplitAssignmentTracker.RemovedSplitInfo --- .../source/coordinator/SourceCoordinatorSerdeUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java index b055793b1117a..62ceaf178b2af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.flink.runtime.source.coordinator; +import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -126,7 +127,7 @@ static Map> deserializeAssignments( * @param splitSerializer serializer for split objects * @return serialized tombstone data */ - static byte[] serializeTombstones( + static byte[] serializeTombstones( Map> tombstones, SimpleVersionedSerializer splitSerializer) throws IOException { @@ -168,7 +169,7 @@ static byte[] serializeTombstones( * @param splitSerializer serializer for split objects * @return map of split IDs to their removal information */ - static Map> + static Map> deserializeTombstones( byte[] tombstoneData, SimpleVersionedSerializer splitSerializer) throws IOException { From ad8f5f7226711fe95b8962a3a3ecf6afc6c62385 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Fri, 5 Jun 2026 00:19:45 -0700 Subject: [PATCH 3/3] [FLINK-39837] Apply Spotless formatting --- .../SourceCoordinatorSerdeUtils.java | 7 ++--- .../coordinator/SplitAssignmentTracker.java | 28 +++++++++---------- .../SplitAssignmentTrackerTest.java | 15 ++++------ 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java index 62ceaf178b2af..623c4b79a4d2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java @@ -169,8 +169,8 @@ static byte[] serializeTombstones( * @param splitSerializer serializer for split objects * @return map of split IDs to their removal information */ - static Map> - deserializeTombstones( + static + Map> deserializeTombstones( byte[] tombstoneData, SimpleVersionedSerializer splitSerializer) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(tombstoneData); @@ -194,8 +194,7 @@ static byte[] serializeTombstones( // Read serialized split int serializedSplitSize = in.readInt(); byte[] serializedSplit = readBytes(in, serializedSplitSize); - SplitT split = - splitSerializer.deserialize(splitSerializerVersion, serializedSplit); + SplitT split = splitSerializer.deserialize(splitSerializerVersion, serializedSplit); tombstones.put( splitId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java index 04a3f5f685fef..bb301165f6e83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java @@ -44,8 +44,8 @@ Licensed to the Apache Software Foundation (ASF) under one *

This tracker now supports tombstone-based retention of removed splits to handle metadata * service instability in dynamic sources. When splits/partitions are removed due to temporary * metadata inconsistencies, they can be retained in checkpoint state for a configurable duration. - * If the split reappears within the retention window, its progress is recovered instead of - * starting from scratch. + * If the split reappears within the retention window, its progress is recovered instead of starting + * from scratch. * *

Usage Example for Kafka Connector: * @@ -132,8 +132,8 @@ public SplitAssignmentTracker(long removedSplitsRetentionMs) { } /** - * Represents a removed split with its removal timestamp and last known state. - * Used to handle temporary metadata service inconsistencies. + * Represents a removed split with its removal timestamp and last known state. Used to handle + * temporary metadata service inconsistencies. */ @Internal static class RemovedSplitInfo { @@ -192,8 +192,8 @@ public void onCheckpoint(long checkpointId) throws Exception { } /** - * Take a snapshot of the split assignments and tombstones. - * The snapshot includes both uncheckpointed assignments and tombstone entries. + * Take a snapshot of the split assignments and tombstones. The snapshot includes both + * uncheckpointed assignments and tombstone entries. */ public byte[] snapshotState(SimpleVersionedSerializer splitSerializer) throws Exception { @@ -224,8 +224,8 @@ public byte[] snapshotState(SimpleVersionedSerializer splitSerializer) } /** - * Restore the state of the SplitAssignmentTracker including tombstones. - * Supports backward compatibility with older versions that don't have tombstones. + * Restore the state of the SplitAssignmentTracker including tombstones. Supports backward + * compatibility with older versions that don't have tombstones. * * @param splitSerializer The serializer of the splits. * @param assignmentData The state of the SplitAssignmentTracker. @@ -276,8 +276,8 @@ public void onCheckpointComplete(long checkpointId) { } /** - * Remove expired tombstone entries based on the retention duration. - * This is called after successful checkpoints to avoid accumulating stale entries. + * Remove expired tombstone entries based on the retention duration. This is called after + * successful checkpoints to avoid accumulating stale entries. */ private void cleanupExpiredTombstones() { if (removedSplitsRetentionMs == 0) { @@ -299,8 +299,8 @@ private void cleanupExpiredTombstones() { } /** - * Mark a split as removed and store it as a tombstone if retention is enabled. - * If the split reappears within the retention window, its progress can be recovered. + * Mark a split as removed and store it as a tombstone if retention is enabled. If the split + * reappears within the retention window, its progress can be recovered. * * @param splitId the ID of the split being removed * @param split the split object @@ -316,8 +316,8 @@ public void markSplitRemoved(String splitId, SplitT split, int lastAssignedSubta } /** - * Check if a split that is reappearing should be resurrected from tombstone state. - * If the split was recently removed (within retention window), return its previous state. + * Check if a split that is reappearing should be resurrected from tombstone state. If the split + * was recently removed (within retention window), return its previous state. * * @param splitId the ID of the split to check * @return the RemovedSplitInfo if the split should be resurrected, null otherwise diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java index 8344511b8a949..2e9431f964edd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java @@ -179,8 +179,7 @@ void testGetAndRemoveSplitsAfterSomeCheckpoint() throws Exception { @Test void testMarkSplitRemovedWithRetention() throws Exception { // Create tracker with 1-hour retention - SplitAssignmentTracker tracker = - new SplitAssignmentTracker<>(3600000L); + SplitAssignmentTracker tracker = new SplitAssignmentTracker<>(3600000L); MockSourceSplit split = new MockSourceSplit(0); tracker.markSplitRemoved("0", split, 1); @@ -207,8 +206,7 @@ void testMarkSplitRemovedWithoutRetention() { @Test void testTryResurrectSplitWithinRetention() throws Exception { // Create tracker with very long retention - SplitAssignmentTracker tracker = - new SplitAssignmentTracker<>(3600000L); + SplitAssignmentTracker tracker = new SplitAssignmentTracker<>(3600000L); MockSourceSplit split = new MockSourceSplit(0); tracker.markSplitRemoved("0", split, 1); @@ -249,8 +247,7 @@ void testTryResurrectSplitExpired() throws Exception { @Test void testTryResurrectNonExistentSplit() { - SplitAssignmentTracker tracker = - new SplitAssignmentTracker<>(3600000L); + SplitAssignmentTracker tracker = new SplitAssignmentTracker<>(3600000L); // Try to resurrect a split that was never removed SplitAssignmentTracker.RemovedSplitInfo resurrected = @@ -285,8 +282,7 @@ void testCleanupExpiredTombstones() throws Exception { @Test void testSnapshotAndRestoreWithTombstones() throws Exception { // Create tracker with retention - SplitAssignmentTracker tracker = - new SplitAssignmentTracker<>(3600000L); + SplitAssignmentTracker tracker = new SplitAssignmentTracker<>(3600000L); // Record some assignments tracker.recordSplitAssignment(getSplitsAssignment(2, 0)); @@ -329,8 +325,7 @@ void testBackwardCompatibilityWithOldCheckpoints() throws Exception { oldTracker.uncheckpointedAssignments(), new MockSourceSplitSerializer()); // Restore with new tracker that supports tombstones - SplitAssignmentTracker newTracker = - new SplitAssignmentTracker<>(3600000L); + SplitAssignmentTracker newTracker = new SplitAssignmentTracker<>(3600000L); newTracker.restoreState(new MockSourceSplitSerializer(), oldFormatSnapshot); // Should handle gracefully - assignments restored, no tombstones