diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d9dd09d966..f146a60767 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -816,10 +816,22 @@ void assertLifeCycleState(Set expected) throws ServerNotReadyEx getMemberId() + " is not in " + expected + ": current state is " + c), expected); } - private CompletableFuture getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) { - return entry.failWithException(new ResourceUnavailableException( - getMemberId() + ": Failed to acquire a pending write request for " + request)); + /** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context.*/ + private void cancelTransaction(TransactionContextImpl context, Exception exception) { + if (context == null) { + return; + } + + if (exception != null) { + context.setException(exception); + } + + try { + context.cancelTransaction(); + } catch (IOException ioe) { + LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, ioe); } + } /** * Handle a normal update request from client. @@ -833,52 +845,73 @@ private CompletableFuture appendTransaction( final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null); if (unsyncedLeaderState == null) { - final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException()); + final NotLeaderException nle = generateNotLeaderException(); + final RaftClientReply reply = newExceptionReply(request, nle); + cancelTransaction(context, nle); return RetryCacheImpl.failWithReply(reply, cacheEntry); } final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage()); if (unsyncedPermit == null) { - return getResourceUnavailableReply(request, cacheEntry); + final ResourceUnavailableException e = new ResourceUnavailableException( + getMemberId() + ": Failed to acquire a pending write request for " + request); + cancelTransaction(context, e); + return cacheEntry.failWithException(e); } - final LeaderStateImpl leaderState; - final PendingRequest pending; + LeaderStateImpl leaderState = null; + PendingRequest pending = null; + CompletableFuture failure = null; + Exception cancelException = null; synchronized (this) { final CompletableFuture reply = checkLeaderState(request, cacheEntry); if (reply != null) { - return reply; + failure = reply; } - leaderState = role.getLeaderStateNonNull(); - final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit - : leaderState.tryAcquirePendingRequest(request.getMessage()); - if (permit == null) { - return getResourceUnavailableReply(request, cacheEntry); - } + if (failure == null) { + leaderState = role.getLeaderStateNonNull(); + final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit + : leaderState.tryAcquirePendingRequest(request.getMessage()); + if (permit == null) { + final ResourceUnavailableException e = new ResourceUnavailableException( + getMemberId() + ": Failed to acquire a pending write request for " + request); + failure = cacheEntry.failWithException(e); + cancelException = e; + } else { + // append the message to its local log + writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); + try { + state.appendLog(context); + } catch (StateMachineException e) { + // the StateMachineException is thrown by the SM in the preAppend stage. + // Return the exception in a RaftClientReply. + final RaftClientReply exceptionReply = newExceptionReply(request, e); + cacheEntry.failWithReply(exceptionReply); + failure = CompletableFuture.completedFuture(exceptionReply); + cancelException = e; + // leader will step down here + if (e.leaderShouldStepDown() && getInfo().isLeader()) { + leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); + } + } - // append the message to its local log - writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); - try { - state.appendLog(context); - } catch (StateMachineException e) { - // the StateMachineException is thrown by the SM in the preAppend stage. - // Return the exception in a RaftClientReply. - RaftClientReply exceptionReply = newExceptionReply(request, e); - cacheEntry.failWithReply(exceptionReply); - // leader will step down here - if (e.leaderShouldStepDown() && getInfo().isLeader()) { - leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); + if (failure == null) { + // put the request into the pending queue + pending = leaderState.addPendingRequest(permit, request, context); + if (pending == null) { + final ResourceUnavailableException e = new ResourceUnavailableException( + getMemberId() + ": Failed to add a pending write request for " + request); + failure = cacheEntry.failWithException(e); + cancelException = e; + } + } } - return CompletableFuture.completedFuture(exceptionReply); - } - - // put the request into the pending queue - pending = leaderState.addPendingRequest(permit, request, context); - if (pending == null) { - return cacheEntry.failWithException(new ResourceUnavailableException( - getMemberId() + ": Failed to add a pending write request for " + request)); } } + if (failure != null) { + cancelTransaction(context, cancelException); + return failure; + } leaderState.notifySenders(); return pending.getFuture(); } @@ -1001,19 +1034,28 @@ private CompletableFuture writeAsyncImpl(RaftClientRequest requ // return the cached future. return cacheEntry.getReplyFuture(); } - // TODO: this client request will not be added to pending requests until - // later which means that any failure in between will leave partial state in - // the state machine. We should call cancelTransaction() for failed requests + // This request will be added to pending requests later in appendTransaction. + // Any failure in between must invoke cancelTransaction. final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( filterDataStreamRaftClientRequest(request)); if (context.getException() != null) { - final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); + final Exception exception = context.getException(); + final StateMachineException e = new StateMachineException(getMemberId(), exception); + cancelTransaction(context, exception); final RaftClientReply exceptionReply = newExceptionReply(request, e); cacheEntry.failWithReply(exceptionReply); return CompletableFuture.completedFuture(exceptionReply); } - return appendTransaction(request, context, cacheEntry); + try { + return appendTransaction(request, context, cacheEntry); + } catch (IOException ioe) { + cancelTransaction(context, ioe); + throw ioe; + } catch (RuntimeException re) { + cancelTransaction(context, re); + throw re; + } } private CompletableFuture watchAsync(RaftClientRequest request) { diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index d92f3a1c82..8497b12f4d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -191,8 +191,6 @@ public TransactionContext preAppendTransaction() throws IOException { @Override public TransactionContext cancelTransaction() throws IOException { - // TODO: This is not called from Raft server / log yet. When an IOException happens, we should - // call this to let the SM know that Transaction cannot be synced return stateMachine.cancelTransaction(this); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java index 3a58f4e7c6..43b6af97d5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.fail; @@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests { + Assertions.assertTrue(numCancelTransaction.get() > 0, + () -> "Expected cancelTransaction() to be called but got " + numCancelTransaction.get()); + return null; + }, 10, ONE_SECOND, "wait for cancelTransaction", LOG); + } finally { + failPreAppend = false; + } + } }