Skip to content

NIFI-15935 Clean up AMQP resources after processor failures#11243

Open
ing-mattioni wants to merge 1 commit into
apache:mainfrom
ing-mattioni:NIFI-15935-amqp-processor-cleanup
Open

NIFI-15935 Clean up AMQP resources after processor failures#11243
ing-mattioni wants to merge 1 commit into
apache:mainfrom
ing-mattioni:NIFI-15935-amqp-processor-cleanup

Conversation

@ing-mattioni
Copy link
Copy Markdown

Summary

NIFI-15935

This pull request updates AbstractAMQPProcessor to clean up AMQP resources more consistently after unexpected processor failures.

When processResource() throws an unexpected exception, the processor currently logs the failure and yields the context. This change also rolls back the ProcessSession and closes the AMQP resource, ensuring that the client is dropped and recreated instead of potentially remaining in an inconsistent state.

This change also ensures that the AMQP executor service is shut down when resource creation fails after the executor has been created.

Changes

  • Rolls back the ProcessSession after unexpected processor failures
  • Closes the AMQP resource after unexpected processor failures
  • Shuts down the AMQP executor service when resource creation fails
  • Updates AMQP client executor thread naming to use a generic client name

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Module-level verification performed:

.\mvnw.cmd -pl :nifi-amqp-processors -am test

Additional verification performed:

.\mvnw.cmd -pl :nifi-amqp-processors -am -P contrib-check install -DskipTests

Licensing

  • New dependencies are compatible with the Apache License 2.0
  • New dependencies are documented in applicable LICENSE and NOTICE files

No new dependencies were added.

Documentation

  • Documentation formatting appears as expected in rendered files

No user-facing documentation files were changed.

Copy link
Copy Markdown
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for proposing improvements @ing-mattioni.

The try-catch around session.rollback() is not correct and should be removed.

Closing the ExecutorService also appears to be incorrect.

The goal of improving resource handling is good, but this needs to be reconsidered for the broader implications on exception handling.

@ing-mattioni ing-mattioni force-pushed the NIFI-15935-amqp-processor-cleanup branch from 0676182 to 0801a3b Compare May 18, 2026 08:58
@ing-mattioni
Copy link
Copy Markdown
Author

Thanks for proposing improvements @ing-mattioni.

The try-catch around session.rollback() is not correct and should be removed.

Closing the ExecutorService also appears to be incorrect.

The goal of improving resource handling is good, but this needs to be reconsidered for the broader implications on exception handling.

Updated based on the review. The rollback call is no longer wrapped in a try/catch, and the resource creation failure path no longer closes the ExecutorService directly. The generic exception path still rolls back the session and drops the AMQP client resource.

@ing-mattioni
Copy link
Copy Markdown
Author

Thank you @exceptionfactory , I made the following changes:

  • Removed the try/catch wrapper around session.rollback().
  • Removed direct executor.shutdownNow() from the resource-creation failure path.
  • Kept the improved generic failure handling that rolls back the session, yields, and drops the AMQP client resource.

Comment on lines 246 to 251
} catch (Exception e) {
getLogger().error("Processor failure", e);
getLogger().error("Processor failure, dropping the client", e);
session.rollback();
context.yield();
closeResource(resource);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ing-mattioni The original idea was that all client-related exceptions are wrapped into AMQP*Exception in processResource() method and the caller needs to close/recreate the resources only for those exception types but not for generic exceptions.

In case of a generic exception the client should be fine. So I'm afraid recreating the client will not help and the rolled back FlowFile will fail again and again causing an infinite loop.

Could you please give an example of a generic error when close + createResource() helps?

Copy link
Copy Markdown
Contributor

@turcsanyip turcsanyip May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I understand what I missed here. When processResource() throws a generic exception, the client is still alive but it is no longer available for the processor because resourceQueue.offer(resource) will not be called due to the exception.

So closing the client seems to be right way. In this case, basic.nack in NIFI-15892 is not really needed because it just redelivers the message to the client that cannot work anymore and must be closed. The real remedy is that the client gets closed and all locally buffered (unacked) messages will be discarded.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turcsanyip so, if I understood correctly, you want to merge this PR to close both this issue and NIFI-15892, while not merging the PR opened for NIFI-15982 because the basic.nack is not really needed. Is it correct?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ing-mattioni Yes, correct. Given that basic.nack call does not really contribute to the fix, I would not merge that PR if you and @exceptionfactory agree.

The real fix is to close the client so that all pending (unacknowledged) messages become available for the broker to redeliver to the new client or other existing clients. The negative acknowledgment happens too early, and the messages may be redelivered to the same client that is about to be closed. Closing the client is simply enough and handles both the failed messages (that could be nacked optionally) and the ones in the client's local buffer (unprocessed yet but already fetched from the broker).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turcsanyip agreed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants