Skip to content

Commit 22f32d4

Browse files
committed
GH-1277 SafeNotifyService threads leak in CuratorFrameWorkImpl
CURATOR-495 introduced a new runSafeService field in CuratorFrameworkImpl class, and this field is either initialized by an external ExecutorService via the builder, or it is created internally within the class. In the CuratorFrameworkImpl#close method though, this Executor is never closed, so the threads that are opened by the instances are lingering there until the VM is closed by default. Worse, if someone specifies a thread factory to the framework implementation via the builder that produces non-daemon threads, the VM never exits due to the unstopped single thread executor.
1 parent 3bc3ea1 commit 22f32d4

2 files changed

Lines changed: 73 additions & 2 deletions

File tree

curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public final class CuratorFrameworkImpl extends CuratorFrameworkBase {
100100
private final EnsembleTracker ensembleTracker;
101101
private final SchemaSet schemaSet;
102102
private final Executor runSafeService;
103+
private boolean isExternalRunSafeService = false;
103104
private final ZookeeperCompatibility zookeeperCompatibility;
104105

105106
private volatile ExecutorService executorService;
@@ -192,8 +193,11 @@ public void process(WatchedEvent watchedEvent) {
192193
zookeeperCompatibility = builder.getZookeeperCompatibility();
193194
}
194195

195-
private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) {
196-
if (builder.getRunSafeService() != null) {
196+
private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder)
197+
{
198+
if ( builder.getRunSafeService() != null )
199+
{
200+
isExternalRunSafeService = true;
197201
return builder.getRunSafeService();
198202
}
199203
ThreadFactory threadFactory = builder.getThreadFactory();
@@ -383,6 +387,19 @@ public void close() {
383387
}
384388
}
385389

390+
if (!isExternalRunSafeService) {
391+
((ExecutorService) runSafeService).shutdownNow();
392+
try
393+
{
394+
((ExecutorService) runSafeService).awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
395+
}
396+
catch ( InterruptedException e )
397+
{
398+
// Interrupted while interrupting; I give up.
399+
Thread.currentThread().interrupt();
400+
}
401+
}
402+
386403
if (ensembleTracker != null) {
387404
ensembleTracker.close();
388405
}

curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,21 @@
2525
import com.google.common.collect.Lists;
2626
import com.google.common.collect.Queues;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.List;
2930
import java.util.concurrent.BlockingQueue;
3031
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.Executor;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.ScheduledExecutorService;
3136
import java.util.concurrent.TimeUnit;
3237
import java.util.concurrent.atomic.AtomicBoolean;
3338
import java.util.concurrent.atomic.AtomicLong;
3439
import java.util.concurrent.atomic.AtomicReference;
40+
import java.util.stream.Stream;
41+
42+
import io.netty.util.concurrent.DefaultThreadFactory;
3543
import org.apache.curator.framework.CuratorFramework;
3644
import org.apache.curator.framework.CuratorFrameworkFactory;
3745
import org.apache.curator.framework.api.ACLProvider;
@@ -45,6 +53,7 @@
4553
import org.apache.curator.test.BaseClassForTests;
4654
import org.apache.curator.test.Timing;
4755
import org.apache.curator.utils.CloseableUtils;
56+
import org.apache.curator.utils.ThreadUtils;
4857
import org.apache.zookeeper.KeeperException.Code;
4958
import org.apache.zookeeper.data.ACL;
5059
import org.junit.jupiter.api.Test;
@@ -306,4 +315,49 @@ public void listen(OperationAndData<?> data) {
306315
CloseableUtils.closeQuietly(client);
307316
}
308317
}
318+
319+
@Test
320+
public void testCloseShutsDownInternalRunSafeService() {
321+
Timing timing = new Timing();
322+
CuratorFramework client = CuratorFrameworkFactory.newClient(
323+
server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
324+
client.start();
325+
client.runSafe(() -> {});
326+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("SafeNotifyService")));
327+
328+
client.close();
329+
330+
assertTrue(enumerateThreads().noneMatch(t -> t.getName().contains("SafeNotifyService")));
331+
}
332+
333+
@Test
334+
public void testCloseLeavesExternalRunSafeServiceRunning() throws Exception {
335+
Timing timing = new Timing();
336+
ExecutorService externalRunner =
337+
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("ExternalSafeNotifyService", true));
338+
CuratorFramework client = CuratorFrameworkFactory.builder()
339+
.connectString(server.getConnectString())
340+
.sessionTimeoutMs(timing.session())
341+
.connectionTimeoutMs(timing.connection())
342+
.retryPolicy(new RetryOneTime(1))
343+
.maxCloseWaitMs(timing.forWaiting().milliseconds())
344+
.runSafeService(externalRunner)
345+
.build();
346+
client.start();
347+
client.runSafe(() -> {});
348+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService")));
349+
350+
client.close();
351+
352+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService")));
353+
354+
externalRunner.shutdownNow();
355+
assertTrue(externalRunner.awaitTermination(10, TimeUnit.SECONDS));
356+
}
357+
358+
private static Stream<Thread> enumerateThreads() {
359+
Thread[] threads = new Thread[Thread.activeCount()];
360+
Thread.enumerate(threads);
361+
return Arrays.stream(threads);
362+
}
309363
}

0 commit comments

Comments
 (0)