Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -832,44 +832,6 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) {
responseHandler.waitForResponse();
}

/**
* Should the Leader notify the Follower to install the snapshot through
* its own State Machine.
* @return the first available log's start term index
*/
private TermIndex shouldNotifyToInstallSnapshot() {
final FollowerInfo follower = getFollower();
final long leaderNextIndex = getRaftLog().getNextIndex();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
final long leaderStartIndex = getRaftLog().getStartIndex();
final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
.orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
// If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should
// be notified to install a snapshot. Every follower should try to install at least one snapshot during
// bootstrapping, if available.
LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", this, firstAvailable);
return firstAvailable;
}

final long followerNextIndex = follower.getNextIndex();
if (followerNextIndex >= leaderNextIndex) {
return null;
}

if (followerNextIndex < leaderStartIndex) {
// The Leader does not have the logs from the Follower's last log
// index onwards. And install snapshot is disabled. So the Follower
// should be notified to install the latest snapshot through its
// State Machine.
return firstAvailable;
} else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
// Leader has no logs to check from, hence return next index.
return firstAvailable;
}

return null;
}

static class AppendEntriesRequest {
private final Timekeeper timer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,89 @@ default RaftPeerId getFollowerId() {
/** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */
Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);

/**
* Get the previous {@link TermIndex} for the given next index.
* This is used to set the previous log entry in AppendEntries requests.
*
* @return the previous {@link TermIndex}, or null if unavailable
* (e.g. the entry has been purged and the snapshot does not cover it).
*/
default TermIndex getPrevious(long nextIndex) {
if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
return null;
}

final long previousIndex = nextIndex - 1;
final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
if (previous != null) {
return previous;
}

final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
if (snapshot != null) {
final TermIndex snapshotTermIndex = snapshot.getTermIndex();
if (snapshotTermIndex.getIndex() == previousIndex) {
return snapshotTermIndex;
}
}

return null;
}

/**
* Should this {@link LogAppender} send a snapshot to the follower?
*
* @return the snapshot if it should install a snapshot; otherwise, return null.
*/
default SnapshotInfo shouldInstallSnapshot() {
// we should install snapshot if the follower needs to catch up and:
// 1. there is no local log entry but there is snapshot
// 2. or the follower's next index is smaller than the log start index
// 3. or the follower is bootstrapping (i.e. not yet caught up) and has not installed any snapshot yet
final FollowerInfo follower = getFollower();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
return shouldInstallSnapshot(snapshot != null) ? snapshot : null;
}

/**
* Should this {@link LogAppender} send a snapshot notification to the follower?
*
* @return the first available log {@link TermIndex} if it should install a snapshot; otherwise, return null.
*/
default TermIndex shouldNotifyToInstallSnapshot() {
if (!shouldInstallSnapshot(true)) {
return null;
}
final TermIndex start = getRaftLog().getTermIndex(getRaftLog().getStartIndex());
if (start != null) {
return start;
}
// No log is currently available; return the next, which will become available in the future.
return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), getRaftLog().getNextIndex());
}

if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
if (snapshot == null) {
default boolean shouldInstallSnapshot(boolean hasSnapshot) {
final FollowerInfo follower = getFollower();
if (getLeaderState().isFollowerBootstrapping(follower)
&& !follower.hasAttemptedToInstallSnapshot()) {
if (!hasSnapshot) {
// Leader cannot send null snapshot to follower. Hence, acknowledge InstallSnapshot attempt (even though it
// was not attempted) so that follower can come out of staging state after appending log entries.
follower.setAttemptedToInstallSnapshot();
} else {
return snapshot;
}
return true;
}

final long leaderNextIndex = getRaftLog().getNextIndex();
final long followerNextIndex = getFollower().getNextIndex();
if (followerNextIndex < getRaftLog().getNextIndex()) {
final long logStartIndex = getRaftLog().getStartIndex();
if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
return snapshot;
}
if (followerNextIndex >= leaderNextIndex) {
// follower caught up already
return false;
}
return null;
final long leaderStartIndex = getRaftLog().getStartIndex();
if (followerNextIndex < leaderStartIndex || leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
// leader does not have follower's next log
return true;
}
// leader does not have the previous log for appendEntries
return followerNextIndex == leaderStartIndex &&
followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX &&
getPrevious(followerNextIndex) == null;
}

/** Define how this {@link LogAppender} should run. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,27 +165,6 @@ public boolean hasPendingDataRequests() {
return false;
}

private TermIndex getPrevious(long nextIndex) {
if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
return null;
}

final long previousIndex = nextIndex - 1;
final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
if (previous != null) {
return previous;
}

final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
if (snapshot != null) {
final TermIndex snapshotTermIndex = snapshot.getTermIndex();
if (snapshotTermIndex.getIndex() == previousIndex) {
return snapshotTermIndex;
}
}

return null;
}

protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) {
long next = replyNextIndex;
Expand Down Expand Up @@ -238,6 +217,14 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he
final long snapshotIndex = follower.getSnapshotIndex();
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems getNextIndex is called twice in this method, we might need to call it only once per method.


if (previous == null && followerNext > RaftLog.LEAST_VALID_LOG_INDEX && followerNext != snapshotIndex + 1) {
LOG.info("{}: Skipping appendEntries since the previous log entry is unavailable:" +
" follower {} nextIndex={} and snapshotIndex={} but leader startIndex={}",
this, follower.getName(), followerNext, snapshotIndex, getRaftLog().getStartIndex());
return null;
}

final long halfMs = heartbeatWaitTimeMs/2;
for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; ) {
if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
Expand Down
92 changes: 92 additions & 0 deletions ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
Expand All @@ -53,6 +54,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.apache.ratis.thirdparty.com.codahale.metrics.Gauge;
import org.slf4j.event.Level;
Expand Down Expand Up @@ -222,4 +224,94 @@ void runTest(CLUSTER cluster) throws Exception {
Assertions.assertNotNull(last);
Assertions.assertTrue(last.getIndex() <= leader.getInfo().getLastAppliedIndex());
}

@Test
public void testNewAppendEntriesRequestAfterPurgeFollowerBehindStartIndex() throws Exception {
final RaftProperties prop = getProperties();
RaftServerConfigKeys.Log.setPurgeGap(prop, 1);
RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("1KB"));
runWithNewCluster(3, cluster -> {
final long startIndexAfterPurge = setupPurgedLeaderLog(cluster);
// Test when followerNextIndex < leader's logStartIndex
runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge - 1);
});
}

@Test
public void testNewAppendEntriesRequestAfterPurgeFollowerAtStartIndex() throws Exception {
final RaftProperties prop = getProperties();
RaftServerConfigKeys.Log.setPurgeGap(prop, 1);
RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("1KB"));
runWithNewCluster(3, cluster -> {
final long startIndexAfterPurge = setupPurgedLeaderLog(cluster);
// Test when followerNextIndex == leader's logStartIndex, but the previous index is already purged
runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge);
});
}

private long setupPurgedLeaderLog(CLUSTER cluster) throws Exception {
final RaftServer.Division leader = waitForLeader(cluster);
final RaftLog leaderLog = leader.getRaftLog();

try (RaftClient client = cluster.createClient(leader.getId())) {
for (SimpleMessage msg : generateMsgs(5)) {
client.io().send(msg);
}
}

final long lastLogIndex = leaderLog.getLastEntryTermIndex().getIndex();
LOG.info("Leader log lastIndex={}, startIndex={}", lastLogIndex, leaderLog.getStartIndex());
Assertions.assertTrue(lastLogIndex > 5, "Need enough log entries for the test");

// Take a snapshot so that shouldInstallSnapshot() can return it
final long snapshotIndex = SimpleStateMachine4Testing.get(leader).takeSnapshot();
LOG.info("Snapshot taken at index {}", snapshotIndex);
Assertions.assertTrue(snapshotIndex > 0, "Snapshot should have been taken");

final long purgeUpTo = lastLogIndex - 2;
LOG.info("Purging leader log up to index {}", purgeUpTo);
leaderLog.purge(purgeUpTo).get();

final long startIndexAfterPurge = leaderLog.getStartIndex();
LOG.info("Leader log after purge: startIndex={}", startIndexAfterPurge);
Assertions.assertTrue(startIndexAfterPurge > 1,
"Purge should have advanced startIndex, but got " + startIndexAfterPurge);

return startIndexAfterPurge;
}

void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster,
long targetNextIndex) throws Exception {
final RaftServer.Division leader = waitForLeader(cluster);
final RaftLog leaderLog = leader.getRaftLog();
final long startIndexAfterPurge = leaderLog.getStartIndex();

final Stream<LogAppender> appenders = RaftServerTestUtil.getLogAppenders(leader);
Assertions.assertNotNull(appenders, "Leader should have log appenders");
final LogAppender appender = appenders.findFirst().orElseThrow(
() -> new AssertionError("No log appender found"));

Assertions.assertTrue(targetNextIndex > RaftLog.LEAST_VALID_LOG_INDEX,
"targetNextIndex should be > LEAST_VALID_LOG_INDEX");
appender.getFollower().setNextIndex(targetNextIndex);

LOG.info("Set follower nextIndex={}, startIndexAfterPurge={}, snapshotIndex={}",
targetNextIndex, startIndexAfterPurge, appender.getFollower().getSnapshotIndex());
Assertions.assertEquals(0, appender.getFollower().getSnapshotIndex(),
"Follower snapshotIndex should be 0 (default, never installed snapshot)");

Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1),
"Entry at previousIndex=" + (targetNextIndex - 1) + " should have been purged");

// Should return null instead of throwing NPE
Assertions.assertNull(appender.newAppendEntriesRequest(0, false),
"newAppendEntriesRequest should return null when previous TermIndex is not found");

Assertions.assertEquals(targetNextIndex, appender.getFollower().getNextIndex(),
"Follower nextIndex should remain unchanged");

Assertions.assertNotNull(appender.shouldInstallSnapshot(),
"shouldInstallSnapshot should return non-null when followerNextIndex ("
+ targetNextIndex + ") and previous entry has been purged");
}
}