From 6df6072307f523e598a6f3f4db284c0b7c39ab28 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 2 Apr 2026 10:53:45 +0800 Subject: [PATCH 1/8] Add regression test to reproduce NPE --- .../org/apache/ratis/LogAppenderTests.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java index c7a7849e6a..51eb791369 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java @@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.leader.LogAppender; import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.metrics.RaftServerMetricsImpl; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.RaftLog; @@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.ratis.thirdparty.com.codahale.metrics.Gauge; import org.slf4j.event.Level; @@ -222,4 +224,69 @@ void runTest(CLUSTER cluster) throws Exception { Assertions.assertNotNull(last); Assertions.assertTrue(last.getIndex() <= leader.getInfo().getLastAppliedIndex()); } + + @Test + public void testNewAppendEntriesRequestAfterPurge() throws Exception { + final RaftProperties prop = getProperties(); + RaftServerConfigKeys.Log.setPurgeGap(prop, 1); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("1KB")); + runWithNewCluster(3, this::runTestNewAppendEntriesRequestAfterPurge); + } + + void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster) throws Exception { + final RaftServer.Division leader = waitForLeader(cluster); + final RaftLog leaderLog = leader.getRaftLog(); + + // Write enough large messages to create multiple closed log segments + try (RaftClient client = cluster.createClient(leader.getId())) { + for (SimpleMessage msg : generateMsgs(5)) { + client.io().send(msg); + } + } + + final long lastLogIndex = leaderLog.getLastEntryTermIndex().getIndex(); + LOG.info("Leader log lastIndex={}, startIndex={}", lastLogIndex, leaderLog.getStartIndex()); + Assertions.assertTrue(lastLogIndex > 5, "Need enough log entries for the test"); + + final Stream appenders = RaftServerTestUtil.getLogAppenders(leader); + Assertions.assertNotNull(appenders, "Leader should have log appenders"); + final LogAppender appender = appenders.findFirst().orElseThrow( + () -> new AssertionError("No log appender found")); + + // Purge the leader's log to remove old closed segments + final long purgeUpTo = lastLogIndex - 2; + LOG.info("Purging leader log up to index {}", purgeUpTo); + leaderLog.purge(purgeUpTo).get(); + + final long startIndexAfterPurge = leaderLog.getStartIndex(); + LOG.info("Leader log after purge: startIndex={}", startIndexAfterPurge); + Assertions.assertTrue(startIndexAfterPurge > 1, + "Purge should have advanced startIndex, but got " + startIndexAfterPurge); + + // Set the follower's nextIndex to a value where the previous entry + // has been purged, but snapshotIndex remains 0 (default). + // Set nextIndex = startIndexAfterPurge so that previousIndex = startIndexAfterPurge - 1 + // which has been purged. + final long targetNextIndex = startIndexAfterPurge; + LOG.info("Setting follower nextIndex to {} (previous={} has been purged, startIndex={})", + targetNextIndex, targetNextIndex - 1, startIndexAfterPurge); + appender.getFollower().setNextIndex(targetNextIndex); + + final long followerSnapshotIndex = appender.getFollower().getSnapshotIndex(); + LOG.info("Follower snapshotIndex={}, nextIndex={}", followerSnapshotIndex, targetNextIndex); + Assertions.assertEquals(0, followerSnapshotIndex, + "Follower snapshotIndex should be 0 (default, never installed snapshot)"); + + // Verify the entry at targetNextIndex exists (so the buffer fill loop will succeed) + // but the entry at targetNextIndex - 1 does not (so getPrevious returns null) + Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1), + "Entry at previousIndex=" + (targetNextIndex - 1) + " should have been purged"); + Assertions.assertNotNull(leaderLog.getTermIndex(targetNextIndex), + "Entry at nextIndex=" + targetNextIndex + " should still exist"); + + // Call newAppendEntriesRequest - this should handle the purged + // previous entry gracefully by returning null to trigger snapshot install + // instead of throwing NPE. + Assertions.assertNull(appender.newAppendEntriesRequest(0, false)); + } } From 59d78dabca34bb68f7d07df1add937f38315a280 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 2 Apr 2026 11:02:22 +0800 Subject: [PATCH 2/8] Add additional check when previous entry is null --- .../org/apache/ratis/server/leader/LogAppenderBase.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index be0404da36..bd392b76c5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -238,6 +238,14 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he final long snapshotIndex = follower.getSnapshotIndex(); final long leaderNext = getRaftLog().getNextIndex(); final long followerNext = follower.getNextIndex(); + + if (previous == null && followerNext > RaftLog.LEAST_VALID_LOG_INDEX && followerNext != snapshotIndex + 1) { + LOG.warn("{}: Previous log entry not found for follower {} with followerNextIndex = {} (snapshotIndex = {})" + + ". The log entry might already be purged", + this, follower, followerNext, snapshotIndex); + return null; + } + final long halfMs = heartbeatWaitTimeMs/2; for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; ) { if (!buffer.offer(getRaftLog().getEntryWithData(next++))) { From 534479115f40a0f15947a92727ecee427e90ea82 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 2 Apr 2026 11:46:24 +0800 Subject: [PATCH 3/8] Add snapshot test --- .../org/apache/ratis/LogAppenderTests.java | 61 +++++++++++++------ 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java index 51eb791369..90dd1cde0c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java @@ -248,6 +248,12 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster) throws Exception LOG.info("Leader log lastIndex={}, startIndex={}", lastLogIndex, leaderLog.getStartIndex()); Assertions.assertTrue(lastLogIndex > 5, "Need enough log entries for the test"); + // Take a snapshot so that shouldInstallSnapshot() can return it + final long snapshotIndex = SimpleStateMachine4Testing.get(leader).takeSnapshot(); + LOG.info("Snapshot taken at index {}", snapshotIndex); + Assertions.assertTrue(snapshotIndex > 0, "Snapshot should have been taken"); + + // Get a LogAppender for one of the followers final Stream appenders = RaftServerTestUtil.getLogAppenders(leader); Assertions.assertNotNull(appenders, "Leader should have log appenders"); final LogAppender appender = appenders.findFirst().orElseThrow( @@ -263,30 +269,49 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster) throws Exception Assertions.assertTrue(startIndexAfterPurge > 1, "Purge should have advanced startIndex, but got " + startIndexAfterPurge); - // Set the follower's nextIndex to a value where the previous entry - // has been purged, but snapshotIndex remains 0 (default). - // Set nextIndex = startIndexAfterPurge so that previousIndex = startIndexAfterPurge - 1 - // which has been purged. - final long targetNextIndex = startIndexAfterPurge; - LOG.info("Setting follower nextIndex to {} (previous={} has been purged, startIndex={})", - targetNextIndex, targetNextIndex - 1, startIndexAfterPurge); + // Set the follower's nextIndex below startIndexAfterPurge so that: + // - previousIndex has been purged (getPrevious returns null) + // - followerNextIndex < logStartIndex (triggers snapshot install/notification) + // - snapshotIndex remains 0 (never installed a snapshot) + final long targetNextIndex = startIndexAfterPurge - 1; + Assertions.assertTrue(targetNextIndex > RaftLog.LEAST_VALID_LOG_INDEX, + "targetNextIndex should be > LEAST_VALID_LOG_INDEX"); appender.getFollower().setNextIndex(targetNextIndex); - final long followerSnapshotIndex = appender.getFollower().getSnapshotIndex(); - LOG.info("Follower snapshotIndex={}, nextIndex={}", followerSnapshotIndex, targetNextIndex); - Assertions.assertEquals(0, followerSnapshotIndex, + LOG.info("Set follower nextIndex={}, startIndexAfterPurge={}, snapshotIndex={}", + targetNextIndex, startIndexAfterPurge, appender.getFollower().getSnapshotIndex()); + Assertions.assertEquals(0, appender.getFollower().getSnapshotIndex(), "Follower snapshotIndex should be 0 (default, never installed snapshot)"); - // Verify the entry at targetNextIndex exists (so the buffer fill loop will succeed) - // but the entry at targetNextIndex - 1 does not (so getPrevious returns null) + // Verify the previous entry has been purged Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1), "Entry at previousIndex=" + (targetNextIndex - 1) + " should have been purged"); - Assertions.assertNotNull(leaderLog.getTermIndex(targetNextIndex), - "Entry at nextIndex=" + targetNextIndex + " should still exist"); - // Call newAppendEntriesRequest - this should handle the purged - // previous entry gracefully by returning null to trigger snapshot install - // instead of throwing NPE. - Assertions.assertNull(appender.newAppendEntriesRequest(0, false)); + // Update the follower's RPC times so that getHeartbeatWaitTimeMs() > 0, + // ensuring newAppendEntriesRequest takes the non-heartbeat path. + appender.getFollower().updateLastRpcResponseTime(); + appender.getFollower().updateLastRpcSendTime(true); + + Assertions.assertNull(appender.newAppendEntriesRequest(0, false), + "newAppendEntriesRequest should return null when previous TermIndex is not found"); + + // Verify the follower is behind the log start index. + // This is the condition checked by both shouldInstallSnapshot() (when + // installSnapshotEnabled=true) and shouldNotifyToInstallSnapshot() (when + // installSnapshotEnabled=false) to trigger snapshot recovery. + Assertions.assertTrue(appender.getFollower().getNextIndex() < leaderLog.getStartIndex(), + "Follower nextIndex (" + appender.getFollower().getNextIndex() + + ") should be < log startIndex (" + leaderLog.getStartIndex() + + ") to trigger snapshot install/notification"); + + + Assertions.assertNotNull(appender.shouldInstallSnapshot(), + "shouldInstallSnapshot should return non-null when followerNextIndex (" + + targetNextIndex + ") < logStartIndex (" + startIndexAfterPurge + ")"); + + // When installSnapshotEnabled=false, GrpcLogAppender.run() calls the + // private shouldNotifyToInstallSnapshot() which checks the same + // followerNextIndex < leaderStartIndex condition (already verified above) + // and notifies the follower to install a snapshot via its state machine. } } From 8124fdcdef279cf110426d49ef96047184aff032 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 2 Apr 2026 17:02:52 +0800 Subject: [PATCH 4/8] Update shouldInstallSnapshot logic to also send install snapshot if nextIndex == startIndex --- .../ratis/grpc/server/GrpcLogAppender.java | 9 ++- .../ratis/server/leader/LogAppender.java | 5 +- .../ratis/server/leader/LogAppenderBase.java | 7 +- .../org/apache/ratis/LogAppenderTests.java | 72 +++++++++---------- 4 files changed, 46 insertions(+), 47 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index b4d78c207a..d1af5c236b 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -857,11 +857,10 @@ private TermIndex shouldNotifyToInstallSnapshot() { return null; } - if (followerNextIndex < leaderStartIndex) { - // The Leader does not have the logs from the Follower's last log - // index onwards. And install snapshot is disabled. So the Follower - // should be notified to install the latest snapshot through its - // State Machine. + if (followerNextIndex <= leaderStartIndex) { + // The Leader does not have the log entry preceding the follower's + // next index (it may have been purged). Notify the follower to + // install the latest snapshot through its State Machine. return firstAvailable; } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { // Leader has no logs to check from, hence return next index. diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index cff5425d32..b8b66749b9 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -142,7 +142,8 @@ default RaftPeerId getFollowerId() { default SnapshotInfo shouldInstallSnapshot() { // we should install snapshot if the follower needs to catch up and: // 1. there is no local log entry but there is snapshot - // 2. or the follower's next index is smaller than the log start index + // 2. or the follower's next index is at or before the log start index + // (the previous log entry required for AppendEntries may have been purged) // 3. or the follower is bootstrapping (i.e. not yet caught up) and has not installed any snapshot yet final FollowerInfo follower = getFollower(); final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); @@ -161,7 +162,7 @@ default SnapshotInfo shouldInstallSnapshot() { final long followerNextIndex = getFollower().getNextIndex(); if (followerNextIndex < getRaftLog().getNextIndex()) { final long logStartIndex = getRaftLog().getStartIndex(); - if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { + if (followerNextIndex <= logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { return snapshot; } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index bd392b76c5..9e2fa9a397 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -240,9 +240,10 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he final long followerNext = follower.getNextIndex(); if (previous == null && followerNext > RaftLog.LEAST_VALID_LOG_INDEX && followerNext != snapshotIndex + 1) { - LOG.warn("{}: Previous log entry not found for follower {} with followerNextIndex = {} (snapshotIndex = {})" + - ". The log entry might already be purged", - this, follower, followerNext, snapshotIndex); + LOG.warn("{}: Previous log entry not found for follower {} with followerNextIndex = {} " + + "(followerSnapshotIndex = {}, leaderLogStartIndex = {})." + + " The log entry might already be purged.", + this, follower.getName(), followerNext, snapshotIndex, getRaftLog().getStartIndex()); return null; } diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java index 90dd1cde0c..0f7f254a89 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java @@ -226,18 +226,33 @@ void runTest(CLUSTER cluster) throws Exception { } @Test - public void testNewAppendEntriesRequestAfterPurge() throws Exception { + public void testNewAppendEntriesRequestAfterPurgeFollowerBehindStartIndex() throws Exception { final RaftProperties prop = getProperties(); RaftServerConfigKeys.Log.setPurgeGap(prop, 1); RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("1KB")); - runWithNewCluster(3, this::runTestNewAppendEntriesRequestAfterPurge); + runWithNewCluster(3, cluster -> { + final long startIndexAfterPurge = setupPurgedLeaderLog(cluster); + // Test when followerNextIndex < leader's logStartIndex + runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge - 1); + }); } - void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster) throws Exception { + @Test + public void testNewAppendEntriesRequestAfterPurgeFollowerAtStartIndex() throws Exception { + final RaftProperties prop = getProperties(); + RaftServerConfigKeys.Log.setPurgeGap(prop, 1); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("1KB")); + runWithNewCluster(3, cluster -> { + final long startIndexAfterPurge = setupPurgedLeaderLog(cluster); + // Test when followerNextIndex == leader's logStartIndex, but the previous index is already purged + runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge); + }); + } + + private long setupPurgedLeaderLog(CLUSTER cluster) throws Exception { final RaftServer.Division leader = waitForLeader(cluster); final RaftLog leaderLog = leader.getRaftLog(); - // Write enough large messages to create multiple closed log segments try (RaftClient client = cluster.createClient(leader.getId())) { for (SimpleMessage msg : generateMsgs(5)) { client.io().send(msg); @@ -253,13 +268,6 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster) throws Exception LOG.info("Snapshot taken at index {}", snapshotIndex); Assertions.assertTrue(snapshotIndex > 0, "Snapshot should have been taken"); - // Get a LogAppender for one of the followers - final Stream appenders = RaftServerTestUtil.getLogAppenders(leader); - Assertions.assertNotNull(appenders, "Leader should have log appenders"); - final LogAppender appender = appenders.findFirst().orElseThrow( - () -> new AssertionError("No log appender found")); - - // Purge the leader's log to remove old closed segments final long purgeUpTo = lastLogIndex - 2; LOG.info("Purging leader log up to index {}", purgeUpTo); leaderLog.purge(purgeUpTo).get(); @@ -269,11 +277,20 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster) throws Exception Assertions.assertTrue(startIndexAfterPurge > 1, "Purge should have advanced startIndex, but got " + startIndexAfterPurge); - // Set the follower's nextIndex below startIndexAfterPurge so that: - // - previousIndex has been purged (getPrevious returns null) - // - followerNextIndex < logStartIndex (triggers snapshot install/notification) - // - snapshotIndex remains 0 (never installed a snapshot) - final long targetNextIndex = startIndexAfterPurge - 1; + return startIndexAfterPurge; + } + + void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster, + long targetNextIndex) throws Exception { + final RaftServer.Division leader = waitForLeader(cluster); + final RaftLog leaderLog = leader.getRaftLog(); + final long startIndexAfterPurge = leaderLog.getStartIndex(); + + final Stream appenders = RaftServerTestUtil.getLogAppenders(leader); + Assertions.assertNotNull(appenders, "Leader should have log appenders"); + final LogAppender appender = appenders.findFirst().orElseThrow( + () -> new AssertionError("No log appender found")); + Assertions.assertTrue(targetNextIndex > RaftLog.LEAST_VALID_LOG_INDEX, "targetNextIndex should be > LEAST_VALID_LOG_INDEX"); appender.getFollower().setNextIndex(targetNextIndex); @@ -287,31 +304,12 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster) throws Exception Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1), "Entry at previousIndex=" + (targetNextIndex - 1) + " should have been purged"); - // Update the follower's RPC times so that getHeartbeatWaitTimeMs() > 0, - // ensuring newAppendEntriesRequest takes the non-heartbeat path. - appender.getFollower().updateLastRpcResponseTime(); - appender.getFollower().updateLastRpcSendTime(true); - + // Should not throw NPE Assertions.assertNull(appender.newAppendEntriesRequest(0, false), "newAppendEntriesRequest should return null when previous TermIndex is not found"); - // Verify the follower is behind the log start index. - // This is the condition checked by both shouldInstallSnapshot() (when - // installSnapshotEnabled=true) and shouldNotifyToInstallSnapshot() (when - // installSnapshotEnabled=false) to trigger snapshot recovery. - Assertions.assertTrue(appender.getFollower().getNextIndex() < leaderLog.getStartIndex(), - "Follower nextIndex (" + appender.getFollower().getNextIndex() - + ") should be < log startIndex (" + leaderLog.getStartIndex() - + ") to trigger snapshot install/notification"); - - Assertions.assertNotNull(appender.shouldInstallSnapshot(), "shouldInstallSnapshot should return non-null when followerNextIndex (" - + targetNextIndex + ") < logStartIndex (" + startIndexAfterPurge + ")"); - - // When installSnapshotEnabled=false, GrpcLogAppender.run() calls the - // private shouldNotifyToInstallSnapshot() which checks the same - // followerNextIndex < leaderStartIndex condition (already verified above) - // and notifies the follower to install a snapshot via its state machine. + + targetNextIndex + ") <= logStartIndex (" + leaderLog.getStartIndex() + ")"); } } From 0ff6c20ce416a9ea7711640a10296257bd373094 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 2 Apr 2026 17:46:26 +0800 Subject: [PATCH 5/8] Revert patch which might trigger infinite snapshot --- .../org/apache/ratis/grpc/server/GrpcLogAppender.java | 9 +++++---- .../java/org/apache/ratis/server/leader/LogAppender.java | 5 ++--- .../src/test/java/org/apache/ratis/LogAppenderTests.java | 8 +++----- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index d1af5c236b..b4d78c207a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -857,10 +857,11 @@ private TermIndex shouldNotifyToInstallSnapshot() { return null; } - if (followerNextIndex <= leaderStartIndex) { - // The Leader does not have the log entry preceding the follower's - // next index (it may have been purged). Notify the follower to - // install the latest snapshot through its State Machine. + if (followerNextIndex < leaderStartIndex) { + // The Leader does not have the logs from the Follower's last log + // index onwards. And install snapshot is disabled. So the Follower + // should be notified to install the latest snapshot through its + // State Machine. return firstAvailable; } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { // Leader has no logs to check from, hence return next index. diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index b8b66749b9..cff5425d32 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -142,8 +142,7 @@ default RaftPeerId getFollowerId() { default SnapshotInfo shouldInstallSnapshot() { // we should install snapshot if the follower needs to catch up and: // 1. there is no local log entry but there is snapshot - // 2. or the follower's next index is at or before the log start index - // (the previous log entry required for AppendEntries may have been purged) + // 2. or the follower's next index is smaller than the log start index // 3. or the follower is bootstrapping (i.e. not yet caught up) and has not installed any snapshot yet final FollowerInfo follower = getFollower(); final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); @@ -162,7 +161,7 @@ default SnapshotInfo shouldInstallSnapshot() { final long followerNextIndex = getFollower().getNextIndex(); if (followerNextIndex < getRaftLog().getNextIndex()) { final long logStartIndex = getRaftLog().getStartIndex(); - if (followerNextIndex <= logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { + if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { return snapshot; } } diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java index 0f7f254a89..a7939bb637 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java @@ -300,16 +300,14 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster, Assertions.assertEquals(0, appender.getFollower().getSnapshotIndex(), "Follower snapshotIndex should be 0 (default, never installed snapshot)"); - // Verify the previous entry has been purged Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1), "Entry at previousIndex=" + (targetNextIndex - 1) + " should have been purged"); - // Should not throw NPE + // Should return null instead of throwing NPE Assertions.assertNull(appender.newAppendEntriesRequest(0, false), "newAppendEntriesRequest should return null when previous TermIndex is not found"); - Assertions.assertNotNull(appender.shouldInstallSnapshot(), - "shouldInstallSnapshot should return non-null when followerNextIndex (" - + targetNextIndex + ") <= logStartIndex (" + leaderLog.getStartIndex() + ")"); + Assertions.assertEquals(targetNextIndex, appender.getFollower().getNextIndex(), + "Follower nextIndex should remain unchanged"); } } From 5ff6be10cb9b79618ee12bfa80c2d1086fabba6c Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 2 Apr 2026 17:58:54 +0800 Subject: [PATCH 6/8] Leader takes into account previous log when checking whether to trigger snapshot --- .../ratis/grpc/server/GrpcLogAppender.java | 4 +++ .../ratis/server/leader/LogAppender.java | 32 +++++++++++++++++++ .../ratis/server/leader/LogAppenderBase.java | 21 ------------ .../org/apache/ratis/LogAppenderTests.java | 4 +++ 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index b4d78c207a..26ac125d3a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -868,6 +868,10 @@ private TermIndex shouldNotifyToInstallSnapshot() { return firstAvailable; } + if (followerNextIndex == leaderStartIndex && getPrevious(followerNextIndex) == null) { + return firstAvailable; + } + return null; } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index cff5425d32..5f27c737ba 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -134,6 +134,35 @@ default RaftPeerId getFollowerId() { /** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */ Iterable newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot); + /** + * Get the previous {@link TermIndex} for the given next index. + * This is used to set the previous log entry in AppendEntries requests. + * + * @return the previous {@link TermIndex}, or null if unavailable + * (e.g. the entry has been purged and the snapshot does not cover it). + */ + default TermIndex getPrevious(long nextIndex) { + if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) { + return null; + } + + final long previousIndex = nextIndex - 1; + final TermIndex previous = getRaftLog().getTermIndex(previousIndex); + if (previous != null) { + return previous; + } + + final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot(); + if (snapshot != null) { + final TermIndex snapshotTermIndex = snapshot.getTermIndex(); + if (snapshotTermIndex.getIndex() == previousIndex) { + return snapshotTermIndex; + } + } + + return null; + } + /** * Should this {@link LogAppender} send a snapshot to the follower? * @@ -164,6 +193,9 @@ default SnapshotInfo shouldInstallSnapshot() { if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { return snapshot; } + if (followerNextIndex == logStartIndex && getPrevious(followerNextIndex) == null) { + return snapshot; + } } return null; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index 9e2fa9a397..d17525b859 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -165,27 +165,6 @@ public boolean hasPendingDataRequests() { return false; } - private TermIndex getPrevious(long nextIndex) { - if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) { - return null; - } - - final long previousIndex = nextIndex - 1; - final TermIndex previous = getRaftLog().getTermIndex(previousIndex); - if (previous != null) { - return previous; - } - - final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); - if (snapshot != null) { - final TermIndex snapshotTermIndex = snapshot.getTermIndex(); - if (snapshotTermIndex.getIndex() == previousIndex) { - return snapshotTermIndex; - } - } - - return null; - } protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) { long next = replyNextIndex; diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java index a7939bb637..c9b19a72a1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java @@ -309,5 +309,9 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster, Assertions.assertEquals(targetNextIndex, appender.getFollower().getNextIndex(), "Follower nextIndex should remain unchanged"); + + Assertions.assertNotNull(appender.shouldInstallSnapshot(), + "shouldInstallSnapshot should return non-null when followerNextIndex (" + + targetNextIndex + ") and previous entry has been purged"); } } From 4970bc4ab238afe64b29cf50c51cf952ea702575 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 3 Apr 2026 13:14:36 +0800 Subject: [PATCH 7/8] Fix infinite snapshot if log is empty --- .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 4 +++- .../main/java/org/apache/ratis/server/leader/LogAppender.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 26ac125d3a..2434616094 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -868,7 +868,9 @@ private TermIndex shouldNotifyToInstallSnapshot() { return firstAvailable; } - if (followerNextIndex == leaderStartIndex && getPrevious(followerNextIndex) == null) { + if (followerNextIndex == leaderStartIndex + && followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX + && getPrevious(followerNextIndex) == null) { return firstAvailable; } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index 5f27c737ba..a02724710d 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -193,7 +193,9 @@ default SnapshotInfo shouldInstallSnapshot() { if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { return snapshot; } - if (followerNextIndex == logStartIndex && getPrevious(followerNextIndex) == null) { + if (followerNextIndex == logStartIndex + && followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX + && getPrevious(followerNextIndex) == null) { return snapshot; } } From 671416cb327b1f9b294c8cf8aef9edfaf9735d40 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 3 Apr 2026 13:37:47 +0800 Subject: [PATCH 8/8] Address comments --- .../ratis/grpc/server/GrpcLogAppender.java | 44 -------------- .../ratis/server/leader/LogAppender.java | 59 ++++++++++++------- .../ratis/server/leader/LogAppenderBase.java | 5 +- 3 files changed, 40 insertions(+), 68 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 2434616094..69421e9f0f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -832,50 +832,6 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) { responseHandler.waitForResponse(); } - /** - * Should the Leader notify the Follower to install the snapshot through - * its own State Machine. - * @return the first available log's start term index - */ - private TermIndex shouldNotifyToInstallSnapshot() { - final FollowerInfo follower = getFollower(); - final long leaderNextIndex = getRaftLog().getNextIndex(); - final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); - final long leaderStartIndex = getRaftLog().getStartIndex(); - final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex)) - .orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex)); - if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) { - // If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should - // be notified to install a snapshot. Every follower should try to install at least one snapshot during - // bootstrapping, if available. - LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", this, firstAvailable); - return firstAvailable; - } - - final long followerNextIndex = follower.getNextIndex(); - if (followerNextIndex >= leaderNextIndex) { - return null; - } - - if (followerNextIndex < leaderStartIndex) { - // The Leader does not have the logs from the Follower's last log - // index onwards. And install snapshot is disabled. So the Follower - // should be notified to install the latest snapshot through its - // State Machine. - return firstAvailable; - } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { - // Leader has no logs to check from, hence return next index. - return firstAvailable; - } - - if (followerNextIndex == leaderStartIndex - && followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX - && getPrevious(followerNextIndex) == null) { - return firstAvailable; - } - - return null; - } static class AppendEntriesRequest { private final Timekeeper timer; diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index a02724710d..33914fde7f 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -169,37 +169,54 @@ default TermIndex getPrevious(long nextIndex) { * @return the snapshot if it should install a snapshot; otherwise, return null. */ default SnapshotInfo shouldInstallSnapshot() { - // we should install snapshot if the follower needs to catch up and: - // 1. there is no local log entry but there is snapshot - // 2. or the follower's next index is smaller than the log start index - // 3. or the follower is bootstrapping (i.e. not yet caught up) and has not installed any snapshot yet - final FollowerInfo follower = getFollower(); - final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot(); + return shouldInstallSnapshot(snapshot != null) ? snapshot : null; + } - if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) { - if (snapshot == null) { + /** + * Should this {@link LogAppender} send a snapshot notification to the follower? + * + * @return the first available log {@link TermIndex} if it should install a snapshot; otherwise, return null. + */ + default TermIndex shouldNotifyToInstallSnapshot() { + if (!shouldInstallSnapshot(true)) { + return null; + } + final TermIndex start = getRaftLog().getTermIndex(getRaftLog().getStartIndex()); + if (start != null) { + return start; + } + // No log is currently available; return the next, which will become available in the future. + return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), getRaftLog().getNextIndex()); + } + + default boolean shouldInstallSnapshot(boolean hasSnapshot) { + final FollowerInfo follower = getFollower(); + if (getLeaderState().isFollowerBootstrapping(follower) + && !follower.hasAttemptedToInstallSnapshot()) { + if (!hasSnapshot) { // Leader cannot send null snapshot to follower. Hence, acknowledge InstallSnapshot attempt (even though it // was not attempted) so that follower can come out of staging state after appending log entries. follower.setAttemptedToInstallSnapshot(); - } else { - return snapshot; } + return true; } + final long leaderNextIndex = getRaftLog().getNextIndex(); final long followerNextIndex = getFollower().getNextIndex(); - if (followerNextIndex < getRaftLog().getNextIndex()) { - final long logStartIndex = getRaftLog().getStartIndex(); - if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { - return snapshot; - } - if (followerNextIndex == logStartIndex - && followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX - && getPrevious(followerNextIndex) == null) { - return snapshot; - } + if (followerNextIndex >= leaderNextIndex) { + // follower caught up already + return false; } - return null; + final long leaderStartIndex = getRaftLog().getStartIndex(); + if (followerNextIndex < leaderStartIndex || leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { + // leader does not have follower's next log + return true; + } + // leader does not have the previous log for appendEntries + return followerNextIndex == leaderStartIndex && + followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX && + getPrevious(followerNextIndex) == null; } /** Define how this {@link LogAppender} should run. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index d17525b859..4f558e0c7c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -219,9 +219,8 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he final long followerNext = follower.getNextIndex(); if (previous == null && followerNext > RaftLog.LEAST_VALID_LOG_INDEX && followerNext != snapshotIndex + 1) { - LOG.warn("{}: Previous log entry not found for follower {} with followerNextIndex = {} " - + "(followerSnapshotIndex = {}, leaderLogStartIndex = {})." - + " The log entry might already be purged.", + LOG.info("{}: Skipping appendEntries since the previous log entry is unavailable:" + + " follower {} nextIndex={} and snapshotIndex={} but leader startIndex={}", this, follower.getName(), followerNext, snapshotIndex, getRaftLog().getStartIndex()); return null; }