Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,10 @@ public final void onTrigger(final ProcessContext context, final ProcessSession s
context.yield();
closeResource(resource);
} catch (Exception e) {
getLogger().error("Processor failure", e);
getLogger().error("Processor failure, dropping the client", e);
session.rollback();
context.yield();
closeResource(resource);
}
Comment on lines 246 to 251
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ing-mattioni The original idea was that all client-related exceptions are wrapped into AMQP*Exception in processResource() method and the caller needs to close/recreate the resources only for those exception types but not for generic exceptions.

In case of a generic exception the client should be fine. So I'm afraid recreating the client will not help and the rolled back FlowFile will fail again and again causing an infinite loop.

Could you please give an example of a generic error when close + createResource() helps?

Copy link
Copy Markdown
Contributor

@turcsanyip turcsanyip May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I understand what I missed here. When processResource() throws a generic exception, the client is still alive but it is no longer available for the processor because resourceQueue.offer(resource) will not be called due to the exception.

So closing the client seems to be right way. In this case, basic.nack in NIFI-15892 is not really needed because it just redelivers the message to the client that cannot work anymore and must be closed. The real remedy is that the client gets closed and all locally buffered (unacked) messages will be discarded.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turcsanyip so, if I understood correctly, you want to merge this PR to close both this issue and NIFI-15892, while not merging the PR opened for NIFI-15982 because the basic.nack is not really needed. Is it correct?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ing-mattioni Yes, correct. Given that basic.nack call does not really contribute to the fix, I would not merge that PR if you and @exceptionfactory agree.

The real fix is to close the client so that all pending (unacknowledged) messages become available for the broker to redeliver to the new client or other existing clients. The negative acknowledgment happens too early, and the messages may be redelivered to the same client that is about to be closed. Closing the client is simply enough and handles both the failed messages (that could be nacked optionally) and the ones in the client's local buffer (unprocessed yet but already fetched from the broker).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turcsanyip agreed

}

Expand Down Expand Up @@ -284,8 +286,8 @@ private void closeResource(AMQPResource<T> resource) {
private AMQPResource<T> createResource(final ProcessContext context) {
Connection connection = null;
try {
ExecutorService executor = Executors.newSingleThreadExecutor(BasicThreadFactory.builder()
.namingPattern("AMQP Consumer: " + getIdentifier())
final ExecutorService executor = Executors.newSingleThreadExecutor(BasicThreadFactory.builder()
.namingPattern("AMQP Client: " + getIdentifier() + "-%d")
.build());
connection = createConnection(context, executor);
T worker = createAMQPWorker(context, connection);
Expand Down
Loading