diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java
new file mode 100644
index 0000000000000..a54ad6a7219f5
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java
@@ -0,0 +1,1008 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.snapshot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.DefaultOpenContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.metrics.SimpleGauge;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Stream operator implementing the {@code LATERAL SNAPSHOT} processing-time temporal table join.
+ *
+ *
The operator runs in two phases, LOAD and JOIN:
+ *
+ *
+ * LOAD: build-side (input2 / right) changes are buffered in {@code buildChangeBuffer} and
+ * applied lazily to a per-key multi-set in {@code buildTableState} on the next per-key access
+ * (build- or probe-side) once the build-side watermark has advanced past the buffer's tag, or
+ * at the flip. Probe-side (input1 / left) records are buffered in {@code probeBuffer} until
+ * the configured flip point is reached on the build-side watermark, at which point a per-key
+ * event-time timer drains the buffered probes and joins them with the materialized build-side
+ * state.
+ * JOIN: probe-side records are joined immediately with the current build-side state.
+ * Build-side updates are buffered in {@code buildChangeBuffer} and applied lazily on next
+ * per-key access once the build-side watermark has advanced past the buffer's tag. This
+ * preserves atomic update visibility across {@code -U}/{@code +U} pairs.
+ *
+ *
+ * In both phases the buffered build-side changelog is applied in event-time order: changes are
+ * sorted by the build-side row-time attribute ({@code buildRowtimeIndex}), and for equal row-times
+ * retractions ({@code -U}/{@code -D}) are applied before accumulations ({@code +U}/{@code +I}). See
+ * {@link #applyBufferedChanges()}.
+ *
+ *
Watermark forwarding rules:
+ *
+ *
+ * Build-side watermarks are never forwarded downstream.
+ * Probe-side watermarks are held back during LOAD and forwarded during JOIN phase.
+ *
+ *
+ * The flip from LOAD to JOIN phase is triggered by either:
+ *
+ *
+ * the build-side watermark reaching {@code loadCompletedTime} (event-time gate), or
+ * the {@code loadCompletedIdleTimeoutMs} processing-time timer firing without any build-side
+ * watermark advance.
+ *
+ *
+ * State TTL is applied during JOIN phase and is implemented with keyed processing-time timers
+ * (matching the semantics of Flink's standard {@code StateTtlConfig}).
+ *
+ *
Metrics (FLIP-579): the operator exposes the following operator-scoped metrics.
+ *
+ *
+ * {@code numProbeSideRecordsBuffered} (gauge): probe-side records currently buffered during
+ * LOAD.
+ * {@code numBuildSideRecordsBuffered} (gauge): build-side changes currently buffered awaiting
+ * event-time-ordered application (in either phase).
+ * {@code numStateTtlEvictions} (counter): keys evicted from state by TTL.
+ * {@code currentBuildSideWatermark} (gauge): latest build-side watermark observed (never
+ * forwarded; used as the flip condition).
+ * {@code currentProbeSideWatermark} (gauge): latest probe-side watermark observed.
+ * {@code currentPhase} (gauge): current phase as an ordinal ({@code 0 = LOAD}, {@code 1 =
+ * JOIN}, {@code -1} if not yet initialized).
+ * {@code maxJoinFanOut} (gauge): maximum number of joined records emitted for a single
+ * probe-side record.
+ * {@code avgJoinFanOut} (gauge): average number of joined records emitted per probe-side
+ * record.
+ * {@code numUnmatchedProbeRecords} (counter): probe-side records without a match (only
+ * meaningful for INNER joins; always 0 for LEFT joins, which emit a null-padded result).
+ * {@code numUnmatchedBuildRetractions} (counter): build-side retractions ({@code -D}/{@code
+ * -U}) received for a row not present in state.
+ *
+ *
+ * The two buffer-occupancy gauges are backed by in-memory counters that are not checkpointed. On
+ * restore (including rescale) they are rebuilt from the restored keyed buffer state so they reflect
+ * the true occupancy of this subtask; see {@link #recomputeBufferCountsFromState()}. The counters
+ * are additionally clamped at 0 as defensive guarding.
+ */
+@Internal
+public class LateralSnapshotJoinOperator extends AbstractStreamOperator
+ implements TwoInputStreamOperator, Triggerable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(LateralSnapshotJoinOperator.class);
+
+ private static final String OPERATOR_PHASE_STATE_NAME = "lateral-snapshot-phase";
+
+ @VisibleForTesting static final String BUILD_TABLE_STATE_NAME = "build-table";
+ @VisibleForTesting static final String BUILD_CHANGE_BUFFER_STATE_NAME = "build-change-buffer";
+ @VisibleForTesting static final String BUFFERED_AT_WM_STATE_NAME = "buffered-at-wm";
+ @VisibleForTesting static final String PROBE_BUFFER_STATE_NAME = "probe-buffer";
+
+ private static final String TTL_EXPIRY_STATE_NAME = "ttl-expiry";
+
+ private static final String TIMER_SERVICE_NAME = "lateral-snapshot-timers";
+
+ @VisibleForTesting static final String NS_FLIP = "flip";
+ @VisibleForTesting static final String NS_TTL = "ttl";
+
+ /**
+ * Event-time timestamp at which the per-key {@code probeBuffer} flip join timer is registered.
+ * Any non-{@code MIN_VALUE} watermark advance fires it.
+ */
+ @VisibleForTesting static final long FLIP_JOIN_TIMER_TS = 1L;
+
+ // -------------------------- metric names --------------------------
+
+ /** Gauge: probe-side records currently buffered during LOAD. */
+ @VisibleForTesting static final String M_NUM_PROBE_BUFFERED = "numProbeSideRecordsBuffered";
+
+ /** Gauge: build-side changes currently buffered during JOIN. */
+ @VisibleForTesting static final String M_NUM_BUILD_BUFFERED = "numBuildSideRecordsBuffered";
+
+ /** Counter: keys evicted from state by TTL. */
+ @VisibleForTesting static final String M_NUM_STATE_TTL_EVICTIONS = "numStateTtlEvictions";
+
+ /** Gauge: latest build-side watermark observed. */
+ @VisibleForTesting static final String M_CURRENT_BUILD_WM = "currentBuildSideWatermark";
+
+ /** Gauge: latest probe-side watermark observed. */
+ @VisibleForTesting static final String M_CURRENT_PROBE_WM = "currentProbeSideWatermark";
+
+ /** Gauge: current phase ordinal, 0 = LOAD, 1 = JOIN. */
+ @VisibleForTesting static final String M_CURRENT_PHASE = "currentPhase";
+
+ /** Gauge: max joined records emitted for a probe-side record. */
+ @VisibleForTesting static final String M_MAX_JOIN_FAN_OUT = "maxJoinFanOut";
+
+ /** Gauge: average joined records emitted per probe-side record. */
+ @VisibleForTesting static final String M_AVG_JOIN_FAN_OUT = "avgJoinFanOut";
+
+ /** Counter: probe-side records without a match. */
+ @VisibleForTesting static final String M_NUM_UNMATCHED_PROBE = "numUnmatchedProbeRecords";
+
+ /** Counter: build-side retractions for a row not present in state. */
+ @VisibleForTesting
+ static final String M_NUM_UNMATCHED_BUILD_RETRACTIONS = "numUnmatchedBuildRetractions";
+
+ /** Two-phase state machine. */
+ public enum Phase {
+ LOAD,
+ JOIN
+ }
+
+ // -------------------------- ctor args --------------------------
+
+ private final boolean isLeftOuterJoin;
+
+ private final InternalTypeInfo leftType;
+ private final InternalTypeInfo rightType;
+
+ /** Field index of the build-side (right) row-time attribute. */
+ private final int buildRowtimeIndex;
+
+ private final GeneratedJoinCondition generatedJoinCondition;
+
+ /**
+ * Per-equi-key flag indicating whether rows with a NULL in that key position must be filtered
+ * before the join condition runs (SQL semantics: {@code NULL = NULL} is not true).
+ */
+ private final boolean[] filterNullKeys;
+
+ /**
+ * Timestamp at which the build-side watermark must arrive for the operator to flip from {@code
+ * LOAD} to JOIN.
+ */
+ private final Long loadCompletedTime;
+
+ /**
+ * Processing-time idle timeout duration (millis) on build-side watermarks. When configured, the
+ * operator flips to JOIN if no build-side watermark advance is seen for this duration.
+ */
+ @Nullable private final Long loadCompletedIdleTimeoutMs;
+
+ /**
+ * State TTL (millis) to clean up any keyed state during JOIN phase. We schedule TTL timers
+ * maxStateTtlMs ahead and check on minStateTtlMs before scheduling a new timer. This avoids
+ * rescheduling timers on every state access while still ensuring that keyed state is evicted
+ * after at most maxStateTtlMs of key inactivity during JOIN phase. If minStateTtlMs is set to
+ * 0, state TTL is disabled.
+ */
+ private final long minStateTtlMs;
+
+ private final long maxStateTtlMs;
+
+ // -------------------------- transient runtime --------------------------
+
+ private transient JoinConditionWithNullFilters joinCondition;
+ private transient GenericRowData nullPaddedBuild;
+ private transient TimestampedCollector collector;
+
+ private transient InternalTimerService timerService;
+
+ private transient Phase phase;
+
+ /**
+ * Processing-time wall clock at which the operator transitioned from {@link Phase#LOAD} to
+ * {@link Phase#JOIN}. {@code null} while still in LOAD. Used by the TTL handler to reschedule
+ * state TTL timers that fire too early.
+ */
+ @Nullable private transient Long flipProcTime;
+
+ /** Highest build-side watermark observed. */
+ private transient long currentBuildSideWm;
+
+ /** Latest probe-side watermark observed. */
+ private transient long currentProbeSideWm;
+
+ /** Non-keyed processing-time idle-flip timer. */
+ @Nullable private transient ScheduledFuture> idleFlipTimer;
+
+ // -------------------------- keyed state --------------------------
+
+ /** Build-side table as multi-set: row → reference count. */
+ private transient MapState buildTableState;
+
+ /** Buffer for build-side changes during JOIN to ensure atomic updates. */
+ private transient ListState buildChangeBuffer;
+
+ /** Build-side watermark to ensure atomic application of build changes during JOIN. */
+ private transient ValueState bufferedAtWmState;
+
+ /** Buffer for probe-side records during LOAD. */
+ private transient ListState probeBuffer;
+
+ /** Most recently registered TTL timer deadline; used to advance TTL timer. */
+ private transient ValueState ttlExpiryState;
+
+ // -------------------------- operator state --------------------------
+
+ private transient ListState operatorPhaseState;
+
+ // -------------------------- metrics --------------------------
+
+ private transient Counter numStateTtlEvictions;
+ private transient Counter numUnmatchedProbeRecords;
+ private transient Counter numUnmatchedBuildRetractions;
+
+ private transient SimpleGauge probeBufferedGauge;
+ private transient SimpleGauge buildBufferedGauge;
+ private transient SimpleGauge buildWmGauge;
+ private transient SimpleGauge probeWmGauge;
+ private transient SimpleGauge maxFanOutGauge;
+ private transient SimpleGauge avgFanOutGauge;
+ private transient Gauge phaseGauge;
+
+ /** Backing accumulators for the push-model gauges (in-memory, not persisted, best-effort). */
+ private transient long probeBufferedCount;
+
+ private transient long buildBufferedCount;
+ private transient long maxJoinFanOut;
+ private transient long totalJoinFanOut;
+ private transient long totalProbeJoins;
+
+ public LateralSnapshotJoinOperator(
+ boolean isLeftOuterJoin,
+ InternalTypeInfo leftType,
+ InternalTypeInfo rightType,
+ int buildRowtimeIndex,
+ GeneratedJoinCondition generatedJoinCondition,
+ boolean[] filterNullKeys,
+ Long loadCompletedTime,
+ @Nullable Long loadCompletedIdleTimeoutMs,
+ @Nullable Long stateTtlMs) {
+ this.isLeftOuterJoin = isLeftOuterJoin;
+ this.leftType = Preconditions.checkNotNull(leftType);
+ this.rightType = Preconditions.checkNotNull(rightType);
+ this.buildRowtimeIndex = buildRowtimeIndex;
+ if (buildRowtimeIndex < 0) {
+ throw new IllegalArgumentException("buildRowtimeIndex must be non-negative");
+ }
+ this.generatedJoinCondition = Preconditions.checkNotNull(generatedJoinCondition);
+ this.filterNullKeys = Preconditions.checkNotNull(filterNullKeys);
+ this.loadCompletedTime = Preconditions.checkNotNull(loadCompletedTime);
+ if (this.loadCompletedTime < 0) {
+ throw new IllegalArgumentException("loadCompletedTime must be non-negative");
+ }
+ this.loadCompletedIdleTimeoutMs = loadCompletedIdleTimeoutMs;
+ if (this.loadCompletedIdleTimeoutMs != null && this.loadCompletedIdleTimeoutMs < 0) {
+ throw new IllegalArgumentException("loadCompletedIdleTimeoutMs must be non-negative");
+ }
+ this.minStateTtlMs = stateTtlMs == null ? 0 : stateTtlMs;
+ if (this.minStateTtlMs < 0) {
+ throw new IllegalArgumentException("stateTtlMs must be non-negative");
+ }
+ // maxStateTtlMs is 1.5x of minStateTtlMs
+ this.maxStateTtlMs = this.minStateTtlMs + this.minStateTtlMs / 2;
+ }
+
+ // -------------------------- lifecycle --------------------------
+
+ @Override
+ public boolean useInterruptibleTimers(ReadableConfig config) {
+ return true;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+
+ // Operator state only — keyed state and timer services are initialized in open()
+ operatorPhaseState =
+ context.getOperatorStateStore()
+ .getUnionListState(
+ new ListStateDescriptor<>(
+ OPERATOR_PHASE_STATE_NAME, StringSerializer.INSTANCE));
+
+ // any LOAD entry → LOAD; empty (fresh start) → LOAD; else JOIN
+ boolean phaseStateExists = false;
+ boolean anyTaskInLoad = false;
+ for (String phase : operatorPhaseState.get()) {
+ phaseStateExists = true;
+ if (Phase.LOAD.name().equals(phase)) {
+ anyTaskInLoad = true;
+ break;
+ }
+ }
+ // we are in LOAD phase if no phaseState exists (no savepoint/checkpoint) or any task was
+ // still in LOAD phase (not all tasks transitioned to JOIN phaase).
+ phase = (!phaseStateExists || anyTaskInLoad) ? Phase.LOAD : Phase.JOIN;
+
+ // When restored into JOIN, anchor flipProcTime on the current wall clock so the TTL
+ // handler's post-flip grace window restarts from now.
+ flipProcTime =
+ phase == Phase.JOIN ? getProcessingTimeService().getCurrentProcessingTime() : null;
+
+ currentBuildSideWm = Long.MIN_VALUE;
+ currentProbeSideWm = Long.MIN_VALUE;
+
+ // Initialize metric counters
+ probeBufferedCount = 0L;
+ buildBufferedCount = 0L;
+ maxJoinFanOut = 0L;
+ totalJoinFanOut = 0L;
+ totalProbeJoins = 0L;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ buildTableState =
+ getRuntimeContext()
+ .getMapState(
+ new MapStateDescriptor<>(
+ BUILD_TABLE_STATE_NAME, rightType, Types.LONG));
+ buildChangeBuffer =
+ getRuntimeContext()
+ .getListState(
+ new ListStateDescriptor<>(
+ BUILD_CHANGE_BUFFER_STATE_NAME, rightType));
+ bufferedAtWmState =
+ getRuntimeContext()
+ .getState(
+ new ValueStateDescriptor<>(BUFFERED_AT_WM_STATE_NAME, Types.LONG));
+ probeBuffer =
+ getRuntimeContext()
+ .getListState(new ListStateDescriptor<>(PROBE_BUFFER_STATE_NAME, leftType));
+ ttlExpiryState =
+ getRuntimeContext()
+ .getState(new ValueStateDescriptor<>(TTL_EXPIRY_STATE_NAME, Types.LONG));
+
+ // Wrap the codegen'd condition with a null-key filter so SQL semantics are honored for
+ // equi-keys whose values may be NULL.
+ final JoinCondition rawCondition =
+ generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ joinCondition = new JoinConditionWithNullFilters(rawCondition, filterNullKeys, this);
+ joinCondition.setRuntimeContext(getRuntimeContext());
+ joinCondition.open(DefaultOpenContext.INSTANCE);
+
+ nullPaddedBuild = new GenericRowData(rightType.toRowType().getFieldCount());
+ collector = new TimestampedCollector<>(output);
+
+ timerService = getInternalTimerService(TIMER_SERVICE_NAME, StringSerializer.INSTANCE, this);
+
+ // Register metrics
+ final MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ numStateTtlEvictions = metricGroup.counter(M_NUM_STATE_TTL_EVICTIONS);
+ numUnmatchedProbeRecords = metricGroup.counter(M_NUM_UNMATCHED_PROBE);
+ numUnmatchedBuildRetractions = metricGroup.counter(M_NUM_UNMATCHED_BUILD_RETRACTIONS);
+
+ // Rebuild the buffer counts from restored keyed state so the gauges reflect the
+ // records buffered in this subtask after a restore/rescale.
+ recomputeBufferCountsFromState();
+
+ probeBufferedGauge = new SimpleGauge<>(probeBufferedCount);
+ buildBufferedGauge = new SimpleGauge<>(buildBufferedCount);
+ buildWmGauge = new SimpleGauge<>(currentBuildSideWm);
+ probeWmGauge = new SimpleGauge<>(currentProbeSideWm);
+ maxFanOutGauge = new SimpleGauge<>(maxJoinFanOut);
+ avgFanOutGauge = new SimpleGauge<>(0.0d);
+
+ metricGroup.gauge(M_NUM_PROBE_BUFFERED, probeBufferedGauge);
+ metricGroup.gauge(M_NUM_BUILD_BUFFERED, buildBufferedGauge);
+ metricGroup.gauge(M_CURRENT_BUILD_WM, buildWmGauge);
+ metricGroup.gauge(M_CURRENT_PROBE_WM, probeWmGauge);
+ metricGroup.gauge(M_MAX_JOIN_FAN_OUT, maxFanOutGauge);
+ metricGroup.gauge(M_AVG_JOIN_FAN_OUT, avgFanOutGauge);
+ phaseGauge = () -> phase == null ? -1 : phase.ordinal();
+ metricGroup.gauge(M_CURRENT_PHASE, phaseGauge);
+
+ // Mark the build-side input (index 1) as permanently idle in the inherited
+ // combinedWatermark accounting. This operator never forwards build-side WMs nor
+ // build-side idle status: it absorbs both.
+ combinedWatermark.updateStatus(1, true);
+
+ // Register the load-completed idle-timeout timer if it is configured.
+ if (phase == Phase.LOAD && loadCompletedIdleTimeoutMs != null) {
+ scheduleIdleFlipTimer();
+ }
+ }
+
+ /**
+ * Recomputes the in-memory buffer counter ({@link #probeBufferedCount}, {@link
+ * #buildBufferedCount}) for the corresponding metrics by scanning the restored keyed buffer
+ * state.
+ */
+ private void recomputeBufferCountsFromState() throws Exception {
+ final KeyedStateBackend backend = getKeyedStateBackend();
+
+ probeBufferedCount = 0L;
+ backend.applyToAllKeys(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>(PROBE_BUFFER_STATE_NAME, leftType),
+ (key, state) -> {
+ for (RowData ignored : state.get()) {
+ probeBufferedCount++;
+ }
+ });
+
+ buildBufferedCount = 0L;
+ backend.applyToAllKeys(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>(BUILD_CHANGE_BUFFER_STATE_NAME, rightType),
+ (key, state) -> {
+ for (RowData ignored : state.get()) {
+ buildBufferedCount++;
+ }
+ });
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ operatorPhaseState.update(List.of(phase.name()));
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (idleFlipTimer != null) {
+ idleFlipTimer.cancel(false);
+ idleFlipTimer = null;
+ }
+ if (joinCondition != null) {
+ joinCondition.close();
+ }
+ super.close();
+ }
+
+ // -------------------------- elements --------------------------
+
+ @Override
+ public void processElement1(StreamRecord element) throws Exception {
+ RowData probe = element.getValue();
+ // Apply any buffered build-side changes if the build-side watermark has advanced.
+ applyBufferedChangesIfReady();
+ if (phase == Phase.LOAD) {
+ probeBuffer.add(probe);
+ timerService.registerEventTimeTimer(NS_FLIP, FLIP_JOIN_TIMER_TS);
+ probeBufferedGauge.update(++probeBufferedCount);
+ } else {
+ joinProbeRow(probe);
+ }
+ refreshStateTtl();
+ }
+
+ @Override
+ public void processElement2(StreamRecord element) throws Exception {
+ RowData build = element.getValue();
+ // Apply any buffered build-side changes if the build-side watermark has advanced.
+ applyBufferedChangesIfReady();
+ // Buffer the new change, tagged with the current build-side watermark.
+ buildChangeBuffer.add(build);
+ bufferedAtWmState.update(currentBuildSideWm);
+ buildBufferedGauge.update(++buildBufferedCount);
+ refreshStateTtl();
+ }
+
+ // -------------------------- watermarks --------------------------
+
+ @Override
+ public void processWatermark1(Watermark mark) throws Exception {
+ // Probe-side watermark: held back during LOAD (neither advances the timer service nor is
+ // forwarded), forwarded during JOIN.
+ if (phase == Phase.JOIN) {
+ super.processWatermark1(mark);
+ }
+ currentProbeSideWm = Math.max(currentProbeSideWm, mark.getTimestamp());
+ probeWmGauge.update(currentProbeSideWm);
+ }
+
+ @Override
+ public void processWatermark2(Watermark mark) throws Exception {
+ // Build-side watermark: NEVER forwarded; never advances the timer service.
+ long ts = mark.getTimestamp();
+ currentBuildSideWm = Math.max(currentBuildSideWm, ts);
+ buildWmGauge.update(currentBuildSideWm);
+ if (phase == Phase.LOAD) {
+ if (currentBuildSideWm >= loadCompletedTime) {
+ // we reached the flip point. Transition to JOIN phase.
+ transitionToJoinPhase();
+ } else if (loadCompletedIdleTimeoutMs != null) {
+ // we got a new build-side wm. Reschedule the idle timer (if it was configured)
+ rescheduleIdleFlipTimer();
+ }
+ }
+ }
+
+ @Override
+ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index)
+ throws Exception {
+ if (index == 1) {
+ // Build-side idle status is absorbed entirely. partial[1] is initialized idle in
+ // open() and stays that way regardless of source-side toggles, so combined accounting
+ // is always driven by the probe side alone.
+ return;
+ }
+ if (phase == Phase.LOAD) {
+ // During LOAD, nothing is emitted downstream — neither watermarks nor status. But we
+ // do track partial[0]'s idle bit so that, after the flip, the operator has an accurate
+ // view of the probe-side's idle state.
+ combinedWatermark.updateStatus(0, watermarkStatus.isIdle());
+ return;
+ }
+ super.processWatermarkStatus(watermarkStatus, index);
+ }
+
+ // -------------------------- timers --------------------------
+
+ @Override
+ public void onEventTime(InternalTimer timer) throws Exception {
+ String ns = timer.getNamespace();
+ // the NS_FLIP timers are fired when the operator transitions from LOAD to JOIN phase.
+ if (NS_FLIP.equals(ns)) {
+ // If a recovery happened before, there might be buffered build-side changes.
+ // Apply them before joining the buffered probe-side records.
+ applyBufferedChanges();
+ // Join each buffered probe row.
+ long drained = 0;
+ for (RowData p : probeBuffer.get()) {
+ joinProbeRow(p);
+ drained++;
+ }
+ probeBuffer.clear();
+ probeBufferedCount = Math.max(0, probeBufferedCount - drained);
+ probeBufferedGauge.update(probeBufferedCount);
+ }
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer timer) throws Exception {
+ // TTL timers run on processing time so semantics match Flink's standard StateTtlConfig.
+ if (!NS_TTL.equals(timer.getNamespace())) {
+ return;
+ }
+ if (minStateTtlMs == 0) {
+ // TTL wasn't configured and shouldn't have registered any timers.
+ return;
+ }
+ Long deadline = ttlExpiryState.value();
+ if (deadline == null || timer.getTimestamp() != deadline) {
+ return; // stale timer fire
+ }
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ // check if we need to reschedule the ttl timer. This is necessary if
+ // a) we're still in LOAD phase, or
+ // b) if we're in JOIN but the flip happened less than stateTtlMs ago.
+ if (phase == Phase.LOAD || (flipProcTime != null && now < flipProcTime + minStateTtlMs)) {
+ // set the new TTL timer maxStateTtlMs ahead
+ long newDeadline =
+ phase == Phase.LOAD ? now + maxStateTtlMs : flipProcTime + maxStateTtlMs;
+ timerService.registerProcessingTimeTimer(NS_TTL, newDeadline);
+ ttlExpiryState.update(newDeadline);
+ return;
+ }
+ // clear all per-key state
+ buildTableState.clear();
+ // Discount any build-side changes still buffered for this key before clearing.
+ long evictedBuffered = 0;
+ for (RowData ignored : buildChangeBuffer.get()) {
+ evictedBuffered++;
+ }
+ buildBufferedCount = Math.max(0, buildBufferedCount - evictedBuffered);
+ buildBufferedGauge.update(buildBufferedCount);
+ buildChangeBuffer.clear();
+ bufferedAtWmState.clear();
+ ttlExpiryState.clear();
+ // probeBuffer should be empty because we are in JOIN phase, but clear out just in case.
+ // hence, we're also not updating the probeBufferedGauge.
+ probeBuffer.clear();
+ numStateTtlEvictions.inc();
+ }
+
+ /**
+ * Registers the load-completion idle-timeout timer. No-op when the timeout is not configured.
+ */
+ private void scheduleIdleFlipTimer() {
+ if (loadCompletedIdleTimeoutMs == null) {
+ return;
+ }
+ long deadline =
+ getProcessingTimeService().getCurrentProcessingTime() + loadCompletedIdleTimeoutMs;
+ idleFlipTimer =
+ getProcessingTimeService().registerTimer(deadline, t -> transitionToJoinPhase());
+ }
+
+ /** Updates the idle flip timer. */
+ private void rescheduleIdleFlipTimer() {
+ cancelIdleFlipTimer();
+ scheduleIdleFlipTimer();
+ }
+
+ /** Deactivates the currently registered idle flip timer. */
+ private void cancelIdleFlipTimer() {
+ if (idleFlipTimer != null) {
+ idleFlipTimer.cancel(false);
+ idleFlipTimer = null;
+ }
+ }
+
+ // -------------------------- core logic --------------------------
+
+ /**
+ * Transition from LOAD to JOIN.
+ *
+ * Invocation context : This method runs in a NON-KEYED context. The two callers are
+ * (a) {@link #processWatermark2}, which is invoked by the framework without a key context, and
+ * (b) {@link #idleFlipTimer}, which fires from the operator-level processing-time service.
+ * Therefore {@code flipToJoinPhase()} itself must not access keyed state. Per-key work (the
+ * buffered probe flush) is delegated to {@link #onEventTime} via {@code timeServiceManager
+ * .advanceWatermark(...)} below — that path establishes the correct key context for each fired
+ * timer before invoking the callback.
+ */
+ private void transitionToJoinPhase() throws Exception {
+ if (phase == Phase.JOIN) {
+ return;
+ }
+ phase = Phase.JOIN;
+ // Record the flip wall-clock so the TTL handler can grant a grace period of
+ // stateTtlMs after the flip before any build-only key becomes eligible for eviction.
+ // Without this anchor, keys loaded long before the flip would be evicted as soon as the
+ // first TTL fire after the flip happens.
+ flipProcTime = getProcessingTimeService().getCurrentProcessingTime();
+ // disable idle flip timer
+ cancelIdleFlipTimer();
+ // Fire all per-key flip timers (TS=1) so any probes buffered during LOAD are joined.
+ long advanceTo = Math.max(currentProbeSideWm, FLIP_JOIN_TIMER_TS);
+ if (timeServiceManager != null) {
+ timeServiceManager.advanceWatermark(new Watermark(advanceTo));
+ }
+ // Track the idle status which is cleared by updateWatermark()
+ boolean probeIdleAtFlip = combinedWatermark.isIdle();
+ // Emit the last observed probe-side wm downstream
+ if (currentProbeSideWm != Long.MIN_VALUE) {
+ combinedWatermark.updateWatermark(0, currentProbeSideWm);
+ output.emitWatermark(new Watermark(currentProbeSideWm));
+ // reset the idle status
+ combinedWatermark.updateStatus(0, probeIdleAtFlip);
+ }
+ // If the probe-side was idle at flip time, propagate the idle status downstream
+ if (probeIdleAtFlip) {
+ output.emitWatermarkStatus(WatermarkStatus.IDLE);
+ }
+ }
+
+ /**
+ * Joins a probe-side row against the current build-side table and applies the join predicate.
+ * Returns a null-padded result if the row doesn't match any build-side row and this is a LEFT
+ * OUTER join.
+ */
+ private void joinProbeRow(RowData probe) throws Exception {
+ boolean matched = false;
+ // Number of result rows emitted for this probe row (the join fan-out).
+ long fanOut = 0;
+ for (Map.Entry entry : buildTableState.entries()) {
+ RowData buildRow = entry.getKey();
+ long count = entry.getValue();
+ if (joinCondition.apply(probe, buildRow)) {
+ matched = true;
+ // Each emitted record uses a fresh JoinedRowData wrapper.
+ // Reusing a row object here is unsafe when subsequent collects mutate it.
+ for (long i = 0; i < count; i++) {
+ JoinedRowData out = new JoinedRowData();
+ out.replace(probe, buildRow);
+ out.setRowKind(RowKind.INSERT);
+ collector.collect(out);
+ fanOut++;
+ }
+ }
+ }
+ if (!matched && isLeftOuterJoin) {
+ // No join match, emit a null-padded LEFT OUTER join result
+ JoinedRowData out = new JoinedRowData();
+ out.replace(probe, nullPaddedBuild);
+ out.setRowKind(RowKind.INSERT);
+ collector.collect(out);
+ fanOut++;
+ }
+ if (!matched && !isLeftOuterJoin) {
+ numUnmatchedProbeRecords.inc();
+ }
+ // Update fan-out statistics (fan-out = number of result rows emitted for this probe).
+ totalProbeJoins++;
+ totalJoinFanOut += fanOut;
+ if (fanOut > maxJoinFanOut) {
+ maxJoinFanOut = fanOut;
+ maxFanOutGauge.update(maxJoinFanOut);
+ }
+ avgFanOutGauge.update(((double) totalJoinFanOut) / totalProbeJoins);
+ }
+
+ /**
+ * Applies the buffered build-side changes if the build-side watermark advanced since last
+ * buffer application. This ensures that we apply buffered changes atomically once their
+ * corresponding build-side WM is passed.
+ */
+ private void applyBufferedChangesIfReady() throws Exception {
+ Long bufferedAt = bufferedAtWmState.value();
+ if (bufferedAt == null) {
+ // Nothing buffered for this key.
+ return;
+ }
+ if (currentBuildSideWm > bufferedAt) {
+ // the build-side wm advanced. Buffered changes can be applied atomically now.
+ applyBufferedChanges();
+ } else if (phase == Phase.JOIN && currentBuildSideWm == Long.MIN_VALUE) {
+ // JOIN-phase fallback: no build-side watermark has ever been observed by this subtask.
+ // This happens
+ // (a) after a recovery (e.g. JOIN-phase rescaled into this subtask), or
+ // (b) when the operator flipped via the idle-timeout fallback and no build-side
+ // watermark arrived afterwards.
+ // In either case we apply the buffered changes now because we do not know when the next
+ // build-side WM will arrive (if it ever will).
+ applyBufferedChanges();
+ }
+ }
+
+ /**
+ * Apply all buffered build-side changes for the current key to {@code buildTableState} in
+ * event-time order, then clear the buffer state.
+ */
+ private void applyBufferedChanges() throws Exception {
+ List changes = new ArrayList<>();
+ for (RowData c : buildChangeBuffer.get()) {
+ changes.add(c);
+ }
+ // Apply in event-time order; read the row-time before applyBuildChange mutates the kind.
+ changes.sort(this::compareBuildChanges);
+ for (RowData c : changes) {
+ applyBuildChange(c);
+ }
+ buildChangeBuffer.clear();
+ bufferedAtWmState.clear();
+ buildBufferedCount = Math.max(0, buildBufferedCount - changes.size());
+ buildBufferedGauge.update(buildBufferedCount);
+ }
+
+ /**
+ * Orders build-side changelog entries for deterministic, event-time-ordered application:
+ * ascending by the build-side row-time attribute, then retractions ({@code -U}/{@code -D})
+ * before accumulations ({@code +U}/{@code +I}) for entries sharing a row-time.
+ */
+ private int compareBuildChanges(RowData a, RowData b) {
+ int byTime = Long.compare(a.getLong(buildRowtimeIndex), b.getLong(buildRowtimeIndex));
+ if (byTime != 0) {
+ return byTime;
+ }
+ return Integer.compare(retractionRank(a), retractionRank(b));
+ }
+
+ /** Returns {@code 0} for retracting changes ({@code -U}/{@code -D}), {@code 1} otherwise. */
+ private static int retractionRank(RowData change) {
+ RowKind kind = change.getRowKind();
+ return (kind == RowKind.UPDATE_BEFORE || kind == RowKind.DELETE) ? 0 : 1;
+ }
+
+ /**
+ * Apply a build-side change record directly to the build-table multi-set.
+ *
+ * MUTATES the input row's {@link RowKind} to {@link RowKind#INSERT} to normalize the key
+ * used for {@code buildTableState} lookups. The caller must not rely on the original kind after
+ * this call returns. The mutation avoids a per-record copy on a hot path.
+ */
+ private void applyBuildChange(RowData change) throws Exception {
+ RowKind changeType = change.getRowKind();
+ change.setRowKind(RowKind.INSERT);
+ Long currentCnt = buildTableState.get(change);
+ if (changeType == RowKind.INSERT || changeType == RowKind.UPDATE_AFTER) {
+ // +I / +U
+ buildTableState.put(change, currentCnt == null ? 1L : currentCnt + 1L);
+ } else {
+ // -D / -U
+ if (currentCnt == null || currentCnt <= 0L) {
+ numUnmatchedBuildRetractions.inc();
+ LOG.warn("Received {} for build row not present in state — ignoring.", changeType);
+ return;
+ }
+ if (currentCnt == 1L) {
+ buildTableState.remove(change);
+ } else {
+ buildTableState.put(change, currentCnt - 1L);
+ }
+ }
+ }
+
+ /** If state TTL is configured, refreshes the state TTL timer if needed. */
+ private void refreshStateTtl() throws Exception {
+ if (minStateTtlMs == 0) {
+ // Nothing to do when state TTL is not configured.
+ return;
+ }
+ // We register it at maxStateTtlMs to avoid rearming the timer on every access.
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ long refreshThreshold = now + minStateTtlMs;
+ Long currentTtlTimer = ttlExpiryState.value();
+ if (currentTtlTimer != null && currentTtlTimer >= refreshThreshold) {
+ // Existing timer still covers at least one full stateTtlMs — leave it in place.
+ return;
+ }
+ if (currentTtlTimer != null) {
+ // Remove the current timer before setting a new one.
+ timerService.deleteProcessingTimeTimer(NS_TTL, currentTtlTimer);
+ }
+ long newDeadline = now + maxStateTtlMs;
+ timerService.registerProcessingTimeTimer(NS_TTL, newDeadline);
+ ttlExpiryState.update(newDeadline);
+ }
+
+ // -------------------------- accessors (testing) --------------------------
+
+ @VisibleForTesting
+ Phase getPhase() {
+ return phase;
+ }
+
+ @VisibleForTesting
+ long getCurrentBuildSideWm() {
+ return currentBuildSideWm;
+ }
+
+ @VisibleForTesting
+ long getCurrentProbeSideWm() {
+ return currentProbeSideWm;
+ }
+
+ @Nullable
+ @VisibleForTesting
+ Long getFlipProcTime() {
+ return flipProcTime;
+ }
+
+ @VisibleForTesting
+ boolean isIdleFlipTimerActive() {
+ return idleFlipTimer != null;
+ }
+
+ @VisibleForTesting
+ MapState getBuildTableState() {
+ return buildTableState;
+ }
+
+ @VisibleForTesting
+ ListState getBuildChangeBuffer() {
+ return buildChangeBuffer;
+ }
+
+ @VisibleForTesting
+ ValueState getBufferedAtWmState() {
+ return bufferedAtWmState;
+ }
+
+ @VisibleForTesting
+ ListState getProbeBuffer() {
+ return probeBuffer;
+ }
+
+ @VisibleForTesting
+ ValueState getTtlExpiryState() {
+ return ttlExpiryState;
+ }
+
+ // -------------------------- accessors (metrics, testing) --------------------------
+
+ @VisibleForTesting
+ Counter getNumStateTtlEvictions() {
+ return numStateTtlEvictions;
+ }
+
+ @VisibleForTesting
+ Counter getNumUnmatchedProbeRecords() {
+ return numUnmatchedProbeRecords;
+ }
+
+ @VisibleForTesting
+ Counter getNumUnmatchedBuildRetractions() {
+ return numUnmatchedBuildRetractions;
+ }
+
+ @VisibleForTesting
+ SimpleGauge getNumProbeSideRecordsBufferedGauge() {
+ return probeBufferedGauge;
+ }
+
+ @VisibleForTesting
+ SimpleGauge getNumBuildSideRecordsBufferedGauge() {
+ return buildBufferedGauge;
+ }
+
+ @VisibleForTesting
+ SimpleGauge getCurrentBuildSideWatermarkGauge() {
+ return buildWmGauge;
+ }
+
+ @VisibleForTesting
+ SimpleGauge getCurrentProbeSideWatermarkGauge() {
+ return probeWmGauge;
+ }
+
+ @VisibleForTesting
+ SimpleGauge getMaxJoinFanOutGauge() {
+ return maxFanOutGauge;
+ }
+
+ @VisibleForTesting
+ SimpleGauge getAvgJoinFanOutGauge() {
+ return avgFanOutGauge;
+ }
+
+ @VisibleForTesting
+ Gauge getCurrentPhaseGauge() {
+ return phaseGauge;
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperatorTest.java
new file mode 100644
index 0000000000000..1ae05a5a4084f
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperatorTest.java
@@ -0,0 +1,1666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.snapshot;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.Phase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.BUILD_CHANGE_BUFFER_STATE_NAME;
+import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.BUILD_TABLE_STATE_NAME;
+import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.PROBE_BUFFER_STATE_NAME;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.within;
+
+/** Harness tests for {@link LateralSnapshotJoinOperator}. */
+class LateralSnapshotJoinOperatorTest {
+
+ // ----------------------------------------------------------------- Schema
+
+ /** Probe row schema: (id BIGINT, key VARCHAR, val VARCHAR). */
+ private static final InternalTypeInfo PROBE_TYPE =
+ InternalTypeInfo.ofFields(
+ new BigIntType(), VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
+
+ /** Build row schema: (key VARCHAR, val VARCHAR, rt BIGINT). */
+ private static final InternalTypeInfo BUILD_TYPE =
+ InternalTypeInfo.ofFields(
+ VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, new BigIntType());
+
+ /** Joined output schema: probe ++ build = (id, pKey, pVal, bKey, bVal, bRt). */
+ private static final LogicalType[] OUTPUT_TYPES = {
+ new BigIntType(),
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ new BigIntType()
+ };
+
+ /** Probe key column index (key VARCHAR is at field 1). */
+ private static final int PROBE_KEY_IDX = 1;
+
+ /** Build key column index (key VARCHAR is at field 0). */
+ private static final int BUILD_KEY_IDX = 0;
+
+ /** Build row-time column index (rt BIGINT is at field 2). */
+ private static final int BUILD_RT_IDX = 2;
+
+ private static final InternalTypeInfo KEY_TYPE =
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE);
+
+ private static final KeySelector PROBE_KEY_SELECTOR =
+ nullSafeStringKeySelector(PROBE_KEY_IDX);
+ private static final KeySelector BUILD_KEY_SELECTOR =
+ nullSafeStringKeySelector(BUILD_KEY_IDX);
+
+ private static final RowDataHarnessAssertor JOINED_ASSERTOR =
+ new RowDataHarnessAssertor(OUTPUT_TYPES);
+
+ // ----------------------------------------------------------------- Join conditions
+
+ /** Trivial join condition that always matches (equality is enforced by partitioning). */
+ private static final String ALWAYS_TRUE_JOIN_FUNC_CODE =
+ "public class LateralSnapshotJoinConditionStub extends "
+ + "org.apache.flink.api.common.functions.AbstractRichFunction "
+ + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n"
+ + " public LateralSnapshotJoinConditionStub(Object[] reference) {}\n"
+ + " @Override public boolean apply("
+ + " org.apache.flink.table.data.RowData in1,"
+ + " org.apache.flink.table.data.RowData in2) { return true; }\n"
+ + "}\n";
+
+ /**
+ * Join condition that only matches when the probe value (field 2) equals {@code "match"}. Used
+ * to verify that the codegen'd condition is actually invoked at join time.
+ */
+ private static final String MATCH_VAL_JOIN_FUNC_CODE =
+ "public class LateralSnapshotJoinConditionMatchVal extends "
+ + "org.apache.flink.api.common.functions.AbstractRichFunction "
+ + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n"
+ + " public LateralSnapshotJoinConditionMatchVal(Object[] reference) {}\n"
+ + " @Override public boolean apply("
+ + " org.apache.flink.table.data.RowData in1,"
+ + " org.apache.flink.table.data.RowData in2) {\n"
+ + " if (in1.isNullAt(2)) { return false; }\n"
+ + " return \"match\".equals(in1.getString(2).toString());\n"
+ + " }\n"
+ + "}\n";
+
+ private static GeneratedJoinCondition newTrueCondition() {
+ return new GeneratedJoinCondition(
+ "LateralSnapshotJoinConditionStub", ALWAYS_TRUE_JOIN_FUNC_CODE, new Object[0]);
+ }
+
+ private static GeneratedJoinCondition newMatchValCondition() {
+ return new GeneratedJoinCondition(
+ "LateralSnapshotJoinConditionMatchVal", MATCH_VAL_JOIN_FUNC_CODE, new Object[0]);
+ }
+
+ // ----------------------------------------------------------------- Operator / harness
+ // factories
+
+ private static LateralSnapshotJoinOperator newOperator(
+ boolean isLeftOuterJoin,
+ GeneratedJoinCondition joinCondition,
+ boolean[] filterNullKeys,
+ Long loadCompletedTime,
+ Long loadCompletedIdleTimeoutMs,
+ Long stateTtlMs) {
+
+ return new LateralSnapshotJoinOperator(
+ isLeftOuterJoin,
+ PROBE_TYPE,
+ BUILD_TYPE,
+ BUILD_RT_IDX,
+ joinCondition,
+ filterNullKeys,
+ loadCompletedTime,
+ loadCompletedIdleTimeoutMs,
+ stateTtlMs);
+ }
+
+ private static LateralSnapshotJoinOperator newOperator(
+ boolean isLeftOuterJoin,
+ Long loadCompletedTime,
+ Long loadCompletedIdleTimeoutMs,
+ Long stateTtlMs) {
+
+ return newOperator(
+ isLeftOuterJoin,
+ newTrueCondition(),
+ new boolean[] {true},
+ loadCompletedTime,
+ loadCompletedIdleTimeoutMs,
+ stateTtlMs);
+ }
+
+ private static KeyedTwoInputStreamOperatorTestHarness
+ newHarness(LateralSnapshotJoinOperator op) throws Exception {
+ return new KeyedTwoInputStreamOperatorTestHarness<>(
+ op, PROBE_KEY_SELECTOR, BUILD_KEY_SELECTOR, KEY_TYPE);
+ }
+
+ private static KeySelector nullSafeStringKeySelector(final int keyIdx) {
+ return value -> {
+ BinaryRowData ret = new BinaryRowData(1);
+ BinaryRowWriter writer = new BinaryRowWriter(ret);
+ if (value.isNullAt(keyIdx)) {
+ writer.setNullAt(0);
+ } else {
+ writer.writeString(0, value.getString(keyIdx));
+ }
+ writer.complete();
+ return ret;
+ };
+ }
+
+ // ----------------------------------------------------------------- LOAD phase
+
+ @Test
+ void loadPhaseBuildSideChangeProcessing() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // During LOAD, build-side changes are buffered and later applied in event-time order.
+ // -D for a never-inserted (key, value) pair is defensively ignored.
+ addBuildChange(h, deleteRecord("k1", "ghost", 5L));
+ // Two identical records (same key/val/row-time) → count(k1, v1, 20) = 2.
+ addBuildChange(h, insertRecord("k1", "v1", 20L));
+ addBuildChange(h, insertRecord("k1", "v1", 20L));
+ // Earlier row-time than v1, but arrives later.
+ addBuildChange(h, insertRecord("k1", "v2", 10L));
+
+ // Still LOAD: changes are buffered, nothing applied or emitted yet.
+ assertPhase(op, Phase.LOAD);
+ assertThat(h.getOutput()).isEmpty();
+ assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(4);
+ assertThat(buildTableKeys(h)).isEmpty();
+
+ // Advance the build watermark (still below the flip point) and access k1 again. The
+ // access drains the buffered batch in event-time order before buffering the new change.
+ addBuildWm(h, 50L);
+ addBuildChange(h, insertRecord("k1", "v3", 30L));
+ // TODO: also add updateBefore and updateAfter changes, to ensure that these are handled
+ // correctly
+
+ assertPhase(op, Phase.LOAD);
+ // Applied in row-time order: -D(ghost)@5 (ignored), +I(v2)@10, +I(v1)@20 ×2.
+ assertThat(buildTableKeys(h)).containsExactly("k1");
+ assertThat(buildTableForKey(h, op, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L));
+ // The triggering v3 change is now buffered, awaiting the next drain.
+ assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(1);
+ }
+ }
+
+ @Test
+ void loadPhaseProbeSideInputProcessing() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+
+ addBuildChange(h, insertRecord("k1", "build1", 1L));
+ addBuildWm(h, 20L);
+ addProbeRecord(h, 1L, "k1", "probe-load-1");
+ addProbeRecord(h, 2L, "k1", "probe-load-2");
+ addProbeWm(h, 50L);
+
+ assertPhase(op, Phase.LOAD);
+ // No output (records buffered, watermarks held back).
+ assertThat(h.getOutput()).isEmpty();
+
+ assertThat(op.getCurrentProbeSideWm()).isEqualTo(50L);
+ assertThat(probeBufferKeys(h)).containsExactly("k1");
+ assertThat(probeBufferForKey(h, op, "k1")).hasSize(2);
+ // Build-side change was applied to the build table.
+ assertThat(bufferedChangesForKey(h, op, "k1")).isEmpty();
+ assertThat(buildTableForKey(h, op, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("build1", 1L));
+ }
+ }
+
+ // ----------------------------------------------------------------- Flip / transition
+
+ @ParameterizedTest(name = "leftOuter={0}, wmFlip={1}")
+ @CsvSource({"true, true", "true, false", "false, true", "false, false"})
+ void flipDrainsProbeBufferAndJoins(boolean leftOuter, boolean wmFlip) throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(leftOuter, 100L, 200L, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.setProcessingTime(0);
+ h.open();
+ // LOAD: build state and buffered probes (one without a matching build row).
+ // k1 is a multi-set: count(v1)=2, count(v2)=1; k2 is a single row.
+ addBuildChange(h, insertRecord("k1", "build-k1-v1", 1L));
+ addBuildChange(h, insertRecord("k1", "build-k1-v1", 1L));
+ addBuildChange(h, insertRecord("k1", "build-k1-v2", 1L));
+ addBuildChange(h, insertRecord("k2", "build-k2", 1L));
+ // LOAD: add probe-side records
+ addProbeRecord(h, 1L, "k1", "probe-1");
+ addProbeRecord(h, 2L, "k2", "probe-2");
+ addProbeRecord(h, 3L, "k2", "probe-3");
+ addProbeRecord(h, 4L, "k3", "probe-no-match");
+ addProbeWm(h, 80L);
+ assertPhase(op, Phase.LOAD);
+
+ // assert that probe-side buffer is filled
+ assertThat(probeBufferForKey(h, op, "k1")).hasSize(1);
+ assertThat(probeBufferForKey(h, op, "k2")).hasSize(2);
+ assertThat(probeBufferForKey(h, op, "k3")).hasSize(1);
+ // idle-timeout timer is armed while in LOAD
+ assertThat(op.isIdleFlipTimerActive()).isTrue();
+
+ // trigger flip from LOAD to JOIN
+ if (wmFlip) {
+ // build WM crosses loadCompletedTime.
+ addBuildWm(h, 100L);
+ } else {
+ // proc-time exceeds idle timeout
+ h.setProcessingTime(200);
+ }
+ assertPhase(op, Phase.JOIN);
+ // idle-timeout timer is removed on flip (canceled by a WM flip, fired by an idle flip)
+ assertThat(op.isIdleFlipTimerActive()).isFalse();
+
+ // probe k1 joins k1's multi-set (count-respecting: 2x v1 + 1x v2 = three rows);
+ // probe k2 (2 rows) joins a single k2 build row;
+ // probe k3 doesn't have a matching build row. INNER: no output, LEFT OUTER: null-padded
+ assertWatermarkForwardedAfterRecords(h.getOutput(), 80L);
+ stripWatermarksAndStatusesFromOutput(h);
+ if (leftOuter) {
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L),
+ row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L),
+ row(1L, "k1", "probe-1", "k1", "build-k1-v2", 1L),
+ row(2L, "k2", "probe-2", "k2", "build-k2", 1L),
+ row(3L, "k2", "probe-3", "k2", "build-k2", 1L),
+ row(4L, "k3", "probe-no-match", null, null, null));
+ } else {
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L),
+ row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L),
+ row(1L, "k1", "probe-1", "k1", "build-k1-v2", 1L),
+ row(2L, "k2", "probe-2", "k2", "build-k2", 1L),
+ row(3L, "k2", "probe-3", "k2", "build-k2", 1L));
+ }
+ // Probe buffer drained on flip; build table preserved (k1 keeps multi-set counts).
+ assertThat(probeBufferKeys(h)).isEmpty();
+ assertThat(buildTableForKey(h, op, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of("build-k1-v1", 2L, "build-k1-v2", 1L));
+ assertThat(buildTableForKey(h, op, "k2"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("build-k2", 1L));
+ }
+ }
+
+ @Test
+ void idleTimerRearmsOnBuildWatermark() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 1000L, 100L, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.setProcessingTime(10);
+ h.open();
+ h.setProcessingTime(60);
+ // Build WM advances → re-arm.
+ addBuildWm(h, 10L);
+ // Original idle deadline was 10+100=110. Re-armed to 60+100=160.
+ h.setProcessingTime(159);
+ assertPhase(op, Phase.LOAD);
+ h.setProcessingTime(160);
+ assertPhase(op, Phase.JOIN);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void flipJoiningInvokesCodeGeneratedJoinCondition(boolean leftOuter) throws Exception {
+ LateralSnapshotJoinOperator op =
+ newOperator(
+ leftOuter, newMatchValCondition(), new boolean[] {true}, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addProbeRecord(h, 1L, "k1", "match");
+ addProbeRecord(h, 2L, "k1", "skip");
+ addProbeWm(h, 120L);
+ addBuildWm(h, 100L);
+
+ assertWatermarkForwardedAfterRecords(h.getOutput(), 120L);
+ stripWatermarksAndStatusesFromOutput(h);
+ if (leftOuter) {
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k1", "match", "k1", "v1", 1L),
+ row(2L, "k1", "skip", null, null, null));
+ } else {
+ JOINED_ASSERTOR.shouldEmitAll(h, row(1L, "k1", "match", "k1", "v1", 1L));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void flipJoiningCompositeEquiKeys(boolean leftOuter) throws Exception {
+ // Probe schema (kA VARCHAR, kB VARCHAR, val VARCHAR); build schema additionally carries a
+ // row-time attribute (kA VARCHAR, kB VARCHAR, val VARCHAR, rt BIGINT) at index 3.
+ InternalTypeInfo probeType =
+ InternalTypeInfo.ofFields(
+ VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
+ InternalTypeInfo buildType =
+ InternalTypeInfo.ofFields(
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ new BigIntType());
+ final int buildRowtimeIndex = 3;
+ // Compose the composite key as a BinaryRowData with both key fields.
+ KeySelector selector =
+ value -> {
+ BinaryRowData ret = new BinaryRowData(2);
+ BinaryRowWriter writer = new BinaryRowWriter(ret);
+ if (value.isNullAt(0)) {
+ writer.setNullAt(0);
+ } else {
+ writer.writeString(0, value.getString(0));
+ }
+ if (value.isNullAt(1)) {
+ writer.setNullAt(1);
+ } else {
+ writer.writeString(1, value.getString(1));
+ }
+ writer.complete();
+ return ret;
+ };
+ InternalTypeInfo compositeKeyType =
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
+
+ LateralSnapshotJoinOperator op =
+ new LateralSnapshotJoinOperator(
+ leftOuter,
+ probeType,
+ buildType,
+ buildRowtimeIndex,
+ newTrueCondition(),
+ new boolean[] {true, true},
+ 100L,
+ null,
+ null);
+
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ op, selector, selector, compositeKeyType)) {
+ h.open();
+ // build rows: two identical rows for key a-1, a different row for key a-1, one row for
+ // key a-2.
+ addBuildChange(h, insertRecord("a", "1", "b-a-1-1", 1L));
+ addBuildChange(h, insertRecord("a", "1", "b-a-1-1", 1L));
+ addBuildChange(h, insertRecord("a", "1", "b-a-1-2", 1L));
+ addBuildChange(h, insertRecord("a", "2", "b-a-2-1", 1L));
+ // probes: matching composite, non-matching composite.
+ h.processElement1(insertRecord("a", "1", "p-a-1"));
+ h.processElement1(insertRecord("a", "9", "p-a-9"));
+ h.processElement1(insertRecord("b", "1", "p-b-1"));
+ h.processElement1(insertRecord("b", "9", "p-b-9"));
+ // flip to JOIN phase
+ addBuildWm(h, 100L);
+
+ LogicalType[] outTypes = {
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ new BigIntType()
+ };
+ RowDataHarnessAssertor compositeAssertor = new RowDataHarnessAssertor(outTypes);
+
+ stripWatermarksAndStatusesFromOutput(h);
+ if (leftOuter) {
+ compositeAssertor.shouldEmitAll(
+ h,
+ compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L),
+ compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L),
+ compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-2", 1L),
+ compKeyRow("a", "9", "p-a-9", null, null, null, null),
+ compKeyRow("b", "1", "p-b-1", null, null, null, null),
+ compKeyRow("b", "9", "p-b-9", null, null, null, null));
+ } else {
+ compositeAssertor.shouldEmitAll(
+ h,
+ compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L),
+ compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L),
+ compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-2", 1L));
+ }
+ }
+ }
+
+ // ----------------------------------------------------------------- JOIN phase
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void joinPhaseImmediateInnerJoin(boolean leftOuter) throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(leftOuter, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+
+ // First probe WM
+ addProbeWm(h, 10L);
+ assertWatermarkForwardedAfterRecords(h.getOutput(), 10L);
+
+ // First probe — joined immediately.
+ addProbeRecord(h, 1L, "k1", "probe-immediate-1");
+ stripWatermarksAndStatusesFromOutput(h);
+ JOINED_ASSERTOR.shouldEmitAll(h, row(1L, "k1", "probe-immediate-1", "k1", "v1", 1L));
+
+ // Another probe WM
+ addProbeWm(h, 20L);
+ assertWatermarkForwardedAfterRecords(h.getOutput(), 20L);
+
+ // Second probe for same key — joined immediately.
+ addProbeRecord(h, 2L, "k1", "probe-immediate-2");
+ stripWatermarksAndStatusesFromOutput(h);
+ JOINED_ASSERTOR.shouldEmitAll(h, row(2L, "k1", "probe-immediate-2", "k1", "v1", 1L));
+
+ // Probe for non-existent key — no output (INNER).
+ addProbeRecord(h, 3L, "k2", "probe-no-match");
+ stripWatermarksAndStatusesFromOutput(h);
+ if (leftOuter) {
+ JOINED_ASSERTOR.shouldEmitAll(h, row(3L, "k2", "probe-no-match", null, null, null));
+ } else {
+ assertThat(h.extractOutputStreamRecords()).isEmpty();
+ }
+
+ // one more probe WM
+ addProbeWm(h, 30L);
+ assertWatermarkForwardedAfterRecords(h.getOutput(), 30L);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void joinPhaseBuildSideChangeApplication(boolean appliedByBuild) throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // Two identical records for k1 (count 2) at row-time 1, buffered during LOAD.
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+
+ // Buffer four changes for k1
+ addBuildChange(h, deleteRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k1", "v2", 102L));
+ addBuildChange(h, updateBeforeRecord("k1", "v1", 1L));
+ addBuildChange(h, updateAfterRecord("k1", "v3", 103L));
+ // Buffer one change for k2.
+ addBuildChange(h, insertRecord("k2", "v1", 101L));
+
+ // assert number of buffered changes
+ assertThat(op.getCurrentBuildSideWm()).isEqualTo(100L);
+ assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(100L);
+ assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(4);
+ assertThat(bufferedAtWmFor(h, op, "k2")).isEqualTo(100L);
+ assertThat(bufferedChangesForKey(h, op, "k2")).hasSize(1);
+ assertThat(buildTableForKey(h, op, "k1")).isEqualTo(Map.of("v1", 2L));
+
+ // Probe record with no build WM advance - changes are not applied yet
+ addProbeRecord(h, 1L, "k1", "p-1");
+ JOINED_ASSERTOR.shouldEmitAll(
+ h, row(1L, "k1", "p-1", "k1", "v1", 1L), row(1L, "k1", "p-1", "k1", "v1", 1L));
+
+ // increment build-side WM
+ addBuildWm(h, 110L);
+
+ // assert that all changes are still buffered
+ assertThat(op.getCurrentBuildSideWm()).isEqualTo(110L);
+ assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(100L);
+ assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(4);
+ assertThat(bufferedAtWmFor(h, op, "k2")).isEqualTo(100L);
+ assertThat(bufferedChangesForKey(h, op, "k2")).hasSize(1);
+
+ // trigger application of k1 changes by build or probe-side input
+ if (!appliedByBuild) {
+ addBuildChange(h, insertRecord("k1", "v4", 111L));
+ // assert that changes have been applied and removed from buffer
+ // the triggering change is appended to the buffer
+ assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(110L);
+ assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(1);
+ assertThat(buildTableForKey(h, op, "k1")).isEqualTo(Map.of("v2", 1L, "v3", 1L));
+ // assert empty output
+ assertThat(h.getOutput()).isEmpty();
+ } else {
+ addProbeRecord(h, 2L, "k1", "p-2");
+ // assert that changes have been applied and removed from buffer
+ assertThat(bufferedAtWmFor(h, op, "k1")).isNull();
+ assertThat(bufferedChangesForKey(h, op, "k1")).isEmpty();
+ assertThat(buildTableForKey(h, op, "k1"))
+ .isEqualTo(Map.of("v2", 1L, "v3", 1L)); // /
+ // assert join results (v2 carries row-time 2, v3 carries row-time 1)
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(2L, "k1", "p-2", "k1", "v2", 102L),
+ row(2L, "k1", "p-2", "k1", "v3", 103L));
+ }
+
+ // assert that k2 change is still buffered
+ assertThat(bufferedAtWmFor(h, op, "k2")).isEqualTo(100L);
+ assertThat(bufferedChangesForKey(h, op, "k2")).hasSize(1);
+ // apply k2 change and join
+ addProbeRecord(h, 3L, "k2", "p-3");
+ assertThat(bufferedAtWmFor(h, op, "k2")).isNull();
+ assertThat(bufferedChangesForKey(h, op, "k2")).isEmpty();
+ assertThat(buildTableForKey(h, op, "k2")).isEqualTo(Map.of("v1", 1L));
+ // assert join results
+ JOINED_ASSERTOR.shouldEmitAll(h, row(3L, "k2", "p-3", "k2", "v1", 101L));
+ }
+ }
+
+ @Test
+ void joinPhaseWmForwardingLogic() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 100L); // flip
+ assertPhase(op, Phase.JOIN);
+
+ // Build-side WMs after flip are not forwarded.
+ addBuildWm(h, 200L);
+ assertThat(extractWatermarks(h.getOutput())).isEmpty();
+
+ // Probe-side WMs in JOIN are forwarded.
+ addProbeWm(h, 150L);
+ assertThat(extractWatermarks(h.getOutput())).containsExactly(new Watermark(150));
+ h.getOutput().clear();
+
+ // another build-side WM
+ addBuildWm(h, 300L);
+ assertThat(extractWatermarks(h.getOutput())).isEmpty();
+
+ addProbeWm(h, 250L);
+ assertThat(extractWatermarks(h.getOutput())).containsExactly(new Watermark(250));
+ }
+ }
+
+ /**
+ * For buffered build-side changes sharing the same row-time, retractions are applied before
+ * accumulations.
+ */
+ @Test
+ void joinPhaseAppliesRetractionsBeforeAccumulationsAtEqualRowTime() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // Drive into JOIN with empty build state for k1/k2.
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+
+ // Buffer accumulation-first at row-time 105 on absent rows:
+ // k1: +I then -D ; k2: +U then -U ; k3: +U then -U
+ addBuildChange(h, insertRecord("k1", "v1", 105L));
+ addBuildChange(h, deleteRecord("k1", "v1", 105L));
+ addBuildChange(h, updateAfterRecord("k2", "v1", 105L));
+ addBuildChange(h, updateBeforeRecord("k2", "v1", 105L));
+
+ // Advance build WM, then access each key to drain its buffer in event-time order.
+ addBuildWm(h, 200L);
+ addProbeRecord(h, 1L, "k1", "p1");
+ addProbeRecord(h, 2L, "k2", "p2");
+
+ assertThat(buildTableForKey(h, op, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L));
+ assertThat(buildTableForKey(h, op, "k2"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L));
+ stripWatermarksAndStatusesFromOutput(h);
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k1", "p1", "k1", "v1", 105L),
+ row(2L, "k2", "p2", "k2", "v1", 105L));
+ }
+ }
+
+ // ----------------------------------------------------------------- NULL keys
+
+ @ParameterizedTest
+ @CsvSource({"true, true", "true, false", "false, true", "false, false"})
+ void joinRespectsNullKeysFilter(boolean leftOuter, boolean filterNullKey) throws Exception {
+ LateralSnapshotJoinOperator op =
+ newOperator(
+ leftOuter,
+ newTrueCondition(),
+ new boolean[] {filterNullKey},
+ 100L,
+ null,
+ null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ addBuildChange(h, insertRecord(null, "v_null", 1L));
+ addProbeRecord(h, 1L, null, "p_null");
+ addBuildWm(h, 100L);
+
+ // test joining during LOAD -> JOIN transition
+ if (filterNullKey) {
+ if (leftOuter) {
+ JOINED_ASSERTOR.shouldEmitAll(h, row(1L, null, "p_null", null, null, null));
+ } else {
+ assertThat(h.getOutput()).isEmpty();
+ }
+ } else {
+ JOINED_ASSERTOR.shouldEmitAll(h, row(1L, null, "p_null", null, "v_null", 1L));
+ }
+
+ // test joining in JOIN phase
+ addProbeRecord(h, 2L, null, "p_null");
+ if (filterNullKey) {
+ if (leftOuter) {
+ JOINED_ASSERTOR.shouldEmitAll(h, row(2L, null, "p_null", null, null, null));
+ } else {
+ assertThat(h.getOutput()).isEmpty();
+ }
+ } else {
+ JOINED_ASSERTOR.shouldEmitAll(h, row(2L, null, "p_null", null, "v_null", 1L));
+ }
+ }
+ }
+
+ // ----------------------------------------------------------------- Watermark status
+
+ @Test
+ void buildSideWmAndWmStatusForwarding() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // probe-side WM
+ addProbeWm(h, 70L);
+
+ // LOAD phase: build-side becomes idle then active again. WM is advanced.
+ h.processWatermarkStatus2(WatermarkStatus.IDLE);
+ h.processWatermarkStatus2(WatermarkStatus.ACTIVE);
+ addBuildWm(h, 50L);
+ // assert that no WMs or WM statuses are emitted in LOAD.
+ assertThat(h.getOutput()).isEmpty();
+
+ // Drive into JOIN
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+
+ // assert that only the probe-side WM was forwarded after the transition
+ assertThat(h.getOutput()).containsExactly(new Watermark(70L));
+ h.getOutput().clear();
+
+ // JOIN phase: build-side becomes idle then active again. WM is advanced
+ h.processWatermarkStatus2(WatermarkStatus.IDLE);
+ h.processWatermarkStatus2(WatermarkStatus.ACTIVE);
+ addBuildWm(h, 200L);
+
+ // No records, WMs, or WM statuses emitted in JOIN.
+ assertThat(h.getOutput()).isEmpty();
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void probeSideWmAndWmStatusForwarding(boolean probeIdleDuringLoad) throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ addProbeWm(h, 25L);
+ addProbeWm(h, 50L);
+ // Probe-side WM statuses received during LOAD.
+ h.processWatermarkStatus1(WatermarkStatus.IDLE);
+ if (!probeIdleDuringLoad) {
+ h.processWatermarkStatus1(WatermarkStatus.ACTIVE);
+ }
+ // Absorbed during LOAD — nothing emitted.
+ assertThat(extractWatermarks(h.getOutput())).isEmpty();
+ assertThat(extractWatermarkStatuses(h.getOutput())).isEmpty();
+
+ // Flip to JOIN phase — last probe WM emitted.
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+
+ if (probeIdleDuringLoad) {
+ assertThat(extractWatermarkStatuses(h.getOutput()))
+ .containsExactly(WatermarkStatus.IDLE);
+ } else {
+ assertThat(extractWatermarkStatuses(h.getOutput())).isEmpty();
+ }
+ assertThat(extractWatermarks(h.getOutput())).containsExactly(new Watermark(50));
+ h.getOutput().clear();
+
+ // WMs and WM status updates received during JOIN are forwarded.
+ addProbeWm(h, 150L);
+ assertThat(h.getOutput()).containsExactly(new Watermark(150));
+ h.getOutput().clear();
+ // set probe-side to idle
+ h.processWatermarkStatus1(WatermarkStatus.IDLE);
+ assertThat(h.getOutput()).containsExactly(WatermarkStatus.IDLE);
+ h.getOutput().clear();
+ // set probe-side to active
+ h.processWatermarkStatus1(WatermarkStatus.ACTIVE);
+ assertThat(h.getOutput()).containsExactly(WatermarkStatus.ACTIVE);
+ h.getOutput().clear();
+ // emit another watermark
+ addProbeWm(h, 200L);
+ assertThat(h.getOutput()).containsExactly(new Watermark(200));
+ h.getOutput().clear();
+ }
+ }
+
+ // ----------------------------------------------------------------- State TTL
+
+ @Test
+ void stateTtlRefreshesOnAccessAndEvictsInactiveKeys() throws Exception {
+ // stateTtlMs = 100. Timers are registered at 1.5 × stateTtlMs, so the deadline for access
+ // at t=0 is 150.
+ LateralSnapshotJoinOperator op = newOperator(false, 50L, null, 100L);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.setProcessingTime(0);
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k2", "v1", 1L));
+ addBuildChange(h, insertRecord("k3", "v1", 1L));
+ addBuildChange(h, insertRecord("k4", "v1", 1L));
+
+ // State is NOT evicted during LOAD even after the deadline passes (TTL fires are
+ // rescheduled past the LOAD phase).
+ h.setProcessingTime(200);
+ assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3", "k4");
+
+ // flip to JOIN
+ addBuildWm(h, 50L);
+ assertPhase(op, Phase.JOIN);
+
+ // Touch k1, k2, k3 to reset their TTL in JOIN; leave k4 alone.
+ h.setProcessingTime(275);
+ assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3", "k4");
+ addBuildChange(h, insertRecord("k1", "v2", 2L));
+ addBuildChange(h, insertRecord("k2", "v2", 2L));
+ addBuildChange(h, insertRecord("k3", "v2", 2L));
+
+ // k4 evicted: it wasn't accessed since proc-time (0) and we flipped to JOIN at (200)
+ h.setProcessingTime(350);
+ assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3");
+
+ // Update build state for k1 and k2; leave k3 alone
+ addBuildWm(h, 60L);
+ h.setProcessingTime(400);
+ assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3");
+ addBuildChange(h, insertRecord("k1", "v3", 3L));
+ addBuildChange(h, insertRecord("k2", "v3", 3L));
+
+ // k3 evicted, not accessed since (275)
+ addBuildWm(h, 70L);
+ h.setProcessingTime(475);
+ assertThat(buildStateKeys(h)).containsExactly("k1", "k2");
+ // Access k1 from probe side to reset its TTL again; leave k2 alone
+ addProbeRecord(h, 1L, "k1", "p1");
+
+ // k2 evicted, not accessed since (400)
+ h.setProcessingTime(550);
+ assertThat(buildStateKeys(h)).containsExactly("k1");
+
+ // k1 finally evicted.
+ h.setProcessingTime(700);
+ assertThat(buildStateKeys(h)).isEmpty();
+
+ // The probe joined against the (k1) multi-set state at proc-time 475 — entries v1,
+ // v2, v3.
+ stripWatermarksAndStatusesFromOutput(h);
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k1", "p1", "k1", "v1", 1L),
+ row(1L, "k1", "p1", "k1", "v2", 2L),
+ row(1L, "k1", "p1", "k1", "v3", 3L));
+ }
+ }
+
+ @Test
+ void stateTtlClearsAllPerKeyState() throws Exception {
+ // stateTtlMs = 100. Timers are registered at 1.5 × stateTtlMs.
+ LateralSnapshotJoinOperator op = newOperator(false, 50L, null, 100L);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.setProcessingTime(0);
+ h.open();
+ // Buffered during LOAD
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 50L); // flip to JOIN
+ assertPhase(op, Phase.JOIN);
+
+ // Buffer a change in JOIN that does NOT drain (no further WM advance / access). This
+ // populates the change buffer and the buffered-at tag while the build table keeps {v1}.
+ addBuildChange(h, insertRecord("k1", "v2"));
+ assertThat(buildTableForKey(h, op, "k1")).isEqualTo(Map.of("v1", 1L));
+ assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(1);
+ assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(50L);
+ assertThat(ttlExpiryFor(h, op, "k1")).isEqualTo(150L);
+
+ // Fire the TTL timer (well past the eviction deadline).
+ h.setProcessingTime(1000);
+
+ // Every per-key state object is cleared, not just the build table.
+ assertThat(buildTableForKey(h, op, "k1")).isEmpty();
+ assertThat(bufferedChangesForKey(h, op, "k1")).isEmpty();
+ assertThat(bufferedAtWmFor(h, op, "k1")).isNull();
+ assertThat(probeBufferForKey(h, op, "k1")).isEmpty();
+ assertThat(ttlExpiryFor(h, op, "k1")).isNull();
+ }
+ }
+
+ @Test
+ void stateTtlRestoreResetsFlipProcTime() throws Exception {
+ // stateTtlMs = 50. Build write at t=0 arms TTL at 75. Flip at t=0 → flipProcTime=0.
+ // Original grace ends at 0+50=50.
+ LateralSnapshotJoinOperator op1 = newOperator(false, 100L, null, 50L);
+ OperatorSubtaskState state;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op1)) {
+ h.setProcessingTime(0);
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 100L);
+ assertThat(op1.getFlipProcTime()).isEqualTo(0L);
+ state = h.snapshot(0L, 0L);
+ }
+
+ // Restart at t=30 — flipProcTime is re-anchored to 30; new grace ends at 30+50=80.
+ LateralSnapshotJoinOperator op2 = newOperator(false, 100L, null, 50L);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op2)) {
+ h.setProcessingTime(30);
+ h.initializeState(state);
+ h.open();
+ assertThat(op2.getFlipProcTime()).isEqualTo(30L);
+
+ // At t=75 the recovered TTL timer fires. Grace check: now=75 < flipProcTime+stateTtlMs
+ // = 80 → reschedule rather than evict.
+ h.setProcessingTime(75);
+
+ // k1 still present because the grace window was re-anchored.
+ assertThat(buildStateKeys(h)).containsExactly("k1");
+
+ // advance proc-time to 105 to evict state
+ h.setProcessingTime(105);
+ assertThat(buildStateKeys(h)).isEmpty();
+ }
+ }
+
+ // ----------------------------------------------------------------- Snapshot / restore
+
+ @Test
+ void restoreFromLoadPhaseSnapshot() throws Exception {
+ LateralSnapshotJoinOperator op1 = newOperator(false, 100L, null, null);
+ OperatorSubtaskState state;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op1)) {
+ h.open();
+ // Build-side multi-set with a duplicate count, plus a buffered
+ // probe.
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k1", "v2", 2L));
+ addBuildWm(h, 10L);
+ addBuildChange(h, insertRecord("k1", "v3", 3L));
+ addProbeRecord(h, 1L, "k1", "p1");
+ assertPhase(op1, Phase.LOAD);
+ assertThat(probeBufferKeys(h)).containsExactly("k1");
+ state = h.snapshot(0L, 0L);
+ }
+
+ LateralSnapshotJoinOperator op2 = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op2)) {
+ h.initializeState(state);
+ h.open();
+ assertPhase(op2, Phase.LOAD);
+ // Buffered probe and build-table multi-set (with counts) preserved across restore.
+ assertThat(probeBufferKeys(h)).containsExactly("k1");
+ assertThat(buildTableForKey(h, op2, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L));
+ assertThat(bufferedChangesForKey(h, op2, "k1")).hasSize(1);
+ // Gauge is rebuilt from restored keyed state: one probe buffered for k1.
+ assertThat(op2.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(1L);
+
+ // Trigger flip; the buffered probe is joined post-restore, count-respecting against the
+ // restored multi-set (2× v1 + 1× v2 = three rows).
+ addProbeWm(h, 50L);
+ addBuildWm(h, 100L);
+ assertPhase(op2, Phase.JOIN);
+ assertThat(probeBufferKeys(h)).isEmpty();
+ // Draining the restored probe and build buffers brings both gauges back to 0.
+ assertThat(op2.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+ assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+ assertThat(buildTableForKey(h, op2, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L, "v3", 1L));
+
+ stripWatermarksAndStatusesFromOutput(h);
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k1", "p1", "k1", "v1", 1L),
+ row(1L, "k1", "p1", "k1", "v1", 1L),
+ row(1L, "k1", "p1", "k1", "v2", 2L),
+ row(1L, "k1", "p1", "k1", "v3", 3L));
+ }
+ }
+
+ @Test
+ void restoreFromMixedPhaseSnapshot() throws Exception {
+ // Subtask A: drive into JOIN with a buffered change for k1.
+ LateralSnapshotJoinOperator opA = newOperator(false, 100L, null, null);
+ OperatorSubtaskState stateA;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(opA)) {
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 100L);
+ assertPhase(opA, Phase.JOIN);
+ // Buffer a build-side change
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ stateA = h.snapshot(0L, 0L);
+ }
+
+ // Subtask B: stay in LOAD (no flip-triggering build WM).
+ LateralSnapshotJoinOperator opB = newOperator(false, 100L, null, null);
+ OperatorSubtaskState stateB;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(opB)) {
+ h.open();
+ addProbeRecord(h, 1L, "k2", "p1");
+ assertPhase(opB, Phase.LOAD);
+ stateB = h.snapshot(0L, 0L);
+ }
+
+ OperatorSubtaskState combined =
+ AbstractStreamOperatorTestHarness.repackageState(stateA, stateB);
+
+ // Restore the combined state — phase must be LOAD because some subtask was LOAD.
+ LateralSnapshotJoinOperator opC = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(opC)) {
+ h.initializeState(combined);
+ h.open();
+ assertPhase(opC, Phase.LOAD);
+
+ // assert state: A's build table ({v1:1}) plus A's buffered change and B's probe.
+ assertThat(buildTableKeys(h)).containsExactly("k1");
+ assertThat(buildTableForKey(h, opC, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L));
+ assertThat(bufferedChangesForKey(h, opC, "k1")).hasSize(1);
+ assertThat(probeBufferKeys(h)).containsExactly("k2");
+
+ // During LOAD the current build WM is still MIN_VALUE, so the recovered buffer (tagged
+ // at the pre-restore WM) is not drained; the new change is appended to it.
+ addBuildChange(h, insertRecord("k1", "v2", 2L));
+ assertThat(bufferedChangesForKey(h, opC, "k1")).hasSize(2);
+ assertThat(buildTableForKey(h, opC, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L));
+
+ // Trigger flip
+ addBuildWm(h, 100L);
+ assertPhase(opC, Phase.JOIN);
+
+ // Access k1: the buffered changes drain in event-time order, then the probe joins.
+ addProbeRecord(h, 1L, "k1", "p1");
+ assertThat(bufferedChangesForKey(h, opC, "k1")).isEmpty();
+ assertThat(buildTableForKey(h, opC, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L));
+ stripWatermarksAndStatusesFromOutput(h);
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k1", "p1", "k1", "v1", 1L),
+ row(1L, "k1", "p1", "k1", "v1", 1L),
+ row(1L, "k1", "p1", "k1", "v2", 2L));
+ }
+ }
+
+ @Test
+ void restoreFromJoinPhaseSnapshot() throws Exception {
+ LateralSnapshotJoinOperator op1 = newOperator(false, 100L, null, null);
+ OperatorSubtaskState state;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op1)) {
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k2", "v1", 1L));
+ addBuildWm(h, 100L);
+ assertPhase(op1, Phase.JOIN);
+ // Buffer a -U/+U pair for each key (tagged at bufferedAt = 100).
+ addBuildChange(h, updateBeforeRecord("k1", "v1", 1L));
+ addBuildChange(h, updateAfterRecord("k1", "v2", 101L));
+ addBuildChange(h, updateBeforeRecord("k2", "v1", 1L));
+ addBuildChange(h, updateAfterRecord("k2", "v2", 101L));
+ state = h.snapshot(0L, 0L);
+ }
+
+ LateralSnapshotJoinOperator op2 = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op2)) {
+ h.initializeState(state);
+ h.open();
+ assertPhase(op2, Phase.JOIN);
+ assertThat(op2.getCurrentBuildSideWm()).isEqualTo(Long.MIN_VALUE);
+ assertThat(bufferedChangesForKey(h, op2, "k1")).hasSize(2);
+ assertThat(bufferedChangesForKey(h, op2, "k2")).hasSize(2);
+ // Gauge is rebuilt from restored keyed state: four build-side changes buffered
+ // (two for k1, two for k2).
+ assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(4L);
+
+ // k2: accessed while no build WM has arrived since restore → eager drain.
+ addProbeRecord(h, 1L, "k2", "p-k2");
+ assertThat(bufferedChangesForKey(h, op2, "k2")).isEmpty();
+ // Draining k2's two restored changes leaves k1's two still buffered.
+ assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(2L);
+ assertThat(buildTableForKey(h, op2, "k2"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v2", 1L));
+
+ // k1: still buffered (eager drain of k2 left latestBuildSideWm at MIN_VALUE); advance
+ // the build WM, then access → normal WM-gated drain.
+ assertThat(bufferedChangesForKey(h, op2, "k1")).hasSize(2);
+ addBuildWm(h, 200L);
+ addProbeRecord(h, 2L, "k1", "p-k1");
+ assertThat(bufferedChangesForKey(h, op2, "k1")).isEmpty();
+ // All restored changes drained; gauge back to 0.
+ assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+ assertThat(buildTableForKey(h, op2, "k1"))
+ .containsExactlyInAnyOrderEntriesOf(Map.of("v2", 1L));
+
+ stripWatermarksAndStatusesFromOutput(h);
+ JOINED_ASSERTOR.shouldEmitAll(
+ h,
+ row(1L, "k2", "p-k2", "k2", "v2", 101L),
+ row(2L, "k1", "p-k1", "k1", "v2", 101L));
+ }
+ }
+
+ @Test
+ void restoreRearmsIdleFlipTimer() throws Exception {
+ LateralSnapshotJoinOperator op1 = newOperator(false, 1000L, 100L, null);
+ OperatorSubtaskState state;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op1)) {
+ h.setProcessingTime(50);
+ h.open();
+ assertPhase(op1, Phase.LOAD);
+ state = h.snapshot(0L, 0L);
+ }
+
+ LateralSnapshotJoinOperator op2 = newOperator(false, 1000L, 100L, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op2)) {
+ h.setProcessingTime(150);
+ h.initializeState(state);
+ h.open();
+ assertPhase(op2, Phase.LOAD);
+ // Re-armed at open()'s proc-time + idleTimeout = 150 + 100 = 250.
+ h.setProcessingTime(249);
+ assertPhase(op2, Phase.LOAD);
+ h.setProcessingTime(250);
+ assertPhase(op2, Phase.JOIN);
+ }
+ }
+
+ // ----------------------------------------------------------------- Metrics
+
+ @Test
+ void currentPhaseGaugeReflectsPhase() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // LOAD = ordinal 0.
+ assertThat(op.getCurrentPhaseGauge().getValue()).isEqualTo(0);
+
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+ // JOIN = ordinal 1.
+ assertThat(op.getCurrentPhaseGauge().getValue()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ void probeBufferedGaugeTracksLoadBufferingAndDrainsOnFlip() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+
+ addProbeRecord(h, 1L, "k1", "p1");
+ assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(1L);
+ addProbeRecord(h, 2L, "k2", "p2");
+ assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(2L);
+ addProbeRecord(h, 3L, "k1", "p3");
+ assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(3L);
+
+ // Flip drains all buffered probes across keys.
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+ assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+ }
+ }
+
+ @Test
+ void buildBufferedGaugeTracksJoinBufferingAndDrains() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // Flip to JOIN with an empty build state (no build records during LOAD).
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+ assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+
+ // JOIN-phase build changes are buffered (their tag equals the current build WM).
+ addBuildChange(h, insertRecord("k1", "v2", 101L));
+ addBuildChange(h, insertRecord("k1", "v3", 101L));
+ assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(2L);
+ addBuildChange(h, insertRecord("k2", "v1", 102L));
+ assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(3L);
+
+ // Advance build WM, then drain each key by accessing it from the probe side.
+ addBuildWm(h, 110L);
+ addProbeRecord(h, 1L, "k1", "p1");
+ assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(1L);
+ addProbeRecord(h, 2L, "k2", "p2");
+ assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+ }
+ }
+
+ @Test
+ void watermarkGaugesTrackBuildAndProbeWatermarks() throws Exception {
+ // High loadCompletedTime so the operator stays in LOAD for the first build WMs.
+ LateralSnapshotJoinOperator op = newOperator(false, 1000L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(Long.MIN_VALUE);
+ assertThat(op.getCurrentProbeSideWatermarkGauge().getValue()).isEqualTo(Long.MIN_VALUE);
+
+ // LOAD phase: both watermarks are tracked even though nothing is forwarded.
+ addBuildWm(h, 50L);
+ assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(50L);
+ addProbeWm(h, 70L);
+ assertThat(op.getCurrentProbeSideWatermarkGauge().getValue()).isEqualTo(70L);
+ assertPhase(op, Phase.LOAD);
+
+ // Flip to JOIN.
+ addBuildWm(h, 1000L);
+ assertPhase(op, Phase.JOIN);
+ assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(1000L);
+
+ // JOIN phase: probe WM gauge keeps tracking (guards the dedicated currentProbeWm
+ // field).
+ addProbeWm(h, 1500L);
+ assertThat(op.getCurrentProbeSideWatermarkGauge().getValue()).isEqualTo(1500L);
+ addBuildWm(h, 1100L);
+ assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(1100L);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void fanOutGaugesTrackMaxAndAverage(boolean leftOuter) throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(leftOuter, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // k1 multi-set: v1 x2, v2 x1 → fan-out 3; k2 single row → fan-out 1.
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k1", "v2", 1L));
+ addBuildChange(h, insertRecord("k2", "v1", 1L));
+ // a probe record during load
+ addProbeRecord(h, 1L, "k2", "p1"); // fan-out 1
+ // a probe record that does not match
+ addProbeRecord(h, 2L, "k3", "p2"); // fan-out INNER: 0, LEFT OUTER: 1
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+
+ // check merge stats after transition
+ assertThat(op.getMaxJoinFanOutGauge().getValue()).isEqualTo(1L);
+ if (leftOuter) {
+ assertThat(op.getAvgJoinFanOutGauge().getValue()).isEqualTo(2d / 2);
+ assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(0L);
+ } else {
+ assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(1d / 2, within(1e-9));
+ assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(1L);
+ }
+
+ // join probe in JOIN phase
+ addProbeRecord(h, 3L, "k1", "p3"); // fan-out 3
+ assertThat(op.getMaxJoinFanOutGauge().getValue()).isEqualTo(3L);
+ if (leftOuter) {
+ assertThat(op.getAvgJoinFanOutGauge().getValue()).isEqualTo(5d / 3, within(1e-9));
+ assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(0L);
+ } else {
+ assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(4d / 3, within(1e-9));
+ assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(1L);
+ }
+
+ addProbeRecord(h, 3L, "k3", "no-match");
+ assertThat(op.getMaxJoinFanOutGauge().getValue()).isEqualTo(3L);
+ if (leftOuter) {
+ // fan-out 1 (LEFT OUTER, null-padded)
+ assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(6.0d / 4, within(1e-9));
+ assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(0L);
+ } else {
+ // fan-out 0 (INNER, unmatched)
+ assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(4.0d / 4, within(1e-9));
+ assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(2L);
+ }
+ }
+ }
+
+ @Test
+ void numUnmatchedBuildRetractionsCountsAbsentRowRetractions() throws Exception {
+ LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.open();
+ // LOAD: a -D for a never-inserted row is counted.
+ addBuildChange(h, deleteRecord("k1", "ghost", 1L));
+ addBuildChange(h, insertRecord("k1", "v1", 2L));
+ assertThat(op.getNumUnmatchedBuildRetractions().getCount()).isEqualTo(0L);
+
+ // Add a real row for k1
+ addBuildChange(h, insertRecord("k1", "v1", 3L));
+ // Flip to JOIN
+ addBuildWm(h, 100L);
+ assertPhase(op, Phase.JOIN);
+
+ // Access k1 → drains [-D(ghost)@1, +I(v1)@2]. The -D hits an absent row and is counted.
+ addBuildChange(h, deleteRecord("k1", "ghost2", 101L));
+ assertThat(op.getNumUnmatchedBuildRetractions().getCount()).isEqualTo(1L);
+
+ // Advance WM + access → drains -D(ghost2)@3 on an absent row.
+ addBuildWm(h, 110L);
+ addProbeRecord(h, 1L, "k1", "p1");
+ assertThat(op.getNumUnmatchedBuildRetractions().getCount()).isEqualTo(2L);
+
+ // Deleting an existing row (matching row-time) does not change the metric.
+ addBuildChange(h, deleteRecord("k1", "v1", 2L));
+ addBuildWm(h, 120L);
+ addProbeRecord(h, 2L, "k1", "p2");
+ assertThat(op.getNumUnmatchedBuildRetractions().getCount()).isEqualTo(2L);
+ }
+ }
+
+ @Test
+ void numStateTtlEvictionsCountsEvictedKeys() throws Exception {
+ // stateTtlMs = 100 → timers registered at 1.5 × = 150.
+ LateralSnapshotJoinOperator op = newOperator(false, 50L, null, 100L);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(op)) {
+ h.setProcessingTime(0);
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildChange(h, insertRecord("k2", "v1", 1L));
+ addBuildWm(h, 50L); // flip to JOIN at proc-time 0
+ assertPhase(op, Phase.JOIN);
+ assertThat(op.getNumStateTtlEvictions().getCount()).isEqualTo(0L);
+
+ // Neither key is touched again; both evict once their TTL timer (150) fires.
+ h.setProcessingTime(150);
+ assertThat(buildTableKeys(h)).isEmpty();
+ assertThat(op.getNumStateTtlEvictions().getCount()).isEqualTo(2L);
+ }
+ }
+
+ @Test
+ void bufferGaugesAreRebuiltFromStateOnRestore() throws Exception {
+ // Snapshot in LOAD with probes buffered across multiple keys, and in JOIN with build-side
+ // changes buffered across multiple keys, then assert both gauges are rebuilt from the
+ // restored keyed state (the in-memory tallies start at 0 in the restored operator).
+
+ // --- LOAD-phase snapshot: three buffered probes spread over two keys. ---
+ LateralSnapshotJoinOperator loadOp = newOperator(false, 100L, null, null);
+ OperatorSubtaskState loadState;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(loadOp)) {
+ h.open();
+ addProbeRecord(h, 1L, "k1", "p1");
+ addProbeRecord(h, 2L, "k2", "p2");
+ addProbeRecord(h, 3L, "k1", "p3");
+ assertPhase(loadOp, Phase.LOAD);
+ assertThat(loadOp.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(3L);
+ loadState = h.snapshot(0L, 0L);
+ }
+
+ LateralSnapshotJoinOperator loadOp2 = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(loadOp2)) {
+ h.initializeState(loadState);
+ h.open();
+ // Rebuilt from restored state: all three probes, across both keys.
+ assertThat(loadOp2.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(3L);
+ // The build gauge stays 0 — nothing was buffered on the build side.
+ assertThat(loadOp2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+ }
+
+ // --- JOIN-phase snapshot: three buffered build-side changes spread over two keys. ---
+ LateralSnapshotJoinOperator joinOp = newOperator(false, 100L, null, null);
+ OperatorSubtaskState joinState;
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(joinOp)) {
+ h.open();
+ addBuildChange(h, insertRecord("k1", "v1", 1L));
+ addBuildWm(h, 100L);
+ assertPhase(joinOp, Phase.JOIN);
+ // The v2 access drains the LOAD insert (v1); v2, v3 and k2/v1 remain buffered.
+ addBuildChange(h, insertRecord("k1", "v2", 101L));
+ addBuildChange(h, insertRecord("k1", "v3", 101L));
+ addBuildChange(h, insertRecord("k2", "v1", 101L));
+ assertThat(joinOp.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(3L);
+ joinState = h.snapshot(0L, 0L);
+ }
+
+ LateralSnapshotJoinOperator joinOp2 = newOperator(false, 100L, null, null);
+ try (KeyedTwoInputStreamOperatorTestHarness h =
+ newHarness(joinOp2)) {
+ h.initializeState(joinState);
+ h.open();
+ assertPhase(joinOp2, Phase.JOIN);
+ // Rebuilt from restored state: all three buffered changes, across both keys.
+ assertThat(joinOp2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(3L);
+ // The probe gauge stays 0 — nothing was buffered on the probe side.
+ assertThat(joinOp2.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(0L);
+ }
+ }
+
+ // ----------------------------------------------------------------- Helpers
+
+ /** Sends a build-side change record (any {@link RowKind}) to the harness. */
+ private static void addBuildChange(
+ KeyedTwoInputStreamOperatorTestHarness h,
+ StreamRecord record)
+ throws Exception {
+ h.processElement2(record);
+ }
+
+ /** Sends a probe-side {@link RowKind#INSERT} record with the default probe schema. */
+ private static void addProbeRecord(
+ KeyedTwoInputStreamOperatorTestHarness h,
+ Long id,
+ String key,
+ String val)
+ throws Exception {
+ h.processElement1(insertRecord(id, key, val));
+ }
+
+ /** Sends a build-side watermark to the harness (input 2). */
+ private static void addBuildWm(
+ KeyedTwoInputStreamOperatorTestHarness h, long ts)
+ throws Exception {
+ h.processWatermark2(new Watermark(ts));
+ }
+
+ /** Sends a probe-side watermark to the harness (input 1). */
+ private static void addProbeWm(
+ KeyedTwoInputStreamOperatorTestHarness h, long ts)
+ throws Exception {
+ h.processWatermark1(new Watermark(ts));
+ }
+
+ /**
+ * Builds an expected joined row. The operator only ever emits {@link RowKind#INSERT}, so we
+ * don't accept a rowkind parameter.
+ */
+ private static GenericRowData row(
+ Long id, String pKey, String pVal, String bKey, String bVal, Long bRt) {
+ GenericRowData r = new GenericRowData(OUTPUT_TYPES.length);
+ r.setField(0, id);
+ r.setField(1, pKey == null ? null : StringData.fromString(pKey));
+ r.setField(2, pVal == null ? null : StringData.fromString(pVal));
+ r.setField(3, bKey == null ? null : StringData.fromString(bKey));
+ r.setField(4, bVal == null ? null : StringData.fromString(bVal));
+ r.setField(5, bRt);
+ r.setRowKind(RowKind.INSERT);
+ return r;
+ }
+
+ /**
+ * Builds an expected joined row for a composite (two-field) equi-key on both sides: probe
+ * {@code (kA, kB, val)} concatenated with build {@code (kA, kB, val)}.
+ */
+ private static GenericRowData compKeyRow(
+ String pKa, String pKb, String pVal, String bKa, String bKb, String bVal, Long bRt) {
+ GenericRowData r = new GenericRowData(7);
+ r.setField(0, pKa == null ? null : StringData.fromString(pKa));
+ r.setField(1, pKb == null ? null : StringData.fromString(pKb));
+ r.setField(2, pVal == null ? null : StringData.fromString(pVal));
+ r.setField(3, bKa == null ? null : StringData.fromString(bKa));
+ r.setField(4, bKb == null ? null : StringData.fromString(bKb));
+ r.setField(5, bVal == null ? null : StringData.fromString(bVal));
+ r.setField(6, bRt);
+ r.setRowKind(RowKind.INSERT);
+ return r;
+ }
+
+ private static void assertPhase(LateralSnapshotJoinOperator op, Phase expected) {
+ assertThat(op.getPhase()).isEqualTo(expected);
+ }
+
+ private static BinaryRowData stringKey(String key) {
+ BinaryRowData k = new BinaryRowData(1);
+ BinaryRowWriter w = new BinaryRowWriter(k);
+ if (key == null) {
+ w.setNullAt(0);
+ } else {
+ w.writeString(0, StringData.fromString(key));
+ }
+ w.complete();
+ return k;
+ }
+
+ private static List buildTableKeys(
+ KeyedTwoInputStreamOperatorTestHarness h)
+ throws Exception {
+ return h.getOperator()
+ .getKeyedStateBackend()
+ .getKeys(BUILD_TABLE_STATE_NAME, VoidNamespace.INSTANCE)
+ .map(r -> ((BinaryRowData) r).getString(0).toString())
+ .sorted()
+ .toList();
+ }
+
+ private static List probeBufferKeys(
+ KeyedTwoInputStreamOperatorTestHarness h)
+ throws Exception {
+ return h.getOperator()
+ .getKeyedStateBackend()
+ .getKeys(PROBE_BUFFER_STATE_NAME, VoidNamespace.INSTANCE)
+ .map(r -> ((BinaryRowData) r).getString(0).toString())
+ .sorted()
+ .toList();
+ }
+
+ /**
+ * Returns the keys that currently hold any build-side state — either a materialized build-table
+ * entry or a buffered (not-yet-applied) build-side change. TTL eviction clears both, so this
+ * reflects whether a key is still "alive", regardless of whether its buffered changes have been
+ * drained into the build table yet.
+ */
+ private static List buildStateKeys(
+ KeyedTwoInputStreamOperatorTestHarness h)
+ throws Exception {
+ java.util.SortedSet keys = new java.util.TreeSet<>();
+ h.getOperator()
+ .getKeyedStateBackend()
+ .getKeys(BUILD_TABLE_STATE_NAME, VoidNamespace.INSTANCE)
+ .forEach(r -> keys.add(((BinaryRowData) r).getString(0).toString()));
+ h.getOperator()
+ .getKeyedStateBackend()
+ .getKeys(BUILD_CHANGE_BUFFER_STATE_NAME, VoidNamespace.INSTANCE)
+ .forEach(r -> keys.add(((BinaryRowData) r).getString(0).toString()));
+ return new ArrayList<>(keys);
+ }
+
+ /**
+ * Returns the build-table multi-set for the given key as {@code build-val → count}. Assumes the
+ * schema's value field is at index 1.
+ */
+ private static Map buildTableForKey(
+ KeyedTwoInputStreamOperatorTestHarness h,
+ LateralSnapshotJoinOperator op,
+ String key)
+ throws Exception {
+ h.getOperator().setCurrentKey(stringKey(key));
+ Map result = new LinkedHashMap<>();
+ for (Map.Entry e : op.getBuildTableState().entries()) {
+ result.put(e.getKey().getString(1).toString(), e.getValue());
+ }
+ return result;
+ }
+
+ private static List probeBufferForKey(
+ KeyedTwoInputStreamOperatorTestHarness h,
+ LateralSnapshotJoinOperator op,
+ String key)
+ throws Exception {
+ h.getOperator().setCurrentKey(stringKey(key));
+ List result = new ArrayList<>();
+ for (RowData r : op.getProbeBuffer().get()) {
+ result.add(r);
+ }
+ return result;
+ }
+
+ private static List bufferedChangesForKey(
+ KeyedTwoInputStreamOperatorTestHarness h,
+ LateralSnapshotJoinOperator op,
+ String key)
+ throws Exception {
+ h.getOperator().setCurrentKey(stringKey(key));
+ List result = new ArrayList<>();
+ for (RowData r : op.getBuildChangeBuffer().get()) {
+ result.add(r);
+ }
+ return result;
+ }
+
+ private static Long bufferedAtWmFor(
+ KeyedTwoInputStreamOperatorTestHarness h,
+ LateralSnapshotJoinOperator op,
+ String key)
+ throws Exception {
+ h.getOperator().setCurrentKey(stringKey(key));
+ return op.getBufferedAtWmState().value();
+ }
+
+ private static Long ttlExpiryFor(
+ KeyedTwoInputStreamOperatorTestHarness h,
+ LateralSnapshotJoinOperator op,
+ String key)
+ throws Exception {
+ h.getOperator().setCurrentKey(stringKey(key));
+ return op.getTtlExpiryState().value();
+ }
+
+ /** Drops watermarks and watermark statuses from the harness output queue, in place. */
+ private static void stripWatermarksAndStatusesFromOutput(
+ KeyedTwoInputStreamOperatorTestHarness h) {
+ h.getOutput().removeIf(o -> o instanceof Watermark || o instanceof WatermarkStatus);
+ }
+
+ private static List stripWatermarks(ConcurrentLinkedQueue output) {
+ return output.stream().filter(o -> !(o instanceof Watermark)).toList();
+ }
+
+ private static List extractWatermarks(ConcurrentLinkedQueue output) {
+ return output.stream().filter(o -> o instanceof Watermark).map(w -> (Watermark) w).toList();
+ }
+
+ /**
+ * Asserts that exactly one watermark equal to {@code expected} was emitted and that no {@link
+ * StreamRecord} follows it. A forwarded watermark must be released only after the records that
+ * logically precede it (e.g. probes drained on flip) have been emitted.
+ */
+ private static void assertWatermarkForwardedAfterRecords(
+ ConcurrentLinkedQueue output, long expectedTs) {
+ Watermark expectedWatermark = new Watermark(expectedTs);
+ List elements = List.copyOf(output);
+ assertThat(extractWatermarks(output)).containsExactly(expectedWatermark);
+ int wmIndex = elements.indexOf(expectedWatermark);
+ assertThat(elements.subList(wmIndex + 1, elements.size()))
+ .as("no records may be emitted after watermark %s", expectedWatermark)
+ .noneMatch(o -> o instanceof StreamRecord);
+ }
+
+ private static List extractWatermarkStatuses(
+ ConcurrentLinkedQueue output) {
+ return output.stream()
+ .filter(o -> o instanceof WatermarkStatus)
+ .map(w -> (WatermarkStatus) w)
+ .toList();
+ }
+}