Skip to content

Commit 23eb256

Browse files
ARTEMIS-5956 Connection.close may leave leaked temporary destinations
co-author: Done in collaboration with Tim Bish
1 parent 6403e56 commit 23eb256

8 files changed

Lines changed: 149 additions & 6 deletions

File tree

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.activemq.artemis.api.core;
19+
20+
public class ActiveMQAddressHasBindingsException extends ActiveMQDeleteAddressException {
21+
22+
public ActiveMQAddressHasBindingsException() {
23+
super();
24+
}
25+
26+
public ActiveMQAddressHasBindingsException(String msg) {
27+
super(msg);
28+
}
29+
}

artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
/**
2020
* An operation failed because an address exists on the server.
2121
*/
22-
public final class ActiveMQDeleteAddressException extends ActiveMQException {
22+
public class ActiveMQDeleteAddressException extends ActiveMQException {
2323

2424
public ActiveMQDeleteAddressException() {
2525
super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR);

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
2222
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
2323
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
24+
import org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
2425
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
2526
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
26-
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
2727
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
2828
import org.apache.activemq.artemis.api.core.ActiveMQDivertDoesNotExistException;
2929
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
@@ -375,7 +375,7 @@ public interface ActiveMQMessageBundle {
375375
ActiveMQAddressExistsException addressAlreadyExists(SimpleString address);
376376

377377
@Message(id = 229205, value = "Address {} has bindings")
378-
ActiveMQDeleteAddressException addressHasBindings(SimpleString address);
378+
ActiveMQAddressHasBindingsException addressHasBindings(SimpleString address);
379379

380380
@Message(id = 229206, value = "Queue {} has invalid max consumer setting: {}")
381381
IllegalArgumentException invalidMaxConsumers(String queueName, int value);

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ enum SERVER_STATE {
162162

163163
StorageManager getStorageManager();
164164

165+
/**
166+
* The executor responsible to remove temporary destinations.
167+
* */
168+
Executor getTransientQueueExecutor();
169+
165170
PagingManager getPagingManager();
166171

167172
PagingManager createPagingManager() throws Exception;

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
290290

291291
protected volatile ExecutorFactory executorFactory;
292292

293+
protected volatile Executor transientQueueExecutor;
294+
293295
private volatile ExecutorService ioExecutorPool;
294296

295297
private ReplayManager replayManager;
@@ -1776,6 +1778,11 @@ public StorageManager getStorageManager() {
17761778
return storageManager;
17771779
}
17781780

1781+
@Override
1782+
public Executor getTransientQueueExecutor() {
1783+
return transientQueueExecutor;
1784+
}
1785+
17791786
@Override
17801787
public ActiveMQSecurityManager getSecurityManager() {
17811788
return securityManager;
@@ -3271,6 +3278,8 @@ private void initializeExecutorServices() {
32713278
}
32723279
this.executorFactory = new OrderedExecutorFactory(threadPool);
32733280

3281+
this.transientQueueExecutor = executorFactory.getExecutor();
3282+
32743283
if (serviceRegistry.getIOExecutorService() == null) {
32753284
this.ioExecutorPool = new ActiveMQThreadPoolExecutor(0, maxIoThreads, THREAD_POOL_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, getThreadFactory("io"));
32763285
} else {

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
3838
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
3939
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
40+
import org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
4041
import org.apache.activemq.artemis.api.core.ActiveMQException;
4142
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
4243
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -814,7 +815,7 @@ private void handleTempResource(SimpleString name, boolean queue) {
814815
// not mean it will get deleted automatically when the session is closed. It is up to the user to delete the
815816
// resource when finished with it
816817

817-
TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name);
818+
TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name, sessionExecutor);
818819
if (remotingConnection instanceof TempResourceObserver observer) {
819820
cleaner.setObserver(observer);
820821
}
@@ -1163,26 +1164,43 @@ public String getSecurityDomain() {
11631164

11641165
public static class TempResourceCleanerUpper implements CloseListener, FailureListener {
11651166

1167+
private int retry = 0;
1168+
1169+
private final int MAX_RETRY = 5;
1170+
11661171
private final SimpleString resourceName;
11671172

11681173
private final ActiveMQServer server;
11691174

11701175
private TempResourceObserver observer;
11711176

1172-
public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName) {
1177+
private Executor sessionExecutor;
1178+
1179+
public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName, Executor sessionExecutor) {
11731180
this.server = server;
11741181
this.resourceName = resourceName;
1182+
this.sessionExecutor = sessionExecutor;
11751183
}
11761184

11771185
public void setObserver(TempResourceObserver observer) {
11781186
this.observer = observer;
11791187
}
11801188

11811189
private void run() {
1190+
sessionExecutor.execute(() -> {
1191+
// this needs to use the same executor as TransientQueueManagerImpl
1192+
// even though we retry failed executions
1193+
// we still use the same executor as the TransientQueueManagerImpl to minimize the number of retries
1194+
server.getTransientQueueExecutor().execute(this::done);
1195+
});
1196+
}
1197+
1198+
private void done() {
11821199
try {
11831200
logger.debug("deleting temporary resource {}", resourceName);
11841201
try {
11851202
Queue q = server.locateQueue(resourceName);
1203+
logger.debug("deleting queue {}", resourceName);
11861204
if (q != null && q.isTemporary()) {
11871205
AddressInfo a = server.getAddressInfo(q.getAddress());
11881206
server.destroyQueue(resourceName, null, false, false, a == null || a.isTemporary());
@@ -1196,12 +1214,26 @@ private void run() {
11961214
}
11971215
try {
11981216
AddressInfo a = server.getAddressInfo(resourceName);
1217+
logger.debug("deleting address with resource={}, address={}", resourceName, a);
11991218
if (a != null && a.isTemporary()) {
12001219
server.removeAddressInfo(resourceName, null);
12011220
if (observer != null) {
12021221
observer.tempAddressDeleted(resourceName);
12031222
}
12041223
}
1224+
} catch (ActiveMQAddressHasBindingsException e) {
1225+
// in a scenario where the consumer on a temporary and connection is being closed as part of the same event
1226+
// we could get on a situation where the remove of the queue is already scheduled in the executors
1227+
// but have not yet reached.
1228+
// It is not possible to serialize the calls on org.apache.activemq.artemis.core.server.impl.TransientQueueManagerImpl
1229+
// as that could lead to starvations and deadlocks.
1230+
// for that reason we can only retry in the executor's line
1231+
if (retry++ < MAX_RETRY) {
1232+
logger.debug("retrying deleteResource {}, retry={}", resourceName, retry);
1233+
TempResourceCleanerUpper.this.run();
1234+
} else {
1235+
logger.warn(e.getMessage(), e);
1236+
}
12051237
} catch (ActiveMQException e) {
12061238
// that's fine.. it can happen due to resource already been deleted
12071239
logger.debug(e.getMessage(), e);

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ private void doIt() {
5252
}
5353

5454
public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
55-
super(server.getExecutorFactory().getExecutor());
55+
// We have to use the same executor between here and ServerSessionImpl::TempResourceCleanerUpper
56+
super(server.getTransientQueueExecutor());
5657

5758
this.server = server;
5859

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.activemq.artemis.tests.integration.amqp;
1818

19+
import static org.junit.jupiter.api.Assertions.assertEquals;
1920
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
2021
import static org.junit.jupiter.api.Assertions.assertNotNull;
2122
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -24,6 +25,7 @@
2425
import java.util.concurrent.TimeUnit;
2526

2627
import javax.jms.Connection;
28+
import javax.jms.Destination;
2729
import javax.jms.MessageConsumer;
2830
import javax.jms.MessageProducer;
2931
import javax.jms.Session;
@@ -137,4 +139,69 @@ public void testDeleteTemporaryTopic() throws Exception {
137139
connection.close();
138140
}
139141
}
142+
143+
@Test
144+
@Timeout(20)
145+
public void testTemporaryTopicDeletedOnConnectionClosed() throws Exception {
146+
doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, true);
147+
}
148+
149+
@Test
150+
@Timeout(20)
151+
public void testTemporaryQueueDeletedOnConnectionClosed() throws Exception {
152+
doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, true);
153+
}
154+
155+
@Test
156+
@Timeout(20)
157+
public void testTemporaryTopicDeletedOnConnectionClosedWithoutExplicitConsumerClose() throws Exception {
158+
doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, false);
159+
}
160+
161+
@Test
162+
@Timeout(20)
163+
public void testTemporaryQueueDeletedOnConnectionClosedWithoutExplicitConsumerClose() throws Exception {
164+
doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, false);
165+
}
166+
167+
private void doTestTemporaryDestinationIsDeletedOnConnectionClosed(boolean topic, boolean closeConsumer) throws Exception {
168+
final String addressName;
169+
try (Connection connection = createConnection()) {
170+
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
171+
final Destination destination;
172+
173+
if (topic) {
174+
destination = session.createTemporaryTopic();
175+
176+
assertNotNull(destination);
177+
assertTrue(destination instanceof TemporaryTopic);
178+
179+
addressName = ((TemporaryTopic) destination).getTopicName();
180+
} else {
181+
destination = session.createTemporaryQueue();
182+
183+
assertNotNull(destination);
184+
assertTrue(destination instanceof TemporaryQueue);
185+
186+
addressName = ((TemporaryQueue) destination).getQueueName();
187+
}
188+
189+
logger.debug("Address being used is {}", addressName);
190+
191+
final MessageConsumer consumer = session.createConsumer(destination);
192+
193+
final AddressInfo addressView = getProxyToAddress(addressName);
194+
assertNotNull(addressView);
195+
196+
assertEquals(1, server.bindingQuery(addressView.getName()).getQueueNames().size());
197+
198+
if (closeConsumer) {
199+
consumer.close();
200+
}
201+
202+
}
203+
Wait.assertNull(() -> getProxyToAddress(addressName), TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50));
204+
}
205+
206+
140207
}

0 commit comments

Comments
 (0)