diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 5f6c584cced90..7ab9d777cfb65 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -43,6 +43,7 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -1080,6 +1081,34 @@ void createTemporarySystemFunction(
*/
Table from(String path);
+ /**
+ * Reads a registered table and applies dynamic options, returning the corresponding {@link
+ * Table}.
+ *
+ *
Dynamic options override the table's static options defined at creation time (DDL).
+ * This is the Table API equivalent of SQL's {@code OPTIONS} hint:
+ *
+ *
{@code
+ * // Table API (this method)
+ * Table tab = tableEnv.from("kafka_table1", Map.of("scan.startup.mode", "earliest-offset"));
+ *
+ * // Equivalent SQL
+ * // SELECT * FROM kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') * /
+ * }
+ *
+ * The configuration option {@code table.dynamic-table-options.enabled} must be set to
+ * {@code true} (the default) for dynamic options to take effect.
+ *
+ *
Note: Dynamic options cannot be applied to views.
+ *
+ * @param path The path of a table API object to scan.
+ * @param dynamicOptions A map of option key-value pairs to override on the table.
+ * @return The {@link Table} object describing the pipeline for further transformations.
+ * @throws ValidationException if the table is not found, is a view, or dynamic options are
+ * disabled.
+ */
+ Table from(String path, Map dynamicOptions);
+
/**
* Returns a {@link Table} backed by the given {@link TableDescriptor descriptor}.
*
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 496369e78f269..49e74a65f285b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -673,6 +673,48 @@ public Table from(String path) {
"Table %s was not found.", unresolvedIdentifier)));
}
+ @Override
+ public Table from(String path, Map dynamicOptions) {
+ Preconditions.checkNotNull(dynamicOptions, "Dynamic options must not be null.");
+
+ if (!tableConfig.get(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED)) {
+ throw new ValidationException(
+ String.format(
+ "Dynamic table options are disabled. Set '%s' to 'true' to enable them.",
+ TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key()));
+ }
+
+ UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
+ ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+ ContextResolvedTable contextResolvedTable =
+ catalogManager
+ .getTable(tableIdentifier)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Table %s was not found.",
+ unresolvedIdentifier)));
+
+ if (dynamicOptions.isEmpty()) {
+ return createTable(new SourceQueryOperation(contextResolvedTable));
+ }
+
+ if (contextResolvedTable.getResolvedTable().getTableKind()
+ == CatalogBaseTable.TableKind.VIEW) {
+ throw new ValidationException(
+ String.format(
+ "View '%s' cannot be enriched with dynamic options.",
+ unresolvedIdentifier));
+ }
+
+ Map mergedOptions =
+ new HashMap<>(contextResolvedTable.getResolvedTable().getOptions());
+ mergedOptions.putAll(dynamicOptions);
+ ContextResolvedTable withOptions = contextResolvedTable.copy(mergedOptions);
+ return createTable(new SourceQueryOperation(withOptions));
+ }
+
@Override
public Table from(TableDescriptor descriptor) {
Preconditions.checkNotNull(descriptor, "Table descriptor must not be null.");
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
index eabecbd905ab6..9d34c63a88cab 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
@@ -257,6 +257,80 @@ void testListModels() {
assertThat(tEnv.listModels()).containsExactly("M1", "M2");
}
+ @Test
+ void testFromWithDynamicOptions() {
+ tEnv.createTable("T", TEST_DESCRIPTOR);
+
+ final Table table = tEnv.from("T", Map.of("b", "Override"));
+
+ assertThat(table.getQueryOperation())
+ .asInstanceOf(type(SourceQueryOperation.class))
+ .extracting(SourceQueryOperation::getContextResolvedTable)
+ .satisfies(
+ crt -> {
+ assertThat(crt.getResolvedTable().getOptions())
+ .contains(entry("a", "Test"), entry("b", "Override"));
+ });
+ }
+
+ @Test
+ void testFromWithDynamicOptionsOverridesExisting() {
+ tEnv.createTable("T", TEST_DESCRIPTOR);
+
+ final Table table = tEnv.from("T", Map.of("a", "Overridden"));
+
+ assertThat(table.getQueryOperation())
+ .asInstanceOf(type(SourceQueryOperation.class))
+ .extracting(SourceQueryOperation::getContextResolvedTable)
+ .satisfies(
+ crt -> {
+ assertThat(crt.getResolvedTable().getOptions())
+ .containsEntry("a", "Overridden");
+ });
+ }
+
+ @Test
+ void testFromWithEmptyDynamicOptions() {
+ tEnv.createTable("T", TEST_DESCRIPTOR);
+
+ final Table table = tEnv.from("T", Map.of());
+
+ assertThat(table.getQueryOperation())
+ .asInstanceOf(type(SourceQueryOperation.class))
+ .extracting(SourceQueryOperation::getContextResolvedTable)
+ .satisfies(
+ crt -> {
+ assertThat(crt.getResolvedTable().getOptions())
+ .containsEntry("a", "Test");
+ });
+ }
+
+ @Test
+ void testFromWithDynamicOptionsOnViewThrows() {
+ tEnv.createTable("T", TEST_DESCRIPTOR);
+ tEnv.createView("V", tEnv.from("T"));
+
+ assertThatThrownBy(() -> tEnv.from("V", Map.of("key", "value")))
+ .isInstanceOf(ValidationException.class);
+ }
+
+ @Test
+ void testFromWithDynamicOptionsDisabledThrows() {
+ tEnv.getConfig().set("table.dynamic-table-options.enabled", "false");
+ tEnv.createTable("T", TEST_DESCRIPTOR);
+
+ assertThatThrownBy(() -> tEnv.from("T", Map.of("key", "value")))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Dynamic table options are disabled");
+ }
+
+ @Test
+ void testFromWithDynamicOptionsTableNotFound() {
+ assertThatThrownBy(() -> tEnv.from("NonExistent", Map.of("key", "value")))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("was not found");
+ }
+
private static void assertCreateTableFromDescriptor(
TableEnvironmentMock tEnv, Schema schema, boolean ignoreIfExists)
throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java
index ab91f9550258b..2e335149a3a7b 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java
@@ -30,6 +30,7 @@
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.util.Preconditions;
+import java.util.Map;
import java.util.function.Function;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -104,6 +105,11 @@ public Table sqlQuery(String query) {
return env.sqlQuery(query);
}
+ @Override
+ public Table from(String path, Map dynamicOptions) {
+ return null;
+ }
+
@Override
public Model fromModel(String modelPath) {
return env.fromModel(modelPath);
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java
index 176e067addb10..c15b268048cc4 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java
@@ -28,6 +28,7 @@
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.types.AbstractDataType;
+import java.util.Map;
import java.util.function.Function;
/** Test step for execution of a Table API. Similar to {@link SqlTestStep}. */
@@ -84,6 +85,11 @@ public Table sqlQuery(String query) {
return env.sqlQuery(query);
}
+ @Override
+ public Table from(String path, Map dynamicOptions) {
+ return env.from(path, dynamicOptions);
+ }
+
@Override
public Model fromModel(String modelPath) {
return env.fromModel(modelPath);
@@ -137,6 +143,8 @@ public interface TableEnvAccessor {
/** See {@link TableEnvironment#sqlQuery(String)}. */
Table sqlQuery(String query);
+ Table from(String path, Map dynamicOptions);
+
/** See {@link TableEnvironment#fromModel(String)}. */
Model fromModel(String modelPath);