Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.io.InterruptedIOException;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -250,14 +252,23 @@ private boolean installSnapshot() {
if (installSnapshotEnabled) {
final SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
if (firstAvailable != null) {
final List<RaftPeerProto> sourcePeers = selectSnapshotSourcePeers(firstAvailable);
if (!sourcePeers.isEmpty()) {
notifyInstallSnapshot(firstAvailable, sourcePeers);
return true;
}
}
// Fallback to the existing leader-sourced install path when no eligible source follower exists.
installSnapshot(snapshot);
return true;
}
} else {
// check installSnapshotNotification
final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
if (firstAvailable != null) {
notifyInstallSnapshot(firstAvailable);
notifyInstallSnapshot(firstAvailable, selectSnapshotSourcePeers(firstAvailable));
return true;
}
}
Expand Down Expand Up @@ -802,15 +813,18 @@ private void installSnapshot(SnapshotInfo snapshot) {
* Send an installSnapshot notification request to the Follower.
* @param firstAvailable the first available log's index on the Leader
*/
private void notifyInstallSnapshot(TermIndex firstAvailable) {
private void notifyInstallSnapshot(TermIndex firstAvailable, List<RaftPeerProto> sourcePeers) {
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_NOTIFY, getFollower().getName(),
suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={} {}",
this, firstAvailable, getFollower().getNextIndex(), suffix));
suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}, "
+ "minimumSnapshotIndex={}, sourcePeers={} {}",
this, firstAvailable, getFollower().getNextIndex(),
getMinimumSnapshotIndex(firstAvailable), sourcePeers.size(), suffix));

final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
// prepare and enqueue the notify install snapshot request.
final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailable);
final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(
firstAvailable, getMinimumSnapshotIndex(firstAvailable), sourcePeers);
if (LOG.isInfoEnabled()) {
LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
}
Expand All @@ -832,45 +846,6 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) {
responseHandler.waitForResponse();
}

/**
* Should the Leader notify the Follower to install the snapshot through
* its own State Machine.
* @return the first available log's start term index
*/
private TermIndex shouldNotifyToInstallSnapshot() {
final FollowerInfo follower = getFollower();
final long leaderNextIndex = getRaftLog().getNextIndex();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
final long leaderStartIndex = getRaftLog().getStartIndex();
final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
.orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
// If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should
// be notified to install a snapshot. Every follower should try to install at least one snapshot during
// bootstrapping, if available.
LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", this, firstAvailable);
return firstAvailable;
}

final long followerNextIndex = follower.getNextIndex();
if (followerNextIndex >= leaderNextIndex) {
return null;
}

if (followerNextIndex < leaderStartIndex) {
// The Leader does not have the logs from the Follower's last log
// index onwards. And install snapshot is disabled. So the Follower
// should be notified to install the latest snapshot through its
// State Machine.
return firstAvailable;
} else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
// Leader has no logs to check from, hence return next index.
return firstAvailable;
}

return null;
}

static class AppendEntriesRequest {
private final Timekeeper timer;
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
Expand Down
6 changes: 6 additions & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ message InstallSnapshotRequestProto {

message NotificationProto {
TermIndexProto firstAvailableTermIndex = 1; // first available log index to notify Follower to install snapshot.
// Minimum acceptable snapshot index for this install request.
// 0 means unspecified for backward compatibility.
uint64 minimumSnapshotIndex = 2;
// Ranked source peers (highest preference first) from which the follower may fetch the snapshot.
// Older peers ignore this field.
repeated RaftPeerProto sourcePeers = 3;
}

RaftRpcRequestProto serverRequest = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.JavaUtils;

import java.util.Collection;
import java.util.List;

/**
Expand Down Expand Up @@ -66,4 +67,7 @@ AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
/** Received an {@link AppendEntriesReplyProto} */
void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply);

/** @return all follower states tracked by this leader. */
Collection<FollowerInfo> getFollowerInfos();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
Expand All @@ -33,6 +34,8 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

Expand Down Expand Up @@ -131,6 +134,15 @@ default RaftPeerId getFollowerId() {
/** @return a new {@link InstallSnapshotRequestProto} object. */
InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex);

/**
* @return a new {@link InstallSnapshotRequestProto} object carrying snapshot-install
* notification plan metadata.
*/
default InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(
TermIndex firstAvailableLogTermIndex, long minimumSnapshotIndex, List<RaftPeerProto> sourcePeers) {
return newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
}

/** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */
Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);

Expand Down Expand Up @@ -168,6 +180,60 @@ default SnapshotInfo shouldInstallSnapshot() {
return null;
}

/**
* Should this {@link LogAppender} send a snapshot-install notification to the follower
*
* <p>The notification path is used by follower-managed snapshot installation.
* Returning null means no notification should be sent for now.</p>
*
* @return the leader first available log term-index if notification should be sent; otherwise, null.
*/
default TermIndex shouldNotifyToInstallSnapshot() {
final FollowerInfo follower = getFollower();
final long leaderNextIndex = getRaftLog().getNextIndex();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
final long leaderStartIndex = getRaftLog().getStartIndex();
final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
.orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
// Every bootstrapping follower should attempt at least one snapshot install path if needed.
return firstAvailable;
}

final long followerNextIndex = follower.getNextIndex();
if (followerNextIndex >= leaderNextIndex) {
return null;
}

if (followerNextIndex < leaderStartIndex) {
// The leader does not have follower's missing logs, so snapshot install is required.
return firstAvailable;
} else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
// If leader has no local log range to compare, snapshot notification is still allowed.
return firstAvailable;
}
return null;
}

/**
* @return the minimum snapshot index acceptable for the given notification plan.
*/
default long getMinimumSnapshotIndex(TermIndex firstAvailableLogTermIndex) {
return firstAvailableLogTermIndex.getIndex() - 1;
}

/**
* @return the leader last entry term-index for evaluating whether followers are fully caught up.
*/
default TermIndex getLeaderLastEntryForSnapshotSourceSelection() {
final TermIndex logLast = getRaftLog().getLastEntryTermIndex();
if (logLast != null) {
return logLast;
}
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
return snapshot != null ? snapshot.getTermIndex() : null;
}

/** Define how this {@link LogAppender} should run. */
void run() throws InterruptedException, IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
Expand Down Expand Up @@ -294,6 +295,24 @@ default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
return CompletableFuture.completedFuture(null);
}

/**
* Notify the {@link StateMachine} that the leader has purged entries from its log.
* In order to catch up, the {@link StateMachine} has to install a snapshot asynchronously.
*
* @param roleInfoProto information about the current node role and rpc delay information.
* @param firstTermIndexInLog the term-index of the first append entry available in the leader's log.
* @param minimumSnapshotIndex minimum acceptable snapshot index for this request.
* A value of 0 means unspecified. This is used for backward compatibility.
* @param sourcePeers ranked source peers (highest preference first) from which
* the follower may fetch the snapshot.
* @return return the last term-index in the snapshot after the snapshot installation.
*/
default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog,
long minimumSnapshotIndex, List<RaftPeerProto> sourcePeers) {
return notifyInstallSnapshotFromLeader(roleInfoProto, firstTermIndexInLog);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,13 @@ Stream<LogAppender> getLogAppenders() {
return StreamSupport.stream(senders.spliterator(), false);
}

@Override
public Collection<FollowerInfo> getFollowerInfos() {
return getLogAppenders()
.map(LogAppender::getFollower)
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
}

Optional<LogAppender> getLogAppender(RaftPeerId id) {
return getLogAppenders().filter(a -> a.getFollowerId().equals(id)).findAny();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,15 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
ServerImplUtils.assertGroup(getMemberId(), leaderId, leaderGroupId);

InstallSnapshotReplyProto reply = null;
// Check if install snapshot from Leader is enabled
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot.
if (request.hasSnapshotChunk()) {
if (request.hasSnapshotChunk()) {
// Keep existing behavior for chunked transfer, but install only if leader has it enabled.
if (installSnapshotEnabled) {
reply = checkAndInstallSnapshot(request, leaderId).join();
}
} else {
// Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot.
if (request.hasNotification()) {
reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
}
} else if (request.hasNotification()) {
// Always accept notification requests. This enables follower-sourced install plans while
// keeping leader-sourced chunk transfer as fallback in the same deployment.
reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
}

if (reply != null) {
Expand Down Expand Up @@ -308,7 +306,10 @@ private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstall
// is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread,
// otherwise leader could get this follower's latest nextIndex from appendEntries instead of after
// acknowledging the SNAPSHOT_INSTALLED.
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex)
final long minimumSnapshotIndex = request.getNotification().getMinimumSnapshotIndex();
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(
proto, firstAvailableLogTermIndex, minimumSnapshotIndex,
request.getNotification().getSourcePeersList())
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto.NotificationProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto.SnapshotChunkProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;

import java.util.Collections;
import java.util.List;

/** Leader only proto utilities. */
final class LeaderProtoUtils {
Expand All @@ -55,8 +57,22 @@ static InstallSnapshotRequestProto toInstallSnapshotRequestProto(

static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
RaftServer.Division server, RaftPeerId replyId, TermIndex firstAvailable) {
return toInstallSnapshotRequestProto(server, replyId, firstAvailable, 0L, Collections.emptyList());
}

static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
RaftServer.Division server, RaftPeerId replyId, TermIndex firstAvailable,
long minimumSnapshotIndex, List<RaftPeerProto> sourcePeers) {
final NotificationProto.Builder notification = NotificationProto.newBuilder()
.setFirstAvailableTermIndex(firstAvailable.toProto());
if (minimumSnapshotIndex > 0) {
notification.setMinimumSnapshotIndex(minimumSnapshotIndex);
}
if (sourcePeers != null && !sourcePeers.isEmpty()) {
notification.addAllSourcePeers(sourcePeers);
}
return toInstallSnapshotRequestProtoBuilder(server, replyId)
.setNotification(NotificationProto.newBuilder().setFirstAvailableTermIndex(firstAvailable.toProto()))
.setNotification(notification)
.build();
}

Expand Down
Loading
Loading