Skip to content

Conversation

@sandeep-mst
Copy link
Contributor

Fixes #25201

Motivation

Fix a re-entrancy bug in ProducerImpl.failPendingMessages. While executing on the timer, failPendingMessages invokes sendComplete(ex) on pending messages, which can synchronously trigger a retry from client code. The subsequent pendingMessages.clear() then removes the newly enqueued retry operation, leaving the retry’s CompletableFuture unresolved and the client in a limbo state.

Modifications

Updated failPendingMessages to first drain the pendingMessages queue into a local list and clear the queue before iterating. This prevents re-entrant retries triggered during sendComplete from being inadvertently cleared.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • org.apache.pulsar.client.impl.ProducerImplTest#testFailPendingMessagesSyncRetry

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:
cognitree#29

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 3, 2026
@lhotari
Copy link
Member

lhotari commented Feb 4, 2026

great work @sandeep-mst .

Regarding the tests, would it be possible to add a reproducer similar as described in #25201? (test without mocking the client, just using MockedPulsarServiceBaseTest base class)

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great work @sandeep-mst . Just a wish to add a test mentioned in the description.

@sandeep-mst
Copy link
Contributor Author

sandeep-mst commented Feb 4, 2026

LGTM, great work @sandeep-mst . Just a wish to add a test mentioned in the description.

Will add it if possible. That test will go to pulsar-broker module. Should I remove the currently added test afterwards?

@lhotari
Copy link
Member

lhotari commented Feb 4, 2026

LGTM, great work @sandeep-mst . Just a wish to add a test mentioned in the description.

Will add it if possible. That test will go to pulsar-broker module. Should I remove the currently added test afterwards?

I think the current unit tests are fine. The additional test would be a bit like an integration test. In Pulsar, the tests which use the MockedPulsarServiceBaseTest base class are in most cases integration tests although we call them "unit tests" in CI.

Comment on lines +88 to +90
Field pendingField = ProducerImpl.class.getDeclaredField("pendingMessages");
pendingField.setAccessible(true);
pendingField.set(producer, pendingQueue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In many cases we use org.apache.commons.lang3.reflect.FieldUtils or add a getter to the field (could use @Getter(value = AccessLevel.PACKAGE, onMethod_ = @__({@VisibleForTesting})) to add package protected getter in Lombok.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Producer synchronous retries can cause retry sendAsync future to never complete

4 participants