diff --git a/docs/content.zh/docs/sql/materialized-table/statements.md b/docs/content.zh/docs/sql/materialized-table/statements.md index 260027f6d0288..c99fbf7459c28 100644 --- a/docs/content.zh/docs/sql/materialized-table/statements.md +++ b/docs/content.zh/docs/sql/materialized-table/statements.md @@ -242,6 +242,7 @@ The `OR ALTER` clause provides create-or-update semantics: - **If the materialized table does not exist**: Creates a new materialized table with the specified options - **If the materialized table exists**: Modifies the query definition (behaves like `ALTER MATERIALIZED TABLE AS`) +- **If a regular table with the same name exists**: Converts it in place into a materialized table, when enabled (see [Converting a Table to a Materialized Table](#converting-a-table-to-a-materialized-table)) This is particularly useful in declarative deployment scenarios where you want to define the desired state without checking if the materialized table already exists. @@ -260,6 +261,63 @@ The operation updates the materialized table similarly to [ALTER MATERIALIZED TA See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details. +## Converting a Table to a Materialized Table + +This lets you adopt a materialized table on top of a table that already exists, without dropping and recreating it. + +`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table into a materialized table in place. The catalog object keeps its identity and underlying storage. Its kind becomes materialized table, and its schema, options, query definition, freshness, and refresh mode are taken from the conversion statement, exactly as for a newly created materialized table. After the conversion, a refresh job is launched just as it is for a newly created materialized table. + + +**Enabling conversion** + +Conversion is disabled by default. When the option is disabled, `CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected. +To enable it, set the following option in the cluster configuration file `config.yaml`: + +```yaml +table.materialized-table.conversion-from-table.enabled: true +``` + +This is a cluster-wide setting: it must be set in the cluster configuration, and a session-level `SET` statement has no effect. + +**Schema, watermark, and primary key** + +The materialized table's schema is derived from the `CREATE OR ALTER MATERIALIZED TABLE` statement and its query, exactly as for a newly created materialized table. The source table's watermark and primary key are **not** carried over — declare them in the conversion statement if you want them. This keeps `CREATE OR ALTER MATERIALIZED TABLE` declarative and idempotent: running the same statement again produces the same materialized table. + +**Example** + +```sql +-- An existing regular table that you now want to maintain as a materialized table +CREATE TABLE user_spending ( + user_id BIGINT, + total_amount BIGINT +) WITH ( + 'connector' = '...' +); + +-- Convert it in place; from now on it is refreshed by the query +CREATE OR ALTER MATERIALIZED TABLE user_spending + AS SELECT + user_id, + SUM(amount) AS total_amount + FROM orders + GROUP BY user_id; +``` +The conversion is one-way and cannot be undone. You can still use the result like a regular table: suspend it to stop the refresh job, then query it. +From the example above: +```sql +-- Suspend the materialized table to stop the refresh job +ALTER MATERIALIZED TABLE user_spending SUSPEND; + +-- Use the materialized table as a regular table +SELECT * FROM user_spending; +``` + + +Note +- The catalog must support in-place conversion; catalogs that do not implement it reject the statement. +- Converting a view into a materialized table is not supported. +- If the refresh job (continuous mode) or refresh workflow (full mode) cannot be started, the conversion is **not** rolled back: the table is left as a materialized table in `SUSPENDED` status. Fix the underlying issue and `RESUME` it. + ## 示例 假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟。 diff --git a/docs/content/docs/sql/materialized-table/statements.md b/docs/content/docs/sql/materialized-table/statements.md index 5b0628219cb0d..fe904b96ada4c 100644 --- a/docs/content/docs/sql/materialized-table/statements.md +++ b/docs/content/docs/sql/materialized-table/statements.md @@ -242,6 +242,7 @@ The `OR ALTER` clause provides create-or-update semantics: - **If the materialized table does not exist**: Creates a new materialized table with the specified options - **If the materialized table exists**: Modifies the query definition (behaves like `ALTER MATERIALIZED TABLE AS`) +- **If a regular table with the same name exists**: Converts it in place into a materialized table, when enabled (see [Converting a Table to a Materialized Table](#converting-a-table-to-a-materialized-table)) This is particularly useful in declarative deployment scenarios where you want to define the desired state without checking if the materialized table already exists. @@ -260,6 +261,63 @@ The operation updates the materialized table similarly to [ALTER MATERIALIZED TA See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details. +## Converting a Table to a Materialized Table + +This lets you adopt a materialized table on top of a table that already exists, without dropping and recreating it. + +`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table into a materialized table in place. The catalog object keeps its identity and underlying storage. Its kind becomes materialized table, and its schema, options, query definition, freshness, and refresh mode are taken from the conversion statement, exactly as for a newly created materialized table. After the conversion, a refresh job is launched just as it is for a newly created materialized table. + + +**Enabling conversion** + +Conversion is disabled by default. When the option is disabled, `CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected. +To enable it, set the following option in the cluster configuration file `config.yaml`: + +```yaml +table.materialized-table.conversion-from-table.enabled: true +``` + +This is a cluster-wide setting: it must be set in the cluster configuration, and a session-level `SET` statement has no effect. + +**Schema, watermark, and primary key** + +The materialized table's schema is derived from the `CREATE OR ALTER MATERIALIZED TABLE` statement and its query, exactly as for a newly created materialized table. The source table's watermark and primary key are **not** carried over — declare them in the conversion statement if you want them. This keeps `CREATE OR ALTER MATERIALIZED TABLE` declarative and idempotent: running the same statement again produces the same materialized table. + +**Example** + +```sql +-- An existing regular table that you now want to maintain as a materialized table +CREATE TABLE user_spending ( + user_id BIGINT, + total_amount BIGINT +) WITH ( + 'connector' = '...' +); + +-- Convert it in place; from now on it is refreshed by the query +CREATE OR ALTER MATERIALIZED TABLE user_spending + AS SELECT + user_id, + SUM(amount) AS total_amount + FROM orders + GROUP BY user_id; +``` +The conversion is one-way and cannot be undone. You can still use the result like a regular table: suspend it to stop the refresh job, then query it. +From the example above: +```sql +-- Suspend the materialized table to stop the refresh job +ALTER MATERIALIZED TABLE user_spending SUSPEND; + +-- Use the materialized table as a regular table +SELECT * FROM user_spending; +``` + + +Note +- The catalog must support in-place conversion; catalogs that do not implement it reject the statement. +- Converting a view into a materialized table is not supported. +- If the refresh job (continuous mode) or refresh workflow (full mode) cannot be started, the conversion is **not** rolled back: the table is left as a materialized table in `SUSPENDED` status. Fix the underlying issue and `RESUME` it. + ## Examples Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes. diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html b/docs/layouts/shortcodes/generated/table_config_configuration.html index 9c9229a96ebf1..9484d942bf081 100644 --- a/docs/layouts/shortcodes/generated/table_config_configuration.html +++ b/docs/layouts/shortcodes/generated/table_config_configuration.html @@ -68,6 +68,12 @@ String The local time zone defines current session time zone id. It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session time zone is used during conversion. The input of option is either a full name such as "America/Los_Angeles", or a custom timezone id such as "GMT-08:00". + +
table.materialized-table.conversion-from-table.enabled

Batch Streaming + false + Boolean + If enabled, executing a CREATE OR ALTER MATERIALIZED TABLE on an existing regular table converts it in place to a materialized table, preserving identity and storage. This is a cluster-wide setting: it must be set in the cluster configuration (e.g. config.yaml); session-level SET has no effect. When disabled (the default), CREATE OR ALTER MATERIALIZED TABLE against a regular table is rejected. +
table.plan.compile.catalog-objects

Batch Streaming ALL diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py index 3e20aab37ca7d..5b14f867d3e23 100644 --- a/flink-python/pyflink/table/tests/test_catalog_completeness.py +++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py @@ -38,7 +38,7 @@ def java_class(cls): @classmethod def excluded_methods(cls): - # open/close are not needed in Python API as they are used internally + # open/close are not needed in Python API as they are used internally. return { 'open', 'close', @@ -51,7 +51,8 @@ def excluded_methods(cls): 'connectionExists', 'listConnections', 'createConnection', - 'alterConnection'} + 'alterConnection', + 'convertTableToMaterializedTable'} class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase): diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 6555bd57d19b7..0ae8d2bb27f61 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -60,6 +60,7 @@ import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; +import org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation; @@ -186,6 +187,9 @@ public ResultFetcher callMaterializedTableOperation( } else if (op instanceof AlterMaterializedTableChangeOperation) { return callAlterMaterializedTableChangeOperation( operationExecutor, handle, (AlterMaterializedTableChangeOperation) op); + } else if (op instanceof ConvertTableToMaterializedTableOperation) { + return callConvertTableToMaterializedTableOperation( + operationExecutor, handle, (ConvertTableToMaterializedTableOperation) op); } throw new SqlExecutionException( @@ -261,37 +265,15 @@ private void createMaterializedTableInFullMode( ResolvedCatalogMaterializedTable catalogMaterializedTable = createMaterializedTableOperation.getCatalogMaterializedTable(); - final IntervalFreshness freshness = catalogMaterializedTable.getDefinitionFreshness(); - String cronExpression = convertFreshnessToCron(freshness); - // create full refresh job - CreateRefreshWorkflow createRefreshWorkflow = - new CreatePeriodicRefreshWorkflow( - materializedTableIdentifier, - catalogMaterializedTable.getExpandedQuery(), - cronExpression, - getSessionInitializationConf(operationExecutor), - Collections.emptyMap(), - restEndpointUrl); - try { - RefreshHandler refreshHandler = - workflowScheduler.createRefreshWorkflow(createRefreshWorkflow); - RefreshHandlerSerializer refreshHandlerSerializer = - workflowScheduler.getRefreshHandlerSerializer(); - byte[] serializedRefreshHandler = refreshHandlerSerializer.serialize(refreshHandler); - - updateRefreshHandler( + createPeriodicRefreshWorkflow( operationExecutor, handle, materializedTableIdentifier, - catalogMaterializedTable, - RefreshStatus.ACTIVATED, - refreshHandler.asSummaryString(), - serializedRefreshHandler); + catalogMaterializedTable); } catch (Exception e) { - // drop materialized table if creating the refresh workflow encounters an exception. - // Thus, weak - // atomicity is guaranteed + // drop materialized table if creating the refresh workflow encounters an exception, so + // weak atomicity is guaranteed operationExecutor.callExecutableOperation( handle, new DropMaterializedTableOperation(materializedTableIdentifier, true)); throw new SqlExecutionException( @@ -302,6 +284,145 @@ private void createMaterializedTableInFullMode( } } + private ResultFetcher callConvertTableToMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + ConvertTableToMaterializedTableOperation convertOperation) { + ResolvedCatalogMaterializedTable materializedTable = + convertOperation.getMaterializedTable(); + if (RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) { + convertTableToMaterializedTableInContinuousMode( + operationExecutor, handle, convertOperation); + } else { + convertTableToMaterializedTableInFullMode(operationExecutor, handle, convertOperation); + } + // Just return ok for unify different refresh job info of continuous and full mode, user + // should get the refresh job info via desc table. + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private void convertTableToMaterializedTableInContinuousMode( + OperationExecutor operationExecutor, + OperationHandle handle, + ConvertTableToMaterializedTableOperation convertOperation) { + // swap the catalog entry from a regular table to a materialized table first + operationExecutor.callExecutableOperation(handle, convertOperation); + + ObjectIdentifier materializedTableIdentifier = convertOperation.getTableIdentifier(); + ResolvedCatalogMaterializedTable catalogMaterializedTable = + convertOperation.getMaterializedTable(); + + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + catalogMaterializedTable, + materializedTableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + suspendMaterializedTable( + operationExecutor, + handle, + materializedTableIdentifier, + catalogMaterializedTable); + throw new SqlExecutionException( + String.format( + "Failed to start the continuous refresh job when converting table %s to a materialized table. " + + "The table was converted and left in SUSPENDED status; resume it once the issue is resolved.", + materializedTableIdentifier), + e); + } + } + + private void convertTableToMaterializedTableInFullMode( + OperationExecutor operationExecutor, + OperationHandle handle, + ConvertTableToMaterializedTableOperation convertOperation) { + if (workflowScheduler == null) { + throw new SqlExecutionException( + "The workflow scheduler must be configured when converting a table to a materialized table in full refresh mode."); + } + // swap the catalog entry from a regular table to a materialized table first + operationExecutor.callExecutableOperation(handle, convertOperation); + + ObjectIdentifier materializedTableIdentifier = convertOperation.getTableIdentifier(); + ResolvedCatalogMaterializedTable catalogMaterializedTable = + convertOperation.getMaterializedTable(); + + try { + createPeriodicRefreshWorkflow( + operationExecutor, + handle, + materializedTableIdentifier, + catalogMaterializedTable); + } catch (Exception e) { + suspendMaterializedTable( + operationExecutor, + handle, + materializedTableIdentifier, + catalogMaterializedTable); + throw new SqlExecutionException( + String.format( + "Failed to create the refresh workflow when converting table %s to a materialized table. " + + "The table was converted and left in SUSPENDED status; resume it once the issue is resolved.", + materializedTableIdentifier), + e); + } + } + + /** + * Creates the periodic refresh workflow for a full-mode materialized table and records the + * resulting handler with {@code ACTIVATED} status. + */ + private void createPeriodicRefreshWorkflow( + OperationExecutor operationExecutor, + OperationHandle handle, + ObjectIdentifier materializedTableIdentifier, + ResolvedCatalogMaterializedTable catalogMaterializedTable) + throws Exception { + final IntervalFreshness freshness = catalogMaterializedTable.getDefinitionFreshness(); + final String cronExpression = convertFreshnessToCron(freshness); + final CreateRefreshWorkflow createRefreshWorkflow = + new CreatePeriodicRefreshWorkflow( + materializedTableIdentifier, + catalogMaterializedTable.getExpandedQuery(), + cronExpression, + getSessionInitializationConf(operationExecutor), + Collections.emptyMap(), + restEndpointUrl); + + final RefreshHandler refreshHandler = + workflowScheduler.createRefreshWorkflow(createRefreshWorkflow); + final RefreshHandlerSerializer refreshHandlerSerializer = + workflowScheduler.getRefreshHandlerSerializer(); + final byte[] serializedRefreshHandler = refreshHandlerSerializer.serialize(refreshHandler); + + updateRefreshHandler( + operationExecutor, + handle, + materializedTableIdentifier, + catalogMaterializedTable, + RefreshStatus.ACTIVATED, + refreshHandler.asSummaryString(), + serializedRefreshHandler); + } + + /** Sets a materialized table refresh status to {@code SUSPENDED}. */ + private void suspendMaterializedTable( + OperationExecutor operationExecutor, + OperationHandle handle, + ObjectIdentifier materializedTableIdentifier, + ResolvedCatalogMaterializedTable catalogMaterializedTable) { + AlterMaterializedTableChangeOperation alterOperation = + new AlterMaterializedTableChangeOperation( + materializedTableIdentifier, + oldTable -> + List.of(TableChange.modifyRefreshStatus(RefreshStatus.SUSPENDED)), + catalogMaterializedTable); + operationExecutor.callExecutableOperation(handle, alterOperation); + } + private ResultFetcher callAlterMaterializedTableSuspend( OperationExecutor operationExecutor, OperationHandle handle, diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableConversionITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableConversionITCase.java new file mode 100644 index 0000000000000..870648e9e9694 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableConversionITCase.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; +import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.table.refresh.ContinuousRefreshHandler; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND; +import static org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE; +import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.executeStatement; +import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.getContinuousRefreshHandler; +import static org.apache.flink.table.gateway.service.MaterializedTableTestUtils.getTable; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; +import static org.apache.flink.test.util.TestUtils.waitUntilAllTasksAreRunning; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * ITCase for converting a regular table in place to a materialized table via CREATE OR ALTER + * MATERIALIZED TABLE. + * + *

This case runs against its own gateway with {@link + * TableConfigOptions#MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED} turned on, so that the + * default-disabled behavior stays covered by the other materialized table cases and the planner + * unit tests. + */ +class MaterializedTableConversionITCase { + + private static final String FILE_CATALOG_STORE = "file_store"; + private static final String TEST_CATALOG_PREFIX = "test_catalog"; + private static final String TEST_DEFAULT_DATABASE = "test_db"; + + private static final AtomicLong COUNTER = new AtomicLong(0); + + @RegisterExtension + @Order(1) + static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .build()); + + @RegisterExtension + @Order(2) + static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + new SqlGatewayServiceExtension( + MaterializedTableConversionITCase::getGatewayConfiguration); + + @RegisterExtension + @Order(3) + static final TestExecutorExtension EXECUTOR_EXTENSION = + new TestExecutorExtension<>( + () -> + Executors.newCachedThreadPool( + new ExecutorThreadFactory( + "SqlGatewayService Test Pool", + IgnoreExceptionHandler.INSTANCE))); + + @RegisterExtension + @Order(4) + static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = + new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); + + private static SqlGatewayServiceImpl service; + private static SessionEnvironment defaultSessionEnvironment; + private static Path baseCatalogPath; + + private String fileSystemCatalogPath; + private String fileSystemCatalogName; + + private SessionHandle sessionHandle; + private RestClusterClient restClusterClient; + + private static Configuration getGatewayConfiguration() { + final Configuration configuration = + new Configuration(MINI_CLUSTER.getClientConfiguration()); + configuration.set( + TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED, true); + return configuration; + } + + @BeforeAll + static void setUp(@TempDir Path temporaryFolder) throws Exception { + service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService(); + + Path fileCatalogStore = temporaryFolder.resolve(FILE_CATALOG_STORE); + Files.createDirectory(fileCatalogStore); + Map catalogStoreOptions = new HashMap<>(); + catalogStoreOptions.put(TABLE_CATALOG_STORE_KIND.key(), "file"); + catalogStoreOptions.put("table.catalog-store.file.path", fileCatalogStore.toString()); + + baseCatalogPath = temporaryFolder.resolve(TEST_CATALOG_PREFIX); + Files.createDirectory(baseCatalogPath); + + Map workflowSchedulerConfig = new HashMap<>(); + workflowSchedulerConfig.put(WORKFLOW_SCHEDULER_TYPE.key(), "embedded"); + workflowSchedulerConfig.put( + "sql-gateway.endpoint.rest.address", + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress()); + workflowSchedulerConfig.put( + "sql-gateway.endpoint.rest.port", + String.valueOf(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort())); + + defaultSessionEnvironment = + SessionEnvironment.newBuilder() + .addSessionConfig(catalogStoreOptions) + .addSessionConfig(workflowSchedulerConfig) + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + } + + @BeforeEach + void before(@InjectClusterClient RestClusterClient injectClusterClient) throws Exception { + String randomStr = String.valueOf(COUNTER.incrementAndGet()); + Path fileCatalogPath = baseCatalogPath.resolve(randomStr); + Files.createDirectory(fileCatalogPath); + Files.createDirectory(fileCatalogPath.resolve(TEST_DEFAULT_DATABASE)); + + fileSystemCatalogPath = fileCatalogPath.toString(); + fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr; + sessionHandle = initializeSession(); + + restClusterClient = injectClusterClient; + } + + @Test + void testConvertTableToMaterializedTableInContinuousMode() throws Exception { + // pre-create a regular table that will be converted in place to a materialized table + String createRegularTableDDL = + "CREATE TABLE users_shops (\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " ds STRING,\n" + + " payed_buy_fee_sum BIGINT,\n" + + " pv INT NOT NULL\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '10'\n" + + ")"; + OperationHandle createTableHandle = + executeStatement(service, sessionHandle, createRegularTableDDL); + awaitOperationTermination(service, sessionHandle, createTableHandle); + + String convertDDL = + "CREATE OR ALTER MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle convertHandle = executeStatement(service, sessionHandle, convertDDL); + awaitOperationTermination(service, sessionHandle, convertHandle); + + // the entry is now a materialized table with an active continuous refresh job + ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops"); + ResolvedCatalogMaterializedTable actualMaterializedTable = + getTable(service, sessionHandle, userShopsIdentifier); + assertThat(actualMaterializedTable.getRefreshMode()).isSameAs(RefreshMode.CONTINUOUS); + assertThat(actualMaterializedTable.getRefreshStatus()).isSameAs(RefreshStatus.ACTIVATED); + assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty(); + + ContinuousRefreshHandler activeRefreshHandler = + getContinuousRefreshHandler(actualMaterializedTable, getClass().getClassLoader()); + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // verify the background refresh job is running + String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId()); + OperationHandle describeJobHandle = + executeStatement(service, sessionHandle, describeJobDDL); + awaitOperationTermination(service, sessionHandle, describeJobHandle); + List jobResults = fetchAllResults(service, sessionHandle, describeJobHandle); + assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); + + dropMaterializedTable(userShopsIdentifier); + } + + @Test + void testConvertTableLeftSuspendedWhenRefreshJobFailsToStart() throws Exception { + String createRegularTableDDL = + "CREATE TABLE users_shops (\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " ds STRING,\n" + + " payed_buy_fee_sum BIGINT,\n" + + " pv INT NOT NULL\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '10'\n" + + ")"; + OperationHandle createTableHandle = + executeStatement(service, sessionHandle, createRegularTableDDL); + awaitOperationTermination(service, sessionHandle, createTableHandle); + + // 'json' cannot encode the updating changelog produced by the GROUP BY, so the continuous + // refresh job fails to start after the catalog entry has already been swapped to a + // materialized table + String convertDDL = + "CREATE OR ALTER MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle convertHandle = executeStatement(service, sessionHandle, convertDDL); + + assertThatThrownBy(() -> awaitOperationTermination(service, sessionHandle, convertHandle)) + .cause() + .hasMessageContaining( + "Failed to start the continuous refresh job when converting table") + .hasMessageContaining("left in SUSPENDED status"); + + // the conversion is not rolled back: the table stays a materialized table but suspended + ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops"); + ResolvedCatalogMaterializedTable suspendedMaterializedTable = + getTable(service, sessionHandle, userShopsIdentifier); + assertThat(suspendedMaterializedTable.getRefreshMode()).isSameAs(RefreshMode.CONTINUOUS); + assertThat(suspendedMaterializedTable.getRefreshStatus()).isSameAs(RefreshStatus.SUSPENDED); + } + + private SessionHandle initializeSession() { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + String catalogDDL = + String.format( + "CREATE CATALOG %s\n" + + "WITH (\n" + + " 'type' = 'test-filesystem',\n" + + " 'path' = '%s',\n" + + " 'default-database' = '%s'\n" + + " )", + fileSystemCatalogName, fileSystemCatalogPath, TEST_DEFAULT_DATABASE); + service.configureSession(sessionHandle, catalogDDL, -1); + service.configureSession( + sessionHandle, String.format("USE CATALOG %s", fileSystemCatalogName), -1); + + String dataGenSource = + "CREATE TABLE datagenSource (\n" + + " order_id BIGINT,\n" + + " order_number VARCHAR(20),\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " product_id BIGINT,\n" + + " status BIGINT,\n" + + " order_type BIGINT,\n" + + " order_created_at TIMESTAMP,\n" + + " payment_amount_cents BIGINT\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '10'\n" + + ")"; + service.configureSession(sessionHandle, dataGenSource, -1); + return sessionHandle; + } + + private ObjectIdentifier getObjectIdentifier(String name) { + return ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, name); + } + + private void dropMaterializedTable(ObjectIdentifier objectIdentifier) throws Exception { + String dropMaterializedTableDDL = + String.format( + "DROP MATERIALIZED TABLE %s", objectIdentifier.asSerializableString()); + OperationHandle dropHandle = + executeStatement(service, sessionHandle, dropMaterializedTableDDL); + awaitOperationTermination(service, sessionHandle, dropHandle); + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index c3a64b5c8fead..134015c565da5 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -1872,14 +1872,14 @@ private OperationHandle executeStatement( } private OperationHandle executeStatement(String statement) { - return executeStatement(statement, -1, new Configuration()); + return MaterializedTableTestUtils.executeStatement(service, sessionHandle, statement); } private ContinuousRefreshHandler getContinuousRefreshHandler( ResolvedCatalogMaterializedTable resolvedTable) throws IOException, ClassNotFoundException { - return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( - resolvedTable.getSerializedRefreshHandler(), getClass().getClassLoader()); + return MaterializedTableTestUtils.getContinuousRefreshHandler( + resolvedTable, getClass().getClassLoader()); } private List getAddedColumns(ResolvedSchema newSchema, ResolvedSchema oldSchema) { @@ -1934,7 +1934,7 @@ private JobKey getJobKey(ObjectIdentifier identifier) { } private ResolvedCatalogMaterializedTable getTable(ObjectIdentifier identifier) { - return (ResolvedCatalogMaterializedTable) service.getTable(sessionHandle, identifier); + return MaterializedTableTestUtils.getTable(service, sessionHandle, identifier); } private static diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableTestUtils.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableTestUtils.java new file mode 100644 index 0000000000000..66af0bd62029e --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableTestUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.refresh.ContinuousRefreshHandler; +import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; + +import java.io.IOException; + +/** Helpers shared between the materialized table gateway ITCases. */ +final class MaterializedTableTestUtils { + + private MaterializedTableTestUtils() {} + + static OperationHandle executeStatement( + SqlGatewayService service, SessionHandle sessionHandle, String statement) { + return service.executeStatement(sessionHandle, statement, -1, new Configuration()); + } + + static ResolvedCatalogMaterializedTable getTable( + SqlGatewayService service, SessionHandle sessionHandle, ObjectIdentifier identifier) { + return (ResolvedCatalogMaterializedTable) service.getTable(sessionHandle, identifier); + } + + static ContinuousRefreshHandler getContinuousRefreshHandler( + ResolvedCatalogMaterializedTable resolvedTable, ClassLoader classLoader) + throws IOException, ClassNotFoundException { + return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + resolvedTable.getSerializedRefreshHandler(), classLoader); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java index 3c59b56162f0d..173caa9092e3f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java @@ -145,6 +145,19 @@ private TableConfigOptions() {} + "If set this option to true and the underlying DynamicTableSink implements the SupportsStaging interface, " + "the statement is expected to be executed atomically, the behavior of which depends on the actual DynamicTableSink."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED = + key("table.materialized-table.conversion-from-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "If enabled, executing a CREATE OR ALTER MATERIALIZED TABLE on an existing regular table " + + "converts it in place to a materialized table, preserving identity and storage. " + + "This is a cluster-wide setting: it must be set in the cluster configuration " + + "(e.g. config.yaml); session-level SET has no effect. " + + "When disabled (the default), CREATE OR ALTER MATERIALIZED TABLE against a regular " + + "table is rejected."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption> TABLE_COLUMN_EXPANSION_STRATEGY = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index cd375bc9554bf..ca31e17ebf4e4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -1404,6 +1404,44 @@ public void alterTable( "AlterTable"); } + /** + * Converts an existing regular table to a materialized table in place. Identity and storage are + * preserved; only the kind and the materialized-table specific metadata change. + * + * @param originalTable the existing regular table + * @param materializedTable the new materialized table definition + * @param changes describe the modification from originalTable to materializedTable + * @param objectIdentifier fully qualified path of the table being converted + */ + public void convertTableToMaterializedTable( + CatalogTable originalTable, + CatalogMaterializedTable materializedTable, + List changes, + ObjectIdentifier objectIdentifier) { + execute( + (catalog, path) -> { + final CatalogTable resolvedOriginal = + (CatalogTable) resolveCatalogBaseTable(originalTable); + final CatalogMaterializedTable resolvedMt = + (CatalogMaterializedTable) resolveCatalogBaseTable(materializedTable); + catalog.convertTableToMaterializedTable( + path, resolvedOriginal, resolvedMt, changes); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + AlterTableEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedMt, + false))); + }, + objectIdentifier, + false, + "ConvertTableToMaterializedTable"); + } + /** * Drops a table in a given fully qualified path. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index bc678672d91d6..83e0688a5192d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -264,6 +264,30 @@ public void alterTable( } } + @Override + public void convertTableToMaterializedTable( + ObjectPath tablePath, + CatalogTable originalTable, + CatalogMaterializedTable materializedTable, + List tableChanges) + throws TableNotExistException { + checkNotNull(tablePath); + checkNotNull(originalTable); + checkNotNull(materializedTable); + + final CatalogBaseTable existing = tables.get(tablePath); + if (existing == null) { + throw new TableNotExistException(getName(), tablePath); + } + if (existing.getTableKind() != CatalogBaseTable.TableKind.TABLE) { + throw new CatalogException( + String.format( + "Cannot convert %s to a materialized table: existing entry has kind %s.", + tablePath.getFullName(), existing.getTableKind())); + } + tables.put(tablePath, materializedTable.copy()); + } + // ------ tables and views ------ @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java new file mode 100644 index 0000000000000..2e29695696bee --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.ExecutableOperation; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * Operation to describe an in-place conversion of an existing regular table to a materialized table + * via CREATE OR ALTER MATERIALIZED TABLE. + */ +@Internal +public class ConvertTableToMaterializedTableOperation + implements MaterializedTableOperation, ExecutableOperation { + + private final ObjectIdentifier tableIdentifier; + private final ResolvedCatalogTable originalTable; + private final ResolvedCatalogMaterializedTable materializedTable; + private final Function> + tableChangesForTable; + private List tableChanges; + + public ConvertTableToMaterializedTableOperation( + ObjectIdentifier tableIdentifier, + ResolvedCatalogTable originalTable, + ResolvedCatalogMaterializedTable materializedTable, + Function> tableChangesBuilder) { + this.tableIdentifier = tableIdentifier; + this.originalTable = originalTable; + this.materializedTable = materializedTable; + this.tableChangesForTable = tableChangesBuilder; + } + + @Override + public TableResultInternal execute(Context ctx) { + ctx.getCatalogManager() + .convertTableToMaterializedTable( + originalTable, materializedTable, getTableChanges(), tableIdentifier); + return TableResultImpl.TABLE_RESULT_OK; + } + + public ObjectIdentifier getTableIdentifier() { + return tableIdentifier; + } + + public ResolvedCatalogTable getOriginalTable() { + return originalTable; + } + + public ResolvedCatalogMaterializedTable getMaterializedTable() { + return materializedTable; + } + + public List getTableChanges() { + if (tableChanges == null) { + tableChanges = tableChangesForTable.apply(materializedTable); + } + return tableChanges; + } + + @Override + public String asSummaryString() { + final Map params = new LinkedHashMap<>(); + params.put("identifier", tableIdentifier); + params.put("materializedTable", materializedTable); + params.put("tableChanges", tableChangesForTable); + return OperationUtils.formatWithChildren( + "CONVERT TABLE TO MATERIALIZED TABLE", + params, + List.of(), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java index 403d94459fb02..ae656d789a600 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java @@ -18,7 +18,11 @@ package org.apache.flink.table.catalog; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase; import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary; @@ -34,6 +38,7 @@ import org.apache.flink.table.utils.TableEnvironmentMock; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -42,6 +47,7 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for GenericInMemoryCatalog. */ class GenericInMemoryCatalogTest extends CatalogTestBase { @@ -54,6 +60,66 @@ static void init() { // ------ tables ------ + @Nested + class ConvertTableToMaterializedTableTest { + @Test + void testConvertTableToMaterializedTable() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createTable(), false); + assertThat(catalog.getTable(path1).getTableKind()) + .isEqualTo(CatalogBaseTable.TableKind.TABLE); + + catalog.convertTableToMaterializedTable( + path1, createTable(), convertedMaterializedTable(), List.of()); + + final CatalogBaseTable converted = catalog.getTable(path1); + assertThat(converted).isInstanceOf(CatalogMaterializedTable.class); + assertThat(converted.getTableKind()) + .isEqualTo(CatalogBaseTable.TableKind.MATERIALIZED_TABLE); + } + + @Test + void testConvertTableToMaterializedTable_tableNotExist() throws Exception { + catalog.createDatabase(db1, createDb(), false); + assertThatThrownBy( + () -> + catalog.convertTableToMaterializedTable( + path1, + createTable(), + convertedMaterializedTable(), + List.of())) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testConvertTableToMaterializedTable_existingIsNotARegularTable() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, convertedMaterializedTable(), false); + assertThatThrownBy( + () -> + catalog.convertTableToMaterializedTable( + path1, + createTable(), + convertedMaterializedTable(), + List.of())) + .isInstanceOf(CatalogException.class) + .hasMessageContaining("existing entry has kind"); + } + + private CatalogMaterializedTable convertedMaterializedTable() { + return CatalogMaterializedTable.newBuilder() + .schema(createTable().getUnresolvedSchema()) + .options(Map.of()) + .originalQuery("SELECT * FROM src") + .expandedQuery("SELECT * FROM `builtin`.`default`.`src`") + .freshness(IntervalFreshness.ofSecond(30)) + .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC) + .refreshMode(RefreshMode.CONTINUOUS) + .refreshStatus(RefreshStatus.INITIALIZING) + .build(); + } + } + @Test void testDropTable_partitionedTable() throws Exception { catalog.createDatabase(db1, createDb(), false); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index 3db7d7cf2ee8b..3418b193d45f4 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -383,6 +383,30 @@ default void alterTable( alterTable(tablePath, newTable, ignoreIfNotExists); } + /** + * Converts an existing {@link CatalogTable} to a {@link CatalogMaterializedTable} in place, + * preserving the catalog entry's identity and storage. + * + *

The default throws {@link UnsupportedOperationException}; catalogs that support in-place + * conversion override it. Launching the refresh job for the new materialized table is the + * executor's responsibility, not the catalog's. + * + * @param tableChanges structured delta between {@code originalTable} and {@code + * materializedTable}, useful for incremental storage migration. + * @throws CatalogException if the existing entry is not a {@link CatalogTable} + */ + default void convertTableToMaterializedTable( + ObjectPath tablePath, + CatalogTable originalTable, + CatalogMaterializedTable materializedTable, + List tableChanges) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException( + "Catalog " + + getClass().getName() + + " does not support converting tables to materialized tables."); + } + // ------ partitions ------ /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index c1e35838ead63..fd6d1b759cd07 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -22,14 +22,16 @@ import org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMaterializedTable; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaResolver; import org.apache.flink.table.catalog.StartMode; @@ -38,6 +40,7 @@ import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.MaterializedTableChangeHandler; @@ -78,23 +81,29 @@ private Operation handleCreateOrAlter( final ObjectIdentifier identifier) { final Optional> resolvedBaseTable = context.getCatalogManager().getCatalogBaseTable(identifier); - return resolvedBaseTable - .map( - oldBaseTable -> { - if (oldBaseTable.getTableKind() != TableKind.MATERIALIZED_TABLE) { - throw new ValidationException( - String.format( - "Table %s is not a materialized table. Only materialized table support create or alter operation.", - identifier.asSummaryString())); - } - return handleAlter( - sqlCreateOrAlterMaterializedTable, - (ResolvedCatalogMaterializedTable) oldBaseTable, - context, - identifier); - }) - .orElseGet( - () -> handleCreate(sqlCreateOrAlterMaterializedTable, context, identifier)); + if (resolvedBaseTable.isEmpty()) { + return handleCreate(sqlCreateOrAlterMaterializedTable, context, identifier); + } + final ResolvedCatalogBaseTable oldBaseTable = resolvedBaseTable.get(); + switch (oldBaseTable.getTableKind()) { + case MATERIALIZED_TABLE: + return handleAlter( + sqlCreateOrAlterMaterializedTable, + (ResolvedCatalogMaterializedTable) oldBaseTable, + context, + identifier); + case TABLE: + return handleConvert( + sqlCreateOrAlterMaterializedTable, + context, + identifier, + (ResolvedCatalogTable) oldBaseTable); + default: + throw new ValidationException( + String.format( + "Table %s is not a materialized table. Only materialized table support create or alter operation.", + identifier.asSummaryString())); + } } private static boolean createOrAlterOperation( @@ -117,6 +126,84 @@ private Operation handleAlter( currentTable -> buildNewTable(currentTable, mergeContext, schemaResolver)); } + private Operation handleConvert( + final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterMaterializedTable, + final ConvertContext context, + final ObjectIdentifier identifier, + final ResolvedCatalogTable oldBaseTable) { + final boolean conversionEnabled = + context.getTableConfig() + .getRootConfiguration() + .get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED); + if (!conversionEnabled) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table. Only materialized table support create or alter operation.", + identifier.asSummaryString())); + } + final MergeContext baseMergeContext = + getMergeContext(sqlCreateOrAlterMaterializedTable, context); + + final CatalogMaterializedTable newMaterializedTable = + CatalogMaterializedTable.newBuilder() + .schema(baseMergeContext.getMergedSchema()) + .comment(baseMergeContext.getMergedComment()) + .partitionKeys(baseMergeContext.getMergedPartitionKeys()) + .options(baseMergeContext.getMergedTableOptions()) + .originalQuery(baseMergeContext.getMergedOriginalQuery()) + .expandedQuery(baseMergeContext.getMergedExpandedQuery()) + .distribution(baseMergeContext.getMergedTableDistribution().orElse(null)) + .freshness(baseMergeContext.getMergedFreshness()) + .logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode()) + .refreshMode(baseMergeContext.getMergedRefreshMode()) + .refreshStatus(RefreshStatus.INITIALIZING) + .startMode(baseMergeContext.getMergedStartMode()) + .build(); + final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable = + context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable); + + return new ConvertTableToMaterializedTableOperation( + identifier, + oldBaseTable, + resolvedNewMaterializedTable, + resolvedCatalogMaterializedTable -> + buildConversionTableChanges( + oldBaseTable, + resolvedCatalogMaterializedTable, + baseMergeContext.hasSchemaDefinition(), + baseMergeContext.hasConstraintDefinition())); + } + + private List buildConversionTableChanges( + final ResolvedCatalogTable oldTable, + final ResolvedCatalogMaterializedTable newTable, + final boolean hasSchemaDefinition, + final boolean hasConstraintDefinition) { + final ResolvedSchema oldSchema = oldTable.getResolvedSchema(); + final ResolvedSchema newSchema = newTable.getResolvedSchema(); + final List changes = + new ArrayList<>( + MaterializedTableUtils.validateAndExtractColumnChanges( + oldSchema, newSchema, hasSchemaDefinition)); + + getConstraintChange(oldSchema, newSchema, hasConstraintDefinition).ifPresent(changes::add); + getWatermarkChange(oldSchema, newSchema, hasSchemaDefinition).ifPresent(changes::add); + + changes.addAll( + getQueryTableChanges( + null, null, newTable.getOriginalQuery(), newTable.getExpandedQuery())); + changes.addAll(getOptionsTableChanges(oldTable.getOptions(), newTable.getOptions())); + changes.addAll( + getDistributionTableChanges( + oldTable.getDistribution().orElse(null), + newTable.getDistribution().orElse(null))); + + newTable.getStartMode() + .ifPresent(newStartMode -> changes.add(TableChange.modifyStartMode(newStartMode))); + + return changes; + } + private CatalogMaterializedTable buildNewTable( final ResolvedCatalogMaterializedTable currentTable, final MergeContext mergeContext, @@ -161,9 +248,19 @@ private List buildTableChanges( final List changes = getSchemaTableChanges(mergeContext, schemaResolver, oldTable); - changes.addAll(getQueryTableChanges(mergeContext, oldTable)); - changes.addAll(getOptionsTableChanges(mergeContext, oldTable)); - changes.addAll(getDistributionTableChanges(mergeContext, oldTable)); + changes.addAll( + getQueryTableChanges( + oldTable.getOriginalQuery(), + oldTable.getExpandedQuery(), + mergeContext.getMergedOriginalQuery(), + mergeContext.getMergedExpandedQuery())); + changes.addAll( + getOptionsTableChanges( + oldTable.getOptions(), mergeContext.getMergedTableOptions())); + changes.addAll( + getDistributionTableChanges( + oldTable.getDistribution().orElse(null), + mergeContext.getMergedTableDistribution().orElse(null))); final RefreshMode oldRefreshMode = oldTable.getRefreshMode(); final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode(); @@ -188,10 +285,7 @@ private List buildTableChanges( } private List getDistributionTableChanges( - final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) { - final TableDistribution oldDistribution = oldTable.getDistribution().orElse(null); - final TableDistribution newDistribution = - mergeContext.getMergedTableDistribution().orElse(null); + final TableDistribution oldDistribution, final TableDistribution newDistribution) { if (!Objects.equals(oldDistribution, newDistribution)) { if (oldDistribution == null) { return List.of(TableChange.add(newDistribution)); @@ -205,10 +299,8 @@ private List getDistributionTableChanges( } private List getOptionsTableChanges( - final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) { + final Map oldOptions, final Map newOptions) { final List changes = new ArrayList<>(); - final Map oldOptions = oldTable.getOptions(); - final Map newOptions = mergeContext.getMergedTableOptions(); for (Map.Entry newOptionEntry : newOptions.entrySet()) { if (!newOptionEntry.getValue().equals(oldOptions.get(newOptionEntry.getKey()))) { @@ -225,15 +317,13 @@ private List getOptionsTableChanges( } private List getQueryTableChanges( - final MergeContext mergeContext, final ResolvedCatalogMaterializedTable oldTable) { - final String originalQuery = oldTable.getOriginalQuery(); - final String expandedQuery = oldTable.getExpandedQuery(); - if (!originalQuery.equals(mergeContext.getMergedOriginalQuery()) - || !expandedQuery.equals(mergeContext.getMergedExpandedQuery())) { - return List.of( - TableChange.modifyDefinitionQuery( - mergeContext.getMergedOriginalQuery(), - mergeContext.getMergedExpandedQuery())); + final String oldOriginalQuery, + final String oldExpandedQuery, + final String newOriginalQuery, + final String newExpandedQuery) { + if (!Objects.equals(oldOriginalQuery, newOriginalQuery) + || !Objects.equals(oldExpandedQuery, newExpandedQuery)) { + return List.of(TableChange.modifyDefinitionQuery(newOriginalQuery, newExpandedQuery)); } return List.of(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java index 9ed19411a7a7c..01a25d488366b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java @@ -61,7 +61,7 @@ /** Test base for testing convert sql statement to operation. */ class SqlNodeToOperationConversionTestBase { private final boolean isStreamingMode = false; - private final TableConfig tableConfig = TableConfig.getDefault(); + protected final TableConfig tableConfig = TableConfig.getDefault(); protected final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default"); protected final CatalogManager catalogManager = CatalogManagerMocks.preparedCatalogManager() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConvertTableToMaterializedTableTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConvertTableToMaterializedTableTest.java new file mode 100644 index 0000000000000..80060959a02d8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConvertTableToMaterializedTableTest.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for in-place conversion of a regular table to a materialized table via {@code CREATE OR + * ALTER MATERIALIZED TABLE}. + */ +class SqlNodeToOperationConvertTableToMaterializedTableTest + extends SqlNodeToOperationConversionTestBase { + + private static final String SOURCE_REGULAR_TABLE_NAME = "src_table"; + + @BeforeEach + void before() throws TableAlreadyExistException, DatabaseNotExistException { + super.before(); + sourceTable(SOURCE_REGULAR_TABLE_NAME).create(); + sourceTable("t1_with_ts").create(); + } + + @Nested + class OperationSelection { + private static final String EXISTING_MT_NAME = "existing_mt"; + + @Test + void missingTargetCreatesMaterializedTable() { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE brand_new" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b FROM t1"; + assertThat(parse(sql)).isInstanceOf(CreateMaterializedTableOperation.class); + } + + @Test + void existingMaterializedTableAlters() + throws TableAlreadyExistException, DatabaseNotExistException { + configureConversionEnabled(true); + createExistingMaterializedTable(); + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE " + + EXISTING_MT_NAME + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b FROM t1"; + assertThat(parse(sql)).isInstanceOf(FullAlterMaterializedTableOperation.class); + } + + @Test + void regularTableWithConversionDisabledIsRejected() { + configureConversionEnabled(false); + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE " + + SOURCE_REGULAR_TABLE_NAME + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b FROM t1"; + assertThatThrownBy(() -> parse(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("is not a materialized table"); + } + + @Test + void regularTableWithConversionEnabledIsConverted() { + configureConversionEnabled(true); + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE " + + SOURCE_REGULAR_TABLE_NAME + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b FROM t1"; + assertThat(parse(sql)).isInstanceOf(ConvertTableToMaterializedTableOperation.class); + } + + private void createExistingMaterializedTable() + throws TableAlreadyExistException, DatabaseNotExistException { + final String sql = + "CREATE MATERIALIZED TABLE existing_mt (\n" + + " CONSTRAINT pk1 PRIMARY KEY(a) NOT ENFORCED\n" + + ")\n" + + "FRESHNESS = INTERVAL '1' MINUTE\n" + + "AS SELECT a, b FROM t1"; + final Operation op = parse(sql); + assertThat(op).isInstanceOf(CreateMaterializedTableOperation.class); + final CatalogMaterializedTable mt = + ((CreateMaterializedTableOperation) op).getCatalogMaterializedTable(); + catalog.createTable( + new ObjectPath(catalogManager.getCurrentDatabase(), EXISTING_MT_NAME), + mt, + true); + } + } + + @Nested + class ConfigScope { + + @Test + void sessionOnlyEnableHasNoEffect() { + tableConfig.set( + TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED, true); + // root configuration left default (false) + + assertThatThrownBy(() -> parse(conversionSql())) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("is not a materialized table"); + } + + @Test + void clusterRootEnableAllowsConversion() { + configureConversionEnabled(true); + + assertThat(parse(conversionSql())) + .isInstanceOf(ConvertTableToMaterializedTableOperation.class); + } + + @Test + void bothSessionAndClusterEnabledAllowsConversion() { + tableConfig.set( + TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED, true); + configureConversionEnabled(true); + + assertThat(parse(conversionSql())) + .isInstanceOf(ConvertTableToMaterializedTableOperation.class); + } + + @Test + void neitherSessionNorClusterEnabledIsRejected() { + // nothing set + assertThatThrownBy(() -> parse(conversionSql())) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("is not a materialized table"); + } + + private String conversionSql() { + return "CREATE OR ALTER MATERIALIZED TABLE " + + SOURCE_REGULAR_TABLE_NAME + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b FROM t1"; + } + } + + @Nested + class WatermarkAndPrimaryKey { + + @BeforeEach + void enableConversion() { + configureConversionEnabled(true); + } + + @Test + void sourceWatermarkAndPrimaryKeyAreNotInherited() + throws TableAlreadyExistException, DatabaseNotExistException { + sourceTable("src_wm_pk").withWatermark().withPrimaryKey().create(); + + final ResolvedSchema newSchema = + convertedMaterializedTableSchema( + "CREATE OR ALTER MATERIALIZED TABLE src_wm_pk" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts FROM t1_with_ts"); + + assertThat(newSchema.getWatermarkSpecs()).isEmpty(); + assertThat(newSchema.getPrimaryKey()).isEmpty(); + } + + @Test + void columnListWithoutWatermarkOrPrimaryKeyDropsThem() + throws TableAlreadyExistException, DatabaseNotExistException { + sourceTable("src_list").withWatermark().withPrimaryKey().create(); + + final ResolvedSchema newSchema = + convertedMaterializedTableSchema( + "CREATE OR ALTER MATERIALIZED TABLE src_list (" + + " a BIGINT NOT NULL, b STRING, ts TIMESTAMP(3))" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts FROM t1_with_ts"); + + assertThat(newSchema.getWatermarkSpecs()).isEmpty(); + assertThat(newSchema.getPrimaryKey()).isEmpty(); + } + + @Test + void declaredWatermarkAndPrimaryKeyArePresent() + throws TableAlreadyExistException, DatabaseNotExistException { + sourceTable("src_neither").create(); + + final ResolvedSchema newSchema = + convertedMaterializedTableSchema( + "CREATE OR ALTER MATERIALIZED TABLE src_neither (" + + " a BIGINT NOT NULL, b STRING, ts TIMESTAMP(3)," + + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND," + + " PRIMARY KEY (a) NOT ENFORCED)" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts FROM t1_with_ts"); + + assertThat(newSchema.getWatermarkSpecs()).isNotEmpty(); + assertThat(newSchema.getPrimaryKey()).isPresent(); + } + + @Test + void declaredWatermarkOnlyDropsSourcePrimaryKey() + throws TableAlreadyExistException, DatabaseNotExistException { + sourceTable("src_pk_only").withPrimaryKey().create(); + + final ResolvedSchema newSchema = + convertedMaterializedTableSchema( + "CREATE OR ALTER MATERIALIZED TABLE src_pk_only (" + + " a BIGINT NOT NULL, b STRING, ts TIMESTAMP(3)," + + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts FROM t1_with_ts"); + + assertThat(newSchema.getWatermarkSpecs()).isNotEmpty(); + assertThat(newSchema.getPrimaryKey()).isEmpty(); + } + + @Test + void declaredPrimaryKeyOnlyDropsSourceWatermark() + throws TableAlreadyExistException, DatabaseNotExistException { + sourceTable("src_wm_only").withWatermark().create(); + + final ResolvedSchema newSchema = + convertedMaterializedTableSchema( + "CREATE OR ALTER MATERIALIZED TABLE src_wm_only (" + + " a BIGINT NOT NULL, b STRING, ts TIMESTAMP(3)," + + " PRIMARY KEY (a) NOT ENFORCED)" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts FROM t1_with_ts"); + + assertThat(newSchema.getWatermarkSpecs()).isEmpty(); + assertThat(newSchema.getPrimaryKey()).isPresent(); + } + + @Test + void neitherSourceNorStatementDeclaresThem() + throws TableAlreadyExistException, DatabaseNotExistException { + sourceTable("src_plain").create(); + + final ResolvedSchema newSchema = + convertedMaterializedTableSchema( + "CREATE OR ALTER MATERIALIZED TABLE src_plain" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts FROM t1_with_ts"); + + assertThat(newSchema.getWatermarkSpecs()).isEmpty(); + assertThat(newSchema.getPrimaryKey()).isEmpty(); + } + + private ResolvedSchema convertedMaterializedTableSchema(String sql) { + return convertedOperation(sql).getMaterializedTable().getResolvedSchema(); + } + } + + @Nested + class QueryEvolution { + + @BeforeEach + void enableConversion() { + configureConversionEnabled(true); + } + + @Test + void queryAddsColumnNotInSourceAddsItToNewMaterializedTable() + throws TableAlreadyExistException, DatabaseNotExistException { + // src has columns a, b, ts + sourceTable("src_add_col").create(); + + final ConvertTableToMaterializedTableOperation op = + convertedOperation( + "CREATE OR ALTER MATERIALIZED TABLE src_add_col" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts, CAST('extra' AS STRING) AS c FROM t1_with_ts"); + + final ResolvedSchema newSchema = op.getMaterializedTable().getResolvedSchema(); + assertThat(newSchema.getColumnNames()).containsExactly("a", "b", "ts", "c"); + assertThat(op.getTableChanges()) + .anyMatch( + change -> + change instanceof TableChange.AddColumn + && "c" + .equals( + ((TableChange.AddColumn) change) + .getColumn() + .getName())); + } + + @Test + void queryDropsColumnFromSourceDropsItFromNewMaterializedTable() + throws TableAlreadyExistException, DatabaseNotExistException { + // src has columns a, b, ts + sourceTable("src_drop_col").create(); + + final ConvertTableToMaterializedTableOperation op = + convertedOperation( + "CREATE OR ALTER MATERIALIZED TABLE src_drop_col" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b FROM t1_with_ts"); + + final ResolvedSchema newSchema = op.getMaterializedTable().getResolvedSchema(); + assertThat(newSchema.getColumnNames()).containsExactly("a", "b"); + assertThat(op.getTableChanges()) + .anyMatch( + change -> + change instanceof TableChange.DropColumn + && "ts" + .equals( + ((TableChange.DropColumn) change) + .getColumnName())); + } + + @Test + void queryRenamesColumnViaAliasIsModeledAsDropAndAdd() + throws TableAlreadyExistException, DatabaseNotExistException { + // src has columns a, b, ts + sourceTable("src_rename_col").create(); + + final ConvertTableToMaterializedTableOperation op = + convertedOperation( + "CREATE OR ALTER MATERIALIZED TABLE src_rename_col" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT a, b, ts AS event_time FROM t1_with_ts"); + + final ResolvedSchema newSchema = op.getMaterializedTable().getResolvedSchema(); + assertThat(newSchema.getColumnNames()).containsExactly("a", "b", "event_time"); + assertThat(op.getTableChanges()) + .anyMatch( + change -> + change instanceof TableChange.DropColumn + && "ts" + .equals( + ((TableChange.DropColumn) change) + .getColumnName())) + .anyMatch( + change -> + change instanceof TableChange.AddColumn + && "event_time" + .equals( + ((TableChange.AddColumn) change) + .getColumn() + .getName())); + } + + @Test + void queryChangesColumnTypeIsModeledAsModifyPhysicalColumnType() + throws TableAlreadyExistException, DatabaseNotExistException { + // src has columns a BIGINT NOT NULL, b STRING, ts TIMESTAMP(3) + sourceTable("src_type_change").create(); + + final ConvertTableToMaterializedTableOperation op = + convertedOperation( + "CREATE OR ALTER MATERIALIZED TABLE src_type_change" + + " FRESHNESS = INTERVAL '1' MINUTE" + + " AS SELECT CAST(a AS INT) AS a, b, ts FROM t1_with_ts"); + + final ResolvedSchema newSchema = op.getMaterializedTable().getResolvedSchema(); + assertThat(newSchema.getColumn("a")) + .hasValueSatisfying( + column -> + assertThat(column.getDataType().getLogicalType().getTypeRoot()) + .isEqualTo(LogicalTypeRoot.INTEGER)); + assertThat(op.getTableChanges()) + .anyMatch(change -> change instanceof TableChange.ModifyPhysicalColumnType); + } + } + + // -------------------------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------------------------- + + private void configureConversionEnabled(boolean enabled) { + final Configuration root = new Configuration(); + root.set(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED, enabled); + tableConfig.setRootConfiguration(root); + } + + private ConvertTableToMaterializedTableOperation convertedOperation(String sql) { + final Operation op = parse(sql); + assertThat(op).isInstanceOf(ConvertTableToMaterializedTableOperation.class); + return (ConvertTableToMaterializedTableOperation) op; + } + + private SourceTableBuilder sourceTable(String name) { + return new SourceTableBuilder(name); + } + + /** Fluent builder for registering a regular {@code (a, b, ts)} source table in the catalog. */ + private final class SourceTableBuilder { + + private final String name; + private boolean withWatermark; + private boolean withPrimaryKey; + + private SourceTableBuilder(String name) { + this.name = name; + } + + SourceTableBuilder withWatermark() { + this.withWatermark = true; + return this; + } + + SourceTableBuilder withPrimaryKey() { + this.withPrimaryKey = true; + return this; + } + + void create() throws TableAlreadyExistException, DatabaseNotExistException { + final Schema.Builder schema = Schema.newBuilder(); + schema.column("a", DataTypes.BIGINT().notNull()); + schema.column("b", DataTypes.STRING()); + schema.column("ts", DataTypes.TIMESTAMP(3)); + if (withWatermark) { + schema.watermark("ts", new SqlCallExpression("ts - INTERVAL '5' SECOND")); + } + if (withPrimaryKey) { + schema.primaryKeyNamed("pk_src", List.of("a")); + } + final Map options = new HashMap<>(); + options.put("connector", "COLLECTION"); + final CatalogTable table = + CatalogTable.newBuilder().schema(schema.build()).options(options).build(); + catalog.createTable( + new ObjectPath(catalogManager.getCurrentDatabase(), name), table, false); + } + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java index 787178367f25e..a268e08402ab1 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java @@ -416,6 +416,40 @@ public void alterTable( } } + @Override + public void convertTableToMaterializedTable( + ObjectPath tablePath, + CatalogTable originalTable, + CatalogMaterializedTable materializedTable, + List tableChanges) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + Tuple4 tableSchemaInfo = + getTableJsonInfo(tablePath, materializedTable); + Path tableSchemaPath = tableSchemaInfo.f1; + String jsonSchema = tableSchemaInfo.f3; + try { + if (!fs.exists(tableSchemaPath)) { + throw new CatalogException( + String.format( + "Table %s schema file %s doesn't exist.", + tablePath, tableSchemaPath)); + } + // overwrite the schema file with the materialized table definition + Path tableSchemaFilePath = + tableSchemaFilePath(tableSchemaPath, tablePath.getObjectName()); + try (FSDataOutputStream os = + fs.create(tableSchemaFilePath, FileSystem.WriteMode.OVERWRITE)) { + os.write(jsonSchema.getBytes(StandardCharsets.UTF_8)); + } + } catch (IOException e) { + throw new CatalogException(String.format("Error converting table %s.", tablePath), e); + } + } + @Override public List listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {