From 237f3aa78e0bd30e0fa5532b1d9fbb71457d6d54 Mon Sep 17 00:00:00 2001
From: Abhishek Pal
Date: Mon, 9 Mar 2026 19:15:07 +0530
Subject: [PATCH 1/5] RATIS-2428. Modify install snapshot notificatio to
include ranked list of source peers plus the minimum acceptable snapshot
index.
---
ratis-proto/src/main/proto/Raft.proto | 6 ++++++
.../ratis/statemachine/StateMachine.java | 19 +++++++++++++++++++
2 files changed, 25 insertions(+)
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 6dbfdb15a5..8cb92f860c 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -218,6 +218,12 @@ message InstallSnapshotRequestProto {
message NotificationProto {
TermIndexProto firstAvailableTermIndex = 1; // first available log index to notify Follower to install snapshot.
+ // Minimum acceptable snapshot index for this install request.
+ // 0 means unspecified for backward compatibility.
+ uint64 minimumSnapshotIndex = 2;
+ // Ranked source peers (highest preference first) from which the follower may fetch the snapshot.
+ // Older peers ignore this field.
+ repeated RaftPeerProto sourcePeers = 3;
}
RaftRpcRequestProto serverRequest = 1;
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 98d4537847..5d03f8d056 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -41,6 +41,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
@@ -294,6 +295,24 @@ default CompletableFuture notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
return CompletableFuture.completedFuture(null);
}
+
+ /**
+ * Notify the {@link StateMachine} that the leader has purged entries from its log.
+ * In order to catch up, the {@link StateMachine} has to install a snapshot asynchronously.
+ *
+ * @param roleInfoProto information about the current node role and rpc delay information.
+ * @param firstTermIndexInLog the term-index of the first append entry available in the leader's log.
+ * @param minimumSnapshotIndex minimum acceptable snapshot index for this request.
+ * A value of 0 means unspecified. This is used for backward compatibility.
+ * @param sourcePeers ranked source peers (highest preference first) from which
+ * the follower may fetch the snapshot.
+ * @return return the last term-index in the snapshot after the snapshot installation.
+ */
+ default CompletableFuture notifyInstallSnapshotFromLeader(
+ RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog,
+ long minimumSnapshotIndex, List sourcePeers) {
+ return notifyInstallSnapshotFromLeader(roleInfoProto, firstTermIndexInLog);
+ }
}
/**
From 8d00c4d269dadf025b4774383817718aa3d28369 Mon Sep 17 00:00:00 2001
From: Abhishek Pal
Date: Mon, 9 Mar 2026 19:34:03 +0530
Subject: [PATCH 2/5] RATIS-2428. Add new SnapshotSourceSelector to rank
followers and return best candidate
---
.../server/leader/SnapshotSourceSelector.java | 137 ++++++++++++++++++
1 file changed, 137 insertions(+)
create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
new file mode 100644
index 0000000000..c34b3c342e
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.Timestamp;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Leader-side utility for selecting snapshot source followers.
+ *
+ * The returned list is ranked by suitability for serving snapshots to a lagging target:
+ * fully caught-up followers first, followed by followers with higher replication progress
+ * and fresher append-response signals.
+ */
+public final class SnapshotSourceSelector {
+ private SnapshotSourceSelector() {
+ }
+
+ /**
+ * Select and rank candidate source followers for snapshot download.
+ * If this method returns an empty list, callers should fall back to leader-sourced snapshot install.
+ *
+ * Selection rule:
+ * only followers with {@code matchIndex >= firstAvailableLogIndex - 1} are returned.
+ *
+ *
+ * Ranking rule (highest preference first):
+ *
+ * - Fully caught up with the leader (as inferred from {@code leaderLastEntry})
+ * - Higher {@link FollowerInfo#getMatchIndex()}
+ * - Higher {@link FollowerInfo#getCommitIndex()}
+ * - Fresher append-response timestamp
+ *
+ *
+ *
+ * @param followers all followers known to the leader.
+ * @param targetFollowerId the follower that needs the snapshot; it will be excluded from candidates.
+ * @param firstAvailableLogIndex leader's first available log index for catch-up.
+ * @param leaderLastEntry leader last entry used to infer whether a follower is fully caught up.
+ * @return ranked snapshot source followers; empty means "fall back to leader".
+ */
+ public static List selectSourceFollowers(
+ Collection followers, RaftPeerId targetFollowerId,
+ long firstAvailableLogIndex, TermIndex leaderLastEntry) {
+ Objects.requireNonNull(followers, "followers == null");
+
+ // A source follower must be able to bridge the target to the leader's first available log.
+ // Without this lower bound, we may pick a follower that still cannot provide a usable snapshot.
+ final long minimumMatchIndex = firstAvailableLogIndex - 1;
+ final List ranked = followers.stream()
+ .filter(Objects::nonNull)
+ .filter(f -> targetFollowerId == null || !targetFollowerId.equals(f.getId()))
+ .filter(f -> f.getMatchIndex() >= minimumMatchIndex)
+ .sorted(newFollowerComparator(leaderLastEntry))
+ .collect(Collectors.toList());
+ return ranked.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(ranked);
+ }
+
+ /**
+ * Convert ranked candidate followers to ranked peer protos for notification payloads.
+ *
+ * @see #selectSourceFollowers(Collection, RaftPeerId, long, TermIndex)
+ */
+ public static List selectSourcePeers(
+ Collection followers, RaftPeerId targetFollowerId,
+ long firstAvailableLogIndex, TermIndex leaderLastEntry) {
+ final List peers = selectSourceFollowers(
+ followers, targetFollowerId, firstAvailableLogIndex, leaderLastEntry).stream()
+ .map(FollowerInfo::getPeer)
+ .filter(Objects::nonNull)
+ .map(peer -> peer.getRaftPeerProto())
+ .collect(Collectors.toList());
+ return peers.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(peers);
+ }
+
+ /**
+ * Compare followers based on the following criteria one by one:
+ * - Fully caught up with the leader (as inferred from {@code leaderLastEntry})
+ * - Higher {@link FollowerInfo#getMatchIndex()}
+ * - Higher {@link FollowerInfo#getCommitIndex()}
+ * - Fresher append-response timestamp
+ */
+ private static Comparator newFollowerComparator(TermIndex leaderLastEntry) {
+ return Comparator.comparing(f -> isFullyCaughtUp(f, leaderLastEntry))
+ .reversed()
+ .thenComparingLong(FollowerInfo::getMatchIndex).reversed()
+ .thenComparingLong(FollowerInfo::getCommitIndex).reversed()
+ .thenComparing(SnapshotSourceSelector::freshnessTimestamp, SnapshotSourceSelector::compareFreshness)
+ // Keep order deterministic when all ranking metrics are equal, we do not want it to change across runs.
+ .thenComparing(f -> f.getId().toString());
+ }
+
+ private static boolean isFullyCaughtUp(FollowerInfo follower, TermIndex leaderLastEntry) {
+ return leaderLastEntry != null && follower.getMatchIndex() >= leaderLastEntry.getIndex();
+ }
+
+ private static Timestamp freshnessTimestamp(FollowerInfo follower) {
+ final Timestamp appendResponse = follower.getLastRespondedAppendEntriesSendTime();
+ return appendResponse != null ? appendResponse : follower.getLastRpcResponseTime();
+ }
+
+ private static int compareFreshness(Timestamp left, Timestamp right) {
+ if (left == right) {
+ return 0;
+ } else if (left == null) {
+ return 1;
+ } else if (right == null) {
+ return -1;
+ }
+ // More recent timestamp first.
+ return right.compareTo(left);
+ }
+}
From 9ae7e4d0eb0ff74fd6047979fd053484733434f7 Mon Sep 17 00:00:00 2001
From: Abhishek Pal
Date: Mon, 9 Mar 2026 20:27:56 +0530
Subject: [PATCH 3/5] RATIS-2428. Implement snapshot install using new snapshot
source selector
---
.../ratis/grpc/server/GrpcLogAppender.java | 63 +---
.../ratis/server/leader/LeaderState.java | 4 +
.../ratis/server/leader/LogAppender.java | 66 ++++
.../ratis/server/impl/LeaderStateImpl.java | 7 +
.../impl/SnapshotInstallationHandler.java | 21 +-
.../ratis/server/leader/LeaderProtoUtils.java | 18 +-
.../ratis/server/leader/LogAppenderBase.java | 24 +-
.../server/leader/LogAppenderDefault.java | 78 +++-
.../InstallSnapshotNotificationTests.java | 334 +++++++++++++++++-
9 files changed, 544 insertions(+), 71 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b4d78c207a..aa826a86ae 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -23,6 +23,7 @@
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
@@ -52,6 +53,7 @@
import java.io.InterruptedIOException;
import java.util.Comparator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -250,6 +252,15 @@ private boolean installSnapshot() {
if (installSnapshotEnabled) {
final SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
+ final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
+ if (firstAvailable != null) {
+ final List sourcePeers = selectSnapshotSourcePeers(firstAvailable);
+ if (!sourcePeers.isEmpty()) {
+ notifyInstallSnapshot(firstAvailable, sourcePeers);
+ return true;
+ }
+ }
+ // Fallback to the existing leader-sourced install path when no eligible source follower exists.
installSnapshot(snapshot);
return true;
}
@@ -257,7 +268,7 @@ private boolean installSnapshot() {
// check installSnapshotNotification
final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
if (firstAvailable != null) {
- notifyInstallSnapshot(firstAvailable);
+ notifyInstallSnapshot(firstAvailable, selectSnapshotSourcePeers(firstAvailable));
return true;
}
}
@@ -802,15 +813,18 @@ private void installSnapshot(SnapshotInfo snapshot) {
* Send an installSnapshot notification request to the Follower.
* @param firstAvailable the first available log's index on the Leader
*/
- private void notifyInstallSnapshot(TermIndex firstAvailable) {
+ private void notifyInstallSnapshot(TermIndex firstAvailable, List sourcePeers) {
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_NOTIFY, getFollower().getName(),
- suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={} {}",
- this, firstAvailable, getFollower().getNextIndex(), suffix));
+ suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}, "
+ + "minimumSnapshotIndex={}, sourcePeers={} {}",
+ this, firstAvailable, getFollower().getNextIndex(),
+ getMinimumSnapshotIndex(firstAvailable), sourcePeers.size(), suffix));
final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
StreamObserver snapshotRequestObserver = null;
// prepare and enqueue the notify install snapshot request.
- final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailable);
+ final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(
+ firstAvailable, getMinimumSnapshotIndex(firstAvailable), sourcePeers);
if (LOG.isInfoEnabled()) {
LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
}
@@ -832,45 +846,6 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) {
responseHandler.waitForResponse();
}
- /**
- * Should the Leader notify the Follower to install the snapshot through
- * its own State Machine.
- * @return the first available log's start term index
- */
- private TermIndex shouldNotifyToInstallSnapshot() {
- final FollowerInfo follower = getFollower();
- final long leaderNextIndex = getRaftLog().getNextIndex();
- final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
- final long leaderStartIndex = getRaftLog().getStartIndex();
- final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
- .orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
- if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
- // If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should
- // be notified to install a snapshot. Every follower should try to install at least one snapshot during
- // bootstrapping, if available.
- LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", this, firstAvailable);
- return firstAvailable;
- }
-
- final long followerNextIndex = follower.getNextIndex();
- if (followerNextIndex >= leaderNextIndex) {
- return null;
- }
-
- if (followerNextIndex < leaderStartIndex) {
- // The Leader does not have the logs from the Follower's last log
- // index onwards. And install snapshot is disabled. So the Follower
- // should be notified to install the latest snapshot through its
- // State Machine.
- return firstAvailable;
- } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
- // Leader has no logs to check from, hence return next index.
- return firstAvailable;
- }
-
- return null;
- }
-
static class AppendEntriesRequest {
private final Timekeeper timer;
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index e906dd209c..d16233a49a 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -23,6 +23,7 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.JavaUtils;
+import java.util.Collection;
import java.util.List;
/**
@@ -66,4 +67,7 @@ AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
/** Received an {@link AppendEntriesReplyProto} */
void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply);
+ /** @return all follower states tracked by this leader. */
+ Collection getFollowerInfos();
+
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index cff5425d32..79966cd51f 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -19,6 +19,7 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
@@ -33,6 +34,8 @@
import java.io.IOException;
import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -131,6 +134,15 @@ default RaftPeerId getFollowerId() {
/** @return a new {@link InstallSnapshotRequestProto} object. */
InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex);
+ /**
+ * @return a new {@link InstallSnapshotRequestProto} object carrying snapshot-install
+ * notification plan metadata.
+ */
+ default InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(
+ TermIndex firstAvailableLogTermIndex, long minimumSnapshotIndex, List sourcePeers) {
+ return newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
+ }
+
/** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */
Iterable newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);
@@ -168,6 +180,60 @@ default SnapshotInfo shouldInstallSnapshot() {
return null;
}
+ /**
+ * Should this {@link LogAppender} send a snapshot-install notification to the follower
+ *
+ * The notification path is used by follower-managed snapshot installation.
+ * Returning null means no notification should be sent for now.
+ *
+ * @return the leader first available log term-index if notification should be sent; otherwise, null.
+ */
+ default TermIndex shouldNotifyToInstallSnapshot() {
+ final FollowerInfo follower = getFollower();
+ final long leaderNextIndex = getRaftLog().getNextIndex();
+ final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
+ final long leaderStartIndex = getRaftLog().getStartIndex();
+ final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
+ .orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
+ if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
+ // Every bootstrapping follower should attempt at least one snapshot install path if needed.
+ return firstAvailable;
+ }
+
+ final long followerNextIndex = follower.getNextIndex();
+ if (followerNextIndex >= leaderNextIndex) {
+ return null;
+ }
+
+ if (followerNextIndex < leaderStartIndex) {
+ // The leader does not have follower's missing logs, so snapshot install is required.
+ return firstAvailable;
+ } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
+ // If leader has no local log range to compare, snapshot notification is still allowed.
+ return firstAvailable;
+ }
+ return null;
+ }
+
+ /**
+ * @return the minimum snapshot index acceptable for the given notification plan.
+ */
+ default long getMinimumSnapshotIndex(TermIndex firstAvailableLogTermIndex) {
+ return firstAvailableLogTermIndex.getIndex() - 1;
+ }
+
+ /**
+ * @return the leader last entry term-index for evaluating whether followers are fully caught up.
+ */
+ default TermIndex getLeaderLastEntryForSnapshotSourceSelection() {
+ final TermIndex logLast = getRaftLog().getLastEntryTermIndex();
+ if (logLast != null) {
+ return logLast;
+ }
+ final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
+ return snapshot != null ? snapshot.getTermIndex() : null;
+ }
+
/** Define how this {@link LogAppender} should run. */
void run() throws InterruptedException, IOException;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index ef0bb6b700..56fa79a60e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -1304,6 +1304,13 @@ Stream getLogAppenders() {
return StreamSupport.stream(senders.spliterator(), false);
}
+ @Override
+ public Collection getFollowerInfos() {
+ return getLogAppenders()
+ .map(LogAppender::getFollower)
+ .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
+ }
+
Optional getLogAppender(RaftPeerId id) {
return getLogAppenders().filter(a -> a.getFollowerId().equals(id)).findAny();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 46b6aaf87f..eb16a796ff 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -126,17 +126,15 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
ServerImplUtils.assertGroup(getMemberId(), leaderId, leaderGroupId);
InstallSnapshotReplyProto reply = null;
- // Check if install snapshot from Leader is enabled
- if (installSnapshotEnabled) {
- // Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot.
- if (request.hasSnapshotChunk()) {
+ if (request.hasSnapshotChunk()) {
+ // Keep existing behavior for chunked transfer, but install only if leader has it enabled.
+ if (installSnapshotEnabled) {
reply = checkAndInstallSnapshot(request, leaderId).join();
}
- } else {
- // Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot.
- if (request.hasNotification()) {
- reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
- }
+ } else if (request.hasNotification()) {
+ // Always accept notification requests. This enables follower-sourced install plans while
+ // keeping leader-sourced chunk transfer as fallback in the same deployment.
+ reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
}
if (reply != null) {
@@ -308,7 +306,10 @@ private CompletableFuture notifyStateMachineToInstall
// is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread,
// otherwise leader could get this follower's latest nextIndex from appendEntries instead of after
// acknowledging the SNAPSHOT_INSTALLED.
- server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex)
+ final long minimumSnapshotIndex = request.getNotification().getMinimumSnapshotIndex();
+ server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(
+ proto, firstAvailableLogTermIndex, minimumSnapshotIndex,
+ request.getNotification().getSourcePeersList())
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java
index 3afb735deb..e823d21c99 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java
@@ -23,6 +23,7 @@
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto.NotificationProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto.SnapshotChunkProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
@@ -30,6 +31,7 @@
import org.apache.ratis.server.raftlog.LogProtoUtils;
import java.util.Collections;
+import java.util.List;
/** Leader only proto utilities. */
final class LeaderProtoUtils {
@@ -55,8 +57,22 @@ static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
RaftServer.Division server, RaftPeerId replyId, TermIndex firstAvailable) {
+ return toInstallSnapshotRequestProto(server, replyId, firstAvailable, 0L, Collections.emptyList());
+ }
+
+ static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
+ RaftServer.Division server, RaftPeerId replyId, TermIndex firstAvailable,
+ long minimumSnapshotIndex, List sourcePeers) {
+ final NotificationProto.Builder notification = NotificationProto.newBuilder()
+ .setFirstAvailableTermIndex(firstAvailable.toProto());
+ if (minimumSnapshotIndex > 0) {
+ notification.setMinimumSnapshotIndex(minimumSnapshotIndex);
+ }
+ if (sourcePeers != null && !sourcePeers.isEmpty()) {
+ notification.addAllSourcePeers(sourcePeers);
+ }
return toInstallSnapshotRequestProtoBuilder(server, replyId)
- .setNotification(NotificationProto.newBuilder().setFirstAvailableTermIndex(firstAvailable.toProto()))
+ .setNotification(notification)
.build();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 5a27cda510..e1a8192b3f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -21,6 +21,7 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
@@ -218,6 +219,20 @@ protected LongUnaryOperator getNextIndexForError(long newNextIndex) {
};
}
+ /**
+ * Select ranked follower source peers for snapshot install.
+ *
+ * An empty list means no follower satisfies the minimum conditions and
+ * callers should fall back to leader-sourced snapshot installation.
+ */
+ protected List selectSnapshotSourcePeers(TermIndex firstAvailableLogTermIndex) {
+ return SnapshotSourceSelector.selectSourcePeers(
+ getLeaderState().getFollowerInfos(),
+ getFollowerId(),
+ firstAvailableLogTermIndex.getIndex(),
+ getLeaderLastEntryForSnapshotSourceSelection());
+ }
+
@Override
public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat)
@@ -274,9 +289,16 @@ private void assertProtos(List protos, long nextIndex, TermIndex
@Override
public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
+ return newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex, 0L, Collections.emptyList());
+ }
+
+ @Override
+ public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(
+ TermIndex firstAvailableLogTermIndex, long minimumSnapshotIndex, List sourcePeers) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
synchronized (server) {
- return LeaderProtoUtils.toInstallSnapshotRequestProto(server, getFollowerId(), firstAvailableLogTermIndex);
+ return LeaderProtoUtils.toInstallSnapshotRequestProto(
+ server, getFollowerId(), firstAvailableLogTermIndex, minimumSnapshotIndex, sourcePeers);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 9d1edd4695..5689f0becd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -21,8 +21,10 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
@@ -32,6 +34,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Comparator;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -103,7 +106,7 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF
return null;
}
- private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
+ private InstallSnapshotReplyProto installSnapshotFromLeader(SnapshotInfo snapshot) throws InterruptedIOException {
String requestId = UUID.randomUUID().toString();
InstallSnapshotReplyProto reply = null;
try {
@@ -132,6 +135,65 @@ private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws
return reply;
}
+ private InstallSnapshotReplyProto notifyInstallSnapshot(
+ TermIndex firstAvailable, List sourcePeers) throws InterruptedIOException {
+ try {
+ final long minimumSnapshotIndex = getMinimumSnapshotIndex(firstAvailable);
+ final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(
+ firstAvailable, minimumSnapshotIndex, sourcePeers);
+ getFollower().updateLastRpcSendTime(false);
+ final InstallSnapshotReplyProto reply = getServerRpc().installSnapshot(request);
+ getFollower().updateLastRpcResponseTime();
+ return reply;
+ } catch (InterruptedIOException ioe) {
+ throw ioe;
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to notify follower to install snapshot at {} with {} source peers",
+ this, firstAvailable, sourcePeers.size(), e);
+ handleException(e);
+ return null;
+ }
+ }
+
+ private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
+ final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
+ if (firstAvailable != null) {
+ final List sourcePeers = selectSnapshotSourcePeers(firstAvailable);
+ if (!sourcePeers.isEmpty()) {
+ LOG.info("{}: notify follower to install snapshot at {} from {} ranked follower sources",
+ this, firstAvailable, sourcePeers.size());
+ return notifyInstallSnapshot(firstAvailable, sourcePeers);
+ }
+ }
+ // Fallback to the existing leader-driven transfer when no eligible source follower exists.
+ return installSnapshotFromLeader(snapshot);
+ }
+
+ private void handleInstallSnapshotReply(InstallSnapshotReplyProto reply) {
+ switch (reply.getResult()) {
+ case NOT_LEADER:
+ onFollowerTerm(reply.getTerm());
+ break;
+ case SNAPSHOT_INSTALLED:
+ case ALREADY_INSTALLED:
+ if (reply.getInstallSnapshotReplyBodyCase() ==
+ InstallSnapshotReplyProto.InstallSnapshotReplyBodyCase.SNAPSHOTINDEX) {
+ final long installedSnapshotIndex = reply.getSnapshotIndex();
+ getFollower().setSnapshotIndex(installedSnapshotIndex);
+ getLeaderState().onFollowerCommitIndex(getFollower(), installedSnapshotIndex);
+ }
+ getFollower().setAttemptedToInstallSnapshot();
+ break;
+ case SUCCESS:
+ case SNAPSHOT_UNAVAILABLE:
+ case SNAPSHOT_EXPIRED:
+ getFollower().setAttemptedToInstallSnapshot();
+ break;
+ default:
+ break;
+ }
+ }
+
@Override
public void run() throws InterruptedException, IOException {
while (isRunning()) {
@@ -143,19 +205,7 @@ public void run() throws InterruptedException, IOException {
final InstallSnapshotReplyProto r = installSnapshot(snapshot);
if (r != null) {
- switch (r.getResult()) {
- case NOT_LEADER:
- onFollowerTerm(r.getTerm());
- break;
- case SUCCESS:
- case SNAPSHOT_UNAVAILABLE:
- case ALREADY_INSTALLED:
- case SNAPSHOT_EXPIRED:
- getFollower().setAttemptedToInstallSnapshot();
- break;
- default:
- break;
- }
+ handleInstallSnapshotReply(r);
}
// otherwise if r is null, retry the snapshot installation
} else {
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 931bf6317f..3ca1874427 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -28,6 +28,8 @@
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.PeerChanges;
import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.leader.FollowerInfo;
+import org.apache.ratis.server.leader.SnapshotSourceSelector;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
@@ -40,6 +42,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.Timestamp;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -50,14 +53,18 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
public abstract class InstallSnapshotNotificationTests
extends BaseTest
@@ -88,6 +95,18 @@ public abstract class InstallSnapshotNotificationTests> LAST_SOURCE_PEER_IDS =
+ new AtomicReference<>(Collections.emptyList());
+ private static final AtomicLong LAST_MINIMUM_SNAPSHOT_INDEX = new AtomicLong(-1L);
+
+ private enum SnapshotSourceFetchPolicy {
+ LEADER_ONLY,
+ FAIL_SOURCE_THEN_LEADER
+ }
+
+ private static final AtomicReference SNAPSHOT_SOURCE_FETCH_POLICY =
+ new AtomicReference<>(SnapshotSourceFetchPolicy.LEADER_ONLY);
private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing {
@@ -97,20 +116,37 @@ private static class StateMachine4InstallSnapshotNotificationTests extends Simpl
public CompletableFuture notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto,
TermIndex termIndex) {
+ return notifyInstallSnapshotFromLeader(roleInfoProto, termIndex, 0L, Collections.emptyList());
+ }
+
+ @Override
+ public CompletableFuture notifyInstallSnapshotFromLeader(
+ RaftProtos.RoleInfoProto roleInfoProto, TermIndex termIndex,
+ long minimumSnapshotIndex, List sourcePeers) {
if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) {
return JavaUtils.completeExceptionally(new IOException("Failed " +
"notifyInstallSnapshotFromLeader due to missing leader info"));
}
numSnapshotRequests.incrementAndGet();
+ LAST_MINIMUM_SNAPSHOT_INDEX.set(minimumSnapshotIndex);
+ LAST_SOURCE_PEER_IDS.set(sourcePeers.stream().map(p -> RaftPeerId.valueOf(p.getId()).toString())
+ .collect(Collectors.toList()));
final SingleFileSnapshotInfo leaderSnapshotInfo = (SingleFileSnapshotInfo) LEADER_SNAPSHOT_INFO_REF.get();
LOG.info("{}: leaderSnapshotInfo = {}", getId(), leaderSnapshotInfo);
if (leaderSnapshotInfo == null) {
- return super.notifyInstallSnapshotFromLeader(roleInfoProto, termIndex);
+ return super.notifyInstallSnapshotFromLeader(roleInfoProto, termIndex, minimumSnapshotIndex, sourcePeers);
}
Supplier supplier = () -> {
try {
+ // Simulate a source failure before retrying with leader.
+ // We still return success after falling back to leader so that the test can verify retry behavior.
+ if (!sourcePeers.isEmpty()
+ && SNAPSHOT_SOURCE_FETCH_POLICY.get() == SnapshotSourceFetchPolicy.FAIL_SOURCE_THEN_LEADER) {
+ numFallbackToLeaderFromSourceFailure.incrementAndGet();
+ }
+
Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
final File followerSnapshotFilePath = new File(getStateMachineDir(),
leaderSnapshotFile.getFileName().toString());
@@ -165,6 +201,302 @@ public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result, lon
}
+ private static class TestFollowerInfo implements FollowerInfo {
+ private final String name;
+ private final RaftPeer peer;
+ private final long matchIndex;
+ private final long commitIndex;
+ private final Timestamp lastRpcResponseTime;
+ private final Timestamp lastRespondedAppendEntriesSendTime;
+
+ TestFollowerInfo(String peerId, long matchIndex, long commitIndex, long appendResponseAgeMs) {
+ this.name = "test-" + peerId;
+ this.peer = RaftPeer.newBuilder().setId(peerId).build();
+ this.matchIndex = matchIndex;
+ this.commitIndex = commitIndex;
+ final Timestamp now = Timestamp.currentTime();
+ this.lastRpcResponseTime = now;
+ this.lastRespondedAppendEntriesSendTime = now.addTimeMs(-appendResponseAgeMs);
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public RaftPeerId getId() {
+ return peer.getId();
+ }
+
+ @Override
+ public RaftPeer getPeer() {
+ return peer;
+ }
+
+ @Override
+ public long getMatchIndex() {
+ return matchIndex;
+ }
+
+ @Override
+ public boolean updateMatchIndex(long newMatchIndex) {
+ return false;
+ }
+
+ @Override
+ public long getCommitIndex() {
+ return commitIndex;
+ }
+
+ @Override
+ public boolean updateCommitIndex(long newCommitIndex) {
+ return false;
+ }
+
+ @Override
+ public long getSnapshotIndex() {
+ return 0L;
+ }
+
+ @Override
+ public void setSnapshotIndex(long newSnapshotIndex) {
+ }
+
+ @Override
+ public void setAttemptedToInstallSnapshot() {
+ }
+
+ @Override
+ public boolean hasAttemptedToInstallSnapshot() {
+ return true;
+ }
+
+ @Override
+ public long getNextIndex() {
+ return 0L;
+ }
+
+ @Override
+ public void increaseNextIndex(long newNextIndex) {
+ }
+
+ @Override
+ public void decreaseNextIndex(long newNextIndex) {
+ }
+
+ @Override
+ public void setNextIndex(long newNextIndex) {
+ }
+
+ @Override
+ public void updateNextIndex(long newNextIndex) {
+ }
+
+ @Override
+ public void computeNextIndex(LongUnaryOperator op) {
+ }
+
+ @Override
+ public Timestamp getLastRpcResponseTime() {
+ return lastRpcResponseTime;
+ }
+
+ @Override
+ public Timestamp getLastRpcSendTime() {
+ return lastRpcResponseTime;
+ }
+
+ @Override
+ public void updateLastRpcResponseTime() {
+ }
+
+ @Override
+ public void updateLastRpcSendTime(boolean isHeartbeat) {
+ }
+
+ @Override
+ public Timestamp getLastRpcTime() {
+ return Timestamp.latest(getLastRpcResponseTime(), getLastRpcSendTime());
+ }
+
+ @Override
+ public Timestamp getLastHeartbeatSendTime() {
+ return lastRpcResponseTime;
+ }
+
+ @Override
+ public Timestamp getLastRespondedAppendEntriesSendTime() {
+ return lastRespondedAppendEntriesSendTime;
+ }
+
+ @Override
+ public void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime) {
+ }
+ }
+
+ @Test
+ public void testExactMatchSourceSelection() {
+ final List ranked = SnapshotSourceSelector.selectSourceFollowers(
+ Arrays.asList(
+ new TestFollowerInfo("exact", 200, 180, 10),
+ new TestFollowerInfo("near", 199, 190, 20),
+ new TestFollowerInfo("target", 205, 205, 5)),
+ RaftPeerId.valueOf("target"),
+ 150,
+ TermIndex.valueOf(5, 200));
+
+ final List ids = ranked.stream()
+ .map(FollowerInfo::getId)
+ .map(RaftPeerId::toString)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(Arrays.asList("exact", "near"), ids);
+ }
+
+ @Test
+ public void testClosestFollowerSelectionByMatchIndexAndCommitIndex() {
+ final List ranked = SnapshotSourceSelector.selectSourceFollowers(
+ Arrays.asList(
+ new TestFollowerInfo("higherCommit", 320, 300, 20),
+ new TestFollowerInfo("lowerCommit", 320, 200, 10),
+ new TestFollowerInfo("lowerMatch", 319, 400, 5),
+ new TestFollowerInfo("belowThreshold", 198, 198, 5)),
+ RaftPeerId.valueOf("target"),
+ 200, // minimum acceptable match index is 199
+ TermIndex.valueOf(8, 1000));
+
+ final List ids = ranked.stream()
+ .map(FollowerInfo::getId)
+ .map(RaftPeerId::toString)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(Arrays.asList("higherCommit", "lowerCommit", "lowerMatch"), ids);
+ }
+
+ @Test
+ public void testFollowerSourceFailureRetryWithLeader() throws Exception {
+ final RaftProperties prop = getProperties();
+ final boolean previousInstallSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(prop);
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, true);
+ try {
+ runWithNewCluster(2, this::testFollowerSourceFailureRetryWithLeader);
+ } finally {
+ SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.LEADER_ONLY);
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, previousInstallSnapshotEnabled);
+ }
+ }
+
+ private void testFollowerSourceFailureRetryWithLeader(CLUSTER cluster) throws Exception {
+ LEADER_SNAPSHOT_INFO_REF.set(null);
+ numSnapshotRequests.set(0);
+ numFallbackToLeaderFromSourceFailure.set(0);
+ LAST_SOURCE_PEER_IDS.set(Collections.emptyList());
+ LAST_MINIMUM_SNAPSHOT_INDEX.set(-1L);
+ SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.FAIL_SOURCE_THEN_LEADER);
+
+ int i = 0;
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = leader.getId();
+
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+ Assertions.assertTrue(reply.isSuccess());
+ }
+ }
+
+ final long nextIndex = leader.getRaftLog().getNextIndex();
+ final List snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+ nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attemptRepeatedly(() -> {
+ Assertions.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+ return null;
+ }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+
+ final SnapshotInfo leaderSnapshotInfo = leader.getStateMachine().getLatestSnapshot();
+ Assertions.assertNotNull(leaderSnapshotInfo);
+ Assertions.assertTrue(LEADER_SNAPSHOT_INFO_REF.compareAndSet(null, leaderSnapshotInfo));
+
+ final PeerChanges change = cluster.addNewPeers(1, true);
+ RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration);
+ RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null);
+
+ final RaftPeerId targetFollowerId = change.getAddedPeers().get(0).getId();
+ final RaftServer.Division targetFollower = cluster.getDivision(targetFollowerId);
+ JavaUtils.attempt(() -> {
+ Assertions.assertEquals(leaderSnapshotInfo.getIndex(),
+ RaftServerTestUtil.getLatestInstalledSnapshotIndex(targetFollower));
+ }, 10, ONE_SECOND, "targetFollowerInstalledSnapshot", LOG);
+
+ // The notification contained follower sources and the state machine retried with leader after source failure.
+ JavaUtils.attempt(() -> {
+ Assertions.assertTrue(numSnapshotRequests.get() >= 1);
+ Assertions.assertTrue(numFallbackToLeaderFromSourceFailure.get() >= 1);
+ Assertions.assertFalse(LAST_SOURCE_PEER_IDS.get().isEmpty());
+ Assertions.assertTrue(LAST_MINIMUM_SNAPSHOT_INDEX.get() >= 0);
+ }, 10, ONE_SECOND, "sourceFailureRetryWithLeader", LOG);
+ }
+
+ @Test
+ public void testNoCandidateFallbackToLeader() throws Exception {
+ final RaftProperties prop = getProperties();
+ final boolean previousInstallSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(prop);
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, true);
+ try {
+ runWithNewCluster(1, this::testNoCandidateFallbackToLeader);
+ } finally {
+ SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.LEADER_ONLY);
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, previousInstallSnapshotEnabled);
+ }
+ }
+
+ private void testNoCandidateFallbackToLeader(CLUSTER cluster) throws Exception {
+ LEADER_SNAPSHOT_INFO_REF.set(null);
+ numSnapshotRequests.set(0);
+ numFallbackToLeaderFromSourceFailure.set(0);
+ LAST_SOURCE_PEER_IDS.set(Collections.emptyList());
+ LAST_MINIMUM_SNAPSHOT_INDEX.set(-1L);
+ SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.FAIL_SOURCE_THEN_LEADER);
+
+ int i = 0;
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = leader.getId();
+
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+ Assertions.assertTrue(reply.isSuccess());
+ }
+ }
+
+ final long nextIndex = leader.getRaftLog().getNextIndex();
+ final List snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+ nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attemptRepeatedly(() -> {
+ Assertions.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+ return null;
+ }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+
+ final SnapshotInfo leaderSnapshotInfo = leader.getStateMachine().getLatestSnapshot();
+ Assertions.assertNotNull(leaderSnapshotInfo);
+ Assertions.assertTrue(LEADER_SNAPSHOT_INFO_REF.compareAndSet(null, leaderSnapshotInfo));
+
+ final PeerChanges change = cluster.addNewPeers(1, true);
+ RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration);
+ RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null);
+
+ final RaftPeerId targetFollowerId = change.getAddedPeers().get(0).getId();
+ final RaftServer.Division targetFollower = cluster.getDivision(targetFollowerId);
+ JavaUtils.attempt(() -> {
+ Assertions.assertEquals(leaderSnapshotInfo.getIndex(),
+ RaftServerTestUtil.getLatestInstalledSnapshotIndex(targetFollower));
+ }, 10, ONE_SECOND, "leaderFallbackInstalledSnapshot", LOG);
+
+ // There is no follower source candidate in a single-leader cluster. The leader should directly install snapshot.
+ Assertions.assertEquals(0, numSnapshotRequests.get());
+ Assertions.assertEquals(0, numFallbackToLeaderFromSourceFailure.get());
+ Assertions.assertEquals(Collections.emptyList(), LAST_SOURCE_PEER_IDS.get());
+ }
+
/**
* Basic test for install snapshot notification: start a one node cluster
* (disable install snapshot option) and let it generate a snapshot. Then
From 200b4e9ca3553f6a09e766849356c5a913084255 Mon Sep 17 00:00:00 2001
From: Abhishek Pal
Date: Mon, 9 Mar 2026 20:28:53 +0530
Subject: [PATCH 4/5] Fix an issue where comparator keeps being reversed due to
chained call
---
.../apache/ratis/server/leader/SnapshotSourceSelector.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
index c34b3c342e..fdd75a1e94 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
@@ -107,8 +107,8 @@ public static List selectSourcePeers(
private static Comparator newFollowerComparator(TermIndex leaderLastEntry) {
return Comparator.comparing(f -> isFullyCaughtUp(f, leaderLastEntry))
.reversed()
- .thenComparingLong(FollowerInfo::getMatchIndex).reversed()
- .thenComparingLong(FollowerInfo::getCommitIndex).reversed()
+ .thenComparing(Comparator.comparingLong(FollowerInfo::getMatchIndex).reversed())
+ .thenComparing(Comparator.comparingLong(FollowerInfo::getCommitIndex).reversed())
.thenComparing(SnapshotSourceSelector::freshnessTimestamp, SnapshotSourceSelector::compareFreshness)
// Keep order deterministic when all ranking metrics are equal, we do not want it to change across runs.
.thenComparing(f -> f.getId().toString());
From 66b89bbfad3bc92225cbdf2ff85d5dd744e650ec Mon Sep 17 00:00:00 2001
From: Abhishek Pal
Date: Mon, 9 Mar 2026 21:10:42 +0530
Subject: [PATCH 5/5] Fix checkstyle issue
---
.../apache/ratis/server/leader/SnapshotSourceSelector.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
index fdd75a1e94..cb5f8ea873 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java
@@ -43,11 +43,11 @@ private SnapshotSourceSelector() {
/**
* Select and rank candidate source followers for snapshot download.
* If this method returns an empty list, callers should fall back to leader-sourced snapshot install.
- *
+ *
* Selection rule:
* only followers with {@code matchIndex >= firstAvailableLogIndex - 1} are returned.
*
- *
+ *
* Ranking rule (highest preference first):
*
* - Fully caught up with the leader (as inferred from {@code leaderLastEntry})