Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/content.zh/docs/sql/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ The `OR ALTER` clause provides create-or-update semantics:

- **If the materialized table does not exist**: Creates a new materialized table with the specified options
- **If the materialized table exists**: Modifies the query definition (behaves like `ALTER MATERIALIZED TABLE AS`)
- **If a regular table with the same name exists**: Converts it in place into a materialized table, when enabled (see [Converting a Table to a Materialized Table](#converting-a-table-to-a-materialized-table))
Comment thread
raminqaf marked this conversation as resolved.

This is particularly useful in declarative deployment scenarios where you want to define the desired state without checking if the materialized table already exists.

Expand All @@ -260,6 +261,63 @@ The operation updates the materialized table similarly to [ALTER MATERIALIZED TA

See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.

## Converting a Table to a Materialized Table

This lets you adopt a materialized table on top of a table that already exists, without dropping and recreating it.

`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table into a materialized table in place. The catalog object keeps its identity and underlying storage. Its kind becomes materialized table, and its schema, options, query definition, freshness, and refresh mode are taken from the conversion statement, exactly as for a newly created materialized table. After the conversion, a refresh job is launched just as it is for a newly created materialized table.


**Enabling conversion**

Conversion is disabled by default. When the option is disabled, `CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected.
To enable it, set the following option in the cluster configuration file `config.yaml`:

```yaml
table.materialized-table.conversion-from-table.enabled: true
```

This is a cluster-wide setting: it must be set in the cluster configuration, and a session-level `SET` statement has no effect.

**Schema, watermark, and primary key**

The materialized table's schema is derived from the `CREATE OR ALTER MATERIALIZED TABLE` statement and its query, exactly as for a newly created materialized table. The source table's watermark and primary key are **not** carried over — declare them in the conversion statement if you want them. This keeps `CREATE OR ALTER MATERIALIZED TABLE` declarative and idempotent: running the same statement again produces the same materialized table.

**Example**

```sql
-- An existing regular table that you now want to maintain as a materialized table
CREATE TABLE user_spending (
user_id BIGINT,
total_amount BIGINT
) WITH (
'connector' = '...'
);

-- Convert it in place; from now on it is refreshed by the query
CREATE OR ALTER MATERIALIZED TABLE user_spending
AS SELECT
user_id,
SUM(amount) AS total_amount
FROM orders
GROUP BY user_id;
```
The conversion is one-way and cannot be undone. You can still use the result like a regular table: suspend it to stop the refresh job, then query it.
From the example above:
```sql
-- Suspend the materialized table to stop the refresh job
ALTER MATERIALIZED TABLE user_spending SUSPEND;

-- Use the materialized table as a regular table
SELECT * FROM user_spending;
```


<span class="label label-danger">Note</span>
- The catalog must support in-place conversion; catalogs that do not implement it reject the statement.
- Converting a view into a materialized table is not supported.
- If the refresh job (continuous mode) or refresh workflow (full mode) cannot be started, the conversion is **not** rolled back: the table is left as a materialized table in `SUSPENDED` status. Fix the underlying issue and `RESUME` it.

## 示例

假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟。
Expand Down
58 changes: 58 additions & 0 deletions docs/content/docs/sql/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ The `OR ALTER` clause provides create-or-update semantics:

- **If the materialized table does not exist**: Creates a new materialized table with the specified options
- **If the materialized table exists**: Modifies the query definition (behaves like `ALTER MATERIALIZED TABLE AS`)
- **If a regular table with the same name exists**: Converts it in place into a materialized table, when enabled (see [Converting a Table to a Materialized Table](#converting-a-table-to-a-materialized-table))

This is particularly useful in declarative deployment scenarios where you want to define the desired state without checking if the materialized table already exists.

Expand All @@ -260,6 +261,63 @@ The operation updates the materialized table similarly to [ALTER MATERIALIZED TA

See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.

## Converting a Table to a Materialized Table

This lets you adopt a materialized table on top of a table that already exists, without dropping and recreating it.

`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table into a materialized table in place. The catalog object keeps its identity and underlying storage. Its kind becomes materialized table, and its schema, options, query definition, freshness, and refresh mode are taken from the conversion statement, exactly as for a newly created materialized table. After the conversion, a refresh job is launched just as it is for a newly created materialized table.


**Enabling conversion**

Conversion is disabled by default. When the option is disabled, `CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected.
To enable it, set the following option in the cluster configuration file `config.yaml`:

```yaml
table.materialized-table.conversion-from-table.enabled: true
```

This is a cluster-wide setting: it must be set in the cluster configuration, and a session-level `SET` statement has no effect.

**Schema, watermark, and primary key**

The materialized table's schema is derived from the `CREATE OR ALTER MATERIALIZED TABLE` statement and its query, exactly as for a newly created materialized table. The source table's watermark and primary key are **not** carried over — declare them in the conversion statement if you want them. This keeps `CREATE OR ALTER MATERIALIZED TABLE` declarative and idempotent: running the same statement again produces the same materialized table.

**Example**

```sql
-- An existing regular table that you now want to maintain as a materialized table
CREATE TABLE user_spending (
user_id BIGINT,
total_amount BIGINT
) WITH (
'connector' = '...'
);

-- Convert it in place; from now on it is refreshed by the query
CREATE OR ALTER MATERIALIZED TABLE user_spending
AS SELECT
user_id,
SUM(amount) AS total_amount
FROM orders
GROUP BY user_id;
```
The conversion is one-way and cannot be undone. You can still use the result like a regular table: suspend it to stop the refresh job, then query it.
From the example above:
```sql
-- Suspend the materialized table to stop the refresh job
ALTER MATERIALIZED TABLE user_spending SUSPEND;

-- Use the materialized table as a regular table
SELECT * FROM user_spending;
```


<span class="label label-danger">Note</span>
- The catalog must support in-place conversion; catalogs that do not implement it reject the statement.
- Converting a view into a materialized table is not supported.
- If the refresh job (continuous mode) or refresh workflow (full mode) cannot be started, the conversion is **not** rolled back: the table is left as a materialized table in `SUSPENDED` status. Fix the underlying issue and `RESUME` it.

## Examples

Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<td>String</td>
<td>The local time zone defines current session time zone id. It is used when converting to/from &lt;code&gt;TIMESTAMP WITH LOCAL TIME ZONE&lt;/code&gt;. Internally, timestamps with local time zone are always represented in the UTC time zone. However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session time zone is used during conversion. The input of option is either a full name such as "America/Los_Angeles", or a custom timezone id such as "GMT-08:00".</td>
</tr>
<tr>
<td><h5>table.materialized-table.conversion-from-table.enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If enabled, executing a CREATE OR ALTER MATERIALIZED TABLE on an existing regular table converts it in place to a materialized table, preserving identity and storage. This is a cluster-wide setting: it must be set in the cluster configuration (e.g. config.yaml); session-level SET has no effect. When disabled (the default), CREATE OR ALTER MATERIALIZED TABLE against a regular table is rejected.</td>
</tr>
<tr>
<td><h5>table.plan.compile.catalog-objects</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ALL</td>
Expand Down
5 changes: 3 additions & 2 deletions flink-python/pyflink/table/tests/test_catalog_completeness.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def java_class(cls):

@classmethod
def excluded_methods(cls):
# open/close are not needed in Python API as they are used internally
# open/close are not needed in Python API as they are used internally.
return {
'open',
'close',
Expand All @@ -51,7 +51,8 @@ def excluded_methods(cls):
'connectionExists',
'listConnections',
'createConnection',
'alterConnection'}
'alterConnection',
'convertTableToMaterializedTable'}


class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
Expand Down
Loading