NIFI-15935 Clean up AMQP resources after processor failures#11243
NIFI-15935 Clean up AMQP resources after processor failures#11243ing-mattioni wants to merge 1 commit into
Conversation
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for proposing improvements @ing-mattioni.
The try-catch around session.rollback() is not correct and should be removed.
Closing the ExecutorService also appears to be incorrect.
The goal of improving resource handling is good, but this needs to be reconsidered for the broader implications on exception handling.
0676182 to
0801a3b
Compare
Updated based on the review. The rollback call is no longer wrapped in a try/catch, and the resource creation failure path no longer closes the ExecutorService directly. The generic exception path still rolls back the session and drops the AMQP client resource. |
|
Thank you @exceptionfactory , I made the following changes:
|
| } catch (Exception e) { | ||
| getLogger().error("Processor failure", e); | ||
| getLogger().error("Processor failure, dropping the client", e); | ||
| session.rollback(); | ||
| context.yield(); | ||
| closeResource(resource); | ||
| } |
There was a problem hiding this comment.
@ing-mattioni The original idea was that all client-related exceptions are wrapped into AMQP*Exception in processResource() method and the caller needs to close/recreate the resources only for those exception types but not for generic exceptions.
In case of a generic exception the client should be fine. So I'm afraid recreating the client will not help and the rolled back FlowFile will fail again and again causing an infinite loop.
Could you please give an example of a generic error when close + createResource() helps?
There was a problem hiding this comment.
OK, I think I understand what I missed here. When processResource() throws a generic exception, the client is still alive but it is no longer available for the processor because resourceQueue.offer(resource) will not be called due to the exception.
So closing the client seems to be right way. In this case, basic.nack in NIFI-15892 is not really needed because it just redelivers the message to the client that cannot work anymore and must be closed. The real remedy is that the client gets closed and all locally buffered (unacked) messages will be discarded.
There was a problem hiding this comment.
@turcsanyip so, if I understood correctly, you want to merge this PR to close both this issue and NIFI-15892, while not merging the PR opened for NIFI-15982 because the basic.nack is not really needed. Is it correct?
There was a problem hiding this comment.
@ing-mattioni Yes, correct. Given that basic.nack call does not really contribute to the fix, I would not merge that PR if you and @exceptionfactory agree.
The real fix is to close the client so that all pending (unacknowledged) messages become available for the broker to redeliver to the new client or other existing clients. The negative acknowledgment happens too early, and the messages may be redelivered to the same client that is about to be closed. Closing the client is simply enough and handles both the failed messages (that could be nacked optionally) and the ones in the client's local buffer (unprocessed yet but already fetched from the broker).
Summary
NIFI-15935
This pull request updates
AbstractAMQPProcessorto clean up AMQP resources more consistently after unexpected processor failures.When
processResource()throws an unexpected exception, the processor currently logs the failure and yields the context. This change also rolls back theProcessSessionand closes the AMQP resource, ensuring that the client is dropped and recreated instead of potentially remaining in an inconsistent state.This change also ensures that the AMQP executor service is shut down when resource creation fails after the executor has been created.
Changes
ProcessSessionafter unexpected processor failuresTracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkModule-level verification performed:
.\mvnw.cmd -pl :nifi-amqp-processors -am testAdditional verification performed:
.\mvnw.cmd -pl :nifi-amqp-processors -am -P contrib-check install -DskipTestsLicensing
LICENSEandNOTICEfilesNo new dependencies were added.
Documentation
No user-facing documentation files were changed.