-
Notifications
You must be signed in to change notification settings - Fork 438
RATIS-2433. Cancel transaction in case of failure to append #1382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -816,10 +816,22 @@ void assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyEx | |
| getMemberId() + " is not in " + expected + ": current state is " + c), expected); | ||
| } | ||
|
|
||
| private CompletableFuture<RaftClientReply> getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) { | ||
| return entry.failWithException(new ResourceUnavailableException( | ||
| getMemberId() + ": Failed to acquire a pending write request for " + request)); | ||
| /** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context.*/ | ||
| private void cancelTransaction(TransactionContextImpl context, Exception exception) { | ||
| if (context == null) { | ||
| return; | ||
| } | ||
|
|
||
| if (exception != null) { | ||
| context.setException(exception); | ||
| } | ||
|
|
||
| try { | ||
| context.cancelTransaction(); | ||
| } catch (IOException ioe) { | ||
| LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, ioe); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handle a normal update request from client. | ||
|
|
@@ -833,52 +845,73 @@ private CompletableFuture<RaftClientReply> appendTransaction( | |
|
|
||
| final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null); | ||
| if (unsyncedLeaderState == null) { | ||
| final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException()); | ||
| final NotLeaderException nle = generateNotLeaderException(); | ||
| final RaftClientReply reply = newExceptionReply(request, nle); | ||
| cancelTransaction(context, nle); | ||
| return RetryCacheImpl.failWithReply(reply, cacheEntry); | ||
| } | ||
| final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage()); | ||
| if (unsyncedPermit == null) { | ||
| return getResourceUnavailableReply(request, cacheEntry); | ||
| final ResourceUnavailableException e = new ResourceUnavailableException( | ||
| getMemberId() + ": Failed to acquire a pending write request for " + request); | ||
| cancelTransaction(context, e); | ||
| return cacheEntry.failWithException(e); | ||
| } | ||
|
|
||
| final LeaderStateImpl leaderState; | ||
| final PendingRequest pending; | ||
| LeaderStateImpl leaderState = null; | ||
| PendingRequest pending = null; | ||
| CompletableFuture<RaftClientReply> failure = null; | ||
| Exception cancelException = null; | ||
|
Comment on lines
844
to
+864
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should keep returning immediately in case of a failure. This change make the code harder to read. |
||
| synchronized (this) { | ||
| final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry); | ||
| if (reply != null) { | ||
| return reply; | ||
| failure = reply; | ||
| } | ||
|
|
||
| leaderState = role.getLeaderStateNonNull(); | ||
| final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit | ||
| : leaderState.tryAcquirePendingRequest(request.getMessage()); | ||
| if (permit == null) { | ||
| return getResourceUnavailableReply(request, cacheEntry); | ||
| } | ||
| if (failure == null) { | ||
| leaderState = role.getLeaderStateNonNull(); | ||
| final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit | ||
| : leaderState.tryAcquirePendingRequest(request.getMessage()); | ||
| if (permit == null) { | ||
| final ResourceUnavailableException e = new ResourceUnavailableException( | ||
| getMemberId() + ": Failed to acquire a pending write request for " + request); | ||
| failure = cacheEntry.failWithException(e); | ||
| cancelException = e; | ||
| } else { | ||
| // append the message to its local log | ||
| writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); | ||
| try { | ||
| state.appendLog(context); | ||
| } catch (StateMachineException e) { | ||
| // the StateMachineException is thrown by the SM in the preAppend stage. | ||
| // Return the exception in a RaftClientReply. | ||
| final RaftClientReply exceptionReply = newExceptionReply(request, e); | ||
| cacheEntry.failWithReply(exceptionReply); | ||
| failure = CompletableFuture.completedFuture(exceptionReply); | ||
| cancelException = e; | ||
| // leader will step down here | ||
| if (e.leaderShouldStepDown() && getInfo().isLeader()) { | ||
| leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); | ||
| } | ||
| } | ||
|
|
||
| // append the message to its local log | ||
| writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); | ||
| try { | ||
| state.appendLog(context); | ||
| } catch (StateMachineException e) { | ||
| // the StateMachineException is thrown by the SM in the preAppend stage. | ||
| // Return the exception in a RaftClientReply. | ||
| RaftClientReply exceptionReply = newExceptionReply(request, e); | ||
| cacheEntry.failWithReply(exceptionReply); | ||
| // leader will step down here | ||
| if (e.leaderShouldStepDown() && getInfo().isLeader()) { | ||
| leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); | ||
| if (failure == null) { | ||
| // put the request into the pending queue | ||
| pending = leaderState.addPendingRequest(permit, request, context); | ||
| if (pending == null) { | ||
| final ResourceUnavailableException e = new ResourceUnavailableException( | ||
| getMemberId() + ": Failed to add a pending write request for " + request); | ||
| failure = cacheEntry.failWithException(e); | ||
| cancelException = e; | ||
| } | ||
| } | ||
| } | ||
| return CompletableFuture.completedFuture(exceptionReply); | ||
| } | ||
|
|
||
| // put the request into the pending queue | ||
| pending = leaderState.addPendingRequest(permit, request, context); | ||
| if (pending == null) { | ||
| return cacheEntry.failWithException(new ResourceUnavailableException( | ||
| getMemberId() + ": Failed to add a pending write request for " + request)); | ||
| } | ||
| } | ||
| if (failure != null) { | ||
| cancelTransaction(context, cancelException); | ||
| return failure; | ||
| } | ||
| leaderState.notifySenders(); | ||
| return pending.getFuture(); | ||
| } | ||
|
|
@@ -1001,19 +1034,28 @@ private CompletableFuture<RaftClientReply> writeAsyncImpl(RaftClientRequest requ | |
| // return the cached future. | ||
| return cacheEntry.getReplyFuture(); | ||
| } | ||
| // TODO: this client request will not be added to pending requests until | ||
| // later which means that any failure in between will leave partial state in | ||
| // the state machine. We should call cancelTransaction() for failed requests | ||
| // This request will be added to pending requests later in appendTransaction. | ||
| // Any failure in between must invoke cancelTransaction. | ||
| final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( | ||
| filterDataStreamRaftClientRequest(request)); | ||
| if (context.getException() != null) { | ||
| final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); | ||
| final Exception exception = context.getException(); | ||
| final StateMachineException e = new StateMachineException(getMemberId(), exception); | ||
| cancelTransaction(context, exception); | ||
| final RaftClientReply exceptionReply = newExceptionReply(request, e); | ||
| cacheEntry.failWithReply(exceptionReply); | ||
| return CompletableFuture.completedFuture(exceptionReply); | ||
| } | ||
|
|
||
| return appendTransaction(request, context, cacheEntry); | ||
| try { | ||
| return appendTransaction(request, context, cacheEntry); | ||
| } catch (IOException ioe) { | ||
| cancelTransaction(context, ioe); | ||
| throw ioe; | ||
| } catch (RuntimeException re) { | ||
| cancelTransaction(context, re); | ||
| throw re; | ||
| } | ||
| } | ||
|
|
||
| private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ | |
| import java.io.IOException; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.fail; | ||
|
|
||
|
|
@@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu | |
| } | ||
|
|
||
| private static volatile boolean failPreAppend = false; | ||
| private static final AtomicInteger numCancelTransaction = new AtomicInteger(); | ||
|
|
||
| protected static class StateMachineWithException extends | ||
| SimpleStateMachine4Testing { | ||
|
|
@@ -72,6 +74,12 @@ public TransactionContext preAppendTransaction(TransactionContext trx) | |
| return trx; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public TransactionContext cancelTransaction(TransactionContext trx) throws IOException { | ||
| numCancelTransaction.incrementAndGet(); | ||
| return super.cancelTransaction(trx); | ||
| } | ||
| } | ||
|
|
||
| { | ||
|
|
@@ -179,4 +187,31 @@ private void runTestRetryOnExceptionDuringReplication(CLUSTER cluster) throws Ex | |
| failPreAppend = false; | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testCancelTransactionOnPreAppendFailure() throws Exception { | ||
| runWithNewCluster(3, this::runTestCancelTransactionOnPreAppendFailure); | ||
| } | ||
|
|
||
| private void runTestCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws Exception { | ||
| final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); | ||
| failPreAppend = true; | ||
| numCancelTransaction.set(0); | ||
| try (final RaftClient client = cluster.createClient(leaderId)) { | ||
| try { | ||
| client.io().send(new SimpleMessage("cancel-transaction")); | ||
| fail("Exception expected"); | ||
| } catch (StateMachineException e) { | ||
| Assertions.assertTrue(e.getCause().getMessage().contains("Fake Exception in preAppend")); | ||
| } | ||
|
|
||
| JavaUtils.attemptRepeatedly(() -> { | ||
| Assertions.assertTrue(numCancelTransaction.get() > 0, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we check the exact number instead of "> 0"? |
||
| () -> "Expected cancelTransaction() to be called but got " + numCancelTransaction.get()); | ||
| return null; | ||
| }, 10, ONE_SECOND, "wait for cancelTransaction", LOG); | ||
| } finally { | ||
| failPreAppend = false; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move the failWithReply method to RaftServerImpl: