From 758d17e3ee87609854bb6a40d4e19e118ef8cdd3 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 9 Apr 2026 14:46:48 -0400 Subject: [PATCH] Fix pool close race during initialization --- .../driver/core/HostConnectionPool.java | 78 ++++++++++++++----- .../driver/core/HostConnectionPoolTest.java | 20 +++++ 2 files changed, 79 insertions(+), 19 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java index a27a70de265..e5a00e9c2a9 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java +++ b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java @@ -301,19 +301,25 @@ ListenableFuture initAsyncWithConnection(Connection reusedConnection) { maxConnections / shardsCount + (maxConnections % shardsCount > 0 ? 1 : 0); int toCreate = shardsCount * connectionsPerShard; - this.connections = new List[shardsCount]; - scheduledForCreation = new AtomicInteger[shardsCount]; - open = new AtomicInteger[shardsCount]; - trash = new Set[shardsCount]; - pendingBorrows = new Queue[shardsCount]; + List[] connectionsByShard = new List[shardsCount]; + AtomicInteger[] scheduledForCreationByShard = new AtomicInteger[shardsCount]; + AtomicInteger[] openByShard = new AtomicInteger[shardsCount]; + Set[] trashByShard = new Set[shardsCount]; + Queue[] pendingBorrowsByShard = new Queue[shardsCount]; for (int i = 0; i < shardsCount; ++i) { - this.connections[i] = new CopyOnWriteArrayList(); - scheduledForCreation[i] = new AtomicInteger(); - open[i] = new AtomicInteger(); - trash[i] = new CopyOnWriteArraySet(); - pendingBorrows[i] = new ConcurrentLinkedQueue(); + connectionsByShard[i] = new CopyOnWriteArrayList(); + scheduledForCreationByShard[i] = new AtomicInteger(); + openByShard[i] = new AtomicInteger(); + trashByShard[i] = new CopyOnWriteArraySet(); + pendingBorrowsByShard[i] = new ConcurrentLinkedQueue(); } + this.connections = connectionsByShard; + scheduledForCreation = scheduledForCreationByShard; + open = openByShard; + trash = trashByShard; + pendingBorrows = pendingBorrowsByShard; + final List connections = Lists.newArrayListWithCapacity(toCreate); final List> connectionFutures = Lists.newArrayListWithCapacity(2 * toCreate); @@ -995,44 +1001,75 @@ final CloseFuture closeAsync() { } int opened() { + AtomicInteger[] open = this.open; + if (open == null) { + return 0; + } + int result = 0; for (AtomicInteger o : open) { - result += o.get(); + if (o != null) { + result += o.get(); + } } return result; } int trashed() { + Set[] trash = this.trash; + if (trash == null) { + return 0; + } + int size = 0; for (final Set shardConnections : trash) { - size += shardConnections.size(); + if (shardConnections != null) { + size += shardConnections.size(); + } } return size; } private List discardAvailableConnections() { - // Note: if this gets called before initialization has completed, both connections and trash - // will be empty, - // so this will return an empty list + List[] connections = this.connections; + Set[] trash = this.trash; + if (connections == null || trash == null) { + return new ArrayList(0); + } + + // Note: if this gets called before initialization has completed, connections and trash might + // still be partially populated, so null entries are ignored and this still returns a valid list int size = 0; for (final Set shardConnections : trash) { - size += shardConnections.size(); + if (shardConnections != null) { + size += shardConnections.size(); + } } for (final List shardConnections : connections) { - size += shardConnections.size(); + if (shardConnections != null) { + size += shardConnections.size(); + } } List futures = new ArrayList(size); for (final List shardConnections : connections) { + if (shardConnections == null) { + continue; + } for (final Connection connection : shardConnections) { CloseFuture future = connection.closeAsync(); future.addListener( new Runnable() { @Override public void run() { - if (connection.state.compareAndSet(OPEN, GONE)) { - open[connection.shardId()].decrementAndGet(); + AtomicInteger[] open = HostConnectionPool.this.open; + AtomicInteger openCount = + open != null && connection.shardId() < open.length + ? open[connection.shardId()] + : null; + if (openCount != null && connection.state.compareAndSet(OPEN, GONE)) { + openCount.decrementAndGet(); } } }, @@ -1043,6 +1080,9 @@ public void run() { // Some connections in the trash might still be open if they hadn't reached their idle timeout for (final Set shardConnections : trash) { + if (shardConnections == null) { + continue; + } for (final Connection connection : shardConnections) { futures.add(connection.closeAsync()); } diff --git a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java index 6147b7d0f09..88964995887 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java @@ -300,6 +300,26 @@ public void requests_with_enqueued_borrow_requests_should_be_failed_when_pool_cl } } + @Test(groups = "short") + public void should_close_uninitialized_pool_without_npe() throws Exception { + Cluster cluster = createClusterBuilder().build(); + try { + Session session = cluster.connect(); + Host host = TestUtils.findHost(cluster, 1); + SessionManager sessionManager = (SessionManager) session; + HostConnectionPool pool = new HostConnectionPool(host, HostDistance.LOCAL, sessionManager); + + CloseFuture closeFuture = pool.closeAsync(); + + closeFuture.get(5, TimeUnit.SECONDS); + assertThat(pool.isClosed()).isTrue(); + assertThat(pool.opened()).isEqualTo(0); + assertThat(pool.trashed()).isEqualTo(0); + } finally { + cluster.close(); + } + } + /** * Validates that if the keyspace tied to the Session's pool state is different than the keyspace * on the connection being used in dequeue that {@link Connection#setKeyspaceAsync(String)} is set