diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 849e87175e346..db53bc9a7ea30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -435,10 +435,13 @@ public IgniteSnapshotManager(GridKernalContext ctx) { .order(ByteOrder.nativeOrder())); startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::initLocalSnapshotStartStage, - this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new); + this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new, + req -> req.incremental() ? + "Incremental snapshot creation is not allowed when rolling upgrade is enabled." : null); endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::initLocalSnapshotEndStage, - this::processLocalSnapshotEndStageResult, (reqId, req) -> new InitMessage<>(reqId, END_SNAPSHOT, req, true)); + this::processLocalSnapshotEndStageResult, (reqId, req) -> new InitMessage<>(reqId, END_SNAPSHOT, req, true), + req -> null); marsh = ctx.marshallerContext().jdkMarshaller(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java index 73298b6696e7d..cb73f4909ede5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java @@ -118,6 +118,26 @@ public DistributedProcess( Function> exec, CI3, Map> finish, BiFunction> initMsgFactory + ) { + this(ctx, type, exec, finish, initMsgFactory, + req -> "Failed to start distributed process " + type + ": rolling upgrade is enabled"); + } + + /** + * @param ctx Kernal context. + * @param type Process type. + * @param exec Execute action and returns future with the single node result to send to the coordinator. + * @param finish Finish process closure. Called on each node when all single nodes results received. + * @param initMsgFactory Factory which creates custom {@link InitMessage} for distributed process initialization. + * @param rollingUpgradeValidator Rolling upgrade validator. Returns rejection reason or {@code null} if the process is allowed. + */ + public DistributedProcess( + GridKernalContext ctx, + DistributedProcessType type, + Function> exec, + CI3, Map> finish, + BiFunction> initMsgFactory, + Function rollingUpgradeValidator ) { this.ctx = ctx; this.type = type; @@ -154,12 +174,14 @@ public DistributedProcess( try { IgniteInternalFuture fut; - if (ctx.rollingUpgrade().enabled()) { - fut = new GridFinishedFuture<>(new IgniteException("Failed to start distributed process " - + type + ": rolling upgrade is enabled")); - } + I req = (I)msg.request(); + + String rejectMsg = ctx.rollingUpgrade().enabled() ? rollingUpgradeValidator.apply(req) : null; + + if (rejectMsg != null) + fut = new GridFinishedFuture<>(new IgniteException(rejectMsg)); else - fut = exec.apply((I)msg.request()); + fut = exec.apply(req); fut.listen(() -> { if (fut.error() != null) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRollingUpgradeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRollingUpgradeTest.java index aa4b7e6fefde0..7ed91ebeeb39e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRollingUpgradeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRollingUpgradeTest.java @@ -27,7 +27,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; -import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; /** */ public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest { @@ -36,7 +36,8 @@ public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)) + .setWalCompactionEnabled(true)); return cfg; } @@ -48,28 +49,55 @@ public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest { cleanPersistenceDir(); } - /** Tests that snapshot creation fails when rolling upgrade is enabled. */ + /** Tests that full snapshot creation is allowed when rolling upgrade is enabled. */ @Test - public void testSnapshotCreationFailsDuringRollingUpgrade() throws Exception { + public void testSnapshotCreationSucceedsDuringRollingUpgrade() throws Exception { IgniteEx srv = startGrid(0); srv.cluster().state(ClusterState.ACTIVE); - IgniteProductVersion curVer = srv.context().discovery().localNode().version(); + srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0); - IgniteProductVersion targetVer = IgniteProductVersion.fromString(curVer.major() - + "." + curVer.minor() - + "." + curVer.maintenance() + 1); + enableRollingUpgrade(srv); - srv.context().rollingUpgrade().enable(targetVer, false); + assertTrue(srv.context().rollingUpgrade().enabled()); + + srv.snapshot().createSnapshot("test").get(getTestTimeout()); + + assertTrue("Full snapshot was not created", + srv.context().cache().context().snapshotMgr().localSnapshotNames(null).contains("test")); + } + + /** Tests that incremental snapshot creation is blocked during rolling upgrade. */ + @Test + public void testIncrementalSnapshotCreationFailsDuringRollingUpgrade() throws Exception { + IgniteEx srv = startGrid(0); + + srv.cluster().state(ClusterState.ACTIVE); + + srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0); + + srv.snapshot().createSnapshot("test").get(getTestTimeout()); + + enableRollingUpgrade(srv); assertTrue(srv.context().rollingUpgrade().enabled()); - Throwable ex = assertThrowsWithCause( - () -> srv.snapshot().createSnapshot("test").get(getTestTimeout()), - IgniteException.class + assertThrowsAnyCause(log, + () -> srv.snapshot().createIncrementalSnapshot("test").get(getTestTimeout()), + IgniteException.class, + "Incremental snapshot creation is not allowed when rolling upgrade is enabled." ); + } + + /** Enables rolling upgrade. */ + private void enableRollingUpgrade(IgniteEx srv) throws Exception { + IgniteProductVersion curVer = srv.context().discovery().localNode().version(); + + IgniteProductVersion targetVer = IgniteProductVersion.fromString(curVer.major() + + "." + curVer.minor() + + "." + curVer.maintenance() + 1); - assertTrue(ex.getMessage().contains("Failed to start distributed process START_SNAPSHOT: rolling upgrade is enabled")); + srv.context().rollingUpgrade().enable(targetVer, false); } }