Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -816,10 +816,22 @@ void assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyEx
getMemberId() + " is not in " + expected + ": current state is " + c), expected);
}

private CompletableFuture<RaftClientReply> getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
return entry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for " + request));
/** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context.*/
private void cancelTransaction(TransactionContextImpl context, Exception exception) {
if (context == null) {
return;
}

if (exception != null) {
context.setException(exception);
}

try {
context.cancelTransaction();
} catch (IOException ioe) {
LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, ioe);
}
}

/**
* Handle a normal update request from client.
Expand All @@ -833,52 +845,73 @@ private CompletableFuture<RaftClientReply> appendTransaction(

final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null);
if (unsyncedLeaderState == null) {
final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException());
final NotLeaderException nle = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, nle);
cancelTransaction(context, nle);
return RetryCacheImpl.failWithReply(reply, cacheEntry);
Comment on lines +850 to 851
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move the failWithReply method to RaftServerImpl:

  private CompletableFuture<RaftClientReply> failWithReply(RaftClientReply reply, CacheEntry entry,
      TransactionContextImpl context) {
    cancelTransaction(context, reply.getException());
    if (entry == null) {
      return CompletableFuture.completedFuture(reply);
    }
    entry.failWithReply(reply);
    return entry.getReplyFuture();
  }

}
final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
if (unsyncedPermit == null) {
return getResourceUnavailableReply(request, cacheEntry);
final ResourceUnavailableException e = new ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for " + request);
cancelTransaction(context, e);
return cacheEntry.failWithException(e);
}

final LeaderStateImpl leaderState;
final PendingRequest pending;
LeaderStateImpl leaderState = null;
PendingRequest pending = null;
CompletableFuture<RaftClientReply> failure = null;
Exception cancelException = null;
Comment on lines 844 to +864
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep returning immediately in case of a failure. This change make the code harder to read.

synchronized (this) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
if (reply != null) {
return reply;
failure = reply;
}

leaderState = role.getLeaderStateNonNull();
final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit
: leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
return getResourceUnavailableReply(request, cacheEntry);
}
if (failure == null) {
leaderState = role.getLeaderStateNonNull();
final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit
: leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
final ResourceUnavailableException e = new ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for " + request);
failure = cacheEntry.failWithException(e);
cancelException = e;
} else {
// append the message to its local log
writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
try {
state.appendLog(context);
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
final RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
failure = CompletableFuture.completedFuture(exceptionReply);
cancelException = e;
// leader will step down here
if (e.leaderShouldStepDown() && getInfo().isLeader()) {
leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION);
}
}

// append the message to its local log
writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
try {
state.appendLog(context);
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
// leader will step down here
if (e.leaderShouldStepDown() && getInfo().isLeader()) {
leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION);
if (failure == null) {
// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
final ResourceUnavailableException e = new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " + request);
failure = cacheEntry.failWithException(e);
cancelException = e;
}
}
}
return CompletableFuture.completedFuture(exceptionReply);
}

// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
return cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " + request));
}
}
if (failure != null) {
cancelTransaction(context, cancelException);
return failure;
}
leaderState.notifySenders();
return pending.getFuture();
}
Expand Down Expand Up @@ -1001,19 +1034,28 @@ private CompletableFuture<RaftClientReply> writeAsyncImpl(RaftClientRequest requ
// return the cached future.
return cacheEntry.getReplyFuture();
}
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
// This request will be added to pending requests later in appendTransaction.
// Any failure in between must invoke cancelTransaction.
final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction(
filterDataStreamRaftClientRequest(request));
if (context.getException() != null) {
final StateMachineException e = new StateMachineException(getMemberId(), context.getException());
final Exception exception = context.getException();
final StateMachineException e = new StateMachineException(getMemberId(), exception);
cancelTransaction(context, exception);
final RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
return CompletableFuture.completedFuture(exceptionReply);
}

return appendTransaction(request, context, cacheEntry);
try {
return appendTransaction(request, context, cacheEntry);
} catch (IOException ioe) {
cancelTransaction(context, ioe);
throw ioe;
} catch (RuntimeException re) {
cancelTransaction(context, re);
throw re;
}
}

private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ public TransactionContext preAppendTransaction() throws IOException {

@Override
public TransactionContext cancelTransaction() throws IOException {
// TODO: This is not called from Raft server / log yet. When an IOException happens, we should
// call this to let the SM know that Transaction cannot be synced
return stateMachine.cancelTransaction(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.fail;

Expand All @@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
}

private static volatile boolean failPreAppend = false;
private static final AtomicInteger numCancelTransaction = new AtomicInteger();

protected static class StateMachineWithException extends
SimpleStateMachine4Testing {
Expand All @@ -72,6 +74,12 @@ public TransactionContext preAppendTransaction(TransactionContext trx)
return trx;
}
}

@Override
public TransactionContext cancelTransaction(TransactionContext trx) throws IOException {
numCancelTransaction.incrementAndGet();
return super.cancelTransaction(trx);
}
}

{
Expand Down Expand Up @@ -179,4 +187,31 @@ private void runTestRetryOnExceptionDuringReplication(CLUSTER cluster) throws Ex
failPreAppend = false;
}
}

@Test
public void testCancelTransactionOnPreAppendFailure() throws Exception {
runWithNewCluster(3, this::runTestCancelTransactionOnPreAppendFailure);
}

private void runTestCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
failPreAppend = true;
numCancelTransaction.set(0);
try (final RaftClient client = cluster.createClient(leaderId)) {
try {
client.io().send(new SimpleMessage("cancel-transaction"));
fail("Exception expected");
} catch (StateMachineException e) {
Assertions.assertTrue(e.getCause().getMessage().contains("Fake Exception in preAppend"));
}

JavaUtils.attemptRepeatedly(() -> {
Assertions.assertTrue(numCancelTransaction.get() > 0,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check the exact number instead of "> 0"?

() -> "Expected cancelTransaction() to be called but got " + numCancelTransaction.get());
return null;
}, 10, ONE_SECOND, "wait for cancelTransaction", LOG);
} finally {
failPreAppend = false;
}
}
}
Loading