-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29823 Control WAL flush and offset persistence from ReplicationSourceShipper #7617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
640308b
9476a1c
7d69f62
ab99ac2
8da9b7b
6e57371
31b4d1a
3bf119d
e37a9e5
af7aa86
4875fe6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.Path; | ||
|
|
@@ -74,6 +75,19 @@ public enum WorkerState { | |
| private final int DEFAULT_TIMEOUT = 20000; | ||
| private final int getEntriesTimeout; | ||
| private final int shipEditsTimeout; | ||
| private long accumulatedSizeSinceLastUpdate = 0L; | ||
| private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); | ||
| private long offsetUpdateIntervalMs; | ||
| private long offsetUpdateSizeThresholdBytes; | ||
| private WALEntryBatch lastShippedBatch; | ||
| private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>(); | ||
|
|
||
| private static final String OFFSET_UPDATE_INTERVAL_MS_KEY = | ||
| "hbase.replication.shipper.offset.update.interval.ms"; | ||
| private static final String OFFSET_UPDATE_SIZE_THRESHOLD_KEY = | ||
| "hbase.replication.shipper.offset.update.size.threshold"; | ||
| private static final long DEFAULT_OFFSET_UPDATE_INTERVAL_MS = Long.MAX_VALUE; | ||
| private static final long DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD = -1L; | ||
|
|
||
| public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, | ||
| ReplicationSourceWALReader walReader) { | ||
|
|
@@ -90,6 +104,10 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati | |
| this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); | ||
| this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, | ||
| HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); | ||
| this.offsetUpdateIntervalMs = | ||
| conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY, DEFAULT_OFFSET_UPDATE_INTERVAL_MS); | ||
| this.offsetUpdateSizeThresholdBytes = | ||
| conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY, DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -106,9 +124,27 @@ public final void run() { | |
| continue; | ||
| } | ||
| try { | ||
| WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); | ||
| // check time-based offset persistence | ||
| if (shouldPersistLogPosition()) { | ||
| // Trigger offset persistence via existing retry/backoff mechanism in shipEdits() | ||
| WALEntryBatch emptyBatch = createEmptyBatchForTimeBasedFlush(); | ||
| if (emptyBatch != null) { | ||
| shipEdits(emptyBatch); | ||
| } | ||
| } | ||
|
|
||
| long pollTimeout = getEntriesTimeout; | ||
| if (offsetUpdateIntervalMs != Long.MAX_VALUE) { | ||
| long elapsed = EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime; | ||
| long remaining = offsetUpdateIntervalMs - elapsed; | ||
| if (remaining > 0) { | ||
| pollTimeout = Math.min(getEntriesTimeout, remaining); | ||
| } | ||
| } | ||
| WALEntryBatch entryBatch = entryReader.poll(pollTimeout); | ||
| LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), | ||
| entryBatch); | ||
|
|
||
| if (entryBatch == null) { | ||
| continue; | ||
| } | ||
|
|
@@ -118,11 +154,22 @@ public final void run() { | |
| } else { | ||
| shipEdits(entryBatch); | ||
| } | ||
|
Comment on lines
148
to
156
|
||
| } catch (InterruptedException | ReplicationRuntimeException e) { | ||
| // It is interrupted and needs to quit. | ||
| LOG.warn("Interrupted while waiting for next replication entry batch", e); | ||
| Thread.currentThread().interrupt(); | ||
| } catch (InterruptedException e) { | ||
| // Normal shutdown | ||
| if (!isActive()) { | ||
| Thread.currentThread().interrupt(); | ||
| break; | ||
| } | ||
|
|
||
| LOG.warn("Shipper {} interrupted unexpectedly, restarting", walGroupId, e); | ||
| abortAndRestart(e); | ||
| break; | ||
| } catch (ReplicationRuntimeException e) { | ||
| LOG.error("Shipper {} aborting due to fatal error", walGroupId, e); | ||
| abortAndRestart(e); | ||
| break; | ||
| } | ||
|
|
||
| } | ||
| // If the worker exits run loop without finishing its task, mark it as stopped. | ||
| if (!isFinished()) { | ||
|
|
@@ -133,6 +180,16 @@ public final void run() { | |
| } | ||
| } | ||
|
|
||
| private WALEntryBatch createEmptyBatchForTimeBasedFlush() { | ||
| // Reuse last shipped WAL position with 0 entries | ||
| if (lastShippedBatch == null) { | ||
| return null; | ||
| } | ||
| WALEntryBatch batch = new WALEntryBatch(0, lastShippedBatch.getLastWalPath()); | ||
| batch.setLastWalPosition(lastShippedBatch.getLastWalPosition()); | ||
| return batch; | ||
| } | ||
|
|
||
| private void noMoreData() { | ||
| if (source.isRecovered()) { | ||
| LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, | ||
|
|
@@ -154,15 +211,18 @@ protected void postFinish() { | |
| private void shipEdits(WALEntryBatch entryBatch) { | ||
| List<Entry> entries = entryBatch.getWalEntries(); | ||
| int sleepMultiplier = 0; | ||
| if (entries.isEmpty()) { | ||
| updateLogPosition(entryBatch); | ||
| return; | ||
| } | ||
| int currentSize = (int) entryBatch.getHeapSize(); | ||
| source.getSourceMetrics() | ||
| .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); | ||
| MetricsSource metrics = source.getSourceMetrics(); | ||
| if (metrics != null && !entries.isEmpty()) { | ||
| metrics.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); | ||
| } | ||
| while (isActive()) { | ||
| try { | ||
| if (entries.isEmpty()) { | ||
|
Apache9 marked this conversation as resolved.
|
||
| lastShippedBatch = entryBatch; | ||
| persistLogPosition(); | ||
| return; | ||
| } | ||
| try { | ||
| source.tryThrottle(currentSize); | ||
| } catch (InterruptedException e) { | ||
|
|
@@ -190,13 +250,13 @@ private void shipEdits(WALEntryBatch entryBatch) { | |
| } else { | ||
| sleepMultiplier = Math.max(sleepMultiplier - 1, 0); | ||
| } | ||
| // Clean up hfile references | ||
| for (Entry entry : entries) { | ||
| cleanUpHFileRefs(entry.getEdit()); | ||
| LOG.trace("shipped entry {}: ", entry); | ||
|
|
||
| accumulatedSizeSinceLastUpdate += currentSize; | ||
| entriesForCleanUpHFileRefs.addAll(entries); | ||
|
taklwu marked this conversation as resolved.
|
||
| lastShippedBatch = entryBatch; | ||
| if (shouldPersistLogPosition()) { | ||
|
wchevreuil marked this conversation as resolved.
|
||
| persistLogPosition(); | ||
| } | ||
| // Log and clean up WAL logs | ||
| updateLogPosition(entryBatch); | ||
|
|
||
| // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) | ||
| // this sizeExcludeBulkLoad has to use same calculation that when calling | ||
|
|
@@ -215,6 +275,9 @@ private void shipEdits(WALEntryBatch entryBatch) { | |
| entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); | ||
| } | ||
| break; | ||
| } catch (IOException ioe) { | ||
| throw new ReplicationRuntimeException( | ||
| "Failed to persist replication offset; aborting shipper for restart", ioe); | ||
| } catch (Exception ex) { | ||
| source.getSourceMetrics().incrementFailedBatches(); | ||
| LOG.warn("{} threw unknown exception:", | ||
|
|
@@ -229,6 +292,43 @@ private void shipEdits(WALEntryBatch entryBatch) { | |
| } | ||
| } | ||
|
|
||
| private boolean shouldPersistLogPosition() { | ||
| if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) { | ||
| return false; | ||
| } | ||
|
|
||
| // Default behaviour to update offset immediately after replicate() | ||
| if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit strange... If offsetUpdateSizeThresholdBytes is -1, then the below accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes will always returns true, so we do not need this check here?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is not needed, I just added here to make this default behaviour explicit |
||
| return true; | ||
| } | ||
|
|
||
| return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes) | ||
| || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs); | ||
|
taklwu marked this conversation as resolved.
|
||
| } | ||
|
wchevreuil marked this conversation as resolved.
|
||
|
|
||
| private void persistLogPosition() throws IOException { | ||
| if (lastShippedBatch == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we could cumulate different batches in the above loop, a null batch does not mean we haven't shipped anything out? Why here we just return if lastShippedBatch is null?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I understand, lastShippedBatch 'null' means no batch has been replicated yet, so we don't need to update offset. Please correct me if I am wrong here lastShippedBatch is by default 'null' during ReplicationSourceShipper initialisation and as soon as a batch is replicated it is updated.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, OK, you do not rese the lastShippedBatch when reading a new batch. But it still makes me a bit nervous that how can we get here when lastShippedBatch is null...
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot. This method is not getting called if lastShippedBatch is null in the current implementation. We can still keep this check here for safety reasons. |
||
| return; | ||
| } | ||
|
|
||
| ReplicationEndpoint endpoint = source.getReplicationEndpoint(); | ||
| if (endpoint != null) { | ||
| endpoint.beforePersistingReplicationOffset(); | ||
| } | ||
|
|
||
| // Clean up hfile references | ||
| for (Entry entry : entriesForCleanUpHFileRefs) { | ||
| cleanUpHFileRefs(entry.getEdit()); | ||
| } | ||
| entriesForCleanUpHFileRefs.clear(); | ||
|
|
||
| accumulatedSizeSinceLastUpdate = 0; | ||
| lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); | ||
|
|
||
| // Log and clean up WAL logs | ||
| updateLogPosition(lastShippedBatch); | ||
| } | ||
|
|
||
| private void cleanUpHFileRefs(WALEdit edit) throws IOException { | ||
| String peerId = source.getPeerId(); | ||
| if (peerId.contains("-")) { | ||
|
|
@@ -359,4 +459,13 @@ void clearWALEntryBatch() { | |
| long getSleepForRetries() { | ||
| return sleepForRetries; | ||
| } | ||
|
|
||
| // Restart from last persisted offset | ||
| void abortAndRestart(Throwable cause) { | ||
| LOG.warn("Shipper for walGroupId={} aborting due to fatal error, will restart", walGroupId, | ||
| cause); | ||
| // Ask source to replace this worker | ||
| source.restartShipper(walGroupId, this); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc typo: use "e.g.," instead of "Eg:".