Skip to content

[FLINK-39839][table] Supports timeout in AsyncTableFunction#28310

Open
yuchengxin wants to merge 3 commits into
apache:masterfrom
yuchengxin:timeout_for_async_table_function
Open

[FLINK-39839][table] Supports timeout in AsyncTableFunction#28310
yuchengxin wants to merge 3 commits into
apache:masterfrom
yuchengxin:timeout_for_async_table_function

Conversation

@yuchengxin
Copy link
Copy Markdown
Contributor

@yuchengxin yuchengxin commented Jun 4, 2026


What is the purpose of the change

This pull request introduces a user-defined timeout handler for AsyncTableFunction. When an async invocation exceeds the configured timeout, the framework now dispatches to a matching
timeout method on the function so users can return a fallback row or surface a domain-specific exception, instead of unconditionally failing the record (or the job) with the default
TimeoutException. The mechanism is wired through both call sites that consume AsyncTableFunction today — async lookup join and async correlate (table-function lateral join).

Brief change log

  • AsyncTableFunction: documented the timeout convention (signature parity with eval, synchronous completion on the mailbox thread, exception transparency, overload resolution,
    empty-collection fallback semantics).
  • UserDefinedFunctionHelper: added validation so that, when a timeout method is present, its visibility, modifiers and parameter list (first param is a CompletableFuture with the same
    generic type as eval, remaining params are lookup keys with the same types and order) are checked at registration time.
  • Codegen path: extended BridgingFunctionGenUtil, FunctionCallCodeGenerator, FunctionCodeGenerator, AsyncCorrelateCodeGenerator, AsyncCodeGenerator and LookupJoinCodeGenerator to:
    • resolve the matching timeout overload against the current call site's lookup-key types,
    • emit a timeout(...) dispatch on the generated async function,
    • fail fast during planning with a ValidationException (including FQN, expected signature and actual candidates) when a timeout method exists with a non-assignable signature.
  • Runtime:
    • DelegatingAsyncTableResultFuture invokes the user-supplied timeout handler on the mailbox thread, enforces synchronous completion (future.isDone() check after the call; otherwise
      short-circuits with IllegalStateException), and propagates synchronous exceptions to the downstream ResultFuture.
    • AsyncLookupJoinRunner / AsyncCorrelateRunner route timeout events through the new handler and preserve INNER/LEFT OUTER semantics for empty-collection completions.
  • Default behavior (no timeout method) is unchanged: the framework keeps emitting the original TimeoutException.

Verifying this change

This change added tests and can be verified as follows:

  • Unit tests
    • UserDefinedFunctionHelperTest: validates positive cases plus rejection of bad timeout signatures (wrong arity, mismatched key types, missing CompletableFuture, non-public, static,
      etc.).
    • DelegatingAsyncTableResultFutureTest: covers synchronous-complete, synchronous-throw, and the "did not complete the future before returning" guard.
    • AsyncCorrelateRunnerTest / AsyncLookupJoinRunnerTimeoutTest: cover dispatch into the user timeout handler, fallback-row emission, exception propagation, and overload resolution against
      multiple eval/timeout pairs.
  • IT tests
    • AsyncTableFunctionTimeoutITTest, AsyncCorrelateTimeoutITTest, AsyncLookupJoinTimeoutITTest, backed by a new TimeoutAsyncLookupTableFactory, exercise the full plan→codegen→runtime
      path for both lookup join (INNER and LEFT OUTER) and async correlate, asserting that fallback rows, NULL-padded rows and user-thrown exceptions surface as documented.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): yes (AsyncTableFunction — additive: a new optional timeout convention is recognized; existing subclasses
    without a timeout method are unaffected)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (only when a timeout method is declared; the dispatch sits on the existing async-timeout path and runs at most once per
    timed-out record)
  • Anything that affects deployment or recovery (JobManager, Checkpointing, K8s/Yarn, ZooKeeper): no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs (the timeout convention, constraints and dispatch/error behavior are documented on AsyncTableFunction, with a worked example). User-facing
    docs for AsyncTableFunction lookup-join timeouts can be added in a follow-up doc-only commit if the reviewers prefer.

Was generative AI tooling used to co-author this PR?

  • Yes — Claude Code (claude-opus-4-7)

Generated-by: Claude Code (claude-opus-4-7)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 4, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@yuchengxin yuchengxin force-pushed the timeout_for_async_table_function branch from fe32aff to 618a847 Compare June 4, 2026 08:59
Co-Authored-By: Claude Code <noreply@anthropic.com>
AI-Model: claude-opus-4-7

AI-Contributed/Feature: 0/0
AI-Contributed/UT: 19/19
The AsyncTableFunction section in the Chinese UDF docs was still
verbatim English even though the surrounding sections (in particular
AsyncScalarFunction directly above) had already been translated. Bring
this section in line by translating the prose, headings, and inline
code comments while preserving every {{< ref >}} shortcode and Java
identifier. The Chinese phrasing is reworked into idiomatic Chinese
rather than a word-for-word rendering, keeping the technical meaning
identical to the English original.

Co-Authored-By: Claude Code <noreply@anthropic.com>
AI-Model: claude-opus-4-7

AI-Contributed/Feature: 32/32
AI-Contributed/UT: 0/0
…Function

The new "Custom Timeout Handling" subsection covers:
  - when the handler is invoked vs. the default TimeoutException;
  - the required signature parity with eval(), and how overloads are
    resolved;
  - the synchronous-completion requirement enforced by codegen
    (future.isDone() check after the call), and why issuing another
    async call or spawning a thread inside the handler is unsafe;
  - exception transparency (sync throws are forwarded);
  - the framework guarantees on top of the convention: missing handler
    falls back to TimeoutException, incompatible signatures fail fast
    at planning with a ValidationException, and emptyList() drops the
    row for an INNER lookup join while padding NULL for a LEFT OUTER
    lookup join;
  - a worked example mirroring the BackgroundFunction sample directly
    above.

Both EN and ZH copies of the section receive the same content; the
Chinese version is rewritten in idiomatic Chinese rather than translated
literally.

Co-Authored-By: Claude Code <noreply@anthropic.com>
AI-Model: claude-opus-4-7

AI-Contributed/Feature: 99/99
AI-Contributed/UT: 0/0
@yuchengxin yuchengxin force-pushed the timeout_for_async_table_function branch from a8ae168 to d5e1117 Compare June 5, 2026 03:30
@yuchengxin
Copy link
Copy Markdown
Contributor Author

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants