Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

Expand Down Expand Up @@ -654,9 +653,14 @@ private static void printCustomCliOptions(

public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
final String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
final Boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt())
? ConfigurationUtils.convertValue(
commandLine.getOptionValue(
SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()),
Boolean.class)
: null;
final RecoveryClaimMode recoveryClaimMode;
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
recoveryClaimMode =
Expand All @@ -672,7 +676,7 @@ public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLin
"The option '%s' is deprecated. Please use '%s' instead.%n",
SAVEPOINT_RESTORE_MODE.getLongOpt(), SAVEPOINT_CLAIM_MODE.getLongOpt());
} else {
recoveryClaimMode = StateRecoveryOptions.RESTORE_MODE.defaultValue();
recoveryClaimMode = null;
}
return SavepointRestoreSettings.forPath(
savepointPath, allowNonRestoredState, recoveryClaimMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.core.execution.RecoveryClaimMode;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Objects;
Expand All @@ -37,30 +38,31 @@ public class SavepointRestoreSettings implements Serializable {

/** No restore should happen. */
private static final SavepointRestoreSettings NONE =
new SavepointRestoreSettings(null, false, RecoveryClaimMode.NO_CLAIM);
new SavepointRestoreSettings(null, null, null);

/** Savepoint restore path. */
private final String restorePath;
private final @Nullable String restorePath;

/**
* Flag indicating whether non restored state is allowed if the savepoint contains state for an
* operator that is not part of the job.
*/
private final boolean allowNonRestoredState;
private final @Nullable Boolean allowNonRestoredState;

private final @Nonnull RecoveryClaimMode recoveryClaimMode;
private final @Nullable RecoveryClaimMode recoveryClaimMode;

/**
* Creates the restore settings.
*
* @param restorePath Savepoint restore path.
* @param allowNonRestoredState Ignore unmapped state.
* @param recoveryClaimMode how to restore from the savepoint
* @param allowNonRestoredState Ignore unmapped state, or {@code null} if not explicitly set.
* @param recoveryClaimMode how to restore from the savepoint, or {@code null} if not explicitly
* set.
*/
private SavepointRestoreSettings(
String restorePath,
boolean allowNonRestoredState,
@Nonnull RecoveryClaimMode recoveryClaimMode) {
@Nullable String restorePath,
@Nullable Boolean allowNonRestoredState,
@Nullable RecoveryClaimMode recoveryClaimMode) {
this.restorePath = restorePath;
this.allowNonRestoredState = allowNonRestoredState;
this.recoveryClaimMode = recoveryClaimMode;
Expand Down Expand Up @@ -92,12 +94,16 @@ public String getRestorePath() {
* that cannot be mapped back to the job.
*/
public boolean allowNonRestoredState() {
return allowNonRestoredState;
return allowNonRestoredState != null
? allowNonRestoredState
: StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue();
}

/** Tells how to restore from the given savepoint. */
public @Nonnull RecoveryClaimMode getRecoveryClaimMode() {
return recoveryClaimMode;
public RecoveryClaimMode getRecoveryClaimMode() {
return recoveryClaimMode != null
? recoveryClaimMode
: StateRecoveryOptions.RESTORE_MODE.defaultValue();
}

@Override
Expand All @@ -110,17 +116,14 @@ public boolean equals(Object o) {
}

SavepointRestoreSettings that = (SavepointRestoreSettings) o;
return allowNonRestoredState == that.allowNonRestoredState
return Objects.equals(allowNonRestoredState, that.allowNonRestoredState)
&& Objects.equals(restorePath, that.restorePath)
&& Objects.equals(recoveryClaimMode, that.recoveryClaimMode);
}

@Override
public int hashCode() {
int result = restorePath != null ? restorePath.hashCode() : 0;
result = 31 * result + recoveryClaimMode.hashCode();
result = 31 * result + (allowNonRestoredState ? 1 : 0);
return result;
return Objects.hash(restorePath, recoveryClaimMode, allowNonRestoredState);
}

@Override
Expand All @@ -131,9 +134,9 @@ public String toString() {
+ restorePath
+ '\''
+ ", allowNonRestoredState="
+ allowNonRestoredState
+ allowNonRestoredState()
+ ", recoveryClaimMode="
+ recoveryClaimMode
+ getRecoveryClaimMode()
+ ')';
} else {
return "SavepointRestoreSettings.none()";
Expand All @@ -147,24 +150,24 @@ public static SavepointRestoreSettings none() {
}

public static SavepointRestoreSettings forPath(String savepointPath) {
return forPath(
savepointPath,
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue());
return forPath(savepointPath, null);
}

public static SavepointRestoreSettings forPath(
String savepointPath, boolean allowNonRestoredState) {
checkNotNull(savepointPath, "Savepoint restore path.");
return new SavepointRestoreSettings(
savepointPath,
allowNonRestoredState,
StateRecoveryOptions.RESTORE_MODE.defaultValue());
String savepointPath, @Nullable Boolean allowNonRestoredState) {
return new SavepointRestoreSettings(savepointPath, allowNonRestoredState, null);
}

/**
* Creates restore settings where certain values may not have been explicitly set by the user.
* Values that are {@code null} will not be written to configuration by {@link
* #toConfiguration}, allowing downstream configuration (e.g., SQL SET statements or
* flink-conf.yaml) to take effect.
*/
public static SavepointRestoreSettings forPath(
String savepointPath,
boolean allowNonRestoredState,
@Nonnull RecoveryClaimMode recoveryClaimMode) {
@Nonnull String savepointPath,
@Nullable Boolean allowNonRestoredState,
@Nullable RecoveryClaimMode recoveryClaimMode) {
checkNotNull(savepointPath, "Savepoint restore path.");
return new SavepointRestoreSettings(
savepointPath, allowNonRestoredState, recoveryClaimMode);
Expand All @@ -176,11 +179,15 @@ public static SavepointRestoreSettings forPath(
public static void toConfiguration(
final SavepointRestoreSettings savepointRestoreSettings,
final Configuration configuration) {
configuration.set(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
savepointRestoreSettings.allowNonRestoredState());
configuration.set(
StateRecoveryOptions.RESTORE_MODE, savepointRestoreSettings.getRecoveryClaimMode());
if (savepointRestoreSettings.allowNonRestoredState != null) {
configuration.set(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
savepointRestoreSettings.allowNonRestoredState);
}
if (savepointRestoreSettings.recoveryClaimMode != null) {
configuration.set(
StateRecoveryOptions.RESTORE_MODE, savepointRestoreSettings.recoveryClaimMode);
}
final String savepointPath = savepointRestoreSettings.getRestorePath();
if (savepointPath != null) {
configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.flink.runtime.jobgraph;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

/** Tests for {@link SavepointRestoreSettings}. */
Expand All @@ -35,4 +38,71 @@ public void testEqualsWithDifferentRestoreMode() {
SavepointRestoreSettings.forPath("/tmp", false, RecoveryClaimMode.NO_CLAIM);
assertNotEquals(claimSettings, noClaimSettings);
}

@Test
void testToConfigurationWritesExplicitlySetValues() {
SavepointRestoreSettings settings =
SavepointRestoreSettings.forPath("/tmp/savepoint", true, RecoveryClaimMode.CLAIM);

Configuration configuration = new Configuration();
SavepointRestoreSettings.toConfiguration(settings, configuration);

assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_PATH))
.isEqualTo("/tmp/savepoint");
assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))
.isTrue();
assertThat(configuration.get(StateRecoveryOptions.RESTORE_MODE))
.isEqualTo(RecoveryClaimMode.CLAIM);
}

@Test
void testToConfigurationSkipsNonExplicitlySetValues() {
SavepointRestoreSettings settings = SavepointRestoreSettings.forPath("/tmp/savepoint");

Configuration configuration = new Configuration();
SavepointRestoreSettings.toConfiguration(settings, configuration);

// Only the savepoint path should be written
assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_PATH))
.isEqualTo("/tmp/savepoint");
// These should NOT be written since they were not explicitly set (null)
assertThat(configuration.containsKey(StateRecoveryOptions.RESTORE_MODE.key())).isFalse();
assertThat(
configuration.containsKey(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key()))
.isFalse();
}

@Test
void testToConfigurationWithPartialExplicitSettings() {
// Only recoveryClaimMode is explicitly set, allowNonRestoredState is null
SavepointRestoreSettings settings =
SavepointRestoreSettings.forPath(
"/tmp/savepoint", (Boolean) null, RecoveryClaimMode.CLAIM);

Configuration configuration = new Configuration();
SavepointRestoreSettings.toConfiguration(settings, configuration);

assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_PATH))
.isEqualTo("/tmp/savepoint");
assertThat(configuration.get(StateRecoveryOptions.RESTORE_MODE))
.isEqualTo(RecoveryClaimMode.CLAIM);
assertThat(
configuration.containsKey(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key()))
.isFalse();
}

@Test
void testNoneSettingsDoNotWriteAnything() {
Configuration configuration = new Configuration();
SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.none(), configuration);

assertThat(configuration.containsKey(StateRecoveryOptions.SAVEPOINT_PATH.key())).isFalse();
assertThat(configuration.containsKey(StateRecoveryOptions.RESTORE_MODE.key())).isFalse();
assertThat(
configuration.containsKey(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key()))
.isFalse();
}
}