diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index 27a64bbb1ffb..fdb1b6ff7207 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -20,10 +20,13 @@ import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ReturnListener; +import com.rabbitmq.client.ShutdownSignalException; import org.apache.nifi.logging.ComponentLog; import java.io.IOException; import java.net.SocketException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** * Generic publisher of messages to AMQP-based messaging system. It is based on @@ -32,17 +35,40 @@ final class AMQPPublisher extends AMQPWorker { private final String connectionString; + private final boolean useConfirms; + + /** + * Stores the broker's return reason when a message is published with mandatory=true + * but the broker cannot route it to any queue. Written by the AMQP I/O thread via + * {@link UndeliverableMessageLogger} and read by the publishing thread after + * {@link com.rabbitmq.client.Channel#waitForConfirms} synchronizes the two. + * Only populated when {@link #useConfirms} is true. + */ + private final AtomicReference undeliverableReturnReason = new AtomicReference<>(null); /** * Creates an instance of this publisher * * @param connection instance of AMQP {@link Connection} + * @param useConfirms when true, enables RabbitMQ Publisher Confirms so that + * {@link #publish} waits for a broker ack/nack and reliably + * detects undeliverable messages; when false, the original + * fire-and-forget behaviour is used for maximum throughput */ - AMQPPublisher(Connection connection, ComponentLog processorLog) { + AMQPPublisher(Connection connection, ComponentLog processorLog, boolean useConfirms) { super(connection, processorLog); + this.useConfirms = useConfirms; getChannel().addReturnListener(new UndeliverableMessageLogger()); this.connectionString = connection.toString(); + if (useConfirms) { + try { + getChannel().confirmSelect(); + } catch (final IOException e) { + throw new AMQPException("Failed to enable Publisher Confirms on AMQP channel", e); + } + } + processorLog.info("Successfully connected AMQPPublisher to {}", this.connectionString); } @@ -68,6 +94,9 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey); } + // Reset any stale return reason from a previous publish before sending. + undeliverableReturnReason.set(null); + try { getChannel().basicPublish(exchange, routingKey, true, properties, bytes); } catch (AlreadyClosedException | SocketException e) { @@ -75,6 +104,33 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String } catch (Exception e) { throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e); } + + if (useConfirms) { + // Wait for the broker's publish confirm (ack/nack). Because the broker sends a basic.return + // frame BEFORE the corresponding confirm frame for mandatory messages it cannot route, + // UndeliverableMessageLogger.handleReturn() is guaranteed to have run by the time + // waitForConfirms() returns. This makes undeliverable-message detection reliable. + try { + if (!getChannel().waitForConfirms(5_000L)) { + throw new AMQPException("Broker negatively acknowledged (NACK) message published to Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AMQPException("Interrupted while waiting for publish confirmation from broker", e); + } catch (TimeoutException e) { + throw new AMQPException("Timed out waiting for publish confirmation from broker for Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'", e); + } catch (ShutdownSignalException e) { + throw new AMQPException("Broker closed channel while waiting for publish confirmation — " + + "Exchange '" + exchange + "' may not exist: " + e.getMessage(), e); + } + + final String returnReason = undeliverableReturnReason.get(); + if (returnReason != null) { + throw new AMQPException(returnReason); + } + } } @Override @@ -83,23 +139,26 @@ public String toString() { } /** - * Listener to listen and WARN-log undeliverable messages which are returned - * back to the sender. Since in the current implementation messages are sent - * with 'mandatory' bit set, such messages must have final destination - * otherwise they are silently dropped which could cause a confusion - * especially during early stages of flow development. This implies that - * bindings between exchange -> routingKey -> queue must exist and are - * typically done by AMQP administrator. This logger simply helps to monitor - * for such conditions by logging such messages as warning. In the future - * this can be extended to provide other type of functionality (e.g., fail - * processor etc.) + * Listens for messages returned by the broker when they cannot be routed to any queue + * (mandatory=true publish with no matching binding). + * + * In {@link PublishAMQP.DeliveryGuarantee#AT_MOST_ONCE} mode (the default), this listener + * only logs a warning — matching the original behaviour. + * + * In {@link PublishAMQP.DeliveryGuarantee#AT_LEAST_ONCE} mode, the return reason is also + * stored in {@link #undeliverableReturnReason} so that {@link #publish} can detect it after + * {@code waitForConfirms()} synchronizes the two threads and throw an {@link AMQPException} + * to trigger REL_FAILURE routing. */ private final class UndeliverableMessageLogger implements ReturnListener { @Override public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException { - String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey - + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + "."; - processorLog.warn(logMessage); + final String reason = "Message returned as undeliverable by broker: exchange='" + exchangeName + + "' routingKey='" + routingKey + "' replyCode=" + replyCode + " replyText='" + replyText + "'"; + if (useConfirms) { + undeliverableReturnReason.set(reason); + } + processorLog.warn(reason); } } } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index 97bc07a6f710..20ffe02f1744 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -95,6 +95,19 @@ public class PublishAMQP extends AbstractAMQPProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("Delivery Guarantee") + .description("Controls whether the processor waits for a publish confirmation (broker ack/nack) before routing the FlowFile. " + + "\"At least once\" enables RabbitMQ Publisher Confirms: the processor blocks until the broker acknowledges the message, " + + "and undeliverable messages (no matching queue binding) are reliably routed to 'failure'. " + + "This prevents silent data loss at the cost of significantly higher latency, especially with remote brokers. " + + "\"At most once\" uses the original fire-and-forget mode: the message is sent without waiting for confirmation. " + + "Undeliverable messages are only logged as a warning and the FlowFile is still routed to 'success'. " + + "This mode offers maximum throughput but provides no delivery guarantee.") + .required(true) + .allowableValues(DeliveryGuarantee.class) + .defaultValue(DeliveryGuarantee.AT_MOST_ONCE) + .build(); public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() .name("Headers Source") .description("The source of the headers which will be applied to the published message.") @@ -136,6 +149,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { Stream.of( EXCHANGE, ROUTING_KEY, + DELIVERY_GUARANTEE, HEADERS_SOURCE, HEADERS_PATTERN, HEADER_SEPARATOR @@ -207,7 +221,8 @@ public Set getRelationships() { @Override protected AMQPPublisher createAMQPWorker(final ProcessContext context, final Connection connection) { - return new AMQPPublisher(connection, getLogger()); + final boolean useConfirms = DeliveryGuarantee.AT_LEAST_ONCE == context.getProperty(DELIVERY_GUARANTEE).asAllowableValue(DeliveryGuarantee.class); + return new AMQPPublisher(connection, getLogger(), useConfirms); } @Override @@ -346,6 +361,42 @@ protected Character getHeaderSeparator(ProcessContext context, InputHeaderSource }; } + public enum DeliveryGuarantee implements DescribedValue { + + AT_MOST_ONCE("At most once", + "Fire-and-forget: message is sent without waiting for a broker acknowledgement. " + + "Undeliverable messages (no matching queue binding) are logged as a warning and " + + "the FlowFile is routed to 'success'. Offers maximum throughput."), + AT_LEAST_ONCE("At least once", + "Publisher Confirms are enabled: the processor blocks until the broker acknowledges " + + "the message (ack or nack). Undeliverable messages are reliably detected and routed " + + "to 'failure'. Prevents silent data loss at the cost of higher latency, particularly " + + "with remote brokers."); + + private final String displayName; + private final String description; + + DeliveryGuarantee(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + } + public enum InputHeaderSource implements DescribedValue { FLOWFILE_ATTRIBUTES("FlowFile Attributes", "Select FlowFile Attributes based on regular expression pattern for event headers. Key of the matching attribute will be used as header key"), diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java index 84d7c5528f58..24fa039b07a0 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -37,17 +37,16 @@ public class AMQPPublisherTest { - @SuppressWarnings("resource") @Test public void failOnNullConnection() { - assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null)); + assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null, false)); } @Test public void failPublishIfChannelClosed() { assertThrows(AMQPRollbackException.class, () -> { Connection conn = new TestConnection(null, null); - try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), false)) { conn.close(); sender.publish("oleg".getBytes(), null, "foo", ""); } @@ -58,7 +57,7 @@ public void failPublishIfChannelClosed() { public void failPublishIfChannelFails() { assertThrows(AMQPException.class, () -> { TestConnection conn = new TestConnection(null, null); - try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), false)) { ((TestChannel) conn.createChannel()).corruptChannel(); sender.publish("oleg".getBytes(), null, "foo", ""); } @@ -74,7 +73,7 @@ public void validateSuccessfulPublishingAndRouting() throws Exception { Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), null, "key1", "myExchange"); } @@ -96,7 +95,7 @@ public void validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce ReturnListener retListener = mock(ReturnListener.class); connection.createChannel().addReturnListener(retListener); - try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""), false)) { sender.publish("hello".getBytes(), null, "key1", "myExchange"); } @@ -105,4 +104,66 @@ public void validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce connection.close(); } + /** + * Verifies that a {@link com.rabbitmq.client.ShutdownSignalException} thrown by + * {@code waitForConfirms()} (e.g., broker closes channel with 404 NOT_FOUND because the + * exchange does not exist) is converted to {@link AMQPException} so the FlowFile routes + * to REL_FAILURE instead of surfacing as an unhandled processor error. + */ + @Test + public void failPublishWhenBrokerClosesChannelDuringConfirmInAtLeastOnce() { + assertThrows(AMQPException.class, () -> { + TestConnection conn = new TestConnection(null, null); + conn.getTestChannel().setSimulateShutdownOnConfirm(true); + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), true)) { + sender.publish("hello".getBytes(), null, "foo", ""); + } + }); + } + + @Test + public void failPublishWhenBrokerNacksMessageInAtLeastOnce() { + assertThrows(AMQPException.class, () -> { + TestConnection conn = new TestConnection(null, null); + conn.getTestChannel().setSimulateNackOnConfirm(true); + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), true)) { + sender.publish("hello".getBytes(), null, "foo", ""); + } + }); + } + + @Test + public void failPublishWhenMessageReturnedAsUndeliverableInAtLeastOnce() { + assertThrows(AMQPException.class, () -> { + Map> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1")); + Map exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); + conn.getTestChannel().setSimulateSynchronousReturn(true); + + try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""), true)) { + sender.publish("hello".getBytes(), null, "wrongKey", "myExchange"); + } + }); + } + + @Test + public void succeedsPublishWhenMessageUndeliverableInAtMostOnceMode() throws Exception { + Map> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1")); + Map exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); + conn.getTestChannel().setSimulateSynchronousReturn(true); + + try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""), false)) { + // In AT_MOST_ONCE mode, undeliverable messages only produce a warning — no exception + sender.publish("hello".getBytes(), null, "wrongKey", "myExchange"); + } + conn.close(); + } + } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 6da28a4dbff7..1081c517ccdd 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -56,7 +56,7 @@ public void testMessageAcked() throws TimeoutException, IOException { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); @@ -87,7 +87,7 @@ public void testBatchSizeAffectsAcks() throws TimeoutException, IOException { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); @@ -118,7 +118,7 @@ public void testConsumerStopped() throws TimeoutException, IOException { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); @@ -153,7 +153,7 @@ public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -184,7 +184,7 @@ public void validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransf final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -219,7 +219,7 @@ public void validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAn final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -251,7 +251,7 @@ public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransfer final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -288,7 +288,7 @@ public void validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSucc final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -318,7 +318,7 @@ public void validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParamet final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); @@ -352,7 +352,7 @@ public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() throws E final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) { sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index e2a2f0697aed..b77d9f0f29db 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -252,6 +252,38 @@ void testMigration() { assertEquals(expectedRemoved, propertyMigrationResult.getPropertiesRemoved()); } + /** + * When the broker closes the channel with a 404 (exchange not found), the FlowFile + * must route to REL_FAILURE — not cause an unhandled processor exception. + */ + @Test + public void validateFlowFileRoutedToFailureWhenBrokerClosesChannel() { + final LocalPublishAMQP proc = new LocalPublishAMQP(); + final TestRunner testRunner = TestRunners.newTestRunner(proc); + setConnectionProperties(testRunner); + testRunner.setProperty(PublishAMQP.DELIVERY_GUARANTEE, PublishAMQP.DeliveryGuarantee.AT_LEAST_ONCE); + proc.getTestChannel().setSimulateShutdownOnConfirm(true); + + testRunner.enqueue("Hello Joe".getBytes()); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PublishAMQP.REL_FAILURE); + } + + @Test + public void validateFlowFileRoutedToFailureOnBrokerNack() { + final LocalPublishAMQP proc = new LocalPublishAMQP(); + final TestRunner testRunner = TestRunners.newTestRunner(proc); + setConnectionProperties(testRunner); + testRunner.setProperty(PublishAMQP.DELIVERY_GUARANTEE, PublishAMQP.DeliveryGuarantee.AT_LEAST_ONCE); + proc.getTestChannel().setSimulateNackOnConfirm(true); + + testRunner.enqueue("Hello Joe".getBytes()); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PublishAMQP.REL_FAILURE); + } + private void setConnectionProperties(TestRunner runner) { runner.setProperty(PublishAMQP.BROKERS, "injvm:5672"); runner.setProperty(PublishAMQP.USER, "user"); @@ -278,5 +310,9 @@ protected Connection createConnection(ProcessContext context, ExecutorService ex public Connection getConnection() { return connection; } + + public TestChannel getTestChannel() { + return connection.getTestChannel(); + } } } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java index ada7f18958fa..20c1ccb77745 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -78,6 +78,9 @@ class TestChannel implements Channel { private final BitSet acknowledgments = new BitSet(); private final BitSet nacks = new BitSet(); private int prefetchCount = 0; + private boolean simulateShutdownOnConfirm = false; + private boolean simulateNackOnConfirm = false; + private boolean simulateSynchronousReturn = false; public TestChannel(Map exchangeToRoutingKeyMappings, Map> routingKeyToQueueMappings) { @@ -100,6 +103,24 @@ void corruptChannel() { this.corrupted = true; } + /** Causes the next {@link #waitForConfirms(long)} call to throw {@link ShutdownSignalException}, + * simulating the broker closing the channel (e.g., exchange not found, 404 NOT_FOUND). */ + void setSimulateShutdownOnConfirm(boolean simulate) { + this.simulateShutdownOnConfirm = simulate; + } + + /** Causes the next {@link #waitForConfirms(long)} call to return {@code false}, + * simulating the broker sending a NACK for the published message. */ + void setSimulateNackOnConfirm(boolean simulate) { + this.simulateNackOnConfirm = simulate; + } + + /** When {@code true}, return listeners are invoked synchronously inside + * {@link #basicPublish} rather than asynchronously, making tests deterministic. */ + void setSimulateSynchronousReturn(boolean simulate) { + this.simulateSynchronousReturn = simulate; + } + void setConnection(Connection connection) { this.connection = connection; } @@ -283,15 +304,23 @@ public void basicPublish(final String exchange, final String routingKey, boolean private void discard(final String exchange, final String routingKey, boolean mandatory, final BasicProperties props, final byte[] body) { - // NO ROUTE. Invoke return listener async + // NO ROUTE. Invoke return listener — synchronously when simulating for tests, async otherwise. for (final ReturnListener listener : returnListeners) { - this.executorService.execute(() -> { + if (simulateSynchronousReturn) { try { listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body); } catch (Exception e) { throw new IllegalStateException("Failed to send return message", e); } - }); + } else { + this.executorService.execute(() -> { + try { + listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body); + } catch (Exception e) { + throw new IllegalStateException("Failed to send return message", e); + } + }); + } } } @@ -582,7 +611,7 @@ public RollbackOk txRollback() throws IOException { @Override public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + return null; // no-op: publisher confirms enabled for testing } @Override @@ -597,7 +626,10 @@ public boolean waitForConfirms() throws InterruptedException { @Override public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + if (simulateShutdownOnConfirm) { + throw new ShutdownSignalException(false, false, null, this); + } + return !simulateNackOnConfirm; } @Override diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java index 996c00dd8ceb..10adb50438a3 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java @@ -122,6 +122,10 @@ public Channel createChannel() throws IOException { return this.channel; } + public TestChannel getTestChannel() { + return this.channel; + } + @Override public Channel createChannel(int channelNumber) throws IOException { throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");