Skip to content

[FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness#28326

Open
autophagy wants to merge 1 commit into
apache:masterfrom
autophagy:flink-39378-timers-2
Open

[FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness#28326
autophagy wants to merge 1 commit into
apache:masterfrom
autophagy:flink-39378-timers-2

Conversation

@autophagy
Copy link
Copy Markdown
Contributor

@autophagy autophagy commented Jun 5, 2026

What is the purpose of the change

This PR adds support for Context, OnTimerContext, TimeContext, timer registration and firing, watermark management, and rowtime support to the PTF Test Harness.

When a PTF registers a timer, the timer is stored in the TimerManager and keyed by the partition row. When the user advances the watermark using
either setWatermark or setWatermarkForTable, the manager then fires all pending timers below or equal to the watermark in a deterministic order.

I moved per-invocation state stuff into an InvocationContext to make separation of concerns a little easier to follow.

Rows emitted from test harness setups that configure an on_time column also contain the rowtime column, and the PTF rejects at the point of registration if the user tries to register a timer with PASS_COLUMNS_THROUGH
enabled (similar to live, per the PROCESS_INVALID_PASS_THROUGH_TIMERS test on live.)

Some open questions remain. One is that there's an edge case where if a user's onTimer registers a new timer at or before the current watermark, it then fires, and then registers,
then fires, etc. Should there be some sort of max depth check here?

The second is that when defining the on time parameter in SQL, you pass in DESCRIPTOR(ts), but in the harness you just pass in the string name of the column. Would it instead be better to support withOnTime("DESCRIPTOR(ts)") to better mirror SQL, rather than withOnTimeColumn("ts")?

Brief change log

  • Added support for Context, OnTimerContext, TimeContext in ProcessTableFunctionTestHarness
  • Added support for timer registration, watermark tracking and timer firing in ProcessTableFunctionTestHarness
  • Added an InvocationContext to capture per-invocation state to simplify the collector logic
  • Reworked derriving the output type from the system inference to account for on_time and uid
  • Fixed an issue in createStateConverter where the incorrect state converters were being used for Map/List state.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests to ProcessFunctionTestHarnessesTest to cover timer firing, watermark advancement and context.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

2.1.156 (Claude Code)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@autophagy autophagy marked this pull request as ready for review June 5, 2026 11:28
@fhueske fhueske changed the title Timers [FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness Jun 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants