From d2878195a70a2db44e071988138a25ca36d59430 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Wed, 27 May 2026 14:29:21 +0530 Subject: [PATCH] FLINK-39764 Support dynamic table options in Table API --- .../flink/table/api/TableEnvironment.java | 29 ++++++++ .../api/internal/TableEnvironmentImpl.java | 42 +++++++++++ .../flink/table/api/TableEnvironmentTest.java | 74 +++++++++++++++++++ .../test/program/FailingTableApiTestStep.java | 6 ++ .../table/test/program/TableApiTestStep.java | 8 ++ 5 files changed, 159 insertions(+) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index 5f6c584cced90..7ab9d777cfb65 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -43,6 +43,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -1080,6 +1081,34 @@ void createTemporarySystemFunction( */ Table from(String path); + /** + * Reads a registered table and applies dynamic options, returning the corresponding {@link + * Table}. + * + *

Dynamic options override the table's static options defined at creation time (DDL). + * This is the Table API equivalent of SQL's {@code OPTIONS} hint: + * + *

{@code
+     * // Table API (this method)
+     * Table tab = tableEnv.from("kafka_table1", Map.of("scan.startup.mode", "earliest-offset"));
+     *
+     * // Equivalent SQL
+     * // SELECT * FROM kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') * /
+     * }
+ * + *

The configuration option {@code table.dynamic-table-options.enabled} must be set to + * {@code true} (the default) for dynamic options to take effect. + * + *

Note: Dynamic options cannot be applied to views. + * + * @param path The path of a table API object to scan. + * @param dynamicOptions A map of option key-value pairs to override on the table. + * @return The {@link Table} object describing the pipeline for further transformations. + * @throws ValidationException if the table is not found, is a view, or dynamic options are + * disabled. + */ + Table from(String path, Map dynamicOptions); + /** * Returns a {@link Table} backed by the given {@link TableDescriptor descriptor}. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 496369e78f269..49e74a65f285b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -673,6 +673,48 @@ public Table from(String path) { "Table %s was not found.", unresolvedIdentifier))); } + @Override + public Table from(String path, Map dynamicOptions) { + Preconditions.checkNotNull(dynamicOptions, "Dynamic options must not be null."); + + if (!tableConfig.get(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED)) { + throw new ValidationException( + String.format( + "Dynamic table options are disabled. Set '%s' to 'true' to enable them.", + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key())); + } + + UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); + ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + ContextResolvedTable contextResolvedTable = + catalogManager + .getTable(tableIdentifier) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Table %s was not found.", + unresolvedIdentifier))); + + if (dynamicOptions.isEmpty()) { + return createTable(new SourceQueryOperation(contextResolvedTable)); + } + + if (contextResolvedTable.getResolvedTable().getTableKind() + == CatalogBaseTable.TableKind.VIEW) { + throw new ValidationException( + String.format( + "View '%s' cannot be enriched with dynamic options.", + unresolvedIdentifier)); + } + + Map mergedOptions = + new HashMap<>(contextResolvedTable.getResolvedTable().getOptions()); + mergedOptions.putAll(dynamicOptions); + ContextResolvedTable withOptions = contextResolvedTable.copy(mergedOptions); + return createTable(new SourceQueryOperation(withOptions)); + } + @Override public Table from(TableDescriptor descriptor) { Preconditions.checkNotNull(descriptor, "Table descriptor must not be null."); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java index eabecbd905ab6..9d34c63a88cab 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java @@ -257,6 +257,80 @@ void testListModels() { assertThat(tEnv.listModels()).containsExactly("M1", "M2"); } + @Test + void testFromWithDynamicOptions() { + tEnv.createTable("T", TEST_DESCRIPTOR); + + final Table table = tEnv.from("T", Map.of("b", "Override")); + + assertThat(table.getQueryOperation()) + .asInstanceOf(type(SourceQueryOperation.class)) + .extracting(SourceQueryOperation::getContextResolvedTable) + .satisfies( + crt -> { + assertThat(crt.getResolvedTable().getOptions()) + .contains(entry("a", "Test"), entry("b", "Override")); + }); + } + + @Test + void testFromWithDynamicOptionsOverridesExisting() { + tEnv.createTable("T", TEST_DESCRIPTOR); + + final Table table = tEnv.from("T", Map.of("a", "Overridden")); + + assertThat(table.getQueryOperation()) + .asInstanceOf(type(SourceQueryOperation.class)) + .extracting(SourceQueryOperation::getContextResolvedTable) + .satisfies( + crt -> { + assertThat(crt.getResolvedTable().getOptions()) + .containsEntry("a", "Overridden"); + }); + } + + @Test + void testFromWithEmptyDynamicOptions() { + tEnv.createTable("T", TEST_DESCRIPTOR); + + final Table table = tEnv.from("T", Map.of()); + + assertThat(table.getQueryOperation()) + .asInstanceOf(type(SourceQueryOperation.class)) + .extracting(SourceQueryOperation::getContextResolvedTable) + .satisfies( + crt -> { + assertThat(crt.getResolvedTable().getOptions()) + .containsEntry("a", "Test"); + }); + } + + @Test + void testFromWithDynamicOptionsOnViewThrows() { + tEnv.createTable("T", TEST_DESCRIPTOR); + tEnv.createView("V", tEnv.from("T")); + + assertThatThrownBy(() -> tEnv.from("V", Map.of("key", "value"))) + .isInstanceOf(ValidationException.class); + } + + @Test + void testFromWithDynamicOptionsDisabledThrows() { + tEnv.getConfig().set("table.dynamic-table-options.enabled", "false"); + tEnv.createTable("T", TEST_DESCRIPTOR); + + assertThatThrownBy(() -> tEnv.from("T", Map.of("key", "value"))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Dynamic table options are disabled"); + } + + @Test + void testFromWithDynamicOptionsTableNotFound() { + assertThatThrownBy(() -> tEnv.from("NonExistent", Map.of("key", "value"))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("was not found"); + } + private static void assertCreateTableFromDescriptor( TableEnvironmentMock tEnv, Schema schema, boolean ignoreIfExists) throws org.apache.flink.table.catalog.exceptions.TableNotExistException { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java index ab91f9550258b..2e335149a3a7b 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java @@ -30,6 +30,7 @@ import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.util.Preconditions; +import java.util.Map; import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -104,6 +105,11 @@ public Table sqlQuery(String query) { return env.sqlQuery(query); } + @Override + public Table from(String path, Map dynamicOptions) { + return null; + } + @Override public Model fromModel(String modelPath) { return env.fromModel(modelPath); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java index 176e067addb10..c15b268048cc4 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java @@ -28,6 +28,7 @@ import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.types.AbstractDataType; +import java.util.Map; import java.util.function.Function; /** Test step for execution of a Table API. Similar to {@link SqlTestStep}. */ @@ -84,6 +85,11 @@ public Table sqlQuery(String query) { return env.sqlQuery(query); } + @Override + public Table from(String path, Map dynamicOptions) { + return env.from(path, dynamicOptions); + } + @Override public Model fromModel(String modelPath) { return env.fromModel(modelPath); @@ -137,6 +143,8 @@ public interface TableEnvAccessor { /** See {@link TableEnvironment#sqlQuery(String)}. */ Table sqlQuery(String query); + Table from(String path, Map dynamicOptions); + /** See {@link TableEnvironment#fromModel(String)}. */ Model fromModel(String modelPath);