Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions docs/content.zh/docs/dev/table/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ FROM TenantKafka t
该功能默认启用。当满足以下所有条件时, regular join 将自动优化为 delta join。

1. 作业拓扑结构满足优化条件。具体可以查看[支持的功能和限制]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)。
2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 文档](https://fluss.apache.org/docs/engine-flink/delta-joins/#flink-version-support)。
2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache Fluss (Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 文档](https://fluss.apache.org/docs/engine-flink/delta-joins/#flink-version-support)。

<a name="working-principle"></a>

Expand Down Expand Up @@ -445,16 +445,23 @@ SET 'table.optimizer.delta-join.strategy' = 'NONE';

1. 支持 **INSERT-only** 的表作为源表。
2. 支持不带 **DELETE 操作**的 **CDC** 表作为源表。
3. 支持源表和 delta join 间包含 **project** 和 **filter** 算子。
3. 支持源表和 delta join 间包含 **projection** 和 **filter** 算子。
4. Delta join 算子内支持**缓存**。
5. 支持**级联 delta join**(cascaded delta join)—— 查询中符合条件的 join 节点从源表到结果表被依次转换为 delta join 节点。
6. 支持 delta join 后接 **lookup join**。
7. 支持在 delta join 与下游算子之间的 projection 和 filter 中使用**非确定性函数**。

然而,delta join 也存在几个**限制**,包含以下任何条件的作业无法优化为 delta join。

1. 表的**索引键**必须包含在 join 的**等值条件**中
2. 目前仅支持 **INNER JOIN**。
3. **下游节点**必须能够处理**冗余变更**。例如以 **UPSERT 模式**运行、不带 `upsertMaterialize` 的 sink 节点。
4. 当消费 **CDC 流**时,**join key** 必须是**主键**的一部分。
5. 当消费 **CDC 流**时,所有 **filter** 必须应用于 **upsert key** 上。
6. 所有 project 和 filter 都不能包含**非确定性函数**。
4. 当消费 **CDC 流**时,**join key** 必须是 **upsert key**<sup>[1]</sup> 的一部分。
5. 当消费 **CDC 流**时,所有 **filter** 必须应用于 **upsert key**<sup>[1]</sup> 上。
6. 源表和 delta join 之间的 projection 或 filter 不能包含**非确定性函数**。

{{< hint info >}}
[1] Flink 支持定义表级别的**不可变列**(immutable columns)约束来丰富 upsert key,从而使更多场景能够被优化为 delta join。不可变列约束声明某些列一旦为给定主键设置后便不可修改,不可变列会联合主键,作为一组新的 upsert key 传播给下游。该信息由外部存储系统提供。[Apache Fluss (Incubating)](https://fluss.apache.org/) 已在未来计划支持表级别的不可变列约束。
{{< /hint >}}

{{< top >}}
15 changes: 11 additions & 4 deletions docs/content/docs/dev/table/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ To mitigate these challenges, Flink introduces the delta join operator. The key
This feature is enabled by default. A regular join will be automatically optimized into a delta join when all the following conditions are met:

1. The sql pattern satisfies the optimization criteria. For details, please refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)
2. The external storage system of the source table provides index information for fast querying for delta joins. Currently, [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has provided index information at the table level for Flink, allowing such tables to be used as source tables for delta joins. Please refer to the [Fluss documentation](https://fluss.apache.org/docs/engine-flink/delta-joins/#flink-version-support) for more details.
2. The external storage system of the source table provides index information for fast querying for delta joins. Currently, [Apache Fluss (Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has provided index information at the table level for Flink, allowing such tables to be used as source tables for delta joins. Please refer to the [Fluss documentation](https://fluss.apache.org/docs/engine-flink/delta-joins/#flink-version-support) for more details.

### Working Principle

Expand Down Expand Up @@ -431,12 +431,19 @@ Delta joins are continuously evolving, and supports the following features curre
2. Support for **CDC** tables without **DELETE operations** as source tables.
3. Support for **projection** and **filter** operations between the source and the delta join.
4. Support for **caching** within the delta join operator.
5. Support for **cascaded delta joins** — eligible join nodes in a query are sequentially converted into delta join nodes from source to sink.
6. Support for **lookup join** after a delta join.
7. Support for **non-deterministic functions** in projections and filters between the delta join and downstream operators.

However, Delta Joins also have several **limitations**. Jobs containing any of the following conditions cannot be optimized into a delta join:

1. The **index key** of the table must be included in the join’s **equivalence conditions**.
2. Only **INNER JOIN** is currently supported.
3. The **downstream operator** must be able to handle **duplicate changes**, such as a sink operating in **UPSERT mode** without `upsertMaterialize`.
4. When consuming a **CDC stream**, the **join key** must be part of the **primary key**.
5. When consuming a **CDC stream**, all **filters** must be applied on the **upsert key**.
6. **Non-deterministic functions** are not allowed in filters or projections.
4. When consuming a **CDC stream**, the **join key** must be part of the **upsert key**<sup>[1]</sup>.
5. When consuming a **CDC stream**, all **filters** must be applied on the **upsert key**<sup>[1]</sup>.
6. **Non-deterministic functions** are not allowed in projections or filters between the source and the delta join.

{{< hint info >}}
[1] Flink supports defining a table-level **immutable columns** constraint to enrich the upsert key, enabling delta join optimization in more scenarios. The immutable columns constraint declares that certain columns, once set for a given primary key, cannot be modified. The immutable columns are combined with the primary key to form a new upsert key that is propagated to downstream operators. This information is provided by the external storage system. [Apache Fluss (Incubating)](https://fluss.apache.org/) is planning to support table-level immutable columns constraint in the future.
{{< /hint >}}