[FLINK-38939][runtime] Pause sources until the 1st checkpoint to prioritize processing recovered records#27440
Conversation
1996fanrui
left a comment
There was a problem hiding this comment.
Hey @rkhachatryan , thanks for the PR, I have several questions about this approach.
Pause sources until the 1st checkpoint to prioritize processing recovered records
Don't pull any data from sources until the first checkpoint is triggered.
If so, the source does not work even if all recovered buffers are consumed, right?
Let me understand the existing issue and the current approach:
- The task will be switched from INITIALIZATION to RUNNING once all recovered input buffers and output buffers are consumed.
- The recovered buffer of some input channels are fully consumed, and there are some new buffers is coming. The recovered buffer of rest of channels are not fully consumed.
Issue: If task starts consume new buffers before all recovered buffer are consumed, it will be switched to running later.
IIUC,, the purpose of pause source is avoid new buffers are generated. Is it correct?
If so, I do not think it works perfect since new buffers can be generated from the recovered buffers of upstream task. Of course, pause source could avoid new buffers from outside of flink during recovery.
Blocking channels whose recovered buffers are fully consumed maybe more fine-grained that pausing source, it allows task consumes recovered buffers before new buffers, as well as the upstream tasks and source are not blocked as early as possible.
Also, FLIP-547 part 4.6 will introduce fine-grained blocking mechanism. Not sure whether pausing source is still needed if new mechanism will be introduced in the near future?
Looking forward to your opinion, thanks
|
Yes @1996fanrui, you're right. The purpose of this change is to prevent new input records from delaying the switch of the downstream tasks to RUNNING. In a sense, this is a lightweight alternative to FLIP-547. |
1996fanrui
left a comment
There was a problem hiding this comment.
Thanks @rkhachatryan for the comment!
Sounds make sense for considering this approach as a lightweight alternative first. I only left one comment, please take a look when you are available, thanks
2bb4a3a to
b4ff432
Compare
Efrat19
left a comment
There was a problem hiding this comment.
Thank you for these valuable contributions
|
Thanks for the reviews! |
|
Reopening to address test failures |
d876c70 to
533aff7
Compare
|
@flinkbot run azure |
…er is received This allows to prioritize processing of recovered records (when recovering from an unaligned checkpoint)
…f checkpointing interval The check doesn't make sense because checkpointing might be disabled before recovery; or there might be a manual checkpoint.
…CoordinatorConfiguration
…g for a checkpoint
With more frequent checkpoints, ids can be duplicated in RestoreUpgradedJobITCase. This change adds a sipmle deduplication before the assertion.
|
Rebased. Previous test failures were unrelated:
I believe I've addressed all the feedback, thanks again for the reviews. Will be merging this PR. |
… watermark After FLINK-38939 / apache#27440, if the source operator was stopped while waiting for the first checkpoint then the output needs to be initialized so final watermark can be emitted; otherwise, final checkpoint might fail with java.lang.IllegalStateException This commit fixes the issue by calling initializeMainOutput if necessary.
… watermark After FLINK-38939 / #27440, if the source operator was stopped while waiting for the first checkpoint then the output needs to be initialized so final watermark can be emitted; otherwise, final checkpoint might fail with java.lang.IllegalStateException This commit fixes the issue by calling initializeMainOutput if necessary.
… watermark After FLINK-38939 / apache#27440, if the source operator was stopped while waiting for the first checkpoint then the output needs to be initialized so final watermark can be emitted; otherwise, final checkpoint might fail with java.lang.IllegalStateException This commit fixes the issue by calling initializeMainOutput if necessary.
No description provided.