[FLINK-39835][runtime] AsyncWaitOperator supports soft backpressure to avoid blocking unaligned checkpoint#28305
[FLINK-39835][runtime] AsyncWaitOperator supports soft backpressure to avoid blocking unaligned checkpoint#28305pltbkd wants to merge 2 commits into
Conversation
…vailability plumbing
davidradl
left a comment
There was a problem hiding this comment.
My AI says:
- Race Condition in AsyncWaitOperator.outputCompletedElement()
if (!downstreamAvailabilityProvider.isAvailable()) {
downstreamAvailabilityProvider.getAvailableFuture().thenRun(...)
}
Between checking isAvailable() and getting the future, state can change. Should capture future first, then check isDone().
-
Potential Memory Leak
Deferred callbacks via thenRun() may accumulate if mailbox rejects execution. Silent failure in catch block doesn't clean up pending state. -
Performance Concern
CompositeAvailabilityProvider.getAvailableFuture() creates new combined future on every call by iterating all providers - expensive for long chains.
📝 Code Quality Issues
Inconsistent indentation in deferred callback
Missing comprehensive JavaDoc with usage examples
Magic numbers in tests (capacity=2) without explanation
No null checks on availabilityHelper initialization
🔄 Complexity
Nested callback chain (thenRun → mailboxExecutor.execute → outputCompletedElement) creates hard-to-debug control flow. Consider state machine or explicit deferred queue.
⚙️ Compatibility & Side Effects
Works with existing nodes: ✅ Yes - opt-in via instanceof checks
Side effects requiring documentation:
Performance: Additional availability checks on hot path (every element)
Latency: Elements delayed when downstream unavailable
Memory: Deferred callbacks accumulate during prolonged unavailability
Checkpoint timing: More in-flight data may be captured
Non-deterministic timing: Deferred emissions maintain order but timing varies
Migration concerns:
Custom backpressure implementations may conflict
Monitoring/metrics need updates for deferred emissions
Queue size tuning may need adjustment
Missing Documentation
User guide: When/why soft backpressure activates
Operator implementation guide for SupportsSoftBackpressure
Performance tuning guide
Migration impact guide
Troubleshooting deferred emissions
Thanks for the review! I'd like to clarify some points, some of which may be because AI reviewers sometimes miss certain Flink internal mechanisms, like mailbox.
IMO the only real concern is the performance issue, that adding an availability check on the hot path, given that most cases don't need to build a real composite availability. However, for async-fusion operators the bottleneck is probably on the outside, so the cost is relatively low. On the other hand, with the soft backpressure supported, not only unaligned checkpoint, but also system notifications including canceling, failing over, checkpoint notifications, etc., will be guaranteed executed in time, without the fear of being blocked by downstream processing. I think the cost is worth paid. What do you think? Thanks again for the review. Could you please take another manual or AI review with the explanations? |
What is the purpose of the change
When AsyncWaitOperator processes input, if the async queue is full, it yields and waits for a callback to complete to free up a slot. During this time, unaligned checkpoints cannot be processed.
Soft backpressure is used to ensure UC can be processed in time. In the non-async path,
processInputproduces data written to the ResultPartition. If the RP is full,processInputcan be blocked by soft backpressure so that mails can be executed.The same idea applies to AsyncWaitOperator: on the async path,
processInputproduces requests written to the AsyncWaitOperator's queue — when the queue is full,processInputshould be blocked. Similarly, results from AsyncWaitOperator produce data written to downstream (e.g., RP) — if downstream is full, AsyncWaitOperator'semitResultshould also be blocked.Brief change log
SupportsSoftBackpressureinterface extending AvailabilityProvider, allowing operators to declare soft-backpressure capability and receive a downstream AvailabilityProvider.instanceof SupportsSoftBackpressureand wires downstream availability to the operator; StreamTask feeds it into the mailbox soft-backpressure path.SupportsSoftBackpressure, checks downstream availability before emitting and re-yields if not available.Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation