From 0801a3b4ed42e855e936a17eb13e11e1764e8018 Mon Sep 17 00:00:00 2001 From: Claudio Mattioni Date: Wed, 13 May 2026 12:31:19 +0200 Subject: [PATCH] NIFI-15935 Clean up AMQP resources after processor failures --- .../nifi/amqp/processors/AbstractAMQPProcessor.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java index f8f6b8f8a588..63e0514df972 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java @@ -244,8 +244,10 @@ public final void onTrigger(final ProcessContext context, final ProcessSession s context.yield(); closeResource(resource); } catch (Exception e) { - getLogger().error("Processor failure", e); + getLogger().error("Processor failure, dropping the client", e); + session.rollback(); context.yield(); + closeResource(resource); } } @@ -284,8 +286,8 @@ private void closeResource(AMQPResource resource) { private AMQPResource createResource(final ProcessContext context) { Connection connection = null; try { - ExecutorService executor = Executors.newSingleThreadExecutor(BasicThreadFactory.builder() - .namingPattern("AMQP Consumer: " + getIdentifier()) + final ExecutorService executor = Executors.newSingleThreadExecutor(BasicThreadFactory.builder() + .namingPattern("AMQP Client: " + getIdentifier() + "-%d") .build()); connection = createConnection(context, executor); T worker = createAMQPWorker(context, connection);