Skip to content

[FLINK-38939][runtime] Pause sources until the 1st checkpoint to prioritize processing recovered records#27440

Merged
rkhachatryan merged 7 commits intoapache:masterfrom
rkhachatryan:f38939
Feb 18, 2026
Merged

[FLINK-38939][runtime] Pause sources until the 1st checkpoint to prioritize processing recovered records#27440
rkhachatryan merged 7 commits intoapache:masterfrom
rkhachatryan:f38939

Conversation

@rkhachatryan
Copy link
Copy Markdown
Contributor

No description provided.

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jan 19, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@rkhachatryan rkhachatryan marked this pull request as ready for review January 19, 2026 12:54
Comment thread docs/layouts/shortcodes/generated/pipeline_configuration.html Outdated
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jan 20, 2026
Copy link
Copy Markdown
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rkhachatryan , thanks for the PR, I have several questions about this approach.

Pause sources until the 1st checkpoint to prioritize processing recovered records
Don't pull any data from sources until the first checkpoint is triggered.

If so, the source does not work even if all recovered buffers are consumed, right?


Let me understand the existing issue and the current approach:

  1. The task will be switched from INITIALIZATION to RUNNING once all recovered input buffers and output buffers are consumed.
  2. The recovered buffer of some input channels are fully consumed, and there are some new buffers is coming. The recovered buffer of rest of channels are not fully consumed.

Issue: If task starts consume new buffers before all recovered buffer are consumed, it will be switched to running later.

IIUC,, the purpose of pause source is avoid new buffers are generated. Is it correct?

If so, I do not think it works perfect since new buffers can be generated from the recovered buffers of upstream task. Of course, pause source could avoid new buffers from outside of flink during recovery.

Blocking channels whose recovered buffers are fully consumed maybe more fine-grained that pausing source, it allows task consumes recovered buffers before new buffers, as well as the upstream tasks and source are not blocked as early as possible.

Also, FLIP-547 part 4.6 will introduce fine-grained blocking mechanism. Not sure whether pausing source is still needed if new mechanism will be introduced in the near future?

Looking forward to your opinion, thanks

@rkhachatryan
Copy link
Copy Markdown
Contributor Author

Yes @1996fanrui, you're right.

The purpose of this change is to prevent new input records from delaying the switch of the downstream tasks to RUNNING.
This doesn't help in every case; an example where it is useful is two JOINed sources, where one of the channels has a lot of checkpointed buffers; and the other one is fast in producing new data (from source).

In a sense, this is a lightweight alternative to FLIP-547.
To my understanding the timeline of implementing and stabilizing FLIP-547 is relatively long (PCMIIW), so this feature still makes sense.

Copy link
Copy Markdown
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rkhachatryan for the comment!

Sounds make sense for considering this approach as a lightweight alternative first. I only left one comment, please take a look when you are available, thanks

Comment thread flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java Outdated
@1996fanrui 1996fanrui self-assigned this Jan 23, 2026
Copy link
Copy Markdown
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM assuming CI is green

@rkhachatryan rkhachatryan force-pushed the f38939 branch 3 times, most recently from 2bb4a3a to b4ff432 Compare January 24, 2026 17:18
Copy link
Copy Markdown
Contributor

@Efrat19 Efrat19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these valuable contributions

@rkhachatryan
Copy link
Copy Markdown
Contributor Author

Thanks for the reviews!
I'm closing this PR in favor of #27589 - it includes this PR commits

@rkhachatryan
Copy link
Copy Markdown
Contributor Author

Reopening to address test failures

@rkhachatryan rkhachatryan reopened this Feb 13, 2026
@rkhachatryan rkhachatryan force-pushed the f38939 branch 3 times, most recently from d876c70 to 533aff7 Compare February 17, 2026 23:36
@rkhachatryan
Copy link
Copy Markdown
Contributor Author

@flinkbot run azure

…er is received

This allows to prioritize processing of recovered records
(when recovering from an unaligned checkpoint)
…f checkpointing interval

The check doesn't make sense because checkpointing might be disabled
before recovery; or there might be a manual checkpoint.
With more frequent checkpoints, ids can be duplicated in RestoreUpgradedJobITCase.
This change adds a sipmle deduplication before the assertion.
@rkhachatryan
Copy link
Copy Markdown
Contributor Author

Rebased.

Previous test failures were unrelated:

I believe I've addressed all the feedback, thanks again for the reviews.

Will be merging this PR.

@rkhachatryan rkhachatryan merged commit ae0c1e6 into apache:master Feb 18, 2026
@rkhachatryan rkhachatryan deleted the f38939 branch February 18, 2026 23:14
rkhachatryan added a commit to rkhachatryan/flink that referenced this pull request Feb 26, 2026
… watermark

After FLINK-38939 / apache#27440, if the source operator was stopped while waiting for
the first checkpoint then the output needs to be initialized so final watermark
can be emitted; otherwise, final checkpoint might fail with java.lang.IllegalStateException

This commit fixes the issue by calling initializeMainOutput if necessary.
rkhachatryan added a commit that referenced this pull request Feb 27, 2026
… watermark

After FLINK-38939 / #27440, if the source operator was stopped while waiting for
the first checkpoint then the output needs to be initialized so final watermark
can be emitted; otherwise, final checkpoint might fail with java.lang.IllegalStateException

This commit fixes the issue by calling initializeMainOutput if necessary.
Shekharrajak pushed a commit to Shekharrajak/flink that referenced this pull request Apr 2, 2026
… watermark

After FLINK-38939 / apache#27440, if the source operator was stopped while waiting for
the first checkpoint then the output needs to be initialized so final watermark
can be emitted; otherwise, final checkpoint might fail with java.lang.IllegalStateException

This commit fixes the issue by calling initializeMainOutput if necessary.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants