Skip to content

[FLINK-39781][table] Add LateralSnapshotJoinOperator with two-phase LOAD/JOIN execution#28329

Draft
fhueske wants to merge 1 commit into
apache:masterfrom
confluentinc:fhueske-FLINK-39781-Add-LateralSnapshotJoinOperator
Draft

[FLINK-39781][table] Add LateralSnapshotJoinOperator with two-phase LOAD/JOIN execution#28329
fhueske wants to merge 1 commit into
apache:masterfrom
confluentinc:fhueske-FLINK-39781-Add-LateralSnapshotJoinOperator

Conversation

@fhueske
Copy link
Copy Markdown
Contributor

@fhueske fhueske commented Jun 5, 2026

What is the purpose of the change

Stateful keyed two-input operator backing LATERAL SNAPSHOT joins.

  • Two phases (LOAD/JOIN) gated by an operator UnionListState; mixed-phase rescale collapses to LOAD. Flip triggers: build-side WM reaches loadCompletedTime, or a processing-time idle-timeout fallback.
  • Probe records are buffered in LOAD and joined on flip. Build-side changes are buffered per-key and applied lazily in event-time order on the next per-key access once the build-side WM advanced, preserving atomic -U/+U visibility.
  • Watermarks: build-side absorbed; probe-side held back in LOAD, forwarded in JOIN. NULL equi-keys filtered via JoinConditionWithNullFilters.
  • State TTL via keyed processing-time timers with a post-flip grace window; rearms are amortized so effective time-to-eviction is in [1.0×, 1.5×] stateTtlMs.

Brief change log

  • add LateralSnapshotJoinOperator according to FLIP-579

Verifying this change

  • LateralSnapshotJoinOperatorTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no (operator is not accessible yet)
  • If yes, how is the feature documented? n/a

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Co-authored-by: Claude Code 2.1.148 (Opus 4.7)

…OAD/JOIN execution

Stateful keyed two-input operator backing LATERAL SNAPSHOT joins.

* Two phases (LOAD/JOIN) gated by an operator UnionListState; mixed-phase
  rescale collapses to LOAD. Flip triggers: build-side WM reaches
  loadCompletedTime, or a processing-time idle-timeout fallback.
* Probe records are buffered in LOAD and joined on flip. Build-side changes
  are buffered per-key and applied lazily in event-time order on the next
  per-key access once the build-side WM advanced, preserving atomic -U/+U
  visibility.
* Watermarks: build-side absorbed; probe-side held back in LOAD, forwarded
  in JOIN. NULL equi-keys filtered via JoinConditionWithNullFilters.
* State TTL via keyed processing-time timers with a post-flip grace window;
  rearms are amortized so effective time-to-eviction is in
  [1.0×, 1.5×] stateTtlMs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@fhueske
Copy link
Copy Markdown
Contributor Author

fhueske commented Jun 5, 2026

⚠️ PR is not ready for review yet.

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants