From 023049a645ec4812f795c57467e84239a6538802 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Tue, 10 Mar 2026 00:46:00 +0530 Subject: [PATCH 1/2] RATIS-2430. Write snapshot to temporary path until finish --- .../apache/ratis/server/impl/ServerState.java | 13 +++++++++---- .../impl/SnapshotInstallationHandler.java | 9 +++++---- .../ratis/server/storage/SnapshotManager.java | 19 +++++++++++++++---- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index bcf11baf7a..7fb3f47afd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -470,11 +470,16 @@ RaftStorageImpl getStorage() { return raftStorage.getUnchecked(); } - void installSnapshot(InstallSnapshotRequestProto request) throws IOException { + void appendSnapshot(InstallSnapshotRequestProto request) throws IOException { // TODO: verify that we need to install the snapshot - StateMachine sm = server.getStateMachine(); - sm.pause(); // pause the SM to prepare for install snapshot - snapshotManager.installSnapshot(request, sm); + snapshotManager.appendSnapshot(request, server.getStateMachine()); + } + + void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException { + final StateMachine sm = server.getStateMachine(); + sm.pause(); // pause the SM right before publishing the snapshot atomically + // TODO: if there is a failure here, we need to rollback the snapshot installation. + snapshotManager.finalizeSnapshot(request); } private SnapshotInfo getLatestSnapshot() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 46b6aaf87f..57fce8adf8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -214,17 +214,18 @@ private CompletableFuture checkAndInstallSnapshot(Ins return future.thenApply(dummy -> reply); } - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); + final int expectedChunkIndex = nextChunkIndex.get(); if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() + " (the expected index is " + expectedChunkIndex + ")"); } + // Append chunks to a temporary location first. Publish only when done=true. + state.appendSnapshot(request); + nextChunkIndex.incrementAndGet(); // update the committed index // re-load the state machine if this is the last chunk if (snapshotChunkRequest.getDone()) { + state.finalizeSnapshot(request); state.reloadStateMachine(lastIncluded); chunk0CallId.set(-1); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index a96001b598..d2d44e3d40 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -99,12 +99,16 @@ private FileChannel open(FileChunkProto chunk, File tmpSnapshotFile) throws IOEx return out; } - public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException { + private File getSnapshotTmpDir(String requestId) { + return new File(this.snapshotTmpDir.get(), "snapshot-" + requestId); + } + + public void appendSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException { final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); // create a unique temporary directory - final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" + snapshotChunkRequest.getRequestId()); + final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId()); FileUtils.createDirectories(tmpDir); tmpDir.deleteOnExit(); @@ -164,9 +168,16 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st } } - if (snapshotChunkRequest.getDone()) { - rename(tmpDir, snapshotDir.get()); + } + + public void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException { + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); + if (!snapshotChunkRequest.getDone()) { + throw new IOException("Cannot finalize incomplete snapshot request: " + + ServerStringUtils.toInstallSnapshotRequestString(request)); } + final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId()); + rename(tmpDir, snapshotDir.get()); } private static void rename(File tmpDir, File stateMachineDir) throws IOException { From b79a9cda01e5f3d00c5ef39864c08e230fb16a9f Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Tue, 10 Mar 2026 10:44:44 +0530 Subject: [PATCH 2/2] Add new test class for testing partial/failing transfer --- .../server/storage/SnapshotManagerTest.java | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java new file mode 100644 index 0000000000..593642698c --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.proto.RaftProtos.FileChunkProto; +import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.SnapshotRetentionPolicy; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.FileUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.security.MessageDigest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SnapshotManagerTest { + private static final class TestRaftStorageDirectory implements RaftStorageDirectory { + private final File root; + + private TestRaftStorageDirectory(File root) { + this.root = root; + } + + @Override + public File getRoot() { + return root; + } + + @Override + public boolean isHealthy() { + return true; + } + } + + private static final StateMachineStorage EMPTY_STORAGE = new StateMachineStorage() { + @Override + public void init(RaftStorage raftStorage) { + } + + @Override + public SnapshotInfo getLatestSnapshot() { + return null; + } + + @Override + public void format() { + } + + @Override + public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) { + } + }; + + private static InstallSnapshotRequestProto newSnapshotRequest( + String requestId, int requestIndex, boolean requestDone, String filename, byte[] fullData, + int offset, int chunkSize, boolean chunkDone) throws Exception { + final FileChunkProto fileChunk = FileChunkProto.newBuilder() + .setFilename(filename) + .setTotalSize(fullData.length) + .setFileDigest(ByteString.copyFrom(md5(fullData))) + .setChunkIndex(requestIndex) + .setOffset(offset) + .setData(ByteString.copyFrom(fullData, offset, chunkSize)) + .setDone(chunkDone) + .build(); + + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunk = + InstallSnapshotRequestProto.SnapshotChunkProto.newBuilder() + .setRequestId(requestId) + .setRequestIndex(requestIndex) + .setTermIndex(TermIndex.valueOf(1L, 10L).toProto()) + .addFileChunks(fileChunk) + .setTotalSize(fullData.length) + .setDone(requestDone) + .build(); + + return InstallSnapshotRequestProto.newBuilder() + .setSnapshotChunk(snapshotChunk) + .build(); + } + + private static byte[] md5(byte[] data) throws Exception { + return MessageDigest.getInstance("MD5").digest(data); + } + + @Test + public void testAppendOnlyAndFinalizePublish() throws Exception { + final File root = Files.createTempDirectory("snapshot-manager-test").toFile(); + try { + final SnapshotManager manager = new SnapshotManager( + RaftPeerId.valueOf("s1"), () -> new TestRaftStorageDirectory(root), EMPTY_STORAGE); + final StateMachine stateMachine = mock(StateMachine.class); + when(stateMachine.getLatestSnapshot()).thenReturn(null); + + final File stateMachineDir = new File(root, RaftStorageDirectory.STATE_MACHINE_DIR_NAME); + FileUtils.createDirectories(stateMachineDir); + final File oldSnapshot = new File(stateMachineDir, "old.snapshot"); + Files.write(oldSnapshot.toPath(), "old".getBytes(StandardCharsets.UTF_8)); + + final byte[] fullData = "0123456789abcdef".getBytes(StandardCharsets.UTF_8); + final File newSnapshot = new File(stateMachineDir, "new.snapshot"); + final String filename = new File( + RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "new.snapshot").toString(); + final File tmpSnapshot = new File(new File( + new File(root, RaftStorageDirectory.TMP_DIR_NAME), "snapshot-request-1"), "new.snapshot"); + final InstallSnapshotRequestProto chunk0 = newSnapshotRequest( + "request-1", 0, false, filename, fullData, 0, 8, false); + final InstallSnapshotRequestProto chunk1 = newSnapshotRequest( + "request-1", 1, true, filename, fullData, 8, 8, true); + + manager.appendSnapshot(chunk0, stateMachine); + Assertions.assertTrue(oldSnapshot.exists()); + Assertions.assertTrue(tmpSnapshot.exists()); + Assertions.assertFalse(newSnapshot.exists()); + + manager.appendSnapshot(chunk1, stateMachine); + Assertions.assertTrue(oldSnapshot.exists()); + Assertions.assertTrue(tmpSnapshot.exists()); + Assertions.assertFalse(newSnapshot.exists()); + + manager.finalizeSnapshot(chunk1); + Assertions.assertFalse(oldSnapshot.exists()); + Assertions.assertFalse(tmpSnapshot.exists()); + Assertions.assertArrayEquals(fullData, Files.readAllBytes(newSnapshot.toPath())); + } finally { + FileUtils.deleteFully(root); + } + } + + @Test + public void testFinalizeSnapshotRejectsIncompleteRequest() throws Exception { + final File root = Files.createTempDirectory("snapshot-manager-test-incomplete").toFile(); + try { + final SnapshotManager manager = new SnapshotManager( + RaftPeerId.valueOf("s2"), () -> new TestRaftStorageDirectory(root), EMPTY_STORAGE); + + final InstallSnapshotRequestProto incomplete = newSnapshotRequest( + "request-2", 0, false, + new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "f.snapshot").toString(), + "abc".getBytes(StandardCharsets.UTF_8), 0, 3, true); + final IOException ioe = Assertions.assertThrows(IOException.class, + () -> manager.finalizeSnapshot(incomplete)); + Assertions.assertTrue(ioe.getMessage().contains("Cannot finalize incomplete snapshot request")); + } finally { + FileUtils.deleteFully(root); + } + } +}