diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java new file mode 100644 index 00000000000..a023b68c3aa --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +public class ActiveMQAddressHasBindingsException extends ActiveMQDeleteAddressException { + + public ActiveMQAddressHasBindingsException() { + super(); + } + + public ActiveMQAddressHasBindingsException(String msg) { + super(msg); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java index 9c8030649c4..c3dfcfccbb8 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java @@ -19,7 +19,7 @@ /** * An operation failed because an address exists on the server. */ -public final class ActiveMQDeleteAddressException extends ActiveMQException { +public class ActiveMQDeleteAddressException extends ActiveMQException { public ActiveMQDeleteAddressException() { super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index cbe1417f9d5..02b532341dd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -21,9 +21,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; +import org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException; import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException; -import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQDivertDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException; @@ -375,7 +375,7 @@ public interface ActiveMQMessageBundle { ActiveMQAddressExistsException addressAlreadyExists(SimpleString address); @Message(id = 229205, value = "Address {} has bindings") - ActiveMQDeleteAddressException addressHasBindings(SimpleString address); + ActiveMQAddressHasBindingsException addressHasBindings(SimpleString address); @Message(id = 229206, value = "Queue {} has invalid max consumer setting: {}") IllegalArgumentException invalidMaxConsumers(String queueName, int value); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 3158483591a..0cf0bff50ca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -162,6 +162,11 @@ enum SERVER_STATE { StorageManager getStorageManager(); + /** + * The executor responsible to remove temporary destinations. + * */ + Executor getTransientQueueExecutor(); + PagingManager getPagingManager(); PagingManager createPagingManager() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index cf56456fc3c..875597cd365 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -290,6 +290,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { protected volatile ExecutorFactory executorFactory; + protected volatile Executor transientQueueExecutor; + private volatile ExecutorService ioExecutorPool; private ReplayManager replayManager; @@ -1776,6 +1778,11 @@ public StorageManager getStorageManager() { return storageManager; } + @Override + public Executor getTransientQueueExecutor() { + return transientQueueExecutor; + } + @Override public ActiveMQSecurityManager getSecurityManager() { return securityManager; @@ -3271,6 +3278,8 @@ private void initializeExecutorServices() { } this.executorFactory = new OrderedExecutorFactory(threadPool); + this.transientQueueExecutor = executorFactory.getExecutor(); + if (serviceRegistry.getIOExecutorService() == null) { this.ioExecutorPool = new ActiveMQThreadPoolExecutor(0, maxIoThreads, THREAD_POOL_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, getThreadFactory("io")); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 6931d08773f..4ec39ed1bfa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; +import org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; @@ -814,7 +815,7 @@ private void handleTempResource(SimpleString name, boolean queue) { // not mean it will get deleted automatically when the session is closed. It is up to the user to delete the // resource when finished with it - TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name); + TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name, sessionExecutor); if (remotingConnection instanceof TempResourceObserver observer) { cleaner.setObserver(observer); } @@ -1163,15 +1164,22 @@ public String getSecurityDomain() { public static class TempResourceCleanerUpper implements CloseListener, FailureListener { + private int retry = 0; + + private final int MAX_RETRY = 5; + private final SimpleString resourceName; private final ActiveMQServer server; private TempResourceObserver observer; - public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName) { + private Executor sessionExecutor; + + public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName, Executor sessionExecutor) { this.server = server; this.resourceName = resourceName; + this.sessionExecutor = sessionExecutor; } public void setObserver(TempResourceObserver observer) { @@ -1179,10 +1187,20 @@ public void setObserver(TempResourceObserver observer) { } private void run() { + sessionExecutor.execute(() -> { + // this needs to use the same executor as TransientQueueManagerImpl + // even though we retry failed executions + // we still use the same executor as the TransientQueueManagerImpl to minimize the number of retries + server.getTransientQueueExecutor().execute(this::done); + }); + } + + private void done() { try { logger.debug("deleting temporary resource {}", resourceName); try { Queue q = server.locateQueue(resourceName); + logger.debug("deleting queue {}", resourceName); if (q != null && q.isTemporary()) { AddressInfo a = server.getAddressInfo(q.getAddress()); server.destroyQueue(resourceName, null, false, false, a == null || a.isTemporary()); @@ -1196,12 +1214,26 @@ private void run() { } try { AddressInfo a = server.getAddressInfo(resourceName); + logger.debug("deleting address with resource={}, address={}", resourceName, a); if (a != null && a.isTemporary()) { server.removeAddressInfo(resourceName, null); if (observer != null) { observer.tempAddressDeleted(resourceName); } } + } catch (ActiveMQAddressHasBindingsException e) { + // in a scenario where the consumer on a temporary and connection is being closed as part of the same event + // we could get on a situation where the remove of the queue is already scheduled in the executors + // but have not yet reached. + // It is not possible to serialize the calls on org.apache.activemq.artemis.core.server.impl.TransientQueueManagerImpl + // as that could lead to starvations and deadlocks. + // for that reason we can only retry in the executor's line + if (retry++ < MAX_RETRY) { + logger.debug("retrying deleteResource {}, retry={}", resourceName, retry); + TempResourceCleanerUpper.this.run(); + } else { + logger.warn(e.getMessage(), e); + } } catch (ActiveMQException e) { // that's fine.. it can happen due to resource already been deleted logger.debug(e.getMessage(), e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java index 1d8118311fe..df480c2e559 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java @@ -52,7 +52,8 @@ private void doIt() { } public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) { - super(server.getExecutorFactory().getExecutor()); + // We have to use the same executor between here and ServerSessionImpl::TempResourceCleanerUpper + super(server.getTransientQueueExecutor()); this.server = server; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java index 362d6fc32a1..06bfc6d100a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -137,4 +139,69 @@ public void testDeleteTemporaryTopic() throws Exception { connection.close(); } } + + @Test + @Timeout(20) + public void testTemporaryTopicDeletedOnConnectionClosed() throws Exception { + doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, true); + } + + @Test + @Timeout(20) + public void testTemporaryQueueDeletedOnConnectionClosed() throws Exception { + doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, true); + } + + @Test + @Timeout(20) + public void testTemporaryTopicDeletedOnConnectionClosedWithoutExplicitConsumerClose() throws Exception { + doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, false); + } + + @Test + @Timeout(20) + public void testTemporaryQueueDeletedOnConnectionClosedWithoutExplicitConsumerClose() throws Exception { + doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, false); + } + + private void doTestTemporaryDestinationIsDeletedOnConnectionClosed(boolean topic, boolean closeConsumer) throws Exception { + final String addressName; + try (Connection connection = createConnection()) { + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Destination destination; + + if (topic) { + destination = session.createTemporaryTopic(); + + assertNotNull(destination); + assertTrue(destination instanceof TemporaryTopic); + + addressName = ((TemporaryTopic) destination).getTopicName(); + } else { + destination = session.createTemporaryQueue(); + + assertNotNull(destination); + assertTrue(destination instanceof TemporaryQueue); + + addressName = ((TemporaryQueue) destination).getQueueName(); + } + + logger.debug("Address being used is {}", addressName); + + final MessageConsumer consumer = session.createConsumer(destination); + + final AddressInfo addressView = getProxyToAddress(addressName); + assertNotNull(addressView); + + assertEquals(1, server.bindingQuery(addressView.getName()).getQueueNames().size()); + + if (closeConsumer) { + consumer.close(); + } + + } + Wait.assertNull(() -> getProxyToAddress(addressName), TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)); + } + + }