Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ public class SourceReaderOptions {
.defaultValue(2)
.withDescription("The capacity of the element queue in the source reader.");

public static final ConfigOption<Long> REMOVED_SPLITS_RETENTION_MS =
ConfigOptions.key("source.removed-splits.retention.ms")
.longType()
.defaultValue(0L)
.withDescription(
"The duration in milliseconds to retain removed splits/partitions in "
+ "checkpoint state before permanently removing them. This helps maintain "
+ "progress tracking during temporary metadata service instability. "
+ "Default is 0 (immediate removal, current behavior). "
+ "Recommended value for dynamic sources: 86400000 (24 hours).");

// --------------- final fields ----------------------
public final long sourceReaderCloseTimeout;
public final int elementQueueCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,24 @@ public SourceCoordinatorContext(
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer,
boolean supportsConcurrentExecutionAttempts) {
this(
jobID,
coordinatorThreadFactory,
numWorkerThreads,
operatorCoordinatorContext,
splitSerializer,
supportsConcurrentExecutionAttempts,
0L); // Default: no retention (current behavior)
}

public SourceCoordinatorContext(
JobID jobID,
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
int numWorkerThreads,
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer,
boolean supportsConcurrentExecutionAttempts,
long removedSplitsRetentionMs) {
this(
jobID,
Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
Expand All @@ -138,7 +156,7 @@ public SourceCoordinatorContext(
coordinatorThreadFactory,
operatorCoordinatorContext,
splitSerializer,
new SplitAssignmentTracker<>(),
new SplitAssignmentTracker<>(removedSplitsRetentionMs),
supportsConcurrentExecutionAttempts);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one

package org.apache.flink.runtime.source.coordinator;

import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
Expand All @@ -36,9 +37,10 @@ public class SourceCoordinatorSerdeUtils {

public static final int VERSION_0 = 0;
public static final int VERSION_1 = 1;
public static final int VERSION_2 = 2; // Added tombstone support

/** The current source coordinator serde version. */
private static final int CURRENT_VERSION = VERSION_1;
private static final int CURRENT_VERSION = VERSION_2;

/** Private constructor for utility class. */
private SourceCoordinatorSerdeUtils() {}
Expand Down Expand Up @@ -117,4 +119,90 @@ static <SplitT> Map<Integer, LinkedHashSet<SplitT>> deserializeAssignments(
return assignments;
}
}

/**
* Serialize tombstone entries for removed splits.
*
* @param tombstones map of split IDs to their removal information
* @param splitSerializer serializer for split objects
* @return serialized tombstone data
*/
static <SplitT extends SourceSplit> byte[] serializeTombstones(
Map<String, SplitAssignmentTracker.RemovedSplitInfo<SplitT>> tombstones,
SimpleVersionedSerializer<SplitT> splitSerializer)
throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
out.writeInt(splitSerializer.getVersion());

int numTombstones = tombstones.size();
out.writeInt(numTombstones);

for (Map.Entry<String, SplitAssignmentTracker.RemovedSplitInfo<SplitT>> entry :
tombstones.entrySet()) {
// Write split ID
out.writeUTF(entry.getKey());

SplitAssignmentTracker.RemovedSplitInfo<SplitT> info = entry.getValue();

// Write removal timestamp
out.writeLong(info.getRemovalTimestamp());

// Write last assigned subtask ID
out.writeInt(info.getLastAssignedSubtaskId());

// Write serialized split
byte[] serializedSplit = splitSerializer.serialize(info.getSplit());
out.writeInt(serializedSplit.length);
out.write(serializedSplit);
}

out.flush();
return baos.toByteArray();
}
}

/**
* Deserialize tombstone entries for removed splits.
*
* @param tombstoneData serialized tombstone data
* @param splitSerializer serializer for split objects
* @return map of split IDs to their removal information
*/
static <SplitT extends SourceSplit>
Map<String, SplitAssignmentTracker.RemovedSplitInfo<SplitT>> deserializeTombstones(
byte[] tombstoneData, SimpleVersionedSerializer<SplitT> splitSerializer)
throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(tombstoneData);
DataInputStream in = new DataInputViewStreamWrapper(bais)) {
int splitSerializerVersion = in.readInt();

int numTombstones = in.readInt();
Map<String, SplitAssignmentTracker.RemovedSplitInfo<SplitT>> tombstones =
new HashMap<>(numTombstones);

for (int i = 0; i < numTombstones; i++) {
// Read split ID
String splitId = in.readUTF();

// Read removal timestamp
long removalTimestamp = in.readLong();

// Read last assigned subtask ID
int lastAssignedSubtaskId = in.readInt();

// Read serialized split
int serializedSplitSize = in.readInt();
byte[] serializedSplit = readBytes(in, serializedSplitSize);
SplitT split = splitSerializer.deserialize(splitSerializerVersion, serializedSplit);

tombstones.put(
splitId,
new SplitAssignmentTracker.RemovedSplitInfo<>(
split, removalTimestamp, lastAssignedSubtaskId));
}

return tombstones;
}
}
}
Loading