Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,19 +301,25 @@ ListenableFuture<Void> initAsyncWithConnection(Connection reusedConnection) {
maxConnections / shardsCount + (maxConnections % shardsCount > 0 ? 1 : 0);
int toCreate = shardsCount * connectionsPerShard;

this.connections = new List[shardsCount];
scheduledForCreation = new AtomicInteger[shardsCount];
open = new AtomicInteger[shardsCount];
trash = new Set[shardsCount];
pendingBorrows = new Queue[shardsCount];
List<Connection>[] connectionsByShard = new List[shardsCount];
AtomicInteger[] scheduledForCreationByShard = new AtomicInteger[shardsCount];
AtomicInteger[] openByShard = new AtomicInteger[shardsCount];
Set<Connection>[] trashByShard = new Set[shardsCount];
Queue<PendingBorrow>[] pendingBorrowsByShard = new Queue[shardsCount];
for (int i = 0; i < shardsCount; ++i) {
this.connections[i] = new CopyOnWriteArrayList<Connection>();
scheduledForCreation[i] = new AtomicInteger();
open[i] = new AtomicInteger();
trash[i] = new CopyOnWriteArraySet<Connection>();
pendingBorrows[i] = new ConcurrentLinkedQueue<PendingBorrow>();
connectionsByShard[i] = new CopyOnWriteArrayList<Connection>();
scheduledForCreationByShard[i] = new AtomicInteger();
openByShard[i] = new AtomicInteger();
trashByShard[i] = new CopyOnWriteArraySet<Connection>();
pendingBorrowsByShard[i] = new ConcurrentLinkedQueue<PendingBorrow>();
}

this.connections = connectionsByShard;
scheduledForCreation = scheduledForCreationByShard;
open = openByShard;
trash = trashByShard;
pendingBorrows = pendingBorrowsByShard;
Comment on lines +317 to +321
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race window still remains which may cause a partial initialization.


final List<Connection> connections = Lists.newArrayListWithCapacity(toCreate);
final List<ListenableFuture<Void>> connectionFutures =
Lists.newArrayListWithCapacity(2 * toCreate);
Expand Down Expand Up @@ -995,44 +1001,75 @@ final CloseFuture closeAsync() {
}

int opened() {
AtomicInteger[] open = this.open;
if (open == null) {
return 0;
}

int result = 0;
for (AtomicInteger o : open) {
result += o.get();
if (o != null) {
result += o.get();
}
}
return result;
}

int trashed() {
Set<Connection>[] trash = this.trash;
if (trash == null) {
return 0;
}

int size = 0;
for (final Set<Connection> shardConnections : trash) {
size += shardConnections.size();
if (shardConnections != null) {
size += shardConnections.size();
}
}
return size;
}

private List<CloseFuture> discardAvailableConnections() {
// Note: if this gets called before initialization has completed, both connections and trash
// will be empty,
// so this will return an empty list
List<Connection>[] connections = this.connections;
Set<Connection>[] trash = this.trash;
if (connections == null || trash == null) {
return new ArrayList<CloseFuture>(0);
}

// Note: if this gets called before initialization has completed, connections and trash might
// still be partially populated, so null entries are ignored and this still returns a valid list

int size = 0;
for (final Set<Connection> shardConnections : trash) {
size += shardConnections.size();
if (shardConnections != null) {
size += shardConnections.size();
}
}
for (final List<Connection> shardConnections : connections) {
size += shardConnections.size();
if (shardConnections != null) {
size += shardConnections.size();
}
}
List<CloseFuture> futures = new ArrayList<CloseFuture>(size);

for (final List<Connection> shardConnections : connections) {
if (shardConnections == null) {
continue;
}
for (final Connection connection : shardConnections) {
CloseFuture future = connection.closeAsync();
future.addListener(
new Runnable() {
@Override
public void run() {
if (connection.state.compareAndSet(OPEN, GONE)) {
open[connection.shardId()].decrementAndGet();
AtomicInteger[] open = HostConnectionPool.this.open;
AtomicInteger openCount =
open != null && connection.shardId() < open.length
? open[connection.shardId()]
: null;
if (openCount != null && connection.state.compareAndSet(OPEN, GONE)) {
openCount.decrementAndGet();
}
}
},
Expand All @@ -1043,6 +1080,9 @@ public void run() {

// Some connections in the trash might still be open if they hadn't reached their idle timeout
for (final Set<Connection> shardConnections : trash) {
if (shardConnections == null) {
continue;
}
for (final Connection connection : shardConnections) {
futures.add(connection.closeAsync());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,26 @@ public void requests_with_enqueued_borrow_requests_should_be_failed_when_pool_cl
}
}

@Test(groups = "short")
public void should_close_uninitialized_pool_without_npe() throws Exception {
Cluster cluster = createClusterBuilder().build();
try {
Session session = cluster.connect();
Host host = TestUtils.findHost(cluster, 1);
SessionManager sessionManager = (SessionManager) session;
HostConnectionPool pool = new HostConnectionPool(host, HostDistance.LOCAL, sessionManager);

CloseFuture closeFuture = pool.closeAsync();

closeFuture.get(5, TimeUnit.SECONDS);
assertThat(pool.isClosed()).isTrue();
assertThat(pool.opened()).isEqualTo(0);
assertThat(pool.trashed()).isEqualTo(0);
} finally {
cluster.close();
}
}

/**
* Validates that if the keyspace tied to the Session's pool state is different than the keyspace
* on the connection being used in dequeue that {@link Connection#setKeyspaceAsync(String)} is set
Expand Down
Loading