Skip to content

Commit 312febb

Browse files
ARTEMIS-5069 / ARTEMIS-5968 Temporary queues are going through mirroring
Mirroring should ignore send / create / delete / acks for temporary queues
1 parent 6403e56 commit 312febb

11 files changed

Lines changed: 128 additions & 42 deletions

File tree

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
105105
}
106106

107107
@Override
108-
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
108+
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception {
109109
for (MirrorController partition : partitions) {
110-
partition.deleteQueue(addressName, queueName);
110+
partition.deleteQueue(addressName, queueName, queueConfiguration);
111111
}
112112
}
113113

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,14 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
238238
}
239239
logger.trace("{} deleteAddress {}", server, addressInfo);
240240

241-
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
241+
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal() || addressInfo.isTemporary()) {
242+
if (logger.isTraceEnabled()) {
243+
logger.trace("ignoring deleteAddress for invalidTarget = {}, isInternal = {}, isTemporary = {}", invalidTarget(getControllerInUse()), addressInfo.isInternal(), addressInfo.isTemporary());
244+
}
242245
return;
243246
}
244247
if (ignoreAddress(addressInfo.getName())) {
248+
logger.trace("ignoring deleteAddress {} for ignoreAddress condition", addressInfo.getName());
245249
return;
246250
}
247251
if (deleteQueues) {
@@ -282,7 +286,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
282286
}
283287

284288
@Override
285-
public void deleteQueue(SimpleString address, SimpleString queue) throws Exception {
289+
public void deleteQueue(SimpleString address, SimpleString queue, QueueConfiguration queueConfiguration) throws Exception {
286290
if (!brokerConnection.isEnabled()) {
287291
return;
288292
}
@@ -298,6 +302,15 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
298302
return;
299303
}
300304

305+
if (queueConfiguration != null) {
306+
if (queueConfiguration.isTemporary() || queueConfiguration.isInternal()) {
307+
if (logger.isTraceEnabled()) {
308+
logger.trace("deleteQueue {}/{} ignored for isTemporary = {} or isInternal = {}", address, queue, queueConfiguration.isTemporary(), queueConfiguration.isInternal());
309+
}
310+
return;
311+
}
312+
}
313+
301314
if (deleteQueues) {
302315
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
303316
routeMirrorCommand(server, message);
@@ -355,8 +368,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
355368
}
356369
SimpleString address = context.getAddress(message);
357370

358-
if (context.isInternal()) {
359-
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
371+
if (context.isMirrorIgnore()) {
372+
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal or temporary queue", server);
360373
return;
361374
}
362375

@@ -587,9 +600,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
587600
return;
588601
}
589602

590-
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController()))) {
591-
if (logger.isDebugEnabled()) {
592-
logger.debug("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref);
603+
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isTemporary() || ref.getQueue().isMirrorController()))) {
604+
if (logger.isTraceEnabled()) {
605+
logger.trace("ignoring preAcknowledge on ref {} for either internalQueue = {}, temporary = {}, isMirrorController = {}", ref, ref.getQueue().isInternalQueue(), ref.getQueue().isTemporary(), ref.getQueue().isMirrorController());
593606
}
594607
return;
595608
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota
318318
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, ADDRESS);
319319
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, QUEUE);
320320

321-
deleteQueue(SimpleString.of(address), SimpleString.of(queueName));
321+
deleteQueue(SimpleString.of(address), SimpleString.of(queueName), null);
322322
} else if (eventType.equals(POST_ACK)) {
323323
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, BROKER_ID);
324324

@@ -440,7 +440,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
440440
}
441441

442442
@Override
443-
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
443+
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception {
444444
if (logger.isDebugEnabled()) {
445445
logger.debug("{} destroy queue {} on address = {} server {}", server, queueName, addressName, server.getIdentity());
446446
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
640640
}
641641

642642
@Override
643-
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
643+
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception {
644644

645645
}
646646

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ public interface RoutingContext {
5454
boolean isMirrorIndividualRoute();
5555

5656
/**
57-
* return true if every queue routed is internal
57+
* return true if every queue routed is internal or temporary
5858
*/
59-
boolean isInternal();
59+
boolean isMirrorIgnore();
6060

6161
MirrorController getMirrorSource();
6262

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2609,7 +2609,7 @@ public void destroyQueue(final SimpleString queueName,
26092609
}
26102610

26112611
if (mirrorControllerService != null) {
2612-
mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName());
2612+
mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName(), queue.getQueueConfiguration());
26132613
}
26142614

26152615
queue.deleteQueue(removeConsumers);

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class RoutingContextImpl implements RoutingContext {
5959

6060
Boolean reusable = null;
6161

62-
Boolean internalOnly = null;
62+
Boolean mirrorIgnore = null;
6363

6464
boolean divertDisabled = false;
6565

@@ -130,10 +130,11 @@ public boolean isReusable() {
130130
}
131131

132132
@Override
133-
public boolean isInternal() {
134-
return internalOnly != null && internalOnly;
133+
public boolean isMirrorIgnore() {
134+
return mirrorIgnore != null && mirrorIgnore;
135135
}
136136

137+
137138
@Override
138139
public int getPreviousBindingsVersion() {
139140
return version;
@@ -177,7 +178,7 @@ public RoutingContextImpl clear() {
177178

178179
this.reusable = null;
179180

180-
this.internalOnly = null;
181+
this.mirrorIgnore = null;
181182

182183
// once we set to disabled, we keep it always disabled.
183184
// This is because the routing object used to route commands will disable this
@@ -211,11 +212,11 @@ public void addQueue(final SimpleString address, final Queue queue) {
211212
listing.getNonDurableQueues().add(queue);
212213
}
213214

214-
if (internalOnly == null) {
215-
internalOnly = queue.isInternalQueue();
215+
if (mirrorIgnore == null) {
216+
mirrorIgnore = queue.isInternalQueue() || queue.isTemporary();
216217
} else {
217-
// every queue added has to be internal only
218-
internalOnly = internalOnly && queue.isInternalQueue();
218+
// making sure that every queue added matches the mirrorIgnore
219+
mirrorIgnore = mirrorIgnore && (queue.isInternalQueue() || queue.isTemporary());
219220
}
220221

221222
queueCount++;

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ default boolean isRetryACK() {
3636
void addAddress(AddressInfo addressInfo) throws Exception;
3737
void deleteAddress(AddressInfo addressInfo) throws Exception;
3838
void createQueue(QueueConfiguration queueConfiguration) throws Exception;
39-
void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception;
39+
void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception;
4040
void sendMessage(Transaction tx, Message message, RoutingContext context);
4141

4242
void postAcknowledge(MessageReference ref, AckReason reason) throws Exception;

artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -949,28 +949,28 @@ public void recheckRefCount(OperationContext context) {
949949
@Test
950950
public void testValidateInternal() {
951951
RoutingContext context = new RoutingContextImpl(new TransactionImpl(new NullStorageManager()));
952-
assertFalse(context.isInternal());
952+
assertFalse(context.isMirrorIgnore());
953953

954954
context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
955-
assertTrue(context.isInternal());
955+
assertTrue(context.isMirrorIgnore());
956956

957957
context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", false, true));
958-
assertFalse(context.isInternal());
958+
assertFalse(context.isMirrorIgnore());
959959

960960
context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
961-
assertFalse(context.isInternal());
961+
assertFalse(context.isMirrorIgnore());
962962

963963
context.clear();
964-
assertFalse(context.isInternal());
964+
assertFalse(context.isMirrorIgnore());
965965

966966
context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
967-
assertTrue(context.isInternal());
967+
assertTrue(context.isMirrorIgnore());
968968

969969
context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", true, true));
970-
assertTrue(context.isInternal());
970+
assertTrue(context.isMirrorIgnore());
971971

972972
context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
973-
assertTrue(context.isInternal());
973+
assertTrue(context.isMirrorIgnore());
974974
}
975975

976976
}

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import javax.jms.Session;
3333
import javax.jms.TemporaryQueue;
3434
import javax.jms.TextMessage;
35+
import javax.jms.Topic;
3536
import java.io.PrintStream;
3637
import java.net.URI;
3738
import java.util.Enumeration;
@@ -265,18 +266,89 @@ public void testSingleMessage(String protocol) throws Exception {
265266
session1.commit();
266267
}
267268

268-
try {
269-
connection1.close();
270-
} catch (Exception ignored) {
269+
connection1.close();
270+
connection2.close();
271+
272+
Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
273+
Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
274+
275+
server_2.stop();
276+
server.stop();
277+
}
278+
279+
@Test
280+
public void testNoTemporaryQueues() throws Exception {
281+
final String snfOnServer1Name = "$ACTIVEMQ_ARTEMIS_MIRROR_connectTowardsServer2";
282+
283+
server.getConfiguration().setAddressQueueScanPeriod(100);
284+
server.setIdentity("Server1");
285+
{
286+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
287+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
288+
server.getConfiguration().addAMQPConnection(amqpConnection);
271289
}
290+
server.start();
272291

273-
try {
274-
connection2.close();
275-
} catch (Exception ignored) {
292+
server_2 = createServer(AMQP_PORT_2, false);
293+
server_2.setIdentity("Server2");
294+
295+
{
296+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
297+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
298+
server_2.getConfiguration().addAMQPConnection(amqpConnection);
276299
}
300+
server_2.start();
277301

278-
Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
279-
Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
302+
303+
Wait.waitFor(() -> server.locateQueue(snfOnServer1Name) != null);
304+
org.apache.activemq.artemis.core.server.Queue snfOnServer1 = server.locateQueue(snfOnServer1Name);
305+
306+
ConnectionFactory factoryServer1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
307+
308+
String temporarytopicName;
309+
310+
try (Connection connectionServer1 = factoryServer1.createConnection()) {
311+
Session session = connectionServer1.createSession(true, Session.SESSION_TRANSACTED);
312+
Topic temporaryTopic = session.createTemporaryTopic();
313+
314+
MessageConsumer consumer = session.createSharedConsumer(temporaryTopic, "mySub");
315+
316+
temporarytopicName = temporaryTopic.getTopicName();
317+
318+
connectionServer1.start();
319+
320+
MessageProducer producer = session.createProducer(temporaryTopic);
321+
producer.send(session.createTextMessage());
322+
session.commit();
323+
324+
assertNotNull(consumer.receive(5000));
325+
326+
session.commit();
327+
328+
Wait.assertEquals(0L, snfOnServer1::getMessageCount, 5000, 100);
329+
330+
// stopping the server to validate things are not accumulating
331+
server_2.stop();
332+
333+
for (int i = 0; i < 100; i++) {
334+
// sends should not make into the SNF either
335+
producer.send(session.createTextMessage());
336+
}
337+
session.commit();
338+
// no temporary sends
339+
Wait.assertEquals(0L, snfOnServer1::getMessageCount, 5000, 100);
340+
341+
for (int i = 0; i < 100; i++) {
342+
assertNotNull(consumer.receive(5000));
343+
}
344+
session.commit();
345+
346+
// no temporary acks
347+
Wait.assertEquals(0L, snfOnServer1::getMessageCount, 5000, 100);
348+
}
349+
350+
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.of(temporarytopicName)) == null, 5000, 100);
351+
Wait.assertEquals(0L, snfOnServer1::getMessageCount, 5000, 100);
280352

281353
server_2.stop();
282354
server.stop();

0 commit comments

Comments
 (0)