Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 109 additions & 3 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception {
{{< /tab >}}
{{< /tabs >}}

#### Testing with Timers and Context

The harness supports the `Context` parameter, timer registration via `TimeContext`, and `onTimer`
callbacks. Use `.withOnTimeColumn()` to configure the event time column and `.setWatermark()` to
advance watermarks and fire eligible timers.

{{< tabs "timer-testing" >}}
{{< tab "Java" >}}
```java
// A PTF that registers a named timer 5 seconds after each event, and emits when it fires.
@DataTypeHint("ROW<message STRING>")
public class TimerPTF extends ProcessTableFunction<Row> {
public void eval(
Context ctx,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
String name = input.getFieldAs("name");
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("timeout-" + name, timeCtx.time().plus(Duration.ofSeconds(5)));
collect(Row.of("registered-" + name));
}

public void onTimer(OnTimerContext ctx) {
collect(Row.of("timer-fired-" + ctx.currentTimer()));
}
}

@Test
void testTimerRegistrationAndFiring() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, name STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

// Verify the timer was registered
assertThat(harness.getPendingTimers()).hasSize(1);
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");

// Advance watermark past the timer's timestamp to fire it
harness.clearOutput();
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));

assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "timer-fired-timeout-Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));

assertThat(harness.getPendingTimers()).isEmpty();
assertThat(harness.getFiredTimers()).hasSize(1);
}
}
```
{{< /tab >}}
{{< /tabs >}}

**Timers with State**: State persisted during `eval()` is accessible in `onTimer()`:

{{< tabs "timer-state-testing" >}}
{{< tab "Java" >}}
```java
@DataTypeHint("ROW<message STRING>")
public class TimerWithStatePTF extends ProcessTableFunction<Row> {
public static class CounterState {
public long count = 0L;
}

public void eval(
Context ctx,
@StateHint CounterState state,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
state.count++;
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("check", timeCtx.time().plus(Duration.ofSeconds(5)));
}

public void onTimer(OnTimerContext ctx, @StateHint CounterState state) {
collect(Row.of("count=" + state.count));
}
}

@Test
void testTimerWithState() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerWithStatePTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));
}
}
```
{{< /tab >}}
{{< /tabs >}}

#### Optional Partitioning

For PTFs with `OPTIONAL_PARTITION_BY`, you can omit `withPartitionBy()` during harness setup. The
Expand Down Expand Up @@ -2582,8 +2691,5 @@ void testPOJO() throws Exception {

### PTF Features Unsupported by the TestHarness

- `Context` parameter
- Timers (`onTimer`)
- `on_time` / `rowtime`
- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
- State TTL (state is supported but TTL expiration is not yet implemented)
112 changes: 109 additions & 3 deletions docs/content/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2468,6 +2468,115 @@ void testStateMutation() throws Exception {
{{< /tab >}}
{{< /tabs >}}

#### Testing with Timers and Context

The harness supports the `Context` parameter, timer registration via `TimeContext`, and `onTimer`
callbacks. Use `.withOnTimeColumn()` to configure the event time column and `.setWatermark()` to
advance watermarks and fire eligible timers.

{{< tabs "timer-testing" >}}
{{< tab "Java" >}}
```java
// A PTF that registers a named timer 5 seconds after each event, and emits when it fires.
@DataTypeHint("ROW<message STRING>")
public class TimerPTF extends ProcessTableFunction<Row> {
public void eval(
Context ctx,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
String name = input.getFieldAs("name");
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("timeout-" + name, timeCtx.time().plus(Duration.ofSeconds(5)));
collect(Row.of("registered-" + name));
}

public void onTimer(OnTimerContext ctx) {
collect(Row.of("timer-fired-" + ctx.currentTimer()));
}
}

@Test
void testTimerRegistrationAndFiring() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, name STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

// Verify the timer was registered
assertThat(harness.getPendingTimers()).hasSize(1);
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");

// Advance watermark past the timer's timestamp to fire it
harness.clearOutput();
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));

assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "timer-fired-timeout-Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));

assertThat(harness.getPendingTimers()).isEmpty();
assertThat(harness.getFiredTimers()).hasSize(1);
}
}
```
{{< /tab >}}
{{< /tabs >}}

**Timers with State**: State persisted during `eval()` is accessible in `onTimer()`:

{{< tabs "timer-state-testing" >}}
{{< tab "Java" >}}
```java
@DataTypeHint("ROW<message STRING>")
public class TimerWithStatePTF extends ProcessTableFunction<Row> {
public static class CounterState {
public long count = 0L;
}

public void eval(
Context ctx,
@StateHint CounterState state,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
state.count++;
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("check", timeCtx.time().plus(Duration.ofSeconds(5)));
}

public void onTimer(OnTimerContext ctx, @StateHint CounterState state) {
collect(Row.of("count=" + state.count));
}
}

@Test
void testTimerWithState() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerWithStatePTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));
}
}
```
{{< /tab >}}
{{< /tabs >}}

#### Optional Partitioning

For PTFs with `OPTIONAL_PARTITION_BY`, you can omit `withPartitionBy()` during harness setup. The
Expand Down Expand Up @@ -2585,8 +2694,5 @@ void testPOJO() throws Exception {

### PTF Features Unsupported by the TestHarness

- `Context` parameter
- Timers (`onTimer`)
- `on_time` / `rowtime`
- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
- State TTL (state is supported but TTL expiration is not yet implemented)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.functions;

import org.apache.flink.types.Row;

import javax.annotation.Nullable;

/** Captures the per-invocation state for an eval() or onTimer() call in the test harness. */
class InvocationContext {
final Row partitionKey;
@Nullable final Row row;
@Nullable final String tableArgumentName;
@Nullable final Timer firingTimer;

private InvocationContext(
Row partitionKey,
@Nullable Row row,
@Nullable String tableArgumentName,
@Nullable Timer firingTimer) {
this.partitionKey = partitionKey;
this.row = row;
this.tableArgumentName = tableArgumentName;
this.firingTimer = firingTimer;
}

static InvocationContext forEval(Row partitionKey, Row row, String tableArgumentName) {
return new InvocationContext(partitionKey, row, tableArgumentName, null);
}

static InvocationContext forTimer(Timer timer) {
return new InvocationContext(timer.partitionKey, null, null, timer);
}

boolean isTimerInvocation() {
return firingTimer != null;
}

boolean isEvalInvocation() {
return tableArgumentName != null;
}
}
Loading