Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,11 @@ public int getTimeout() {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();

/**
* Hook invoked before persisting replication offsets. Eg: Buffered endpoints can flush/close WALs
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc typo: use "e.g.," instead of "Eg:".

Suggested change
* Hook invoked before persisting replication offsets. Eg: Buffered endpoints can flush/close WALs
* Hook invoked before persisting replication offsets, e.g., buffered endpoints can flush/close WALs

Copilot uses AI. Check for mistakes.
* here.
*/
default void beforePersistingReplicationOffset() throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -866,4 +866,16 @@ public long getTotalReplicatedEdits() {
long getSleepForRetries() {
return sleepForRetries;
}

void restartShipper(String walGroupId, ReplicationSourceShipper oldWorker) {
boolean removed = workerThreads.remove(walGroupId, oldWorker);
if (!removed) {
// Worker was already replaced (e.g. concurrent restart)
LOG.debug("Skip restart for walGroupId={} as worker already replaced", walGroupId);
return;
}

tryStartNewShipper(walGroupId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -74,6 +75,19 @@ public enum WorkerState {
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;
private long accumulatedSizeSinceLastUpdate = 0L;
private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime();
private long offsetUpdateIntervalMs;
private long offsetUpdateSizeThresholdBytes;
private WALEntryBatch lastShippedBatch;
private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>();

private static final String OFFSET_UPDATE_INTERVAL_MS_KEY =
"hbase.replication.shipper.offset.update.interval.ms";
private static final String OFFSET_UPDATE_SIZE_THRESHOLD_KEY =
"hbase.replication.shipper.offset.update.size.threshold";
private static final long DEFAULT_OFFSET_UPDATE_INTERVAL_MS = Long.MAX_VALUE;
private static final long DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD = -1L;

public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
ReplicationSourceWALReader walReader) {
Expand All @@ -90,6 +104,10 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati
this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT);
this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
this.offsetUpdateIntervalMs =
conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY, DEFAULT_OFFSET_UPDATE_INTERVAL_MS);
this.offsetUpdateSizeThresholdBytes =
conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY, DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD);
}

@Override
Expand All @@ -106,9 +124,27 @@ public final void run() {
continue;
}
try {
WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
// check time-based offset persistence
if (shouldPersistLogPosition()) {
// Trigger offset persistence via existing retry/backoff mechanism in shipEdits()
WALEntryBatch emptyBatch = createEmptyBatchForTimeBasedFlush();
if (emptyBatch != null) {
shipEdits(emptyBatch);
}
}

long pollTimeout = getEntriesTimeout;
if (offsetUpdateIntervalMs != Long.MAX_VALUE) {
long elapsed = EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime;
long remaining = offsetUpdateIntervalMs - elapsed;
if (remaining > 0) {
pollTimeout = Math.min(getEntriesTimeout, remaining);
}
}
WALEntryBatch entryBatch = entryReader.poll(pollTimeout);
LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(),
entryBatch);

if (entryBatch == null) {
continue;
}
Expand All @@ -118,11 +154,22 @@ public final void run() {
} else {
shipEdits(entryBatch);
}
Comment on lines 148 to 156
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With deferred offset persistence enabled (size/interval thresholds), receiving WALEntryBatch.NO_MORE_DATA can transition the worker to FINISHED without ever calling persistLogPosition() for the last shipped batch. This can drop the final offset update and any deferred HFile-ref cleanup, causing re-replication/recovery work. Consider persisting (if accumulatedSizeSinceLastUpdate > 0) before calling noMoreData() / before setting state to FINISHED.

Copilot uses AI. Check for mistakes.
} catch (InterruptedException | ReplicationRuntimeException e) {
// It is interrupted and needs to quit.
LOG.warn("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
} catch (InterruptedException e) {
// Normal shutdown
if (!isActive()) {
Thread.currentThread().interrupt();
break;
}

LOG.warn("Shipper {} interrupted unexpectedly, restarting", walGroupId, e);
abortAndRestart(e);
break;
} catch (ReplicationRuntimeException e) {
LOG.error("Shipper {} aborting due to fatal error", walGroupId, e);
abortAndRestart(e);
break;
}

}
// If the worker exits run loop without finishing its task, mark it as stopped.
if (!isFinished()) {
Expand All @@ -133,6 +180,16 @@ public final void run() {
}
}

private WALEntryBatch createEmptyBatchForTimeBasedFlush() {
// Reuse last shipped WAL position with 0 entries
if (lastShippedBatch == null) {
return null;
}
WALEntryBatch batch = new WALEntryBatch(0, lastShippedBatch.getLastWalPath());
batch.setLastWalPosition(lastShippedBatch.getLastWalPosition());
return batch;
}

private void noMoreData() {
if (source.isRecovered()) {
LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId,
Expand All @@ -154,15 +211,18 @@ protected void postFinish() {
private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(entryBatch);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
source.getSourceMetrics()
.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
MetricsSource metrics = source.getSourceMetrics();
if (metrics != null && !entries.isEmpty()) {
metrics.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
}
while (isActive()) {
try {
if (entries.isEmpty()) {
Comment thread
Apache9 marked this conversation as resolved.
lastShippedBatch = entryBatch;
persistLogPosition();
return;
}
try {
source.tryThrottle(currentSize);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -190,13 +250,13 @@ private void shipEdits(WALEntryBatch entryBatch) {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);

accumulatedSizeSinceLastUpdate += currentSize;
entriesForCleanUpHFileRefs.addAll(entries);
Comment thread
taklwu marked this conversation as resolved.
lastShippedBatch = entryBatch;
if (shouldPersistLogPosition()) {
Comment thread
wchevreuil marked this conversation as resolved.
persistLogPosition();
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);

// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
// this sizeExcludeBulkLoad has to use same calculation that when calling
Expand All @@ -215,6 +275,9 @@ private void shipEdits(WALEntryBatch entryBatch) {
entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
}
break;
} catch (IOException ioe) {
throw new ReplicationRuntimeException(
"Failed to persist replication offset; aborting shipper for restart", ioe);
} catch (Exception ex) {
source.getSourceMetrics().incrementFailedBatches();
LOG.warn("{} threw unknown exception:",
Expand All @@ -229,6 +292,43 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}

private boolean shouldPersistLogPosition() {
if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) {
return false;
}

// Default behaviour to update offset immediately after replicate()
if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit strange...

If offsetUpdateSizeThresholdBytes is -1, then the below accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes will always returns true, so we do not need this check here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is not needed, I just added here to make this default behaviour explicit

return true;
}

return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes)
|| (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs);
Comment thread
taklwu marked this conversation as resolved.
}
Comment thread
wchevreuil marked this conversation as resolved.

private void persistLogPosition() throws IOException {
if (lastShippedBatch == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we could cumulate different batches in the above loop, a null batch does not mean we haven't shipped anything out? Why here we just return if lastShippedBatch is null?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, lastShippedBatch 'null' means no batch has been replicated yet, so we don't need to update offset. Please correct me if I am wrong here

lastShippedBatch is by default 'null' during ReplicationSourceShipper initialisation and as soon as a batch is replicated it is updated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK, you do not rese the lastShippedBatch when reading a new batch. But it still makes me a bit nervous that how can we get here when lastShippedBatch is null...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot. This method is not getting called if lastShippedBatch is null in the current implementation. We can still keep this check here for safety reasons.

return;
}

ReplicationEndpoint endpoint = source.getReplicationEndpoint();
if (endpoint != null) {
endpoint.beforePersistingReplicationOffset();
}

// Clean up hfile references
for (Entry entry : entriesForCleanUpHFileRefs) {
cleanUpHFileRefs(entry.getEdit());
}
entriesForCleanUpHFileRefs.clear();

accumulatedSizeSinceLastUpdate = 0;
lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime();

// Log and clean up WAL logs
updateLogPosition(lastShippedBatch);
}

private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
Expand Down Expand Up @@ -359,4 +459,13 @@ void clearWALEntryBatch() {
long getSleepForRetries() {
return sleepForRetries;
}

// Restart from last persisted offset
void abortAndRestart(Throwable cause) {
LOG.warn("Shipper for walGroupId={} aborting due to fatal error, will restart", walGroupId,
cause);
// Ask source to replace this worker
source.restartShipper(walGroupId, this);
Thread.currentThread().interrupt();
}
}