From c42a90f5dfde15d71e7e0288f942377f21e66f5a Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 18 Mar 2026 14:58:05 -0400 Subject: [PATCH] ARTEMIS-5069 / ARTEMIS-5068 Temporary queues are going through mirroring Mirroring should ignore send / create / delete / acks for temporary queues --- .../AMQPMirrorControllerAggregation.java | 4 +- .../mirror/AMQPMirrorControllerSource.java | 27 ++- .../mirror/AMQPMirrorControllerTarget.java | 4 +- .../amqp/connect/mirror/AckManager.java | 2 +- .../artemis/core/server/RoutingContext.java | 4 +- .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../core/server/impl/RoutingContextImpl.java | 17 +- .../core/server/mirror/MirrorController.java | 2 +- .../core/server/impl/RoutingContextTest.java | 16 +- .../amqp/connect/BrokerInSyncTest.java | 156 +++++++++++++++--- .../amqp/connect/StopDuringMirrorTest.java | 4 +- 11 files changed, 180 insertions(+), 58 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java index b86318dc050..b208be947b6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java @@ -105,9 +105,9 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } @Override - public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception { + public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception { for (MirrorController partition : partitions) { - partition.deleteQueue(addressName, queueName); + partition.deleteQueue(addressName, queueName, queueConfiguration); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 9bd23b5b9c2..b9e018c6e48 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -238,10 +238,14 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception { } logger.trace("{} deleteAddress {}", server, addressInfo); - if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) { + if (invalidTarget(getControllerInUse()) || addressInfo.isInternal() || addressInfo.isTemporary()) { + if (logger.isTraceEnabled()) { + logger.trace("ignoring deleteAddress for invalidTarget = {}, isInternal = {}, isTemporary = {}", invalidTarget(getControllerInUse()), addressInfo.isInternal(), addressInfo.isTemporary()); + } return; } if (ignoreAddress(addressInfo.getName())) { + logger.trace("ignoring deleteAddress {} for ignoreAddress condition", addressInfo.getName()); return; } if (deleteQueues) { @@ -282,7 +286,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } @Override - public void deleteQueue(SimpleString address, SimpleString queue) throws Exception { + public void deleteQueue(SimpleString address, SimpleString queue, QueueConfiguration queueConfiguration) throws Exception { if (!brokerConnection.isEnabled()) { return; } @@ -298,6 +302,15 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti return; } + if (queueConfiguration != null) { + if (queueConfiguration.isTemporary() || queueConfiguration.isInternal()) { + if (logger.isTraceEnabled()) { + logger.trace("deleteQueue {}/{} ignored for isTemporary = {} or isInternal = {}", address, queue, queueConfiguration.isTemporary(), queueConfiguration.isInternal()); + } + return; + } + } + if (deleteQueues) { Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString()); routeMirrorCommand(server, message); @@ -355,8 +368,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context) } SimpleString address = context.getAddress(message); - if (context.isInternal()) { - logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server); + if (context.isMirrorIgnore()) { + logger.trace("sendMessage::server {} is discarding send to avoid sending to internal or temporary queue", server); return; } @@ -587,9 +600,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin return; } - if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController()))) { - if (logger.isDebugEnabled()) { - logger.debug("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref); + if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isTemporary() || ref.getQueue().isMirrorController()))) { + if (logger.isTraceEnabled()) { + logger.trace("ignoring preAcknowledge on ref {} for either internalQueue = {}, temporary = {}, isMirrorController = {}", ref, ref.getQueue().isInternalQueue(), ref.getQueue().isTemporary(), ref.getQueue().isMirrorController()); } return; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index e1e3020c559..f8ccb77bede 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -318,7 +318,7 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, ADDRESS); String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, QUEUE); - deleteQueue(SimpleString.of(address), SimpleString.of(queueName)); + deleteQueue(SimpleString.of(address), SimpleString.of(queueName), null); } else if (eventType.equals(POST_ACK)) { String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, BROKER_ID); @@ -440,7 +440,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } @Override - public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception { + public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception { if (logger.isDebugEnabled()) { logger.debug("{} destroy queue {} on address = {} server {}", server, queueName, addressName, server.getIdentity()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index a7f1b8654f8..d26747fd201 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -640,7 +640,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } @Override - public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception { + public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index 44101dc8f6e..fe9c4f3f1fe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -54,9 +54,9 @@ public interface RoutingContext { boolean isMirrorIndividualRoute(); /** - * return true if every queue routed is internal + * return true if every queue routed is internal or temporary */ - boolean isInternal(); + boolean isMirrorIgnore(); MirrorController getMirrorSource(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 875597cd365..c3db8beead4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2616,7 +2616,7 @@ public void destroyQueue(final SimpleString queueName, } if (mirrorControllerService != null) { - mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName()); + mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName(), queue.getQueueConfiguration()); } queue.deleteQueue(removeConsumers); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 3898248a0c8..1f7a7a9dc6c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -59,7 +59,7 @@ public class RoutingContextImpl implements RoutingContext { Boolean reusable = null; - Boolean internalOnly = null; + Boolean mirrorIgnore = null; boolean divertDisabled = false; @@ -130,10 +130,11 @@ public boolean isReusable() { } @Override - public boolean isInternal() { - return internalOnly != null && internalOnly; + public boolean isMirrorIgnore() { + return mirrorIgnore != null && mirrorIgnore; } + @Override public int getPreviousBindingsVersion() { return version; @@ -177,7 +178,7 @@ public RoutingContextImpl clear() { this.reusable = null; - this.internalOnly = null; + this.mirrorIgnore = null; // once we set to disabled, we keep it always disabled. // This is because the routing object used to route commands will disable this @@ -211,11 +212,11 @@ public void addQueue(final SimpleString address, final Queue queue) { listing.getNonDurableQueues().add(queue); } - if (internalOnly == null) { - internalOnly = queue.isInternalQueue(); + if (mirrorIgnore == null) { + mirrorIgnore = queue.isInternalQueue() || queue.isTemporary(); } else { - // every queue added has to be internal only - internalOnly = internalOnly && queue.isInternalQueue(); + // making sure that every queue added matches the mirrorIgnore + mirrorIgnore = mirrorIgnore && (queue.isInternalQueue() || queue.isTemporary()); } queueCount++; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java index fb194cca6e1..d4a22763d14 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java @@ -36,7 +36,7 @@ default boolean isRetryACK() { void addAddress(AddressInfo addressInfo) throws Exception; void deleteAddress(AddressInfo addressInfo) throws Exception; void createQueue(QueueConfiguration queueConfiguration) throws Exception; - void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception; + void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception; void sendMessage(Transaction tx, Message message, RoutingContext context); void postAcknowledge(MessageReference ref, AckReason reason) throws Exception; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java index e3816a489f2..f943650e966 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java @@ -949,28 +949,28 @@ public void recheckRefCount(OperationContext context) { @Test public void testValidateInternal() { RoutingContext context = new RoutingContextImpl(new TransactionImpl(new NullStorageManager())); - assertFalse(context.isInternal()); + assertFalse(context.isMirrorIgnore()); context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true)); - assertTrue(context.isInternal()); + assertTrue(context.isMirrorIgnore()); context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", false, true)); - assertFalse(context.isInternal()); + assertFalse(context.isMirrorIgnore()); context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true)); - assertFalse(context.isInternal()); + assertFalse(context.isMirrorIgnore()); context.clear(); - assertFalse(context.isInternal()); + assertFalse(context.isMirrorIgnore()); context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true)); - assertTrue(context.isInternal()); + assertTrue(context.isMirrorIgnore()); context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", true, true)); - assertTrue(context.isInternal()); + assertTrue(context.isMirrorIgnore()); context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true)); - assertTrue(context.isInternal()); + assertTrue(context.isMirrorIgnore()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java index 1696a108af0..3f5f67efadb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java @@ -26,11 +26,13 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import java.io.PrintStream; import java.net.URI; @@ -116,7 +118,7 @@ protected ActiveMQServer createServer() throws Exception { public void testSyncOnCreateQueues() throws Exception { server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -126,7 +128,7 @@ public void testSyncOnCreateQueues() throws Exception { server_2.setIdentity("Server2"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server_2.getConfiguration().addAMQPConnection(amqpConnection); } @@ -265,23 +267,129 @@ public void testSingleMessage(String protocol) throws Exception { session1.commit(); } - try { - connection1.close(); - } catch (Exception ignored) { + connection1.close(); + connection2.close(); + + Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100); + Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100); + + server_2.stop(); + server.stop(); + } + + @Test + public void testNoTemporaryAddressesOrQueues() throws Exception { + final String snfOnServer1Name = "$ACTIVEMQ_ARTEMIS_MIRROR_connectTowardsServer2"; + + server.getConfiguration().setAddressQueueScanPeriod(100); + server.setIdentity("Server1"); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); } - try { - connection2.close(); - } catch (Exception ignored) { + server.start(); + server.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("Server2"); + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); } + server_2.start(); + server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server_2.createQueue(QueueConfiguration.of(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); - Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100); - Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100); + + Wait.waitFor(() -> server.locateQueue(snfOnServer1Name) != null); + + ConnectionFactory factoryServer1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // exercising a legal mirror operation to make sure things are working properly + try (Connection connection = factoryServer1.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + producer.send(session.createMessage()); + session.commit(); + assertNotNull(consumer.receive(5000)); + session.commit(); + } + + org.apache.activemq.artemis.core.server.Queue snfOnServer1 = server.locateQueue(snfOnServer1Name); + Wait.assertEquals(0L, snfOnServer1::getMessageCount, 5000, 100); + + + // stopping the server to let things accumulate so we can assert the SNF + server_2.stop(); + + tempAddressSendAndReceive(factoryServer1, snfOnServer1, true); + tempAddressSendAndReceive(factoryServer1, snfOnServer1, false); server_2.stop(); server.stop(); } + private void tempAddressSendAndReceive(ConnectionFactory factoryServer1, org.apache.activemq.artemis.core.server.Queue snfQueue, boolean useTopic) throws Exception { + String addressName; + try (Connection connectionServer1 = factoryServer1.createConnection()) { + Session session = connectionServer1.createSession(true, Session.SESSION_TRANSACTED); + Destination destination; + + if (useTopic) { + TemporaryTopic topic = session.createTemporaryTopic(); + destination = topic; + addressName = topic.getTopicName(); + } else { + TemporaryQueue queue = session.createTemporaryQueue(); + destination = queue; + addressName = queue.getQueueName(); + } + + + MessageConsumer consumer = session.createConsumer(destination); + + connectionServer1.start(); + + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage()); + session.commit(); + + assertNotNull(consumer.receive(5000)); + + session.commit(); + + Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100); + + // stopping the server to validate things are not accumulating + + for (int i = 0; i < 100; i++) { + // sends should not make into the SNF either + producer.send(session.createTextMessage()); + } + session.commit(); + // no temporary sends + Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100); + + for (int i = 0; i < 100; i++) { + assertNotNull(consumer.receive(5000)); + } + session.commit(); + + // no temporary acks + Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100); + } + + Wait.assertTrue(() -> server.getAddressInfo(SimpleString.of(addressName)) == null, 5000, 100); + Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100); + } private void checkProperties(Connection connection, javax.jms.Message message) throws Exception { try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { @@ -325,7 +433,7 @@ public void testExpiry() throws Exception { private void internalExpiry(boolean useReaper) throws Exception { server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -429,7 +537,7 @@ private void internalExpiry(boolean useReaper) throws Exception { public void testDLA() throws Exception { server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -545,7 +653,7 @@ public SimpleString getFilterString() { public void testCreateInternalQueue() throws Exception { server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -567,7 +675,7 @@ public void testCreateInternalQueue() throws Exception { server_2.setIdentity("Server2"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server_2.getConfiguration().addAMQPConnection(amqpConnection); } @@ -609,7 +717,7 @@ public void testLVQ() throws Exception { server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -636,7 +744,7 @@ public void testLVQ() throws Exception { server_2.setIdentity("Server2"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server_2.getConfiguration().addAMQPConnection(amqpConnection); } @@ -685,7 +793,7 @@ public void testSyncData() throws Exception { int NUMBER_OF_MESSAGES = 100; server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -695,7 +803,7 @@ public void testSyncData() throws Exception { server_2.setIdentity("Server2"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server_2.getConfiguration().addAMQPConnection(amqpConnection); } @@ -803,7 +911,7 @@ public void testStats() throws Exception { int NUMBER_OF_MESSAGES = 1; server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -813,7 +921,7 @@ public void testStats() throws Exception { server_2.setIdentity("Server2"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server_2.getConfiguration().addAMQPConnection(amqpConnection); } @@ -911,7 +1019,7 @@ public void testSyncDataNoSuppliedID() throws Exception { server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -921,7 +1029,7 @@ public void testSyncDataNoSuppliedID() throws Exception { server_2.setIdentity("Server2"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server_2.getConfiguration().addAMQPConnection(amqpConnection); } @@ -998,7 +1106,7 @@ public void testLargeMessageInSync() throws Exception { String queueName = "testSyncLargeMessage"; server.setIdentity("Server1"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server.getConfiguration().addAMQPConnection(amqpConnection); } @@ -1008,7 +1116,7 @@ public void testLargeMessageInSync() throws Exception { server_2.setIdentity("Server2"); { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); server_2.getConfiguration().addAMQPConnection(amqpConnection); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java index f4df4e7bf7e..732a7f9c5c3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java @@ -181,8 +181,8 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } @Override - public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception { - target.deleteQueue(addressName, queueName); + public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception { + target.deleteQueue(addressName, queueName, configuration); } @Override