diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index ea8c4ecf04a..17b1cdce54f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -39,15 +39,18 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; @@ -256,7 +259,16 @@ private SimpleString createTopicSubscription(boolean isDurable, queueName = SimpleString.of(UUID.randomUUID().toString()); } - session.getCoreSession().createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selector).setDurable(false).setTemporary(true).setInternal(internalAddress)); + Queue queue = session.getCoreSession().createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selector).setDurable(false).setTemporary(true).setInternal(internalAddress)); + + // Check if this is an advisory queue + if (String.valueOf(address).startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX)) { + PagingStore store = queue.getPagingStore(); + if (store != null) { // could be null on tests perhaps + // Advisory queues cannot be paged, we must enforce DROP + store.enforceAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); + } + } } return queueName; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java index f6c3e86a322..3e4ee28c7e2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java @@ -17,11 +17,16 @@ package org.apache.activemq.artemis.tests.integration.openwire; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -77,6 +82,27 @@ public void testTempTopicLeak() throws Exception { } } + @Test + public void testAdvisoryEnforcedToDrop() throws Exception { + + try (AssertionLoggerHandler handler = new AssertionLoggerHandler()) { + handler.start(); + try (Connection connection = factory.createConnection()) { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TemporaryTopic temporaryTopic = session.createTemporaryTopic(); + assertNotNull(temporaryTopic); + + PagingStore store = server.getPagingManager().getPageStore(SimpleString.of("ActiveMQ.Advisory.TempTopic")); + assertNotNull(store); + assertEquals(AddressFullMessagePolicy.DROP, store.getAddressFullMessagePolicy()); + } + assertFalse(handler.hasLevel(AssertionLoggerHandler.LogLevel.WARN)); + } + } + private AddressControl assertNonNullAddressControl(String match) { AddressControl advisoryAddressControl = null; List addressResources = server.getManagementService().getAddressControls();