diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java index f8f6b8f8a588..63e0514df972 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java @@ -244,8 +244,10 @@ public final void onTrigger(final ProcessContext context, final ProcessSession s context.yield(); closeResource(resource); } catch (Exception e) { - getLogger().error("Processor failure", e); + getLogger().error("Processor failure, dropping the client", e); + session.rollback(); context.yield(); + closeResource(resource); } } @@ -284,8 +286,8 @@ private void closeResource(AMQPResource resource) { private AMQPResource createResource(final ProcessContext context) { Connection connection = null; try { - ExecutorService executor = Executors.newSingleThreadExecutor(BasicThreadFactory.builder() - .namingPattern("AMQP Consumer: " + getIdentifier()) + final ExecutorService executor = Executors.newSingleThreadExecutor(BasicThreadFactory.builder() + .namingPattern("AMQP Client: " + getIdentifier() + "-%d") .build()); connection = createConnection(context, executor); T worker = createAMQPWorker(context, connection);