Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 65 additions & 16 deletions docs/content.zh/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1234,28 +1234,28 @@ env.sqlQuery(

{{< top >}}

Asynchronous Table Functions
异步表值函数
----------------

Similar to `AsyncScalarFunction`, there also exists a `AsyncTableFunction` for returning multiple row results rather than a single scalar value. Similarly, this is most useful when interacting with external systems (for example when enriching stream events with data stored in a database).
`AsyncTableFunction` 与 `AsyncScalarFunction` 类似,区别在于它返回的是多行结果而不是单个标量值。它最适合的场景同样是与外部系统打交道——例如访问数据库,做流上的维表打宽。

Asynchronous interaction with an external system means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput.
异步访问外部系统意味着同一个函数实例可以并发地发出多个请求、并发地接收响应;等待时间因此可以与其他请求的发送、响应的接收相互重叠。可以让等待开销分摊到多次请求上。在大多数场景下,这都会显著提高流处理的吞吐量。

#### Defining an AsyncTableFunction
#### 定义 AsyncTableFunction

A user-defined asynchronous table function maps zero, one, or multiple scalar values to zero, one, or multiple Rows. Any data type listed in the [data types section]({{< ref "docs/sql/reference/data-types" >}}) can be used as a parameter or return type of an evaluation method.
用户自定义的异步表值函数接受零个、一个或多个参数,每次输入会输出零行、一行或多行结果。[数据类型部分]({{< ref "docs/sql/reference/data-types" >}}) 中列出的任何数据类型都可以作为求值方法的参数或返回类型。

In order to define an asynchronous table function, extend the base class `AsyncTableFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function.
要定义一个异步表值函数,需要继承 `org.apache.flink.table.functions` 中的基类 `AsyncTableFunction`,并实现一个或多个名为 `eval(...)` 的求值方法。`eval` 的第一个参数必须是 `CompletableFuture<...>`,用于把结果回写给框架;之后的参数即对外暴露的函数实参。

The number of outstanding calls to `eval` may be configured by [`table.exec.async-table.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-table-max-concurrent-operations" >}}).
`eval` 的并发调用数量上限可以通过 [`table.exec.async-table.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-table-max-concurrent-operations" >}}) 进行配置。

#### Asynchronous Semantics
While calls to an `AsyncTableFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case.
#### 异步语义
`AsyncTableFunction` 的并发调用,其完成顺序可能与输入顺序不一致;但为了保证语义正确,框架仍会按输入顺序把函数输出交给查询下游的算子。即便如此,数据本身仍可能反映实际的完成顺序(例如携带获取时间戳);这一点是否会影响业务,需要使用方自行判断。

#### Error Handling
The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-table.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-table-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-table.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-table-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-table.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-table-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-table.timeout`]({{< ref "docs/dev/table/config#table-exec-async-table-timeout" >}}) expires (including all retry attempts), the job will fail.
#### 错误处理
上报错误的主要方式是调用 `CompletableFuture.completeExceptionally(Throwable)`;此外,如果框架在调用 `eval` 时本身就抛出了异常,同样会被视为出错。出错后系统会按照重试策略处理,该策略由 [`table.exec.async-table.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-table-retry-strategy" >}}) 控制:若设为 `NO_RETRY`,作业直接失败;若设为 `FIXED_DELAY`,则会在等待 [`table.exec.async-table.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-table-retry-delay" >}}) 之后再次发起调用。当失败次数达到 [`table.exec.async-table.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-table-max-attempts" >}}) 上限,或总耗时(含全部重试)超过 [`table.exec.async-table.timeout`]({{< ref "docs/dev/table/config#table-exec-async-table-timeout" >}}),作业仍会失败。

The following example shows how to do work on a thread pool in the background, though any libraries exposing an async interface may be directly used to complete the `CompletableFuture` from a callback. See the [Implementation Guide](#implementation-guide) for more details.
下面的示例演示了如何借助一个后台线程池完成异步任务;当然,任何暴露异步接口的库都可以直接拿来在回调中完成 `CompletableFuture`。更多细节请参阅[实现指南](#implementation-guide)

```java
import org.apache.flink.table.api.*;
Expand Down Expand Up @@ -1298,24 +1298,73 @@ TableEnvironment env = TableEnvironment.create(...);
env.getConfig().set("table.exec.async-table.max-concurrent-operations", "5");
env.getConfig().set("table.exec.async-table.timeout", "1m");

// call function "inline" without registration in Table API
// Table API 中"内联"调用函数,无需注册
env.from("MyTable")
.joinLateral(call(BackgroundFunction.class, $("myField")))
.select($("*"))

// register function
// 注册函数
env.createTemporarySystemFunction("BackgroundFunction", BackgroundFunction.class);

// call registered function in Table API
// Table API 中调用已注册的函数
env.from("MyTable")
.joinLateral(call("BackgroundFunction", $("myField")))
.select($("*"))

// call registered function in SQL
// SQL 中调用已注册的函数
env.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(BackgroundFunction(myField))");

```

#### 自定义超时处理

默认情况下,如果某次 `eval` 调用在 [`table.exec.async-table.timeout`]({{< ref "docs/dev/table/config#table-exec-async-table-timeout" >}}) 之内仍未完成 `CompletableFuture`,且配置的重试也已用尽,框架会抛出 `java.util.concurrent.TimeoutException` 并让作业失败。如果希望覆盖这一行为,可以在 `AsyncTableFunction` 子类中声明一个与之匹配的 `timeout(...)` 方法,由它返回一行兜底数据,或由用户自定义超时处理。这通常在使用`AsyncTableFunction`发送请求调用远程LLM集群时很有用,众所周知,请求远程LLM集群出现超时的概率相对而言更高,相当多的用户希望超时不要直接导致作业失败,而是有自定义的兜底行为,`timeout`方法为此而生。

`timeout` 方法需要**同时**满足以下条件:

- **方法声明。** 必须是 `public` 实例方法(不能是 `static`),方法名固定为 `timeout`。
- **签名要与 `eval` 对齐。** 参数列表需要与对应的 `eval` 严格一致:第一个参数是 `CompletableFuture`,其泛型类型必须与 `eval` **完全相同**;其余参数的类型和顺序也必须与 `eval` **完全相同**。`timeout` 支持重载——每一个需要兜底逻辑的 `eval` 重载都可以单独声明一个对应的 `timeout`。
- **同步完成(强制要求)。** 该回调会在算子的 mailbox 线程上运行,**必须**在方法返回前完成 future。方法返回后,框架会立即检查 `future.isDone()`;如果尚未完成,会强制以 `IllegalStateException` 把 future 完成。也就是说:
- **不要**在方法体内再发起一次异步调用(例如又一次重试或二次查询),并指望它的回调最终把 future 完成——等回调真正触发时,框架已经放弃了这条记录。
- **不要**启动新线程异步去完成 future,理由同上。
- 方法体只该做一件事:给出一个轻量的兜底结果,比如返回一个常量行、一个 NULL 行、空集合,或者用业务自定义的异常调用 `completeExceptionally(...)`。
- **抛异常是安全的。** 直接在方法体内同步抛出异常没有问题:框架会把异常转交给下游的 `ResultFuture`,效果等价于显式调用 `future.completeExceptionally(thrown)`。

在以上约束之外,框架还会在以下场景保证稳定的行为:

- **未声明 `timeout` 方法。** 沿用框架默认的 `TimeoutException`,不会引发代码生成阶段的失败。
- **签名不兼容。** 如果 `timeout` 方法可见性合法,但参数列表无法和当前调用点的实参类型对应,框架会在规划期(代码生成阶段)以 `ValidationException` 直接失败;错误信息中会包含函数的全限定类名,并同时给出期望签名与实际签名。
- **空集合兜底。** 调用 `future.complete(Collections.emptyList())` 时:对 INNER lookup join 而言该行会被丢弃;对 LEFT OUTER lookup join 而言,右表字段会被填充为 NULL。

下面这段示例在 `BackgroundFunction` 的基础上加了一个 `timeout` 回调,展示如何同步地完成 future 并返回一行兜底数据:

```java
import org.apache.flink.table.functions.AsyncTableFunction;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

public static class BackgroundFunctionWithTimeout extends AsyncTableFunction<Long> {

public void eval(CompletableFuture<Collection<Long>> future, Integer waitMax) {
// ...逻辑与上面的 BackgroundFunction 一致:把异步任务派发出去,
// 然后在回调线程中完成 `future`。
}

// 参数列表必须与 eval() 对齐:CompletableFuture 的泛型相同,
// 后续参数(Integer waitMax)的类型和顺序也相同。
// 方法体必须同步完成 `future`——不要再发起异步调用,
// 也不要启动新线程异步去完成它;这个回调被触发时,
// 算子已经放弃了这条记录。
public void timeout(CompletableFuture<Collection<Long>> future, Integer waitMax) {
future.complete(Collections.singletonList(-1L));
// 也可以选择用业务自定义异常代替默认的 TimeoutException:
// future.completeExceptionally(new MyLookupTimeoutException(waitMax));
}
}
```

{{< top >}}

聚合函数
Expand Down
50 changes: 50 additions & 0 deletions docs/content/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,56 @@ env.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(BackgroundFunction(myField))"

```

#### Custom Timeout Handling

By default, when an `eval` invocation fails to complete the `CompletableFuture` within [`table.exec.async-table.timeout`]({{< ref "docs/dev/table/config#table-exec-async-table-timeout" >}}) (after exhausting any configured retries), the framework surfaces a `java.util.concurrent.TimeoutException` and fails the job. To override this behavior, you can declare a matching `timeout(...)` method in your `AsyncTableFunction` subclass, which returns a fallback row or implements custom timeout handling logic. This is particularly useful when using `AsyncTableFunction` to send requests to a remote LLM cluster. As is well known, requests to remote LLM clusters are relatively more prone to timeouts, and many users prefer that a timeout does not directly cause the job to fail, but instead triggers custom fallback behavior. The timeout method is designed precisely for this purpose.

The `timeout` method must satisfy **all** of the following:

- **Declaration.** Public, non-static, named `timeout`.
- **Signature parity with `eval`.** The parameter list mirrors the matching `eval`: the first parameter is a `CompletableFuture` with the **same** generic type as in `eval`; the remaining parameters are the lookup keys with the **same** types and order. Overloads are supported — declare one `timeout` per `eval` overload that needs a fallback.
- **Synchronous completion (enforced).** The handler runs on the operator's mailbox thread and **must** complete the future before it returns. The framework checks `future.isDone()` immediately after the call and, if not, forces completion with `IllegalStateException`. Concretely:
- Do **not** issue another async call (e.g. a retry, a secondary lookup) and rely on its callback to complete the future — by the time the callback fires, the framework has already short-circuited this record.
- Do **not** spawn a thread that completes the future asynchronously, for the same reason.
- The body should be a pure, cheap fallback: a constant row, a NULL row, an empty collection, or `completeExceptionally(...)` with a user-defined exception.
- **Exception transparency.** Throwing synchronously from the body is safe: the throw is forwarded to the downstream `ResultFuture`, equivalent to calling `future.completeExceptionally(thrown)`.

Additional behavior the framework guarantees on top of the convention:

- **No `timeout` method** → the default `TimeoutException` fires; no codegen failure.
- **Incompatible signature** → if a `timeout` method exists with valid visibility but its parameter list is not assignable from the call site's lookup-key types, the framework fails fast during planning with a `ValidationException` whose message includes the function's fully qualified class name and the expected / actual signatures.
- **Empty-collection fallback** → `future.complete(Collections.emptyList())` drops the row for an INNER lookup join and pads the right side with NULL for a LEFT OUTER lookup join.

The following example extends `BackgroundFunction` with a `timeout` handler that completes the future synchronously with a fallback row:

```java
import org.apache.flink.table.functions.AsyncTableFunction;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

public static class BackgroundFunctionWithTimeout extends AsyncTableFunction<Long> {

public void eval(CompletableFuture<Collection<Long>> future, Integer waitMax) {
// ...same as BackgroundFunction above — kicks off async work and
// completes `future` from the callback thread.
}

// The parameter list MUST mirror eval(): same CompletableFuture generic
// type, and the same trailing argument list (Integer waitMax).
// The body MUST complete `future` synchronously — do NOT issue another
// async call or spawn a thread to complete it later, because the operator
// has already abandoned this record by the time this handler runs.
public void timeout(CompletableFuture<Collection<Long>> future, Integer waitMax) {
future.complete(Collections.singletonList(-1L));
// Alternatively, surface a domain-specific error instead of the default
// TimeoutException:
// future.completeExceptionally(new MyLookupTimeoutException(waitMax));
}
}
```

{{< top >}}

Aggregate Functions
Expand Down
Loading