Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.nifi.logging.ComponentLog;

import java.io.IOException;
import java.net.SocketException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

/**
* Generic publisher of messages to AMQP-based messaging system. It is based on
Expand All @@ -32,17 +35,40 @@
final class AMQPPublisher extends AMQPWorker {

private final String connectionString;
private final boolean useConfirms;

/**
* Stores the broker's return reason when a message is published with mandatory=true
* but the broker cannot route it to any queue. Written by the AMQP I/O thread via
* {@link UndeliverableMessageLogger} and read by the publishing thread after
* {@link com.rabbitmq.client.Channel#waitForConfirms} synchronizes the two.
* Only populated when {@link #useConfirms} is true.
*/
private final AtomicReference<String> undeliverableReturnReason = new AtomicReference<>(null);

/**
* Creates an instance of this publisher
*
* @param connection instance of AMQP {@link Connection}
* @param useConfirms when true, enables RabbitMQ Publisher Confirms so that
* {@link #publish} waits for a broker ack/nack and reliably
* detects undeliverable messages; when false, the original
* fire-and-forget behaviour is used for maximum throughput
*/
AMQPPublisher(Connection connection, ComponentLog processorLog) {
AMQPPublisher(Connection connection, ComponentLog processorLog, boolean useConfirms) {
super(connection, processorLog);
this.useConfirms = useConfirms;
getChannel().addReturnListener(new UndeliverableMessageLogger());
this.connectionString = connection.toString();

if (useConfirms) {
try {
getChannel().confirmSelect();
} catch (final IOException e) {
throw new AMQPException("Failed to enable Publisher Confirms on AMQP channel", e);
}
}

processorLog.info("Successfully connected AMQPPublisher to {}", this.connectionString);
}

Expand All @@ -68,13 +94,43 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String
processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey);
}

// Reset any stale return reason from a previous publish before sending.
undeliverableReturnReason.set(null);

try {
getChannel().basicPublish(exchange, routingKey, true, properties, bytes);
} catch (AlreadyClosedException | SocketException e) {
throw new AMQPRollbackException("Failed to publish message because the AMQP connection is lost or has been closed", e);
} catch (Exception e) {
throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
}

if (useConfirms) {
// Wait for the broker's publish confirm (ack/nack). Because the broker sends a basic.return
// frame BEFORE the corresponding confirm frame for mandatory messages it cannot route,
// UndeliverableMessageLogger.handleReturn() is guaranteed to have run by the time
// waitForConfirms() returns. This makes undeliverable-message detection reliable.
try {
if (!getChannel().waitForConfirms(5_000L)) {
throw new AMQPException("Broker negatively acknowledged (NACK) message published to Exchange '"
+ exchange + "' with Routing Key '" + routingKey + "'");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AMQPException("Interrupted while waiting for publish confirmation from broker", e);
} catch (TimeoutException e) {
throw new AMQPException("Timed out waiting for publish confirmation from broker for Exchange '"
+ exchange + "' with Routing Key '" + routingKey + "'", e);
} catch (ShutdownSignalException e) {
throw new AMQPException("Broker closed channel while waiting for publish confirmation — "
+ "Exchange '" + exchange + "' may not exist: " + e.getMessage(), e);
}

final String returnReason = undeliverableReturnReason.get();
if (returnReason != null) {
throw new AMQPException("Message returned as undeliverable by broker — " + returnReason);
}
}
}

@Override
Expand All @@ -83,23 +139,24 @@ public String toString() {
}

/**
* Listener to listen and WARN-log undeliverable messages which are returned
* back to the sender. Since in the current implementation messages are sent
* with 'mandatory' bit set, such messages must have final destination
* otherwise they are silently dropped which could cause a confusion
* especially during early stages of flow development. This implies that
* bindings between exchange -> routingKey -> queue must exist and are
* typically done by AMQP administrator. This logger simply helps to monitor
* for such conditions by logging such messages as warning. In the future
* this can be extended to provide other type of functionality (e.g., fail
* processor etc.)
* Listens for messages returned by the broker when they cannot be routed to any queue
* (mandatory=true publish with no matching binding).
*
* In {@link PublishAMQP.DeliveryGuarantee#AT_MOST_ONCE} mode (the default), this listener
* only logs a warning — matching the original behaviour.
*
* In {@link PublishAMQP.DeliveryGuarantee#AT_LEAST_ONCE} mode, the return reason is also
* stored in {@link #undeliverableReturnReason} so that {@link #publish} can detect it after
* {@code waitForConfirms()} synchronizes the two threads and throw an {@link AMQPException}
* to trigger REL_FAILURE routing.
*/
private final class UndeliverableMessageLogger implements ReturnListener {
@Override
public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException {
String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey
+ "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + ".";
processorLog.warn(logMessage);
final String reason = "exchange='" + exchangeName + "' routingKey='" + routingKey
+ "' replyCode=" + replyCode + " replyText='" + replyText + "'";
undeliverableReturnReason.set(reason);
processorLog.warn("Message returned as undeliverable by broker: {}", reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
.name("Delivery Guarantee")
.description("Controls whether the processor waits for a publish confirmation (broker ack/nack) before routing the FlowFile. "
+ "\"At least once\" enables RabbitMQ Publisher Confirms: the processor blocks until the broker acknowledges the message, "
+ "and undeliverable messages (no matching queue binding) are reliably routed to 'failure'. "
+ "This prevents silent data loss at the cost of significantly higher latency, especially with remote brokers. "
+ "\"At most once\" uses the original fire-and-forget mode: the message is sent without waiting for confirmation. "
+ "Undeliverable messages are only logged as a warning and the FlowFile is still routed to 'success'. "
+ "This mode offers maximum throughput but provides no delivery guarantee.")
.required(true)
.allowableValues(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_MOST_ONCE)
.build();
public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder()
.name("Headers Source")
.description("The source of the headers which will be applied to the published message.")
Expand Down Expand Up @@ -136,6 +149,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
Stream.of(
EXCHANGE,
ROUTING_KEY,
DELIVERY_GUARANTEE,
HEADERS_SOURCE,
HEADERS_PATTERN,
HEADER_SEPARATOR
Expand Down Expand Up @@ -207,7 +221,8 @@ public Set<Relationship> getRelationships() {

@Override
protected AMQPPublisher createAMQPWorker(final ProcessContext context, final Connection connection) {
return new AMQPPublisher(connection, getLogger());
final boolean useConfirms = DeliveryGuarantee.AT_LEAST_ONCE == context.getProperty(DELIVERY_GUARANTEE).asAllowableValue(DeliveryGuarantee.class);
return new AMQPPublisher(connection, getLogger(), useConfirms);
}

@Override
Expand Down Expand Up @@ -346,6 +361,42 @@ protected Character getHeaderSeparator(ProcessContext context, InputHeaderSource
};
}

public enum DeliveryGuarantee implements DescribedValue {

AT_MOST_ONCE("At most once",
"Fire-and-forget: message is sent without waiting for a broker acknowledgement. "
+ "Undeliverable messages (no matching queue binding) are logged as a warning and "
+ "the FlowFile is routed to 'success'. Offers maximum throughput."),
AT_LEAST_ONCE("At least once",
"Publisher Confirms are enabled: the processor blocks until the broker acknowledges "
+ "the message (ack or nack). Undeliverable messages are reliably detected and routed "
+ "to 'failure'. Prevents silent data loss at the cost of higher latency, particularly "
+ "with remote brokers.");

private final String displayName;
private final String description;

DeliveryGuarantee(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}

public enum InputHeaderSource implements DescribedValue {

FLOWFILE_ATTRIBUTES("FlowFile Attributes", "Select FlowFile Attributes based on regular expression pattern for event headers. Key of the matching attribute will be used as header key"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,16 @@

public class AMQPPublisherTest {

@SuppressWarnings("resource")
@Test
public void failOnNullConnection() {
assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null));
assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null, false));
}

@Test
public void failPublishIfChannelClosed() {
assertThrows(AMQPRollbackException.class, () -> {
Connection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), false)) {
conn.close();
sender.publish("oleg".getBytes(), null, "foo", "");
}
Expand All @@ -58,7 +57,7 @@ public void failPublishIfChannelClosed() {
public void failPublishIfChannelFails() {
assertThrows(AMQPException.class, () -> {
TestConnection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), false)) {
((TestChannel) conn.createChannel()).corruptChannel();
sender.publish("oleg".getBytes(), null, "foo", "");
}
Expand All @@ -74,7 +73,7 @@ public void validateSuccessfulPublishingAndRouting() throws Exception {

Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);

try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class), false)) {
sender.publish("hello".getBytes(), null, "key1", "myExchange");
}

Expand All @@ -96,7 +95,7 @@ public void validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce
ReturnListener retListener = mock(ReturnListener.class);
connection.createChannel().addReturnListener(retListener);

try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""))) {
try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""), false)) {
sender.publish("hello".getBytes(), null, "key1", "myExchange");
}

Expand All @@ -105,4 +104,66 @@ public void validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce
connection.close();
}

/**
* Verifies that a {@link com.rabbitmq.client.ShutdownSignalException} thrown by
* {@code waitForConfirms()} (e.g., broker closes channel with 404 NOT_FOUND because the
* exchange does not exist) is converted to {@link AMQPException} so the FlowFile routes
* to REL_FAILURE instead of surfacing as an unhandled processor error.
*/
@Test
public void failPublishWhenBrokerClosesChannelDuringConfirm() {
assertThrows(AMQPException.class, () -> {
TestConnection conn = new TestConnection(null, null);
conn.getTestChannel().setSimulateShutdownOnConfirm(true);
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), true)) {
sender.publish("hello".getBytes(), null, "foo", "");
}
});
}

@Test
public void failPublishWhenBrokerNacksMessage() {
assertThrows(AMQPException.class, () -> {
TestConnection conn = new TestConnection(null, null);
conn.getTestChannel().setSimulateNackOnConfirm(true);
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class), true)) {
sender.publish("hello".getBytes(), null, "foo", "");
}
});
}

@Test
public void failPublishWhenMessageReturnedAsUndeliverable() {
assertThrows(AMQPException.class, () -> {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");

TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
conn.getTestChannel().setSimulateSynchronousReturn(true);

try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""), true)) {
sender.publish("hello".getBytes(), null, "wrongKey", "myExchange");
}
});
}

@Test
public void succeedsPublishWhenMessageUndeliverableInAtMostOnceMode() throws Exception {
Map<String, List<String>> routingMap = new HashMap<>();
routingMap.put("key1", Arrays.asList("queue1"));
Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
exchangeToRoutingKeymap.put("myExchange", "key1");

TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
conn.getTestChannel().setSimulateSynchronousReturn(true);

try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""), false)) {
// In AT_MOST_ONCE mode, undeliverable messages only produce a warning — no exception
sender.publish("hello".getBytes(), null, "wrongKey", "myExchange");
}
conn.close();
}

}
Loading
Loading