-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][client] Fix producer synchronous retry handling in failPendingMessages method #25207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Show resolved
Hide resolved
|
great work @sandeep-mst . Regarding the tests, would it be possible to add a reproducer similar as described in #25201? (test without mocking the client, just using |
a28f9d5 to
85be95a
Compare
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great work @sandeep-mst . Just a wish to add a test mentioned in the description.
Will add it if possible. That test will go to |
I think the current unit tests are fine. The additional test would be a bit like an integration test. In Pulsar, the tests which use the |
| Field pendingField = ProducerImpl.class.getDeclaredField("pendingMessages"); | ||
| pendingField.setAccessible(true); | ||
| pendingField.set(producer, pendingQueue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In many cases we use org.apache.commons.lang3.reflect.FieldUtils or add a getter to the field (could use @Getter(value = AccessLevel.PACKAGE, onMethod_ = @__({@VisibleForTesting})) to add package protected getter in Lombok.
Fixes #25201
Motivation
Fix a re-entrancy bug in
ProducerImpl.failPendingMessages. While executing on the timer,failPendingMessagesinvokessendComplete(ex)on pending messages, which can synchronously trigger a retry from client code. The subsequentpendingMessages.clear()then removes the newly enqueued retry operation, leaving the retry’s CompletableFuture unresolved and the client in a limbo state.Modifications
Updated
failPendingMessagesto first drain thependingMessagesqueue into a local list and clear the queue before iterating. This prevents re-entrant retries triggered during sendComplete from being inadvertently cleared.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository:
cognitree#29