Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.api.core;

public class ActiveMQAddressHasBindingsException extends ActiveMQDeleteAddressException {

public ActiveMQAddressHasBindingsException() {
super();
}

public ActiveMQAddressHasBindingsException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* An operation failed because an address exists on the server.
*/
public final class ActiveMQDeleteAddressException extends ActiveMQException {
public class ActiveMQDeleteAddressException extends ActiveMQException {

public ActiveMQDeleteAddressException() {
super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQDivertDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
Expand Down Expand Up @@ -375,7 +375,7 @@ public interface ActiveMQMessageBundle {
ActiveMQAddressExistsException addressAlreadyExists(SimpleString address);

@Message(id = 229205, value = "Address {} has bindings")
ActiveMQDeleteAddressException addressHasBindings(SimpleString address);
ActiveMQAddressHasBindingsException addressHasBindings(SimpleString address);

@Message(id = 229206, value = "Queue {} has invalid max consumer setting: {}")
IllegalArgumentException invalidMaxConsumers(String queueName, int value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ enum SERVER_STATE {

StorageManager getStorageManager();

/**
* The executor responsible to remove temporary destinations.
* */
Executor getTransientQueueExecutor();

PagingManager getPagingManager();

PagingManager createPagingManager() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {

protected volatile ExecutorFactory executorFactory;

protected volatile Executor transientQueueExecutor;

private volatile ExecutorService ioExecutorPool;

private ReplayManager replayManager;
Expand Down Expand Up @@ -1776,6 +1778,11 @@ public StorageManager getStorageManager() {
return storageManager;
}

@Override
public Executor getTransientQueueExecutor() {
return transientQueueExecutor;
}

@Override
public ActiveMQSecurityManager getSecurityManager() {
return securityManager;
Expand Down Expand Up @@ -3271,6 +3278,8 @@ private void initializeExecutorServices() {
}
this.executorFactory = new OrderedExecutorFactory(threadPool);

this.transientQueueExecutor = executorFactory.getExecutor();

if (serviceRegistry.getIOExecutorService() == null) {
this.ioExecutorPool = new ActiveMQThreadPoolExecutor(0, maxIoThreads, THREAD_POOL_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, getThreadFactory("io"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
Expand Down Expand Up @@ -814,7 +815,7 @@ private void handleTempResource(SimpleString name, boolean queue) {
// not mean it will get deleted automatically when the session is closed. It is up to the user to delete the
// resource when finished with it

TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name);
TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name, sessionExecutor);
if (remotingConnection instanceof TempResourceObserver observer) {
cleaner.setObserver(observer);
}
Expand Down Expand Up @@ -1163,26 +1164,43 @@ public String getSecurityDomain() {

public static class TempResourceCleanerUpper implements CloseListener, FailureListener {

private int retry = 0;

private final int MAX_RETRY = 5;

private final SimpleString resourceName;

private final ActiveMQServer server;

private TempResourceObserver observer;

public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName) {
private Executor sessionExecutor;

public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName, Executor sessionExecutor) {
this.server = server;
this.resourceName = resourceName;
this.sessionExecutor = sessionExecutor;
}

public void setObserver(TempResourceObserver observer) {
this.observer = observer;
}

private void run() {
sessionExecutor.execute(() -> {
// this needs to use the same executor as TransientQueueManagerImpl
// even though we retry failed executions
// we still use the same executor as the TransientQueueManagerImpl to minimize the number of retries
server.getTransientQueueExecutor().execute(this::done);
});
}

private void done() {
try {
logger.debug("deleting temporary resource {}", resourceName);
try {
Queue q = server.locateQueue(resourceName);
logger.debug("deleting queue {}", resourceName);
if (q != null && q.isTemporary()) {
AddressInfo a = server.getAddressInfo(q.getAddress());
server.destroyQueue(resourceName, null, false, false, a == null || a.isTemporary());
Expand All @@ -1196,12 +1214,26 @@ private void run() {
}
try {
AddressInfo a = server.getAddressInfo(resourceName);
logger.debug("deleting address with resource={}, address={}", resourceName, a);
if (a != null && a.isTemporary()) {
server.removeAddressInfo(resourceName, null);
if (observer != null) {
observer.tempAddressDeleted(resourceName);
}
}
} catch (ActiveMQAddressHasBindingsException e) {
// in a scenario where the consumer on a temporary and connection is being closed as part of the same event
// we could get on a situation where the remove of the queue is already scheduled in the executors
// but have not yet reached.
// It is not possible to serialize the calls on org.apache.activemq.artemis.core.server.impl.TransientQueueManagerImpl
// as that could lead to starvations and deadlocks.
// for that reason we can only retry in the executor's line
if (retry++ < MAX_RETRY) {
logger.debug("retrying deleteResource {}, retry={}", resourceName, retry);
TempResourceCleanerUpper.this.run();
} else {
logger.warn(e.getMessage(), e);
}
} catch (ActiveMQException e) {
// that's fine.. it can happen due to resource already been deleted
logger.debug(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ private void doIt() {
}

public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
super(server.getExecutorFactory().getExecutor());
// We have to use the same executor between here and ServerSessionImpl::TempResourceCleanerUpper
super(server.getTransientQueueExecutor());

this.server = server;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -24,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
Expand Down Expand Up @@ -137,4 +139,69 @@ public void testDeleteTemporaryTopic() throws Exception {
connection.close();
}
}

@Test
@Timeout(20)
public void testTemporaryTopicDeletedOnConnectionClosed() throws Exception {
doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, true);
}

@Test
@Timeout(20)
public void testTemporaryQueueDeletedOnConnectionClosed() throws Exception {
doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, true);
}

@Test
@Timeout(20)
public void testTemporaryTopicDeletedOnConnectionClosedWithoutExplicitConsumerClose() throws Exception {
doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, false);
}

@Test
@Timeout(20)
public void testTemporaryQueueDeletedOnConnectionClosedWithoutExplicitConsumerClose() throws Exception {
doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, false);
}

private void doTestTemporaryDestinationIsDeletedOnConnectionClosed(boolean topic, boolean closeConsumer) throws Exception {
final String addressName;
try (Connection connection = createConnection()) {
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination;

if (topic) {
destination = session.createTemporaryTopic();

assertNotNull(destination);
assertTrue(destination instanceof TemporaryTopic);

addressName = ((TemporaryTopic) destination).getTopicName();
} else {
destination = session.createTemporaryQueue();

assertNotNull(destination);
assertTrue(destination instanceof TemporaryQueue);

addressName = ((TemporaryQueue) destination).getQueueName();
}

logger.debug("Address being used is {}", addressName);

final MessageConsumer consumer = session.createConsumer(destination);

final AddressInfo addressView = getProxyToAddress(addressName);
assertNotNull(addressView);

assertEquals(1, server.bindingQuery(addressView.getName()).getQueueNames().size());

if (closeConsumer) {
consumer.close();
}

}
Wait.assertNull(() -> getProxyToAddress(addressName), TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50));
}


}