Skip to content

[FLINK-XXXXX][table] Refactor changelog inference program#28308

Draft
bvarghese1 wants to merge 6 commits into
apache:masterfrom
bvarghese1:refactor_changelog_inference_program
Draft

[FLINK-XXXXX][table] Refactor changelog inference program#28308
bvarghese1 wants to merge 6 commits into
apache:masterfrom
bvarghese1:refactor_changelog_inference_program

Conversation

@bvarghese1
Copy link
Copy Markdown
Contributor

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

… util

- The following helper methods are pulled out into a new ChangelogModeInferenceUtils.java
    - getModifyKindSet
    - getDeleteKind
    - isNonUpsertKeyCondition
- The Scala program now delegates these three methods to the Java util
- This commit has no functional change and the ChangelogModeInferenceTest remains unchanged
- Move the following ptf changelog helpers methods to ChangelogModeInferenceUtils.java
    - toChangelogMode
    - ptfRequiresUpdateBefore
    - extractPtfTableArgComponents
    - toPtfChangelogContext
    - queryPtfChangelogMode
    - verifyPtfTableArgsForUpdates
- The Scala program keeps thin forwarders until the migration is complete
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 3, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the file's readability isn't great. We've now split the logic for update, modify, and delete across three files. When working in FlinkChangelogModeInferenceProgram I usually jump to one node and want all of its logic in one place, so I'd suggest other paths:

  1. Convert to Java - definitely +1.
  2. Move each node's logic into a registry (Map<Class, NodeHandler> or visitor-per-node). This keeps a node's logic together across traits and drops the instanceof ladder entirely, so "what does node X do" becomes a single lookup.
  3. Give repeated branches a name - many nodes just forward or do full-delete-if-updates.
  4. Name specific branches for readability, e.g. visitWithFallback(child, preferred, fallback) instead of Optional.or(() -> visit(child, fallback)).

Another option is splitting the instanceof ladder into one small private method per node (visitSink, visitJoin, visitCalc), so the top-level visit() becomes a short dispatch and each handler reads on its own. We already use this pattern in StreamNonDeterministicUpdatePlanVisitor. I lean toward the registry first, but glad to hear other opinions

@bvarghese1
Copy link
Copy Markdown
Contributor Author

I agree the file's readability isn't great. We've now split the logic for update, modify, and delete across three files. When working in FlinkChangelogModeInferenceProgram I usually jump to one node and want all of its logic in one place, so I'd suggest other paths:

  1. Convert to Java - definitely +1.
  2. Move each node's logic into a registry (Map<Class, NodeHandler> or visitor-per-node). This keeps a node's logic together across traits and drops the instanceof ladder entirely, so "what does node X do" becomes a single lookup.
  3. Give repeated branches a name - many nodes just forward or do full-delete-if-updates.
  4. Name specific branches for readability, e.g. visitWithFallback(child, preferred, fallback) instead of Optional.or(() -> visit(child, fallback)).

Another option is splitting the instanceof ladder into one small private method per node (visitSink, visitJoin, visitCalc), so the top-level visit() becomes a short dispatch and each handler reads on its own. We already use this pattern in StreamNonDeterministicUpdatePlanVisitor. I lean toward the registry first, but glad to hear other opinions

Hi @gustavodemorais , yes that's definitely the plan :-) . But I want to do this refactoring in 2 phases. The first PR simply converts to Java with a few newly introduced classes like in this PR. The second PR will move each node into a registry (this is more involved and I did not want to complicate this initial PR).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants