|
39 | 39 | import org.apache.activemq.artemis.api.core.SimpleString; |
40 | 40 | import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; |
41 | 41 | import org.apache.activemq.artemis.core.io.IOCallback; |
| 42 | +import org.apache.activemq.artemis.core.paging.PagingStore; |
42 | 43 | import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; |
43 | 44 | import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants; |
44 | 45 | import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; |
45 | 46 | import org.apache.activemq.artemis.core.server.MessageReference; |
| 47 | +import org.apache.activemq.artemis.core.server.Queue; |
46 | 48 | import org.apache.activemq.artemis.core.server.QueueQueryResult; |
47 | 49 | import org.apache.activemq.artemis.core.server.ServerConsumer; |
48 | 50 | import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; |
49 | 51 | import org.apache.activemq.artemis.core.server.impl.QueueImpl; |
50 | 52 | import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; |
| 53 | +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; |
51 | 54 | import org.apache.activemq.artemis.core.settings.impl.AddressSettings; |
52 | 55 | import org.apache.activemq.artemis.core.transaction.Transaction; |
53 | 56 | import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; |
@@ -256,7 +259,16 @@ private SimpleString createTopicSubscription(boolean isDurable, |
256 | 259 | queueName = SimpleString.of(UUID.randomUUID().toString()); |
257 | 260 | } |
258 | 261 |
|
259 | | - session.getCoreSession().createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selector).setDurable(false).setTemporary(true).setInternal(internalAddress)); |
| 262 | + Queue queue = session.getCoreSession().createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selector).setDurable(false).setTemporary(true).setInternal(internalAddress)); |
| 263 | + |
| 264 | + // Check if this is an advisory queue |
| 265 | + if (String.valueOf(address).startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX)) { |
| 266 | + PagingStore store = queue.getPagingStore(); |
| 267 | + if (store != null) { // could be null on tests perhaps |
| 268 | + // Advisory queues cannot be paged, we must enforce DROP |
| 269 | + store.enforceAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); |
| 270 | + } |
| 271 | + } |
260 | 272 | } |
261 | 273 |
|
262 | 274 | return queueName; |
|
0 commit comments