diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index b4d78c207a..aa826a86ae 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -23,6 +23,7 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics; import org.apache.ratis.metrics.Timekeeper; import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; +import org.apache.ratis.proto.RaftProtos.RaftPeerProto; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServer; @@ -52,6 +53,7 @@ import java.io.InterruptedIOException; import java.util.Comparator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -250,6 +252,15 @@ private boolean installSnapshot() { if (installSnapshotEnabled) { final SnapshotInfo snapshot = shouldInstallSnapshot(); if (snapshot != null) { + final TermIndex firstAvailable = shouldNotifyToInstallSnapshot(); + if (firstAvailable != null) { + final List sourcePeers = selectSnapshotSourcePeers(firstAvailable); + if (!sourcePeers.isEmpty()) { + notifyInstallSnapshot(firstAvailable, sourcePeers); + return true; + } + } + // Fallback to the existing leader-sourced install path when no eligible source follower exists. installSnapshot(snapshot); return true; } @@ -257,7 +268,7 @@ private boolean installSnapshot() { // check installSnapshotNotification final TermIndex firstAvailable = shouldNotifyToInstallSnapshot(); if (firstAvailable != null) { - notifyInstallSnapshot(firstAvailable); + notifyInstallSnapshot(firstAvailable, selectSnapshotSourcePeers(firstAvailable)); return true; } } @@ -802,15 +813,18 @@ private void installSnapshot(SnapshotInfo snapshot) { * Send an installSnapshot notification request to the Follower. * @param firstAvailable the first available log's index on the Leader */ - private void notifyInstallSnapshot(TermIndex firstAvailable) { + private void notifyInstallSnapshot(TermIndex firstAvailable, List sourcePeers) { BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_NOTIFY, getFollower().getName(), - suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={} {}", - this, firstAvailable, getFollower().getNextIndex(), suffix)); + suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}, " + + "minimumSnapshotIndex={}, sourcePeers={} {}", + this, firstAvailable, getFollower().getNextIndex(), + getMinimumSnapshotIndex(firstAvailable), sourcePeers.size(), suffix)); final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true); StreamObserver snapshotRequestObserver = null; // prepare and enqueue the notify install snapshot request. - final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailable); + final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest( + firstAvailable, getMinimumSnapshotIndex(firstAvailable), sourcePeers); if (LOG.isInfoEnabled()) { LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request)); } @@ -832,45 +846,6 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) { responseHandler.waitForResponse(); } - /** - * Should the Leader notify the Follower to install the snapshot through - * its own State Machine. - * @return the first available log's start term index - */ - private TermIndex shouldNotifyToInstallSnapshot() { - final FollowerInfo follower = getFollower(); - final long leaderNextIndex = getRaftLog().getNextIndex(); - final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); - final long leaderStartIndex = getRaftLog().getStartIndex(); - final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex)) - .orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex)); - if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) { - // If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should - // be notified to install a snapshot. Every follower should try to install at least one snapshot during - // bootstrapping, if available. - LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", this, firstAvailable); - return firstAvailable; - } - - final long followerNextIndex = follower.getNextIndex(); - if (followerNextIndex >= leaderNextIndex) { - return null; - } - - if (followerNextIndex < leaderStartIndex) { - // The Leader does not have the logs from the Follower's last log - // index onwards. And install snapshot is disabled. So the Follower - // should be notified to install the latest snapshot through its - // State Machine. - return firstAvailable; - } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { - // Leader has no logs to check from, hence return next index. - return firstAvailable; - } - - return null; - } - static class AppendEntriesRequest { private final Timekeeper timer; @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 6dbfdb15a5..8cb92f860c 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -218,6 +218,12 @@ message InstallSnapshotRequestProto { message NotificationProto { TermIndexProto firstAvailableTermIndex = 1; // first available log index to notify Follower to install snapshot. + // Minimum acceptable snapshot index for this install request. + // 0 means unspecified for backward compatibility. + uint64 minimumSnapshotIndex = 2; + // Ranked source peers (highest preference first) from which the follower may fetch the snapshot. + // Older peers ignore this field. + repeated RaftPeerProto sourcePeers = 3; } RaftRpcRequestProto serverRequest = 1; diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java index e906dd209c..d16233a49a 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java @@ -23,6 +23,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.JavaUtils; +import java.util.Collection; import java.util.List; /** @@ -66,4 +67,7 @@ AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower, /** Received an {@link AppendEntriesReplyProto} */ void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply); + /** @return all follower states tracked by this leader. */ + Collection getFollowerInfos(); + } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index cff5425d32..79966cd51f 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -19,6 +19,7 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerProto; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; @@ -33,6 +34,8 @@ import java.io.IOException; import java.util.Comparator; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -131,6 +134,15 @@ default RaftPeerId getFollowerId() { /** @return a new {@link InstallSnapshotRequestProto} object. */ InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex); + /** + * @return a new {@link InstallSnapshotRequestProto} object carrying snapshot-install + * notification plan metadata. + */ + default InstallSnapshotRequestProto newInstallSnapshotNotificationRequest( + TermIndex firstAvailableLogTermIndex, long minimumSnapshotIndex, List sourcePeers) { + return newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex); + } + /** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */ Iterable newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot); @@ -168,6 +180,60 @@ default SnapshotInfo shouldInstallSnapshot() { return null; } + /** + * Should this {@link LogAppender} send a snapshot-install notification to the follower + * + *

The notification path is used by follower-managed snapshot installation. + * Returning null means no notification should be sent for now.

+ * + * @return the leader first available log term-index if notification should be sent; otherwise, null. + */ + default TermIndex shouldNotifyToInstallSnapshot() { + final FollowerInfo follower = getFollower(); + final long leaderNextIndex = getRaftLog().getNextIndex(); + final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); + final long leaderStartIndex = getRaftLog().getStartIndex(); + final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex)) + .orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex)); + if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) { + // Every bootstrapping follower should attempt at least one snapshot install path if needed. + return firstAvailable; + } + + final long followerNextIndex = follower.getNextIndex(); + if (followerNextIndex >= leaderNextIndex) { + return null; + } + + if (followerNextIndex < leaderStartIndex) { + // The leader does not have follower's missing logs, so snapshot install is required. + return firstAvailable; + } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { + // If leader has no local log range to compare, snapshot notification is still allowed. + return firstAvailable; + } + return null; + } + + /** + * @return the minimum snapshot index acceptable for the given notification plan. + */ + default long getMinimumSnapshotIndex(TermIndex firstAvailableLogTermIndex) { + return firstAvailableLogTermIndex.getIndex() - 1; + } + + /** + * @return the leader last entry term-index for evaluating whether followers are fully caught up. + */ + default TermIndex getLeaderLastEntryForSnapshotSourceSelection() { + final TermIndex logLast = getRaftLog().getLastEntryTermIndex(); + if (logLast != null) { + return logLast; + } + final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot(); + return snapshot != null ? snapshot.getTermIndex() : null; + } + /** Define how this {@link LogAppender} should run. */ void run() throws InterruptedException, IOException; diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 98d4537847..5d03f8d056 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -41,6 +41,7 @@ import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.Collection; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Function; @@ -294,6 +295,24 @@ default CompletableFuture notifyInstallSnapshotFromLeader( RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { return CompletableFuture.completedFuture(null); } + + /** + * Notify the {@link StateMachine} that the leader has purged entries from its log. + * In order to catch up, the {@link StateMachine} has to install a snapshot asynchronously. + * + * @param roleInfoProto information about the current node role and rpc delay information. + * @param firstTermIndexInLog the term-index of the first append entry available in the leader's log. + * @param minimumSnapshotIndex minimum acceptable snapshot index for this request. + * A value of 0 means unspecified. This is used for backward compatibility. + * @param sourcePeers ranked source peers (highest preference first) from which + * the follower may fetch the snapshot. + * @return return the last term-index in the snapshot after the snapshot installation. + */ + default CompletableFuture notifyInstallSnapshotFromLeader( + RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog, + long minimumSnapshotIndex, List sourcePeers) { + return notifyInstallSnapshotFromLeader(roleInfoProto, firstTermIndexInLog); + } } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index ef0bb6b700..56fa79a60e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -1304,6 +1304,13 @@ Stream getLogAppenders() { return StreamSupport.stream(senders.spliterator(), false); } + @Override + public Collection getFollowerInfos() { + return getLogAppenders() + .map(LogAppender::getFollower) + .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); + } + Optional getLogAppender(RaftPeerId id) { return getLogAppenders().filter(a -> a.getFollowerId().equals(id)).findAny(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 46b6aaf87f..eb16a796ff 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -126,17 +126,15 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt ServerImplUtils.assertGroup(getMemberId(), leaderId, leaderGroupId); InstallSnapshotReplyProto reply = null; - // Check if install snapshot from Leader is enabled - if (installSnapshotEnabled) { - // Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot. - if (request.hasSnapshotChunk()) { + if (request.hasSnapshotChunk()) { + // Keep existing behavior for chunked transfer, but install only if leader has it enabled. + if (installSnapshotEnabled) { reply = checkAndInstallSnapshot(request, leaderId).join(); } - } else { - // Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot. - if (request.hasNotification()) { - reply = notifyStateMachineToInstallSnapshot(request, leaderId).join(); - } + } else if (request.hasNotification()) { + // Always accept notification requests. This enables follower-sourced install plans while + // keeping leader-sourced chunk transfer as fallback in the same deployment. + reply = notifyStateMachineToInstallSnapshot(request, leaderId).join(); } if (reply != null) { @@ -308,7 +306,10 @@ private CompletableFuture notifyStateMachineToInstall // is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread, // otherwise leader could get this follower's latest nextIndex from appendEntries instead of after // acknowledging the SNAPSHOT_INSTALLED. - server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex) + final long minimumSnapshotIndex = request.getNotification().getMinimumSnapshotIndex(); + server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader( + proto, firstAvailableLogTermIndex, minimumSnapshotIndex, + request.getNotification().getSourcePeersList()) .whenComplete((reply, exception) -> { if (exception != null) { LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java index 3afb735deb..e823d21c99 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderProtoUtils.java @@ -23,6 +23,7 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto.NotificationProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto.SnapshotChunkProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerProto; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftConfiguration; import org.apache.ratis.server.RaftServer; @@ -30,6 +31,7 @@ import org.apache.ratis.server.raftlog.LogProtoUtils; import java.util.Collections; +import java.util.List; /** Leader only proto utilities. */ final class LeaderProtoUtils { @@ -55,8 +57,22 @@ static InstallSnapshotRequestProto toInstallSnapshotRequestProto( static InstallSnapshotRequestProto toInstallSnapshotRequestProto( RaftServer.Division server, RaftPeerId replyId, TermIndex firstAvailable) { + return toInstallSnapshotRequestProto(server, replyId, firstAvailable, 0L, Collections.emptyList()); + } + + static InstallSnapshotRequestProto toInstallSnapshotRequestProto( + RaftServer.Division server, RaftPeerId replyId, TermIndex firstAvailable, + long minimumSnapshotIndex, List sourcePeers) { + final NotificationProto.Builder notification = NotificationProto.newBuilder() + .setFirstAvailableTermIndex(firstAvailable.toProto()); + if (minimumSnapshotIndex > 0) { + notification.setMinimumSnapshotIndex(minimumSnapshotIndex); + } + if (sourcePeers != null && !sourcePeers.isEmpty()) { + notification.addAllSourcePeers(sourcePeers); + } return toInstallSnapshotRequestProtoBuilder(server, replyId) - .setNotification(NotificationProto.newBuilder().setFirstAvailableTermIndex(firstAvailable.toProto())) + .setNotification(notification) .build(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index 5a27cda510..e1a8192b3f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -21,6 +21,7 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerProto; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; @@ -218,6 +219,20 @@ protected LongUnaryOperator getNextIndexForError(long newNextIndex) { }; } + /** + * Select ranked follower source peers for snapshot install. + * + * An empty list means no follower satisfies the minimum conditions and + * callers should fall back to leader-sourced snapshot installation. + */ + protected List selectSnapshotSourcePeers(TermIndex firstAvailableLogTermIndex) { + return SnapshotSourceSelector.selectSourcePeers( + getLeaderState().getFollowerInfos(), + getFollowerId(), + firstAvailableLogTermIndex.getIndex(), + getLeaderLastEntryForSnapshotSourceSelection()); + } + @Override public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) @@ -274,9 +289,16 @@ private void assertProtos(List protos, long nextIndex, TermIndex @Override public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) { + return newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex, 0L, Collections.emptyList()); + } + + @Override + public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest( + TermIndex firstAvailableLogTermIndex, long minimumSnapshotIndex, List sourcePeers) { Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0); synchronized (server) { - return LeaderProtoUtils.toInstallSnapshotRequestProto(server, getFollowerId(), firstAvailableLogTermIndex); + return LeaderProtoUtils.toInstallSnapshotRequestProto( + server, getFollowerId(), firstAvailableLogTermIndex, minimumSnapshotIndex, sourcePeers); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index 9d1edd4695..5689f0becd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -21,8 +21,10 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerProto; import org.apache.ratis.rpc.CallId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.raftlog.RaftLogIOException; import org.apache.ratis.server.util.ServerStringUtils; @@ -32,6 +34,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Comparator; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -103,7 +106,7 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF return null; } - private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException { + private InstallSnapshotReplyProto installSnapshotFromLeader(SnapshotInfo snapshot) throws InterruptedIOException { String requestId = UUID.randomUUID().toString(); InstallSnapshotReplyProto reply = null; try { @@ -132,6 +135,65 @@ private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws return reply; } + private InstallSnapshotReplyProto notifyInstallSnapshot( + TermIndex firstAvailable, List sourcePeers) throws InterruptedIOException { + try { + final long minimumSnapshotIndex = getMinimumSnapshotIndex(firstAvailable); + final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest( + firstAvailable, minimumSnapshotIndex, sourcePeers); + getFollower().updateLastRpcSendTime(false); + final InstallSnapshotReplyProto reply = getServerRpc().installSnapshot(request); + getFollower().updateLastRpcResponseTime(); + return reply; + } catch (InterruptedIOException ioe) { + throw ioe; + } catch (Exception e) { + LOG.warn("{}: Failed to notify follower to install snapshot at {} with {} source peers", + this, firstAvailable, sourcePeers.size(), e); + handleException(e); + return null; + } + } + + private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException { + final TermIndex firstAvailable = shouldNotifyToInstallSnapshot(); + if (firstAvailable != null) { + final List sourcePeers = selectSnapshotSourcePeers(firstAvailable); + if (!sourcePeers.isEmpty()) { + LOG.info("{}: notify follower to install snapshot at {} from {} ranked follower sources", + this, firstAvailable, sourcePeers.size()); + return notifyInstallSnapshot(firstAvailable, sourcePeers); + } + } + // Fallback to the existing leader-driven transfer when no eligible source follower exists. + return installSnapshotFromLeader(snapshot); + } + + private void handleInstallSnapshotReply(InstallSnapshotReplyProto reply) { + switch (reply.getResult()) { + case NOT_LEADER: + onFollowerTerm(reply.getTerm()); + break; + case SNAPSHOT_INSTALLED: + case ALREADY_INSTALLED: + if (reply.getInstallSnapshotReplyBodyCase() == + InstallSnapshotReplyProto.InstallSnapshotReplyBodyCase.SNAPSHOTINDEX) { + final long installedSnapshotIndex = reply.getSnapshotIndex(); + getFollower().setSnapshotIndex(installedSnapshotIndex); + getLeaderState().onFollowerCommitIndex(getFollower(), installedSnapshotIndex); + } + getFollower().setAttemptedToInstallSnapshot(); + break; + case SUCCESS: + case SNAPSHOT_UNAVAILABLE: + case SNAPSHOT_EXPIRED: + getFollower().setAttemptedToInstallSnapshot(); + break; + default: + break; + } + } + @Override public void run() throws InterruptedException, IOException { while (isRunning()) { @@ -143,19 +205,7 @@ public void run() throws InterruptedException, IOException { final InstallSnapshotReplyProto r = installSnapshot(snapshot); if (r != null) { - switch (r.getResult()) { - case NOT_LEADER: - onFollowerTerm(r.getTerm()); - break; - case SUCCESS: - case SNAPSHOT_UNAVAILABLE: - case ALREADY_INSTALLED: - case SNAPSHOT_EXPIRED: - getFollower().setAttemptedToInstallSnapshot(); - break; - default: - break; - } + handleInstallSnapshotReply(r); } // otherwise if r is null, retry the snapshot installation } else { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java new file mode 100644 index 0000000000..cb5f8ea873 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/SnapshotSourceSelector.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.leader; + +import org.apache.ratis.proto.RaftProtos.RaftPeerProto; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.util.Timestamp; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Leader-side utility for selecting snapshot source followers. + * + *

The returned list is ranked by suitability for serving snapshots to a lagging target: + * fully caught-up followers first, followed by followers with higher replication progress + * and fresher append-response signals.

+ */ +public final class SnapshotSourceSelector { + private SnapshotSourceSelector() { + } + + /** + * Select and rank candidate source followers for snapshot download. + * If this method returns an empty list, callers should fall back to leader-sourced snapshot install. + * + *

Selection rule: + * only followers with {@code matchIndex >= firstAvailableLogIndex - 1} are returned. + *

+ * + *

Ranking rule (highest preference first): + *

    + *
  1. Fully caught up with the leader (as inferred from {@code leaderLastEntry})
  2. + *
  3. Higher {@link FollowerInfo#getMatchIndex()}
  4. + *
  5. Higher {@link FollowerInfo#getCommitIndex()}
  6. + *
  7. Fresher append-response timestamp
  8. + *
+ *

+ * + * @param followers all followers known to the leader. + * @param targetFollowerId the follower that needs the snapshot; it will be excluded from candidates. + * @param firstAvailableLogIndex leader's first available log index for catch-up. + * @param leaderLastEntry leader last entry used to infer whether a follower is fully caught up. + * @return ranked snapshot source followers; empty means "fall back to leader". + */ + public static List selectSourceFollowers( + Collection followers, RaftPeerId targetFollowerId, + long firstAvailableLogIndex, TermIndex leaderLastEntry) { + Objects.requireNonNull(followers, "followers == null"); + + // A source follower must be able to bridge the target to the leader's first available log. + // Without this lower bound, we may pick a follower that still cannot provide a usable snapshot. + final long minimumMatchIndex = firstAvailableLogIndex - 1; + final List ranked = followers.stream() + .filter(Objects::nonNull) + .filter(f -> targetFollowerId == null || !targetFollowerId.equals(f.getId())) + .filter(f -> f.getMatchIndex() >= minimumMatchIndex) + .sorted(newFollowerComparator(leaderLastEntry)) + .collect(Collectors.toList()); + return ranked.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(ranked); + } + + /** + * Convert ranked candidate followers to ranked peer protos for notification payloads. + * + * @see #selectSourceFollowers(Collection, RaftPeerId, long, TermIndex) + */ + public static List selectSourcePeers( + Collection followers, RaftPeerId targetFollowerId, + long firstAvailableLogIndex, TermIndex leaderLastEntry) { + final List peers = selectSourceFollowers( + followers, targetFollowerId, firstAvailableLogIndex, leaderLastEntry).stream() + .map(FollowerInfo::getPeer) + .filter(Objects::nonNull) + .map(peer -> peer.getRaftPeerProto()) + .collect(Collectors.toList()); + return peers.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(peers); + } + + /** + * Compare followers based on the following criteria one by one: + * - Fully caught up with the leader (as inferred from {@code leaderLastEntry}) + * - Higher {@link FollowerInfo#getMatchIndex()} + * - Higher {@link FollowerInfo#getCommitIndex()} + * - Fresher append-response timestamp + */ + private static Comparator newFollowerComparator(TermIndex leaderLastEntry) { + return Comparator.comparing(f -> isFullyCaughtUp(f, leaderLastEntry)) + .reversed() + .thenComparing(Comparator.comparingLong(FollowerInfo::getMatchIndex).reversed()) + .thenComparing(Comparator.comparingLong(FollowerInfo::getCommitIndex).reversed()) + .thenComparing(SnapshotSourceSelector::freshnessTimestamp, SnapshotSourceSelector::compareFreshness) + // Keep order deterministic when all ranking metrics are equal, we do not want it to change across runs. + .thenComparing(f -> f.getId().toString()); + } + + private static boolean isFullyCaughtUp(FollowerInfo follower, TermIndex leaderLastEntry) { + return leaderLastEntry != null && follower.getMatchIndex() >= leaderLastEntry.getIndex(); + } + + private static Timestamp freshnessTimestamp(FollowerInfo follower) { + final Timestamp appendResponse = follower.getLastRespondedAppendEntriesSendTime(); + return appendResponse != null ? appendResponse : follower.getLastRpcResponseTime(); + } + + private static int compareFreshness(Timestamp left, Timestamp right) { + if (left == right) { + return 0; + } else if (left == null) { + return 1; + } else if (right == null) { + return -1; + } + // More recent timestamp first. + return right.compareTo(left); + } +} diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index 931bf6317f..3ca1874427 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -28,6 +28,8 @@ import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.server.impl.PeerChanges; import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.leader.FollowerInfo; +import org.apache.ratis.server.leader.SnapshotSourceSelector; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.raftlog.segmented.LogSegmentPath; @@ -40,6 +42,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.Timestamp; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -50,14 +53,18 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongUnaryOperator; import java.util.function.Supplier; +import java.util.stream.Collectors; public abstract class InstallSnapshotNotificationTests extends BaseTest @@ -88,6 +95,18 @@ public abstract class InstallSnapshotNotificationTests> LAST_SOURCE_PEER_IDS = + new AtomicReference<>(Collections.emptyList()); + private static final AtomicLong LAST_MINIMUM_SNAPSHOT_INDEX = new AtomicLong(-1L); + + private enum SnapshotSourceFetchPolicy { + LEADER_ONLY, + FAIL_SOURCE_THEN_LEADER + } + + private static final AtomicReference SNAPSHOT_SOURCE_FETCH_POLICY = + new AtomicReference<>(SnapshotSourceFetchPolicy.LEADER_ONLY); private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing { @@ -97,20 +116,37 @@ private static class StateMachine4InstallSnapshotNotificationTests extends Simpl public CompletableFuture notifyInstallSnapshotFromLeader( RaftProtos.RoleInfoProto roleInfoProto, TermIndex termIndex) { + return notifyInstallSnapshotFromLeader(roleInfoProto, termIndex, 0L, Collections.emptyList()); + } + + @Override + public CompletableFuture notifyInstallSnapshotFromLeader( + RaftProtos.RoleInfoProto roleInfoProto, TermIndex termIndex, + long minimumSnapshotIndex, List sourcePeers) { if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) { return JavaUtils.completeExceptionally(new IOException("Failed " + "notifyInstallSnapshotFromLeader due to missing leader info")); } numSnapshotRequests.incrementAndGet(); + LAST_MINIMUM_SNAPSHOT_INDEX.set(minimumSnapshotIndex); + LAST_SOURCE_PEER_IDS.set(sourcePeers.stream().map(p -> RaftPeerId.valueOf(p.getId()).toString()) + .collect(Collectors.toList())); final SingleFileSnapshotInfo leaderSnapshotInfo = (SingleFileSnapshotInfo) LEADER_SNAPSHOT_INFO_REF.get(); LOG.info("{}: leaderSnapshotInfo = {}", getId(), leaderSnapshotInfo); if (leaderSnapshotInfo == null) { - return super.notifyInstallSnapshotFromLeader(roleInfoProto, termIndex); + return super.notifyInstallSnapshotFromLeader(roleInfoProto, termIndex, minimumSnapshotIndex, sourcePeers); } Supplier supplier = () -> { try { + // Simulate a source failure before retrying with leader. + // We still return success after falling back to leader so that the test can verify retry behavior. + if (!sourcePeers.isEmpty() + && SNAPSHOT_SOURCE_FETCH_POLICY.get() == SnapshotSourceFetchPolicy.FAIL_SOURCE_THEN_LEADER) { + numFallbackToLeaderFromSourceFailure.incrementAndGet(); + } + Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath(); final File followerSnapshotFilePath = new File(getStateMachineDir(), leaderSnapshotFile.getFileName().toString()); @@ -165,6 +201,302 @@ public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result, lon } + private static class TestFollowerInfo implements FollowerInfo { + private final String name; + private final RaftPeer peer; + private final long matchIndex; + private final long commitIndex; + private final Timestamp lastRpcResponseTime; + private final Timestamp lastRespondedAppendEntriesSendTime; + + TestFollowerInfo(String peerId, long matchIndex, long commitIndex, long appendResponseAgeMs) { + this.name = "test-" + peerId; + this.peer = RaftPeer.newBuilder().setId(peerId).build(); + this.matchIndex = matchIndex; + this.commitIndex = commitIndex; + final Timestamp now = Timestamp.currentTime(); + this.lastRpcResponseTime = now; + this.lastRespondedAppendEntriesSendTime = now.addTimeMs(-appendResponseAgeMs); + } + + @Override + public String getName() { + return name; + } + + @Override + public RaftPeerId getId() { + return peer.getId(); + } + + @Override + public RaftPeer getPeer() { + return peer; + } + + @Override + public long getMatchIndex() { + return matchIndex; + } + + @Override + public boolean updateMatchIndex(long newMatchIndex) { + return false; + } + + @Override + public long getCommitIndex() { + return commitIndex; + } + + @Override + public boolean updateCommitIndex(long newCommitIndex) { + return false; + } + + @Override + public long getSnapshotIndex() { + return 0L; + } + + @Override + public void setSnapshotIndex(long newSnapshotIndex) { + } + + @Override + public void setAttemptedToInstallSnapshot() { + } + + @Override + public boolean hasAttemptedToInstallSnapshot() { + return true; + } + + @Override + public long getNextIndex() { + return 0L; + } + + @Override + public void increaseNextIndex(long newNextIndex) { + } + + @Override + public void decreaseNextIndex(long newNextIndex) { + } + + @Override + public void setNextIndex(long newNextIndex) { + } + + @Override + public void updateNextIndex(long newNextIndex) { + } + + @Override + public void computeNextIndex(LongUnaryOperator op) { + } + + @Override + public Timestamp getLastRpcResponseTime() { + return lastRpcResponseTime; + } + + @Override + public Timestamp getLastRpcSendTime() { + return lastRpcResponseTime; + } + + @Override + public void updateLastRpcResponseTime() { + } + + @Override + public void updateLastRpcSendTime(boolean isHeartbeat) { + } + + @Override + public Timestamp getLastRpcTime() { + return Timestamp.latest(getLastRpcResponseTime(), getLastRpcSendTime()); + } + + @Override + public Timestamp getLastHeartbeatSendTime() { + return lastRpcResponseTime; + } + + @Override + public Timestamp getLastRespondedAppendEntriesSendTime() { + return lastRespondedAppendEntriesSendTime; + } + + @Override + public void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime) { + } + } + + @Test + public void testExactMatchSourceSelection() { + final List ranked = SnapshotSourceSelector.selectSourceFollowers( + Arrays.asList( + new TestFollowerInfo("exact", 200, 180, 10), + new TestFollowerInfo("near", 199, 190, 20), + new TestFollowerInfo("target", 205, 205, 5)), + RaftPeerId.valueOf("target"), + 150, + TermIndex.valueOf(5, 200)); + + final List ids = ranked.stream() + .map(FollowerInfo::getId) + .map(RaftPeerId::toString) + .collect(Collectors.toList()); + Assertions.assertEquals(Arrays.asList("exact", "near"), ids); + } + + @Test + public void testClosestFollowerSelectionByMatchIndexAndCommitIndex() { + final List ranked = SnapshotSourceSelector.selectSourceFollowers( + Arrays.asList( + new TestFollowerInfo("higherCommit", 320, 300, 20), + new TestFollowerInfo("lowerCommit", 320, 200, 10), + new TestFollowerInfo("lowerMatch", 319, 400, 5), + new TestFollowerInfo("belowThreshold", 198, 198, 5)), + RaftPeerId.valueOf("target"), + 200, // minimum acceptable match index is 199 + TermIndex.valueOf(8, 1000)); + + final List ids = ranked.stream() + .map(FollowerInfo::getId) + .map(RaftPeerId::toString) + .collect(Collectors.toList()); + Assertions.assertEquals(Arrays.asList("higherCommit", "lowerCommit", "lowerMatch"), ids); + } + + @Test + public void testFollowerSourceFailureRetryWithLeader() throws Exception { + final RaftProperties prop = getProperties(); + final boolean previousInstallSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(prop); + RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, true); + try { + runWithNewCluster(2, this::testFollowerSourceFailureRetryWithLeader); + } finally { + SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.LEADER_ONLY); + RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, previousInstallSnapshotEnabled); + } + } + + private void testFollowerSourceFailureRetryWithLeader(CLUSTER cluster) throws Exception { + LEADER_SNAPSHOT_INFO_REF.set(null); + numSnapshotRequests.set(0); + numFallbackToLeaderFromSourceFailure.set(0); + LAST_SOURCE_PEER_IDS.set(Collections.emptyList()); + LAST_MINIMUM_SNAPSHOT_INDEX.set(-1L); + SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.FAIL_SOURCE_THEN_LEADER); + + int i = 0; + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); + + try (final RaftClient client = cluster.createClient(leaderId)) { + for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assertions.assertTrue(reply.isSuccess()); + } + } + + final long nextIndex = leader.getRaftLog().getNextIndex(); + final List snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster, + nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); + JavaUtils.attemptRepeatedly(() -> { + Assertions.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists)); + return null; + }, 10, ONE_SECOND, "snapshotFile.exist", LOG); + + final SnapshotInfo leaderSnapshotInfo = leader.getStateMachine().getLatestSnapshot(); + Assertions.assertNotNull(leaderSnapshotInfo); + Assertions.assertTrue(LEADER_SNAPSHOT_INFO_REF.compareAndSet(null, leaderSnapshotInfo)); + + final PeerChanges change = cluster.addNewPeers(1, true); + RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration); + RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null); + + final RaftPeerId targetFollowerId = change.getAddedPeers().get(0).getId(); + final RaftServer.Division targetFollower = cluster.getDivision(targetFollowerId); + JavaUtils.attempt(() -> { + Assertions.assertEquals(leaderSnapshotInfo.getIndex(), + RaftServerTestUtil.getLatestInstalledSnapshotIndex(targetFollower)); + }, 10, ONE_SECOND, "targetFollowerInstalledSnapshot", LOG); + + // The notification contained follower sources and the state machine retried with leader after source failure. + JavaUtils.attempt(() -> { + Assertions.assertTrue(numSnapshotRequests.get() >= 1); + Assertions.assertTrue(numFallbackToLeaderFromSourceFailure.get() >= 1); + Assertions.assertFalse(LAST_SOURCE_PEER_IDS.get().isEmpty()); + Assertions.assertTrue(LAST_MINIMUM_SNAPSHOT_INDEX.get() >= 0); + }, 10, ONE_SECOND, "sourceFailureRetryWithLeader", LOG); + } + + @Test + public void testNoCandidateFallbackToLeader() throws Exception { + final RaftProperties prop = getProperties(); + final boolean previousInstallSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(prop); + RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, true); + try { + runWithNewCluster(1, this::testNoCandidateFallbackToLeader); + } finally { + SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.LEADER_ONLY); + RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, previousInstallSnapshotEnabled); + } + } + + private void testNoCandidateFallbackToLeader(CLUSTER cluster) throws Exception { + LEADER_SNAPSHOT_INFO_REF.set(null); + numSnapshotRequests.set(0); + numFallbackToLeaderFromSourceFailure.set(0); + LAST_SOURCE_PEER_IDS.set(Collections.emptyList()); + LAST_MINIMUM_SNAPSHOT_INDEX.set(-1L); + SNAPSHOT_SOURCE_FETCH_POLICY.set(SnapshotSourceFetchPolicy.FAIL_SOURCE_THEN_LEADER); + + int i = 0; + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); + + try (final RaftClient client = cluster.createClient(leaderId)) { + for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assertions.assertTrue(reply.isSuccess()); + } + } + + final long nextIndex = leader.getRaftLog().getNextIndex(); + final List snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster, + nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); + JavaUtils.attemptRepeatedly(() -> { + Assertions.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists)); + return null; + }, 10, ONE_SECOND, "snapshotFile.exist", LOG); + + final SnapshotInfo leaderSnapshotInfo = leader.getStateMachine().getLatestSnapshot(); + Assertions.assertNotNull(leaderSnapshotInfo); + Assertions.assertTrue(LEADER_SNAPSHOT_INFO_REF.compareAndSet(null, leaderSnapshotInfo)); + + final PeerChanges change = cluster.addNewPeers(1, true); + RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration); + RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null); + + final RaftPeerId targetFollowerId = change.getAddedPeers().get(0).getId(); + final RaftServer.Division targetFollower = cluster.getDivision(targetFollowerId); + JavaUtils.attempt(() -> { + Assertions.assertEquals(leaderSnapshotInfo.getIndex(), + RaftServerTestUtil.getLatestInstalledSnapshotIndex(targetFollower)); + }, 10, ONE_SECOND, "leaderFallbackInstalledSnapshot", LOG); + + // There is no follower source candidate in a single-leader cluster. The leader should directly install snapshot. + Assertions.assertEquals(0, numSnapshotRequests.get()); + Assertions.assertEquals(0, numFallbackToLeaderFromSourceFailure.get()); + Assertions.assertEquals(Collections.emptyList(), LAST_SOURCE_PEER_IDS.get()); + } + /** * Basic test for install snapshot notification: start a one node cluster * (disable install snapshot option) and let it generate a snapshot. Then