From 4f9a052cefb6ee2a4b0ca7063fddaea4c265c06c Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Fri, 10 Apr 2026 00:25:33 +0200 Subject: [PATCH 1/2] feat: optimize system table queries with column projection (DRIVER-368) Backport of the 4.x DefaultTopologyMonitor optimization to the 3.x driver's ControlConnection. On the first query to each system table (system.local, system.peers, system.peers_v2) the driver sends SELECT * to discover available columns. It caches the intersection with an internal *_COLUMNS_OF_INTEREST set; subsequent queries project only those columns, reducing bytes on the wire and deserialization work. Changes in ControlConnection: - LOCAL/PEERS/PEERS_V2_COLUMNS_OF_INTEREST ImmutableSet constants - Volatile Set cache fields (null = uninitialized sentinel) - intersectWithNeeded() returns null on empty intersection to prevent invalid empty-column projections - buildProjectedQuery() static helper - Cache reset in setNewConnection() and peers_v2 fallback path - Projected queries in refreshNodeListAndTokenMap(), selectPeersFuture(), and fetchNodeInfo() (system.local only; peer WHERE lookups use SELECT * for Scassandra compatibility) Test changes: - ScassandraCluster: stable per-instance hostIdByNodeCount field; primes projected full-scan queries; re-primes after node restart - ControlConnectionTest: resets column caches before clearing primes - ControlConnectionUnitTest: 16 pure unit tests covering constants, intersectWithNeeded, buildProjectedQuery, and cache field declarations --- .../driver/core/ControlConnection.java | 255 +++++++++++++-- .../driver/core/ControlConnectionTest.java | 3 + .../core/ControlConnectionUnitTest.java | 290 ++++++++++++++++++ .../driver/core/ScassandraCluster.java | 106 ++++++- 4 files changed, 632 insertions(+), 22 deletions(-) create mode 100644 driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index be8c7c88068..f0b905b6e36 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -34,6 +34,7 @@ import com.datastax.driver.core.utils.MoreFutures; import com.datastax.driver.core.utils.MoreObjects; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -86,6 +87,91 @@ class ControlConnection implements Connection.Owner { private static final String SELECT_SCHEMA_LOCAL = "SELECT schema_version, host_id FROM system.local WHERE key='local'"; + // IMPORTANT: Every column read from system.local rows — in updateInfo(), + // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here. + // If a new column read is added anywhere that consumes a system table row, add it to the + // appropriate set below, otherwise it will be silently excluded from projected queries. + @VisibleForTesting + static final ImmutableSet LOCAL_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "cluster_name", + "partitioner", + "data_center", + "rack", + "release_version", + "native_address", + "native_port", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "rpc_address", + "broadcast_address", + "broadcast_port", + "listen_address", + "listen_port", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely + "rpc_address", + "data_center", + "rack", + "release_version", + "tokens", + "listen_address", + "listen_port", + "host_id", + "schema_version", + "native_address", // may appear on some server variants; guarded by contains() in code + "native_port", // same + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_V2_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", + "native_address", + "native_port", + "data_center", + "rack", + "release_version", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version", + "listen_address", + "listen_port", + "rpc_address", // legacy; guarded by contains() in code — harmless if absent + "native_transport_address", // same + "native_transport_port", // same + "native_transport_port_ssl"); // same + private static final VersionNumber _3_11 = VersionNumber.parse("3.11.0"); @VisibleForTesting @@ -102,10 +188,30 @@ class ControlConnection implements Connection.Owner { // from here on out. private volatile boolean isPeersV2 = true; + // Column projection caches. null = uninitialized: the first query to each system table issues + // SELECT * to discover which columns the server exposes, then subsequent queries project only + // the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set. + // Reset to null on every new connection so that the new server's schema is re-discovered. + private volatile Set localColumns = null; + private volatile Set peersColumns = null; + private volatile Set peersV2Columns = null; + public ControlConnection(Cluster.Manager manager) { this.cluster = manager; } + /** + * Resets the projected-column caches so that the next query to each system table sends {@code + * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra + * primes between driver operations. + */ + @VisibleForTesting + void resetColumnCaches() { + localColumns = null; + peersColumns = null; + peersV2Columns = null; + } + // Only for the initial connection. Does not schedule retries if it fails void connect() throws UnsupportedProtocolVersionException { if (isShutdown) return; @@ -234,6 +340,11 @@ private void signalError() { private void setNewConnection(Connection newConnection) { Host.statesLogger.debug("[Control connection] established to {}", newConnection.endPoint); newConnection.setOwner(this); + // Reset column caches so the new connection triggers a fresh SELECT * to discover which + // columns the server exposes before switching to projected queries. + localColumns = null; + peersColumns = null; + peersV2Columns = null; Connection old = connectionRef.getAndSet(newConnection); if (old != null && !old.isClosed()) old.closeAsync().force(); } @@ -491,26 +602,54 @@ private Row fetchNodeInfo(Host host, Connection c) if (isConnectedHost || host.getBroadcastSocketAddress() != null) { String query; if (isConnectedHost) { - query = SELECT_LOCAL; + query = + localColumns == null + ? SELECT_LOCAL + : buildProjectedQuery("system.local", localColumns, "key='local'"); } else { InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress(); - query = - isPeersV2 - ? SELECT_PEERS_V2 - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "' AND peer_port=" - + broadcastAddress.getPort() - : SELECT_PEERS - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "'"; + // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for + // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and + // every node has the projected full-scan prime registered. For WHERE lookups the control + // connection may query a node that was never restarted (and therefore still carries only + // the original SELECT * prime from init time), so projecting here risks a cache miss. + if (isPeersV2) { + String whereClause = + "peer='" + + broadcastAddress.getAddress().getHostAddress() + + "' AND peer_port=" + + broadcastAddress.getPort(); + query = SELECT_PEERS_V2 + " WHERE " + whereClause; + } else { + String whereClause = "peer='" + broadcastAddress.getAddress().getHostAddress() + "'"; + query = SELECT_PEERS + " WHERE " + whereClause; + } } DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(query)); c.write(future); - Row row = future.get().one(); + ResultSet rs = future.get(); + Row row = rs.one(); + // Populate the column cache on first successful WHERE lookup so that subsequent full-table + // scans via selectPeersFuture() can send projected queries. Only populate when a row is + // found: if the WHERE returned empty we are about to fall through to the full-scan path, + // which has its own cache-population logic; populating here on an empty result would cause + // selectPeersFuture() to send a projected query before it has verified the server supports + // the projected columns. if (row != null) { + if (isConnectedHost) { + if (localColumns == null) { + localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST); + } + } else if (isPeersV2) { + if (peersV2Columns == null) { + peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST); + } + } else { + if (peersColumns == null) { + peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST); + } + } return row; } else { InetSocketAddress address = host.getBroadcastSocketAddress(); @@ -717,9 +856,13 @@ private static void updateLocationInfo( */ private ListenableFuture selectPeersFuture(final Connection connection) { if (isPeersV2) { + String peersV2Query = + peersV2Columns == null + ? SELECT_PEERS_V2 + : buildProjectedQuery("system.peers_v2", peersV2Columns, null); DefaultResultSetFuture peersV2Future = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2)); + null, cluster.protocolVersion(), new Requests.Query(peersV2Query)); connection.write(peersV2Future); final SettableFuture peersFuture = SettableFuture.create(); // if peers v2 query fails, query peers table instead. @@ -729,6 +872,9 @@ private ListenableFuture selectPeersFuture(final Connection connectio @Override public void onSuccess(ResultSet result) { + if (peersV2Columns == null) { + peersV2Columns = intersectWithNeeded(result, PEERS_V2_COLUMNS_OF_INTEREST); + } peersFuture.set(result); } @@ -742,6 +888,9 @@ public void onFailure(Throwable t) { || (t instanceof ServerError && t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) { isPeersV2 = false; + // Also reset the peers cache so the first system.peers query issues SELECT * + // to discover which columns that table exposes on this server. + peersColumns = null; MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection)); } else { peersFuture.setException(t); @@ -751,14 +900,73 @@ public void onFailure(Throwable t) { MoreExecutors.directExecutor()); return peersFuture; } else { - DefaultResultSetFuture peersFuture = + String peersQuery = + peersColumns == null + ? SELECT_PEERS + : buildProjectedQuery("system.peers", peersColumns, null); + DefaultResultSetFuture rawFuture = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS)); - connection.write(peersFuture); + null, cluster.protocolVersion(), new Requests.Query(peersQuery)); + connection.write(rawFuture); + final SettableFuture peersFuture = SettableFuture.create(); + Futures.addCallback( + rawFuture, + new FutureCallback() { + @Override + public void onSuccess(ResultSet result) { + if (peersColumns == null) { + peersColumns = intersectWithNeeded(result, PEERS_COLUMNS_OF_INTEREST); + } + peersFuture.set(result); + } + + @Override + public void onFailure(Throwable t) { + peersFuture.setException(t); + } + }, + MoreExecutors.directExecutor()); return peersFuture; } } + /** + * Returns the intersection of the columns returned by the server (from {@code rs}) with the given + * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache + * projected column lists so subsequent queries fetch only what the driver actually reads. A + * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver + * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection. + */ + @VisibleForTesting + static Set intersectWithNeeded(ResultSet rs, ImmutableSet needed) { + ImmutableSet.Builder result = ImmutableSet.builder(); + for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) { + if (needed.contains(def.getName())) { + result.add(def.getName()); + } + } + ImmutableSet built = result.build(); + return built.isEmpty() ? null : built; + } + + /** + * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the + * given projected column set. {@code whereClause} may be {@code null} for table-wide scans. + */ + @VisibleForTesting + static String buildProjectedQuery(String table, Set columns, String whereClause) { + StringBuilder sb = new StringBuilder("SELECT "); + boolean first = true; + for (String col : columns) { + if (!first) sb.append(", "); + sb.append(col); + first = false; + } + sb.append(" FROM ").append(table); + if (whereClause != null) sb.append(" WHERE ").append(whereClause); + return sb.toString(); + } + private void refreshNodeListAndTokenMap( final Connection connection, final Cluster.Manager cluster, @@ -772,9 +980,12 @@ private void refreshNodeListAndTokenMap( // Make sure we're up to date on nodes and tokens + String localQuery = + localColumns == null + ? SELECT_LOCAL + : buildProjectedQuery("system.local", localColumns, "key='local'"); DefaultResultSetFuture localFuture = - new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)); + new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(localQuery)); ListenableFuture peersFuture = selectPeersFuture(connection); connection.write(localFuture); @@ -783,7 +994,11 @@ private void refreshNodeListAndTokenMap( Map> tokenMap = new HashMap>(); // Update cluster name, DC and rack for the one node we are connected to - Row localRow = localFuture.get().one(); + ResultSet localRs = localFuture.get(); + if (localColumns == null) { + localColumns = intersectWithNeeded(localRs, LOCAL_COLUMNS_OF_INTEREST); + } + Row localRow = localRs.one(); if (localRow == null) { throw new IllegalStateException( String.format( diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java index a925ecbb866..69f1bf9e281 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java @@ -456,6 +456,9 @@ public void should_fetch_whole_peers_table_if_broadcast_address_changed() .build(); scassandras.node(1).primingClient().clearAllPrimes(); + // Reset the column caches so the driver re-discovers columns via SELECT * rather than + // sending projected queries against the now-cleared Scassandra primes. + cluster.manager.controlConnection.resetColumnCaches(); // the driver will attempt to locate host2 in system.peers by its old broadcast address, and // that will fail diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java new file mode 100644 index 00000000000..3e2ad95357f --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java @@ -0,0 +1,290 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.Set; +import org.testng.annotations.Test; + +/** + * Pure unit tests for the column-projection helpers and caching fields added to {@link + * ControlConnection} by DRIVER-368. + * + *

These tests do not require a running Cassandra/Scylla node. For integration-level tests see + * {@link ControlConnectionTest}. + */ +public class ControlConnectionUnitTest { + + // --------------------------------------------------------------------------- + // *_COLUMNS_OF_INTEREST constants + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testLocalColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "cluster_name", + "tokens", + "host_id", + "native_address", + "dse_version", + "rpc_address", + "schema_version", + "data_center", + "rack", + "release_version", + "partitioner"); + } + + @Test(groups = "unit") + public void testLocalColumnsOfInterestSize() { + // 21 columns as documented in the constant declaration + assertThat(ControlConnection.LOCAL_COLUMNS_OF_INTEREST).hasSize(21); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = ControlConnection.PEERS_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "rpc_address", + "tokens", + "native_address", + "native_port", + "native_transport_address", + "data_center", + "rack", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestSize() { + // 19 columns: original 16 + peer_port, native_address, native_port + assertThat(ControlConnection.PEERS_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "native_address", + "native_port", + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "data_center", + "rack", + "tokens", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestSize() { + // 19 columns: original 15 + rpc_address, native_transport_address/port/port_ssl + assertThat(ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ContainsLegacyColumns() { + // rpc_address, native_transport_address/port/port_ssl are legacy columns the driver reads + // with contains() guards. They are included so they are not silently dropped if a server + // exposes them in peers_v2. + assertThat(ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST) + .contains( + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl"); + } + + // --------------------------------------------------------------------------- + // intersectWithNeeded + // --------------------------------------------------------------------------- + + /** Helper: build a mock ResultSet whose column definitions contain exactly the given names. */ + private static ResultSet mockResultSetWithColumns(String... columnNames) { + ColumnDefinitions.Definition[] defs = new ColumnDefinitions.Definition[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + defs[i] = + new ColumnDefinitions.Definition("system", "local", columnNames[i], DataType.text()); + } + ColumnDefinitions colDefs = new ColumnDefinitions(defs, CodecRegistry.DEFAULT_INSTANCE); + + ResultSet rs = mock(ResultSet.class); + when(rs.getColumnDefinitions()).thenReturn(colDefs); + return rs; + } + + @Test(groups = "unit") + public void testIntersectWithNeededReturnsSupersetIntersection() { + // RS has all LOCAL columns plus some extras; result should be exactly LOCAL_COLUMNS_OF_INTEREST + ImmutableSet needed = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + String[] base = needed.asList().toArray(new String[0]); + // Append two extra columns not in the interest set + String[] extended = Arrays.copyOf(base, base.length + 2); + extended[base.length] = "extra_col_1"; + extended[base.length + 1] = "extra_col_2"; + + ResultSet rs = mockResultSetWithColumns(extended); + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).isEqualTo(needed); + assertThat(result).doesNotContain("extra_col_1", "extra_col_2"); + } + + @Test(groups = "unit") + public void testIntersectWithNeededHandlesSubset() { + // RS only exposes a subset of the needed columns + ImmutableSet needed = + ImmutableSet.of("cluster_name", "tokens", "host_id", "schema_version"); + ResultSet rs = mockResultSetWithColumns("cluster_name", "tokens"); + + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).containsOnly("cluster_name", "tokens"); + assertThat(result).hasSize(2); + } + + @Test(groups = "unit") + public void testIntersectWithNeededNoOverlapReturnsNull() { + // When no server columns match the needed set, the result should be null so the cache remains + // in the uninitialized sentinel state (avoids generating an empty-column SELECT projection). + ImmutableSet needed = ImmutableSet.of("cluster_name", "tokens"); + ResultSet rs = mockResultSetWithColumns("some_other_col", "another_col"); + + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + @Test(groups = "unit") + public void testIntersectWithNeededEmptyResultSetReturnsNull() { + // An empty ResultSet has no column definitions, so the intersection is empty → null. + ImmutableSet needed = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + ResultSet rs = mockResultSetWithColumns(); + + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + // --------------------------------------------------------------------------- + // buildProjectedQuery + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testBuildProjectedQueryWithWhereClause() { + Set columns = ImmutableSet.of("cluster_name", "host_id"); + String query = ControlConnection.buildProjectedQuery("system.local", columns, "key='local'"); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("cluster_name"); + assertThat(query).contains("host_id"); + assertThat(query).contains(" FROM system.local"); + assertThat(query).contains(" WHERE key='local'"); + // Should not contain SELECT * + assertThat(query).doesNotContain("*"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryWithoutWhereClause() { + Set columns = ImmutableSet.of("peer", "rpc_address", "tokens"); + String query = ControlConnection.buildProjectedQuery("system.peers", columns, null); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("peer"); + assertThat(query).contains("rpc_address"); + assertThat(query).contains("tokens"); + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + @Test(groups = "unit") + public void testBuildProjectedQuerySingleColumn() { + Set columns = ImmutableSet.of("host_id"); + String query = ControlConnection.buildProjectedQuery("system.local", columns, null); + + assertThat(query).isEqualTo("SELECT host_id FROM system.local"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryAllColumnsPresent() { + // Every column in the needed set must appear as an exact identifier in the projected SELECT + // list. Use exact parsing to avoid false positives where one column name is a substring of + // another (e.g. "native_port" inside "native_transport_port"). + Set columns = ControlConnection.PEERS_COLUMNS_OF_INTEREST; + String query = ControlConnection.buildProjectedQuery("system.peers", columns, null); + Set selectedColumns = extractSelectedColumns(query); + + for (String col : columns) { + assertThat(selectedColumns).as("query should project column: " + col).contains(col); + } + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + /** + * Parses the column identifiers from the {@code SELECT col1, col2, ... FROM ...} portion of a + * projected query string and returns them as a set of trimmed names. + */ + private Set extractSelectedColumns(String query) { + int selectStart = query.indexOf("SELECT "); + int fromStart = query.indexOf(" FROM "); + assertThat(selectStart).as("query should start with SELECT").isEqualTo(0); + assertThat(fromStart).as("query should contain FROM").isGreaterThan(selectStart); + String columnList = query.substring("SELECT ".length(), fromStart); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String col : columnList.split(",")) { + builder.add(col.trim()); + } + return builder.build(); + } + + // --------------------------------------------------------------------------- + // Cache fields: declared as volatile, private, instance-level Set + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testCacheFieldsAreVolatilePrivateInstanceSets() throws Exception { + for (String fieldName : new String[] {"localColumns", "peersColumns", "peersV2Columns"}) { + Field field = ControlConnection.class.getDeclaredField(fieldName); + int mods = field.getModifiers(); + + assertThat(Modifier.isVolatile(mods)).as(fieldName + " should be volatile").isTrue(); + assertThat(Modifier.isPrivate(mods)).as(fieldName + " should be private").isTrue(); + assertThat(Modifier.isStatic(mods)).as(fieldName + " must be an instance field").isFalse(); + assertThat(Set.class.isAssignableFrom(field.getType())) + .as(fieldName + " declared type should be Set") + .isTrue(); + } + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index 83078670d7e..6d50ddfd57b 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -82,6 +82,13 @@ public class ScassandraCluster { private final boolean peersV2; + /** + * One stable UUID per node (keyed by 1-based nodeCount), computed once at construction so that + * system.local and system.peers rows for the same node always carry the same host_id regardless + * of which Scassandra process is being primed or how many times primeMetadata() is called. + */ + private final Map hostIdByNodeCount; + ScassandraCluster( Integer[] nodes, String ipPrefix, @@ -148,6 +155,17 @@ public class ScassandraCluster { instances = instanceListBuilder.build(); dcNodeMap = dcNodeMapBuilder.build(); + // Compute stable host_id UUIDs once so every primeMetadata() call uses the same values. + Map hostIds = new HashMap<>(); + int tempCount = 1; + for (Integer dc : new TreeSet(dcNodeMap.keySet())) { + for (int n = 0; n < dcNodeMap.get(dc).size(); n++) { + hostIds.put(tempCount, UUIDs.random()); + tempCount++; + } + } + this.hostIdByNodeCount = hostIds; + // Prime correct keyspace table based on C* version. String[] versionArray = this.cassandraVersion.split("\\.|-"); double major = Double.parseDouble(versionArray[0] + "." + versionArray[1]); @@ -357,6 +375,11 @@ public void start(Cluster cluster, int node) { logger.debug("Starting node {}.", node); Scassandra scassandra = node(node); scassandra.start(); + // Re-prime after restart: Scassandra loses all primes when its process restarts. + // Without re-priming, the driver may query an unprimed node (e.g. if the control + // connection temporarily reconnects to this host), get empty system table responses, + // and fail to bring the node back up within the allowed window. + primeMetadata(scassandra); assertThat(cluster).host(node).comesUpWithin(10, TimeUnit.SECONDS); } @@ -386,6 +409,7 @@ public List getTokensForDC(int dc) { private void primeMetadata(Scassandra node) { PrimingClient client = node.primingClient(); + int nodeCount = 1; ImmutableList.Builder> rows = ImmutableList.builder(); @@ -396,6 +420,7 @@ private void primeMetadata(Scassandra node) { for (int n = 0; n < nodesInDc.size(); n++) { InetSocketAddress binaryAddress = address(nodeCount); InetSocketAddress listenAddress = listenAddress(nodeCount); + java.util.UUID hostId = hostIdByNodeCount.get(nodeCount); nodeCount++; Scassandra peer = nodesInDc.get(n); if (node == peer) { // prime system.local. @@ -423,7 +448,7 @@ private void primeMetadata(Scassandra node) { "release_version", getPeerInfo(dc, n + 1, "release_version", cassandraVersion)); addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(tokens.get(n))); - addPeerInfo(row, dc, n + 1, "host_id", UUIDs.random()); + addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(row, dc, n + 1, "schema_version", schemaVersion); addPeerInfo(row, dc, n + 1, "graph", false); @@ -444,6 +469,18 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata(SELECT_LOCAL, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } else { addPeerInfo(row, dc, n + 1, "broadcast_port", listenAddress.getPort()); addPeerInfo(row, dc, n + 1, "listen_port", listenAddress.getPort()); @@ -456,6 +493,20 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocalV2 = + projectedColumnMetadata( + SELECT_LOCAL_V2, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery( + projectedQueryString(projectedLocalV2, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocalV2) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } } else { // prime system.peers. Map row = Maps.newHashMap(); @@ -489,7 +540,6 @@ private void primeMetadata(Scassandra node) { addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); addPeerInfo(rowV2, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); - java.util.UUID hostId = UUIDs.random(); addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(rowV2, dc, n + 1, "host_id", hostId); @@ -546,6 +596,14 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers") .withThen(then().withColumnTypes(SELECT_PEERS).withRows(rows.build()).build()) .build()); + // Also prime the projected full-scan that the driver sends after the cache is warm. + ColumnMetadata[] projectedPeersFullScan = + projectedColumnMetadata(SELECT_PEERS, ControlConnection.PEERS_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersFullScan, "system.peers", null)) + .withThen(then().withColumnTypes(projectedPeersFullScan).withRows(rows.build()).build()) + .build()); // return invalid error for peers_v2, indicating the table doesn't exist. if (!peersV2) { @@ -560,6 +618,15 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers_v2") .withThen(then().withColumnTypes(SELECT_PEERS_V2).withRows(rowsV2.build()).build()) .build()); + // Also prime the projected full-scan for peers_v2. + ColumnMetadata[] projectedPeersV2FullScan = + projectedColumnMetadata(SELECT_PEERS_V2, ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersV2FullScan, "system.peers_v2", null)) + .withThen( + then().withColumnTypes(projectedPeersV2FullScan).withRows(rowsV2.build()).build()) + .build()); } // Needed to ensure cluster_name matches what we expect on connection. @@ -751,6 +818,29 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu column("validator", TEXT), }; + /** Returns the subset of {@code full} whose names are in {@code interest}, preserving order. */ + private static ColumnMetadata[] projectedColumnMetadata( + ColumnMetadata[] full, Set interest) { + List result = new ArrayList<>(); + for (ColumnMetadata col : full) { + if (interest.contains(col.getName())) result.add(col); + } + return result.toArray(new ColumnMetadata[0]); + } + + /** Builds a projected SELECT query string from a ColumnMetadata array. */ + private static String projectedQueryString( + ColumnMetadata[] cols, String table, String whereClause) { + StringBuilder sb = new StringBuilder("SELECT "); + for (int i = 0; i < cols.length; i++) { + if (i > 0) sb.append(", "); + sb.append(cols[i].getName()); + } + sb.append(" FROM ").append(table); + if (whereClause != null) sb.append(" WHERE ").append(whereClause); + return sb.toString(); + } + // Primes a minimal system.local row on an Scassandra node. // We need a host_id so that the driver can store it in Metadata.hosts public static void primeSystemLocalRow(Scassandra scassandra) { @@ -767,6 +857,18 @@ public static void primeSystemLocalRow(Scassandra scassandra) { .withColumnTypes( localMetadata.toArray(new ColumnMetadata[localMetadata.size()])) .withRows(Collections.>singletonList(row)))); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata(SELECT_LOCAL, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + scassandra + .primingClient() + .prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)))); } public static ScassandraClusterBuilder builder() { From db35501df203716dcd31ab2922ee428412db28c3 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Fri, 10 Apr 2026 10:33:31 +0200 Subject: [PATCH 2/2] fix: reset column caches in tryConnect() before reconnection queries When the control connection reconnects to a new host, tryConnect() calls refreshNodeListAndTokenMap() before setNewConnection() is called. This meant the stale localColumns/peersColumns/peersV2Columns from the previous connection were used to build projected queries, which could fail if the new host (e.g. a Scassandra mock) only has the unfiltered SELECT * primed. Fix by resetting the three column caches to null at the start of the try block in tryConnect(), before the first query is issued. The reset in setNewConnection() remains as a safety net but is now redundant for the reconnection case. --- .../java/com/datastax/driver/core/ControlConnection.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index f0b905b6e36..a0529622880 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -437,6 +437,14 @@ private Connection tryConnect(Host host, boolean isInitialConnection) ProtocolEvent.Type.SCHEMA_CHANGE); connection.write(new Requests.Register(evs)); + // Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover + // which columns this server exposes, rather than a projected query built for the + // previous connection's server. setNewConnection() also resets these, but it is + // called after tryConnect() returns — too late to protect the queries below. + localColumns = null; + peersColumns = null; + peersV2Columns = null; + // We need to refresh the node list first so we know about the cassandra version of // the node we're connecting to. // This will create the token map for the first time, but it will be incomplete