Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}

@Override
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception {
for (MirrorController partition : partitions) {
partition.deleteQueue(addressName, queueName);
partition.deleteQueue(addressName, queueName, queueConfiguration);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,14 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
}
logger.trace("{} deleteAddress {}", server, addressInfo);

if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal() || addressInfo.isTemporary()) {
if (logger.isTraceEnabled()) {
logger.trace("ignoring deleteAddress for invalidTarget = {}, isInternal = {}, isTemporary = {}", invalidTarget(getControllerInUse()), addressInfo.isInternal(), addressInfo.isTemporary());
}
return;
}
if (ignoreAddress(addressInfo.getName())) {
logger.trace("ignoring deleteAddress {} for ignoreAddress condition", addressInfo.getName());
return;
}
if (deleteQueues) {
Expand Down Expand Up @@ -282,7 +286,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}

@Override
public void deleteQueue(SimpleString address, SimpleString queue) throws Exception {
public void deleteQueue(SimpleString address, SimpleString queue, QueueConfiguration queueConfiguration) throws Exception {
if (!brokerConnection.isEnabled()) {
return;
}
Expand All @@ -298,6 +302,15 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
return;
}

if (queueConfiguration != null) {
if (queueConfiguration.isTemporary() || queueConfiguration.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("deleteQueue {}/{} ignored for isTemporary = {} or isInternal = {}", address, queue, queueConfiguration.isTemporary(), queueConfiguration.isInternal());
}
return;
}
}

if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
routeMirrorCommand(server, message);
Expand Down Expand Up @@ -355,8 +368,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
}
SimpleString address = context.getAddress(message);

if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
if (context.isMirrorIgnore()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal or temporary queue", server);
return;
}

Expand Down Expand Up @@ -587,9 +600,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
return;
}

if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
logger.debug("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref);
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isTemporary() || ref.getQueue().isMirrorController()))) {
if (logger.isTraceEnabled()) {
logger.trace("ignoring preAcknowledge on ref {} for either internalQueue = {}, temporary = {}, isMirrorController = {}", ref, ref.getQueue().isInternalQueue(), ref.getQueue().isTemporary(), ref.getQueue().isMirrorController());
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, ADDRESS);
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, QUEUE);

deleteQueue(SimpleString.of(address), SimpleString.of(queueName));
deleteQueue(SimpleString.of(address), SimpleString.of(queueName), null);
} else if (eventType.equals(POST_ACK)) {
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, BROKER_ID);

Expand Down Expand Up @@ -440,7 +440,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}

@Override
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("{} destroy queue {} on address = {} server {}", server, queueName, addressName, server.getIdentity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}

@Override
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
public void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration configuration) throws Exception {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public interface RoutingContext {
boolean isMirrorIndividualRoute();

/**
* return true if every queue routed is internal
* return true if every queue routed is internal or temporary
*/
boolean isInternal();
boolean isMirrorIgnore();

MirrorController getMirrorSource();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2616,7 +2616,7 @@ public void destroyQueue(final SimpleString queueName,
}

if (mirrorControllerService != null) {
mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName());
mirrorControllerService.deleteQueue(queue.getAddress(), queue.getName(), queue.getQueueConfiguration());
}

queue.deleteQueue(removeConsumers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class RoutingContextImpl implements RoutingContext {

Boolean reusable = null;

Boolean internalOnly = null;
Boolean mirrorIgnore = null;

boolean divertDisabled = false;

Expand Down Expand Up @@ -130,10 +130,11 @@ public boolean isReusable() {
}

@Override
public boolean isInternal() {
return internalOnly != null && internalOnly;
public boolean isMirrorIgnore() {
return mirrorIgnore != null && mirrorIgnore;
}


@Override
public int getPreviousBindingsVersion() {
return version;
Expand Down Expand Up @@ -177,7 +178,7 @@ public RoutingContextImpl clear() {

this.reusable = null;

this.internalOnly = null;
this.mirrorIgnore = null;

// once we set to disabled, we keep it always disabled.
// This is because the routing object used to route commands will disable this
Expand Down Expand Up @@ -211,11 +212,11 @@ public void addQueue(final SimpleString address, final Queue queue) {
listing.getNonDurableQueues().add(queue);
}

if (internalOnly == null) {
internalOnly = queue.isInternalQueue();
if (mirrorIgnore == null) {
mirrorIgnore = queue.isInternalQueue() || queue.isTemporary();
} else {
// every queue added has to be internal only
internalOnly = internalOnly && queue.isInternalQueue();
// making sure that every queue added matches the mirrorIgnore
mirrorIgnore = mirrorIgnore && (queue.isInternalQueue() || queue.isTemporary());
}

queueCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ default boolean isRetryACK() {
void addAddress(AddressInfo addressInfo) throws Exception;
void deleteAddress(AddressInfo addressInfo) throws Exception;
void createQueue(QueueConfiguration queueConfiguration) throws Exception;
void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception;
void deleteQueue(SimpleString addressName, SimpleString queueName, QueueConfiguration queueConfiguration) throws Exception;
void sendMessage(Transaction tx, Message message, RoutingContext context);

void postAcknowledge(MessageReference ref, AckReason reason) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,28 +949,28 @@ public void recheckRefCount(OperationContext context) {
@Test
public void testValidateInternal() {
RoutingContext context = new RoutingContextImpl(new TransactionImpl(new NullStorageManager()));
assertFalse(context.isInternal());
assertFalse(context.isMirrorIgnore());

context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
assertTrue(context.isInternal());
assertTrue(context.isMirrorIgnore());

context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", false, true));
assertFalse(context.isInternal());
assertFalse(context.isMirrorIgnore());

context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
assertFalse(context.isInternal());
assertFalse(context.isMirrorIgnore());

context.clear();
assertFalse(context.isInternal());
assertFalse(context.isMirrorIgnore());

context.addQueue(SimpleString.of("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
assertTrue(context.isInternal());
assertTrue(context.isMirrorIgnore());

context.addQueue(SimpleString.of("t2"), new FakeQueueForRoutingContextTest("t2", true, true));
assertTrue(context.isInternal());
assertTrue(context.isMirrorIgnore());

context.addQueue(SimpleString.of("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
assertTrue(context.isInternal());
assertTrue(context.isMirrorIgnore());
}

}
Loading