Skip to content

Commit 30f51d1

Browse files
ARTEMIS-5956 Connection.close may leave leaked temporary destinations
1 parent 6403e56 commit 30f51d1

3 files changed

Lines changed: 54 additions & 5 deletions

File tree

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,7 @@ private void handleTempResource(SimpleString name, boolean queue) {
814814
// not mean it will get deleted automatically when the session is closed. It is up to the user to delete the
815815
// resource when finished with it
816816

817-
TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name);
817+
TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, name, sessionExecutor, context);
818818
if (remotingConnection instanceof TempResourceObserver observer) {
819819
cleaner.setObserver(observer);
820820
}
@@ -1161,24 +1161,42 @@ public String getSecurityDomain() {
11611161
return securityDomain;
11621162
}
11631163

1164-
public static class TempResourceCleanerUpper implements CloseListener, FailureListener {
1164+
public static class TempResourceCleanerUpper implements CloseListener, FailureListener, IOCallback {
1165+
1166+
final Executor sessionExecutor;
1167+
1168+
final OperationContext operationContext;
11651169

11661170
private final SimpleString resourceName;
11671171

11681172
private final ActiveMQServer server;
11691173

11701174
private TempResourceObserver observer;
11711175

1172-
public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName) {
1176+
public TempResourceCleanerUpper(final ActiveMQServer server, final SimpleString resourceName, final Executor sessionExecutor, OperationContext context) {
11731177
this.server = server;
11741178
this.resourceName = resourceName;
1179+
this.sessionExecutor = sessionExecutor;
1180+
this.operationContext = context;
11751181
}
11761182

11771183
public void setObserver(TempResourceObserver observer) {
11781184
this.observer = observer;
11791185
}
11801186

11811187
private void run() {
1188+
sessionExecutor.execute(() -> {
1189+
operationContext.executeOnCompletion(TempResourceCleanerUpper.this);
1190+
});
1191+
}
1192+
1193+
1194+
@Override
1195+
public void onError(int errorCode, String errorMessage) {
1196+
}
1197+
1198+
@Override
1199+
public void done() {
11821200
try {
11831201
logger.debug("deleting temporary resource {}", resourceName);
11841202
try {
@@ -1197,12 +1215,13 @@ private void run() {
11971215
try {
11981216
AddressInfo a = server.getAddressInfo(resourceName);
11991217
if (a != null && a.isTemporary()) {
1200-
server.removeAddressInfo(resourceName, null);
1218+
server.removeAddressInfo(resourceName, null, false);
12011219
if (observer != null) {
12021220
observer.tempAddressDeleted(resourceName);
12031221
}
12041222
}
12051223
} catch (ActiveMQException e) {
1224+
logger.warn(e.getMessage(), e);
12061225
// that's fine.. it can happen due to resource already been deleted
12071226
logger.debug(e.getMessage(), e);
12081227
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ private void doIt() {
5252
}
5353

5454
public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
55-
super(server.getExecutorFactory().getExecutor());
55+
/* We cannot use a separate executor to delete transient queues as that would
56+
* introduce races on the ServerSessionImpl::TempQueueCleaner. */
57+
super();
5658

5759
this.server = server;
5860

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import javax.jms.TemporaryQueue;
3131
import javax.jms.TemporaryTopic;
3232
import javax.jms.TextMessage;
33+
import javax.jms.Topic;
3334

35+
import org.apache.activemq.artemis.api.core.SimpleString;
3436
import org.apache.activemq.artemis.core.server.Queue;
3537
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
3638
import org.apache.activemq.artemis.tests.util.Wait;
@@ -137,4 +139,30 @@ public void testDeleteTemporaryTopic() throws Exception {
137139
connection.close();
138140
}
139141
}
142+
143+
@Test
144+
public void testAttackDeleteTopicOnClose() throws Exception {
145+
// you need to attack open / close many times to reproduce the race I was after when the test was written
146+
for (int attack = 0; attack < 100; attack++) {
147+
String temporarytopicName;
148+
149+
try (Connection connection = createConnection()) {
150+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
151+
Topic temporaryTopic = session.createTemporaryTopic();
152+
153+
MessageProducer producer = session.createProducer(temporaryTopic);
154+
155+
MessageConsumer consumer = session.createSharedConsumer(temporaryTopic, "mySub");
156+
connection.start();
157+
158+
temporarytopicName = temporaryTopic.getTopicName();
159+
160+
producer.send(session.createMessage());
161+
assertNotNull(consumer.receive(5000));
162+
}
163+
164+
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.of(temporarytopicName)) == null, 5000, 100);
165+
}
166+
}
167+
140168
}

0 commit comments

Comments
 (0)