Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,13 @@ public IgniteSnapshotManager(GridKernalContext ctx) {
.order(ByteOrder.nativeOrder()));

startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::initLocalSnapshotStartStage,
this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new);
this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new,
req -> req.incremental() ?
"Incremental snapshot creation is not allowed when rolling upgrade is enabled." : null);

endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::initLocalSnapshotEndStage,
this::processLocalSnapshotEndStageResult, (reqId, req) -> new InitMessage<>(reqId, END_SNAPSHOT, req, true));
this::processLocalSnapshotEndStageResult, (reqId, req) -> new InitMessage<>(reqId, END_SNAPSHOT, req, true),
req -> null);

marsh = ctx.marshallerContext().jdkMarshaller();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ public DistributedProcess(
Function<I, IgniteInternalFuture<R>> exec,
CI3<UUID, Map<UUID, R>, Map<UUID, Throwable>> finish,
BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory
) {
this(ctx, type, exec, finish, initMsgFactory,
req -> "Failed to start distributed process " + type + ": rolling upgrade is enabled");
}

/**
* @param ctx Kernal context.
* @param type Process type.
* @param exec Execute action and returns future with the single node result to send to the coordinator.
* @param finish Finish process closure. Called on each node when all single nodes results received.
* @param initMsgFactory Factory which creates custom {@link InitMessage} for distributed process initialization.
* @param rollingUpgradeValidator Rolling upgrade validator. Returns rejection reason or {@code null} if the process is allowed.
*/
public DistributedProcess(
GridKernalContext ctx,
DistributedProcessType type,
Function<I, IgniteInternalFuture<R>> exec,
CI3<UUID, Map<UUID, R>, Map<UUID, Throwable>> finish,
BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory,
Function<I, String> rollingUpgradeValidator
) {
this.ctx = ctx;
this.type = type;
Expand Down Expand Up @@ -154,12 +174,14 @@ public DistributedProcess(
try {
IgniteInternalFuture<R> fut;

if (ctx.rollingUpgrade().enabled()) {
fut = new GridFinishedFuture<>(new IgniteException("Failed to start distributed process "
+ type + ": rolling upgrade is enabled"));
}
I req = (I)msg.request();

String rejectMsg = ctx.rollingUpgrade().enabled() ? rollingUpgradeValidator.apply(req) : null;

if (rejectMsg != null)
fut = new GridFinishedFuture<>(new IgniteException(rejectMsg));
else
fut = exec.apply((I)msg.request());
fut = exec.apply(req);

fut.listen(() -> {
if (fut.error() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;

/** */
public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest {
Expand All @@ -36,7 +36,8 @@ public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
.setWalCompactionEnabled(true));

return cfg;
}
Expand All @@ -48,28 +49,55 @@ public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest {
cleanPersistenceDir();
}

/** Tests that snapshot creation fails when rolling upgrade is enabled. */
/** Tests that full snapshot creation is allowed when rolling upgrade is enabled. */
@Test
public void testSnapshotCreationFailsDuringRollingUpgrade() throws Exception {
public void testSnapshotCreationSucceedsDuringRollingUpgrade() throws Exception {
IgniteEx srv = startGrid(0);

srv.cluster().state(ClusterState.ACTIVE);

IgniteProductVersion curVer = srv.context().discovery().localNode().version();
srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);

IgniteProductVersion targetVer = IgniteProductVersion.fromString(curVer.major()
+ "." + curVer.minor()
+ "." + curVer.maintenance() + 1);
enableRollingUpgrade(srv);

srv.context().rollingUpgrade().enable(targetVer, false);
assertTrue(srv.context().rollingUpgrade().enabled());

srv.snapshot().createSnapshot("test").get(getTestTimeout());

assertTrue("Full snapshot was not created",
srv.context().cache().context().snapshotMgr().localSnapshotNames(null).contains("test"));
}

/** Tests that incremental snapshot creation is blocked during rolling upgrade. */
@Test
public void testIncrementalSnapshotCreationFailsDuringRollingUpgrade() throws Exception {
IgniteEx srv = startGrid(0);

srv.cluster().state(ClusterState.ACTIVE);

srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);

srv.snapshot().createSnapshot("test").get(getTestTimeout());

enableRollingUpgrade(srv);

assertTrue(srv.context().rollingUpgrade().enabled());

Throwable ex = assertThrowsWithCause(
() -> srv.snapshot().createSnapshot("test").get(getTestTimeout()),
IgniteException.class
assertThrowsAnyCause(log,
() -> srv.snapshot().createIncrementalSnapshot("test").get(getTestTimeout()),
IgniteException.class,
"Incremental snapshot creation is not allowed when rolling upgrade is enabled."
);
}

/** Enables rolling upgrade. */
private void enableRollingUpgrade(IgniteEx srv) throws Exception {
IgniteProductVersion curVer = srv.context().discovery().localNode().version();

IgniteProductVersion targetVer = IgniteProductVersion.fromString(curVer.major()
+ "." + curVer.minor()
+ "." + curVer.maintenance() + 1);

assertTrue(ex.getMessage().contains("Failed to start distributed process START_SNAPSHOT: rolling upgrade is enabled"));
srv.context().rollingUpgrade().enable(targetVer, false);
}
}