diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 5d99597b35715..1679ab2d02e2c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.RecoveryClaimMode; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -654,9 +653,14 @@ private static void printCustomCliOptions( public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) { if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) { - String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); - boolean allowNonRestoredState = - commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()); + final String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); + final Boolean allowNonRestoredState = + commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()) + ? ConfigurationUtils.convertValue( + commandLine.getOptionValue( + SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()), + Boolean.class) + : null; final RecoveryClaimMode recoveryClaimMode; if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) { recoveryClaimMode = @@ -672,7 +676,7 @@ public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLin "The option '%s' is deprecated. Please use '%s' instead.%n", SAVEPOINT_RESTORE_MODE.getLongOpt(), SAVEPOINT_CLAIM_MODE.getLongOpt()); } else { - recoveryClaimMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); + recoveryClaimMode = null; } return SavepointRestoreSettings.forPath( savepointPath, allowNonRestoredState, recoveryClaimMode); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java index e7bdb2fdeadfd..8d1b1e7445775 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java @@ -24,6 +24,7 @@ import org.apache.flink.core.execution.RecoveryClaimMode; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.Serializable; import java.util.Objects; @@ -37,30 +38,31 @@ public class SavepointRestoreSettings implements Serializable { /** No restore should happen. */ private static final SavepointRestoreSettings NONE = - new SavepointRestoreSettings(null, false, RecoveryClaimMode.NO_CLAIM); + new SavepointRestoreSettings(null, null, null); /** Savepoint restore path. */ - private final String restorePath; + private final @Nullable String restorePath; /** * Flag indicating whether non restored state is allowed if the savepoint contains state for an * operator that is not part of the job. */ - private final boolean allowNonRestoredState; + private final @Nullable Boolean allowNonRestoredState; - private final @Nonnull RecoveryClaimMode recoveryClaimMode; + private final @Nullable RecoveryClaimMode recoveryClaimMode; /** * Creates the restore settings. * * @param restorePath Savepoint restore path. - * @param allowNonRestoredState Ignore unmapped state. - * @param recoveryClaimMode how to restore from the savepoint + * @param allowNonRestoredState Ignore unmapped state, or {@code null} if not explicitly set. + * @param recoveryClaimMode how to restore from the savepoint, or {@code null} if not explicitly + * set. */ private SavepointRestoreSettings( - String restorePath, - boolean allowNonRestoredState, - @Nonnull RecoveryClaimMode recoveryClaimMode) { + @Nullable String restorePath, + @Nullable Boolean allowNonRestoredState, + @Nullable RecoveryClaimMode recoveryClaimMode) { this.restorePath = restorePath; this.allowNonRestoredState = allowNonRestoredState; this.recoveryClaimMode = recoveryClaimMode; @@ -92,12 +94,16 @@ public String getRestorePath() { * that cannot be mapped back to the job. */ public boolean allowNonRestoredState() { - return allowNonRestoredState; + return allowNonRestoredState != null + ? allowNonRestoredState + : StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue(); } /** Tells how to restore from the given savepoint. */ - public @Nonnull RecoveryClaimMode getRecoveryClaimMode() { - return recoveryClaimMode; + public RecoveryClaimMode getRecoveryClaimMode() { + return recoveryClaimMode != null + ? recoveryClaimMode + : StateRecoveryOptions.RESTORE_MODE.defaultValue(); } @Override @@ -110,17 +116,14 @@ public boolean equals(Object o) { } SavepointRestoreSettings that = (SavepointRestoreSettings) o; - return allowNonRestoredState == that.allowNonRestoredState + return Objects.equals(allowNonRestoredState, that.allowNonRestoredState) && Objects.equals(restorePath, that.restorePath) && Objects.equals(recoveryClaimMode, that.recoveryClaimMode); } @Override public int hashCode() { - int result = restorePath != null ? restorePath.hashCode() : 0; - result = 31 * result + recoveryClaimMode.hashCode(); - result = 31 * result + (allowNonRestoredState ? 1 : 0); - return result; + return Objects.hash(restorePath, recoveryClaimMode, allowNonRestoredState); } @Override @@ -131,9 +134,9 @@ public String toString() { + restorePath + '\'' + ", allowNonRestoredState=" - + allowNonRestoredState + + allowNonRestoredState() + ", recoveryClaimMode=" - + recoveryClaimMode + + getRecoveryClaimMode() + ')'; } else { return "SavepointRestoreSettings.none()"; @@ -147,24 +150,24 @@ public static SavepointRestoreSettings none() { } public static SavepointRestoreSettings forPath(String savepointPath) { - return forPath( - savepointPath, - StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue()); + return forPath(savepointPath, null); } public static SavepointRestoreSettings forPath( - String savepointPath, boolean allowNonRestoredState) { - checkNotNull(savepointPath, "Savepoint restore path."); - return new SavepointRestoreSettings( - savepointPath, - allowNonRestoredState, - StateRecoveryOptions.RESTORE_MODE.defaultValue()); + String savepointPath, @Nullable Boolean allowNonRestoredState) { + return new SavepointRestoreSettings(savepointPath, allowNonRestoredState, null); } + /** + * Creates restore settings where certain values may not have been explicitly set by the user. + * Values that are {@code null} will not be written to configuration by {@link + * #toConfiguration}, allowing downstream configuration (e.g., SQL SET statements or + * flink-conf.yaml) to take effect. + */ public static SavepointRestoreSettings forPath( - String savepointPath, - boolean allowNonRestoredState, - @Nonnull RecoveryClaimMode recoveryClaimMode) { + @Nonnull String savepointPath, + @Nullable Boolean allowNonRestoredState, + @Nullable RecoveryClaimMode recoveryClaimMode) { checkNotNull(savepointPath, "Savepoint restore path."); return new SavepointRestoreSettings( savepointPath, allowNonRestoredState, recoveryClaimMode); @@ -176,11 +179,15 @@ public static SavepointRestoreSettings forPath( public static void toConfiguration( final SavepointRestoreSettings savepointRestoreSettings, final Configuration configuration) { - configuration.set( - StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, - savepointRestoreSettings.allowNonRestoredState()); - configuration.set( - StateRecoveryOptions.RESTORE_MODE, savepointRestoreSettings.getRecoveryClaimMode()); + if (savepointRestoreSettings.allowNonRestoredState != null) { + configuration.set( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, + savepointRestoreSettings.allowNonRestoredState); + } + if (savepointRestoreSettings.recoveryClaimMode != null) { + configuration.set( + StateRecoveryOptions.RESTORE_MODE, savepointRestoreSettings.recoveryClaimMode); + } final String savepointPath = savepointRestoreSettings.getRestorePath(); if (savepointPath != null) { configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java index ba73417075ab8..859d636a1c3e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java @@ -18,10 +18,13 @@ package org.apache.flink.runtime.jobgraph; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.RecoveryClaimMode; import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertNotEquals; /** Tests for {@link SavepointRestoreSettings}. */ @@ -35,4 +38,71 @@ public void testEqualsWithDifferentRestoreMode() { SavepointRestoreSettings.forPath("/tmp", false, RecoveryClaimMode.NO_CLAIM); assertNotEquals(claimSettings, noClaimSettings); } + + @Test + void testToConfigurationWritesExplicitlySetValues() { + SavepointRestoreSettings settings = + SavepointRestoreSettings.forPath("/tmp/savepoint", true, RecoveryClaimMode.CLAIM); + + Configuration configuration = new Configuration(); + SavepointRestoreSettings.toConfiguration(settings, configuration); + + assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_PATH)) + .isEqualTo("/tmp/savepoint"); + assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)) + .isTrue(); + assertThat(configuration.get(StateRecoveryOptions.RESTORE_MODE)) + .isEqualTo(RecoveryClaimMode.CLAIM); + } + + @Test + void testToConfigurationSkipsNonExplicitlySetValues() { + SavepointRestoreSettings settings = SavepointRestoreSettings.forPath("/tmp/savepoint"); + + Configuration configuration = new Configuration(); + SavepointRestoreSettings.toConfiguration(settings, configuration); + + // Only the savepoint path should be written + assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_PATH)) + .isEqualTo("/tmp/savepoint"); + // These should NOT be written since they were not explicitly set (null) + assertThat(configuration.containsKey(StateRecoveryOptions.RESTORE_MODE.key())).isFalse(); + assertThat( + configuration.containsKey( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key())) + .isFalse(); + } + + @Test + void testToConfigurationWithPartialExplicitSettings() { + // Only recoveryClaimMode is explicitly set, allowNonRestoredState is null + SavepointRestoreSettings settings = + SavepointRestoreSettings.forPath( + "/tmp/savepoint", (Boolean) null, RecoveryClaimMode.CLAIM); + + Configuration configuration = new Configuration(); + SavepointRestoreSettings.toConfiguration(settings, configuration); + + assertThat(configuration.get(StateRecoveryOptions.SAVEPOINT_PATH)) + .isEqualTo("/tmp/savepoint"); + assertThat(configuration.get(StateRecoveryOptions.RESTORE_MODE)) + .isEqualTo(RecoveryClaimMode.CLAIM); + assertThat( + configuration.containsKey( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key())) + .isFalse(); + } + + @Test + void testNoneSettingsDoNotWriteAnything() { + Configuration configuration = new Configuration(); + SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.none(), configuration); + + assertThat(configuration.containsKey(StateRecoveryOptions.SAVEPOINT_PATH.key())).isFalse(); + assertThat(configuration.containsKey(StateRecoveryOptions.RESTORE_MODE.key())).isFalse(); + assertThat( + configuration.containsKey( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key())) + .isFalse(); + } }