diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index b4d78c207a..69421e9f0f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -832,44 +832,6 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) { responseHandler.waitForResponse(); } - /** - * Should the Leader notify the Follower to install the snapshot through - * its own State Machine. - * @return the first available log's start term index - */ - private TermIndex shouldNotifyToInstallSnapshot() { - final FollowerInfo follower = getFollower(); - final long leaderNextIndex = getRaftLog().getNextIndex(); - final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); - final long leaderStartIndex = getRaftLog().getStartIndex(); - final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex)) - .orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex)); - if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) { - // If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should - // be notified to install a snapshot. Every follower should try to install at least one snapshot during - // bootstrapping, if available. - LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", this, firstAvailable); - return firstAvailable; - } - - final long followerNextIndex = follower.getNextIndex(); - if (followerNextIndex >= leaderNextIndex) { - return null; - } - - if (followerNextIndex < leaderStartIndex) { - // The Leader does not have the logs from the Follower's last log - // index onwards. And install snapshot is disabled. So the Follower - // should be notified to install the latest snapshot through its - // State Machine. - return firstAvailable; - } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { - // Leader has no logs to check from, hence return next index. - return firstAvailable; - } - - return null; - } static class AppendEntriesRequest { private final Timekeeper timer; diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index cff5425d32..33914fde7f 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -134,38 +134,89 @@ default RaftPeerId getFollowerId() { /** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */ Iterable newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot); + /** + * Get the previous {@link TermIndex} for the given next index. + * This is used to set the previous log entry in AppendEntries requests. + * + * @return the previous {@link TermIndex}, or null if unavailable + * (e.g. the entry has been purged and the snapshot does not cover it). + */ + default TermIndex getPrevious(long nextIndex) { + if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) { + return null; + } + + final long previousIndex = nextIndex - 1; + final TermIndex previous = getRaftLog().getTermIndex(previousIndex); + if (previous != null) { + return previous; + } + + final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot(); + if (snapshot != null) { + final TermIndex snapshotTermIndex = snapshot.getTermIndex(); + if (snapshotTermIndex.getIndex() == previousIndex) { + return snapshotTermIndex; + } + } + + return null; + } + /** * Should this {@link LogAppender} send a snapshot to the follower? * * @return the snapshot if it should install a snapshot; otherwise, return null. */ default SnapshotInfo shouldInstallSnapshot() { - // we should install snapshot if the follower needs to catch up and: - // 1. there is no local log entry but there is snapshot - // 2. or the follower's next index is smaller than the log start index - // 3. or the follower is bootstrapping (i.e. not yet caught up) and has not installed any snapshot yet - final FollowerInfo follower = getFollower(); - final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot(); + return shouldInstallSnapshot(snapshot != null) ? snapshot : null; + } + + /** + * Should this {@link LogAppender} send a snapshot notification to the follower? + * + * @return the first available log {@link TermIndex} if it should install a snapshot; otherwise, return null. + */ + default TermIndex shouldNotifyToInstallSnapshot() { + if (!shouldInstallSnapshot(true)) { + return null; + } + final TermIndex start = getRaftLog().getTermIndex(getRaftLog().getStartIndex()); + if (start != null) { + return start; + } + // No log is currently available; return the next, which will become available in the future. + return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), getRaftLog().getNextIndex()); + } - if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) { - if (snapshot == null) { + default boolean shouldInstallSnapshot(boolean hasSnapshot) { + final FollowerInfo follower = getFollower(); + if (getLeaderState().isFollowerBootstrapping(follower) + && !follower.hasAttemptedToInstallSnapshot()) { + if (!hasSnapshot) { // Leader cannot send null snapshot to follower. Hence, acknowledge InstallSnapshot attempt (even though it // was not attempted) so that follower can come out of staging state after appending log entries. follower.setAttemptedToInstallSnapshot(); - } else { - return snapshot; } + return true; } + final long leaderNextIndex = getRaftLog().getNextIndex(); final long followerNextIndex = getFollower().getNextIndex(); - if (followerNextIndex < getRaftLog().getNextIndex()) { - final long logStartIndex = getRaftLog().getStartIndex(); - if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { - return snapshot; - } + if (followerNextIndex >= leaderNextIndex) { + // follower caught up already + return false; } - return null; + final long leaderStartIndex = getRaftLog().getStartIndex(); + if (followerNextIndex < leaderStartIndex || leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { + // leader does not have follower's next log + return true; + } + // leader does not have the previous log for appendEntries + return followerNextIndex == leaderStartIndex && + followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX && + getPrevious(followerNextIndex) == null; } /** Define how this {@link LogAppender} should run. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index be0404da36..4f558e0c7c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -165,27 +165,6 @@ public boolean hasPendingDataRequests() { return false; } - private TermIndex getPrevious(long nextIndex) { - if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) { - return null; - } - - final long previousIndex = nextIndex - 1; - final TermIndex previous = getRaftLog().getTermIndex(previousIndex); - if (previous != null) { - return previous; - } - - final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); - if (snapshot != null) { - final TermIndex snapshotTermIndex = snapshot.getTermIndex(); - if (snapshotTermIndex.getIndex() == previousIndex) { - return snapshotTermIndex; - } - } - - return null; - } protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) { long next = replyNextIndex; @@ -238,6 +217,14 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he final long snapshotIndex = follower.getSnapshotIndex(); final long leaderNext = getRaftLog().getNextIndex(); final long followerNext = follower.getNextIndex(); + + if (previous == null && followerNext > RaftLog.LEAST_VALID_LOG_INDEX && followerNext != snapshotIndex + 1) { + LOG.info("{}: Skipping appendEntries since the previous log entry is unavailable:" + + " follower {} nextIndex={} and snapshotIndex={} but leader startIndex={}", + this, follower.getName(), followerNext, snapshotIndex, getRaftLog().getStartIndex()); + return null; + } + final long halfMs = heartbeatWaitTimeMs/2; for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; ) { if (!buffer.offer(getRaftLog().getEntryWithData(next++))) { diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java index c7a7849e6a..c9b19a72a1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java @@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.leader.LogAppender; import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.metrics.RaftServerMetricsImpl; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.RaftLog; @@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.ratis.thirdparty.com.codahale.metrics.Gauge; import org.slf4j.event.Level; @@ -222,4 +224,94 @@ void runTest(CLUSTER cluster) throws Exception { Assertions.assertNotNull(last); Assertions.assertTrue(last.getIndex() <= leader.getInfo().getLastAppliedIndex()); } + + @Test + public void testNewAppendEntriesRequestAfterPurgeFollowerBehindStartIndex() throws Exception { + final RaftProperties prop = getProperties(); + RaftServerConfigKeys.Log.setPurgeGap(prop, 1); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("1KB")); + runWithNewCluster(3, cluster -> { + final long startIndexAfterPurge = setupPurgedLeaderLog(cluster); + // Test when followerNextIndex < leader's logStartIndex + runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge - 1); + }); + } + + @Test + public void testNewAppendEntriesRequestAfterPurgeFollowerAtStartIndex() throws Exception { + final RaftProperties prop = getProperties(); + RaftServerConfigKeys.Log.setPurgeGap(prop, 1); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("1KB")); + runWithNewCluster(3, cluster -> { + final long startIndexAfterPurge = setupPurgedLeaderLog(cluster); + // Test when followerNextIndex == leader's logStartIndex, but the previous index is already purged + runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge); + }); + } + + private long setupPurgedLeaderLog(CLUSTER cluster) throws Exception { + final RaftServer.Division leader = waitForLeader(cluster); + final RaftLog leaderLog = leader.getRaftLog(); + + try (RaftClient client = cluster.createClient(leader.getId())) { + for (SimpleMessage msg : generateMsgs(5)) { + client.io().send(msg); + } + } + + final long lastLogIndex = leaderLog.getLastEntryTermIndex().getIndex(); + LOG.info("Leader log lastIndex={}, startIndex={}", lastLogIndex, leaderLog.getStartIndex()); + Assertions.assertTrue(lastLogIndex > 5, "Need enough log entries for the test"); + + // Take a snapshot so that shouldInstallSnapshot() can return it + final long snapshotIndex = SimpleStateMachine4Testing.get(leader).takeSnapshot(); + LOG.info("Snapshot taken at index {}", snapshotIndex); + Assertions.assertTrue(snapshotIndex > 0, "Snapshot should have been taken"); + + final long purgeUpTo = lastLogIndex - 2; + LOG.info("Purging leader log up to index {}", purgeUpTo); + leaderLog.purge(purgeUpTo).get(); + + final long startIndexAfterPurge = leaderLog.getStartIndex(); + LOG.info("Leader log after purge: startIndex={}", startIndexAfterPurge); + Assertions.assertTrue(startIndexAfterPurge > 1, + "Purge should have advanced startIndex, but got " + startIndexAfterPurge); + + return startIndexAfterPurge; + } + + void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster, + long targetNextIndex) throws Exception { + final RaftServer.Division leader = waitForLeader(cluster); + final RaftLog leaderLog = leader.getRaftLog(); + final long startIndexAfterPurge = leaderLog.getStartIndex(); + + final Stream appenders = RaftServerTestUtil.getLogAppenders(leader); + Assertions.assertNotNull(appenders, "Leader should have log appenders"); + final LogAppender appender = appenders.findFirst().orElseThrow( + () -> new AssertionError("No log appender found")); + + Assertions.assertTrue(targetNextIndex > RaftLog.LEAST_VALID_LOG_INDEX, + "targetNextIndex should be > LEAST_VALID_LOG_INDEX"); + appender.getFollower().setNextIndex(targetNextIndex); + + LOG.info("Set follower nextIndex={}, startIndexAfterPurge={}, snapshotIndex={}", + targetNextIndex, startIndexAfterPurge, appender.getFollower().getSnapshotIndex()); + Assertions.assertEquals(0, appender.getFollower().getSnapshotIndex(), + "Follower snapshotIndex should be 0 (default, never installed snapshot)"); + + Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1), + "Entry at previousIndex=" + (targetNextIndex - 1) + " should have been purged"); + + // Should return null instead of throwing NPE + Assertions.assertNull(appender.newAppendEntriesRequest(0, false), + "newAppendEntriesRequest should return null when previous TermIndex is not found"); + + Assertions.assertEquals(targetNextIndex, appender.getFollower().getNextIndex(), + "Follower nextIndex should remain unchanged"); + + Assertions.assertNotNull(appender.shouldInstallSnapshot(), + "shouldInstallSnapshot should return non-null when followerNextIndex (" + + targetNextIndex + ") and previous entry has been purged"); + } }