Skip to content

[FLINK-39835][runtime] AsyncWaitOperator supports soft backpressure to avoid blocking unaligned checkpoint#28305

Open
pltbkd wants to merge 2 commits into
apache:masterfrom
pltbkd:FLINK-39835
Open

[FLINK-39835][runtime] AsyncWaitOperator supports soft backpressure to avoid blocking unaligned checkpoint#28305
pltbkd wants to merge 2 commits into
apache:masterfrom
pltbkd:FLINK-39835

Conversation

@pltbkd
Copy link
Copy Markdown
Contributor

@pltbkd pltbkd commented Jun 3, 2026

What is the purpose of the change

When AsyncWaitOperator processes input, if the async queue is full, it yields and waits for a callback to complete to free up a slot. During this time, unaligned checkpoints cannot be processed.

Soft backpressure is used to ensure UC can be processed in time. In the non-async path, processInput produces data written to the ResultPartition. If the RP is full, processInput can be blocked by soft backpressure so that mails can be executed.

The same idea applies to AsyncWaitOperator: on the async path, processInput produces requests written to the AsyncWaitOperator's queue — when the queue is full, processInput should be blocked. Similarly, results from AsyncWaitOperator produce data written to downstream (e.g., RP) — if downstream is full, AsyncWaitOperator's emitResult should also be blocked.

Brief change log

  • Add SupportsSoftBackpressure interface extending AvailabilityProvider, allowing operators to declare soft-backpressure capability and receive a downstream AvailabilityProvider.
  • OperatorChain checks instanceof SupportsSoftBackpressure and wires downstream availability to the operator; StreamTask feeds it into the mailbox soft-backpressure path.
  • AsyncWaitOperator implements SupportsSoftBackpressure, checks downstream availability before emitting and re-yields if not available.

Verifying this change

  • StreamOperatorChainingTest verifies chain availability provider wiring for both head and chained SupportsSoftBackpressure operators.
  • CompositeAvailabilityProviderTest covers the AND semantics.
  • AsyncWaitOperatorTest covers soft-backpressure pause/resume under simulated downstream unavailability.
  • Ordered/UnorderedStreamElementQueueTest cover the emit-side availability hook.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 3, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@davidradl davidradl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My AI says:

  1. Race Condition in AsyncWaitOperator.outputCompletedElement()

if (!downstreamAvailabilityProvider.isAvailable()) {
downstreamAvailabilityProvider.getAvailableFuture().thenRun(...)
}

Between checking isAvailable() and getting the future, state can change. Should capture future first, then check isDone().

  1. Potential Memory Leak
    Deferred callbacks via thenRun() may accumulate if mailbox rejects execution. Silent failure in catch block doesn't clean up pending state.

  2. Performance Concern
    CompositeAvailabilityProvider.getAvailableFuture() creates new combined future on every call by iterating all providers - expensive for long chains.

📝 Code Quality Issues
Inconsistent indentation in deferred callback
Missing comprehensive JavaDoc with usage examples
Magic numbers in tests (capacity=2) without explanation
No null checks on availabilityHelper initialization
🔄 Complexity
Nested callback chain (thenRun → mailboxExecutor.execute → outputCompletedElement) creates hard-to-debug control flow. Consider state machine or explicit deferred queue.

⚙️ Compatibility & Side Effects
Works with existing nodes: ✅ Yes - opt-in via instanceof checks

Side effects requiring documentation:

Performance: Additional availability checks on hot path (every element)
Latency: Elements delayed when downstream unavailable
Memory: Deferred callbacks accumulate during prolonged unavailability
Checkpoint timing: More in-flight data may be captured
Non-deterministic timing: Deferred emissions maintain order but timing varies

Migration concerns:
Custom backpressure implementations may conflict
Monitoring/metrics need updates for deferred emissions
Queue size tuning may need adjustment

Missing Documentation
User guide: When/why soft backpressure activates
Operator implementation guide for SupportsSoftBackpressure
Performance tuning guide
Migration impact guide
Troubleshooting deferred emissions

@pltbkd
Copy link
Copy Markdown
Contributor Author

pltbkd commented Jun 5, 2026

My AI says:

  1. Race Condition in AsyncWaitOperator.outputCompletedElement()

if (!downstreamAvailabilityProvider.isAvailable()) { downstreamAvailabilityProvider.getAvailableFuture().thenRun(...) }

Between checking isAvailable() and getting the future, state can change. Should capture future first, then check isDone().

  1. Potential Memory Leak
    Deferred callbacks via thenRun() may accumulate if mailbox rejects execution. Silent failure in catch block doesn't clean up pending state.
  2. Performance Concern
    CompositeAvailabilityProvider.getAvailableFuture() creates new combined future on every call by iterating all providers - expensive for long chains.

📝 Code Quality Issues Inconsistent indentation in deferred callback Missing comprehensive JavaDoc with usage examples Magic numbers in tests (capacity=2) without explanation No null checks on availabilityHelper initialization 🔄 Complexity Nested callback chain (thenRun → mailboxExecutor.execute → outputCompletedElement) creates hard-to-debug control flow. Consider state machine or explicit deferred queue.

⚙️ Compatibility & Side Effects Works with existing nodes: ✅ Yes - opt-in via instanceof checks

Side effects requiring documentation:

Performance: Additional availability checks on hot path (every element) Latency: Elements delayed when downstream unavailable Memory: Deferred callbacks accumulate during prolonged unavailability Checkpoint timing: More in-flight data may be captured Non-deterministic timing: Deferred emissions maintain order but timing varies

Migration concerns: Custom backpressure implementations may conflict Monitoring/metrics need updates for deferred emissions Queue size tuning may need adjustment

Missing Documentation User guide: When/why soft backpressure activates Operator implementation guide for SupportsSoftBackpressure Performance tuning guide Migration impact guide Troubleshooting deferred emissions

Thanks for the review! I'd like to clarify some points, some of which may be because AI reviewers sometimes miss certain Flink internal mechanisms, like mailbox.

  1. In this case, outputCompletedElement is always called in the mailbox, so there's no race condition issue: if downstream is available, the status won't change; if the downstream is not available at that time, another check will be done in the deferred mail, and status changes won't be an issue.
  2. A SupportsSoftBackpressure operator only needs to respect its direct downstream availability, but doesn't need its downstreams'. The complexity doesn't increase along with the chain length but the forks, so in the majority of cases an operator has only one downstream, and the composite falls back to a single availability, which has an optimized path.
  3. The inflight requests are managed by the element queue, including the completed but not-yet-emitted results. So the memory leak will not happen. The deferred callbacks are bounded by the queue capacity and are idempotent — redundant mails simply re-check and no-op.
  4. The SupportsSoftBackpressure is marked @internal, we don't expect external users to implement the interface. So I suppose the comment and example of AsyncWaitOperator is clear enough and no user-visible documentations should be added.
  5. The deferred entries remain in the inflight queue and are checkpointed along with other inflight requests — just like completed-but-waiting entries in ordered mode. Only the original input elements are persisted; on recovery they are replayed through asyncInvoke regardless of prior completion state. This preserves exactly-once semantics.

IMO the only real concern is the performance issue, that adding an availability check on the hot path, given that most cases don't need to build a real composite availability. However, for async-fusion operators the bottleneck is probably on the outside, so the cost is relatively low. On the other hand, with the soft backpressure supported, not only unaligned checkpoint, but also system notifications including canceling, failing over, checkpoint notifications, etc., will be guaranteed executed in time, without the fear of being blocked by downstream processing. I think the cost is worth paid. What do you think?

Thanks again for the review. Could you please take another manual or AI review with the explanations?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants