Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.datastax.driver.core.utils.MoreFutures;
import com.datastax.driver.core.utils.MoreObjects;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -86,6 +87,91 @@ class ControlConnection implements Connection.Owner {
private static final String SELECT_SCHEMA_LOCAL =
"SELECT schema_version, host_id FROM system.local WHERE key='local'";

// IMPORTANT: Every column read from system.local rows — in updateInfo(),
// refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here.
// If a new column read is added anywhere that consumes a system table row, add it to the
// appropriate set below, otherwise it will be silently excluded from projected queries.
@VisibleForTesting
static final ImmutableSet<String> LOCAL_COLUMNS_OF_INTEREST =
ImmutableSet.of(
"cluster_name",
"partitioner",
"data_center",
"rack",
"release_version",
"native_address",
"native_port",
"native_transport_address",
"native_transport_port",
"native_transport_port_ssl",
"rpc_address",
"broadcast_address",
"broadcast_port",
"listen_address",
"listen_port",
"tokens",
"host_id",
"schema_version",
"workload",
"graph",
"dse_version");

// IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
// Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
// isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows.
// Columns that are absent from the actual server schema are silently excluded by
// intersectWithNeeded(), so listing extra columns here is safe.
@VisibleForTesting
static final ImmutableSet<String> PEERS_COLUMNS_OF_INTEREST =
ImmutableSet.of(
"peer",
"peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely
"rpc_address",
"data_center",
"rack",
"release_version",
"tokens",
"listen_address",
"listen_port",
"host_id",
"schema_version",
"native_address", // may appear on some server variants; guarded by contains() in code
"native_port", // same
"native_transport_address",
"native_transport_port",
"native_transport_port_ssl",
"workload",
"graph",
"dse_version");

// IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
// Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
// isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows.
// Columns that are absent from the actual server schema are silently excluded by
// intersectWithNeeded(), so listing extra columns here is safe.
@VisibleForTesting
static final ImmutableSet<String> PEERS_V2_COLUMNS_OF_INTEREST =
ImmutableSet.of(
"peer",
"peer_port",
"native_address",
"native_port",
"data_center",
"rack",
"release_version",
"tokens",
"host_id",
"schema_version",
"workload",
"graph",
"dse_version",
"listen_address",
"listen_port",
"rpc_address", // legacy; guarded by contains() in code — harmless if absent
"native_transport_address", // same
"native_transport_port", // same
"native_transport_port_ssl"); // same

private static final VersionNumber _3_11 = VersionNumber.parse("3.11.0");

@VisibleForTesting
Expand All @@ -102,10 +188,30 @@ class ControlConnection implements Connection.Owner {
// from here on out.
private volatile boolean isPeersV2 = true;

// Column projection caches. null = uninitialized: the first query to each system table issues
// SELECT * to discover which columns the server exposes, then subsequent queries project only
// the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set.
// Reset to null on every new connection so that the new server's schema is re-discovered.
private volatile Set<String> localColumns = null;
private volatile Set<String> peersColumns = null;
private volatile Set<String> peersV2Columns = null;

public ControlConnection(Cluster.Manager manager) {
this.cluster = manager;
}

/**
* Resets the projected-column caches so that the next query to each system table sends {@code
* SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra
* primes between driver operations.
*/
@VisibleForTesting
void resetColumnCaches() {
localColumns = null;
peersColumns = null;
peersV2Columns = null;
}

// Only for the initial connection. Does not schedule retries if it fails
void connect() throws UnsupportedProtocolVersionException {
if (isShutdown) return;
Expand Down Expand Up @@ -234,6 +340,11 @@ private void signalError() {
private void setNewConnection(Connection newConnection) {
Host.statesLogger.debug("[Control connection] established to {}", newConnection.endPoint);
newConnection.setOwner(this);
// Reset column caches so the new connection triggers a fresh SELECT * to discover which
// columns the server exposes before switching to projected queries.
localColumns = null;
peersColumns = null;
peersV2Columns = null;
Connection old = connectionRef.getAndSet(newConnection);
if (old != null && !old.isClosed()) old.closeAsync().force();
}
Expand Down Expand Up @@ -326,6 +437,14 @@ private Connection tryConnect(Host host, boolean isInitialConnection)
ProtocolEvent.Type.SCHEMA_CHANGE);
connection.write(new Requests.Register(evs));

// Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover
// which columns this server exposes, rather than a projected query built for the
// previous connection's server. setNewConnection() also resets these, but it is
// called after tryConnect() returns — too late to protect the queries below.
localColumns = null;
peersColumns = null;
peersV2Columns = null;

// We need to refresh the node list first so we know about the cassandra version of
// the node we're connecting to.
// This will create the token map for the first time, but it will be incomplete
Expand Down Expand Up @@ -491,26 +610,54 @@ private Row fetchNodeInfo(Host host, Connection c)
if (isConnectedHost || host.getBroadcastSocketAddress() != null) {
String query;
if (isConnectedHost) {
query = SELECT_LOCAL;
query =
localColumns == null
? SELECT_LOCAL
: buildProjectedQuery("system.local", localColumns, "key='local'");
} else {
InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress();
query =
isPeersV2
? SELECT_PEERS_V2
+ " WHERE peer='"
+ broadcastAddress.getAddress().getHostAddress()
+ "' AND peer_port="
+ broadcastAddress.getPort()
: SELECT_PEERS
+ " WHERE peer='"
+ broadcastAddress.getAddress().getHostAddress()
+ "'";
// Always use SELECT * for single-row WHERE lookups. Projected queries are only used for
// full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and
// every node has the projected full-scan prime registered. For WHERE lookups the control
// connection may query a node that was never restarted (and therefore still carries only
// the original SELECT * prime from init time), so projecting here risks a cache miss.
if (isPeersV2) {
String whereClause =
"peer='"
+ broadcastAddress.getAddress().getHostAddress()
+ "' AND peer_port="
+ broadcastAddress.getPort();
query = SELECT_PEERS_V2 + " WHERE " + whereClause;
} else {
String whereClause = "peer='" + broadcastAddress.getAddress().getHostAddress() + "'";
query = SELECT_PEERS + " WHERE " + whereClause;
}
}
DefaultResultSetFuture future =
new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(query));
c.write(future);
Row row = future.get().one();
ResultSet rs = future.get();
Row row = rs.one();
// Populate the column cache on first successful WHERE lookup so that subsequent full-table
// scans via selectPeersFuture() can send projected queries. Only populate when a row is
// found: if the WHERE returned empty we are about to fall through to the full-scan path,
// which has its own cache-population logic; populating here on an empty result would cause
// selectPeersFuture() to send a projected query before it has verified the server supports
// the projected columns.
if (row != null) {
if (isConnectedHost) {
if (localColumns == null) {
localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST);
}
} else if (isPeersV2) {
if (peersV2Columns == null) {
peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST);
}
} else {
if (peersColumns == null) {
peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST);
}
}
return row;
} else {
InetSocketAddress address = host.getBroadcastSocketAddress();
Expand Down Expand Up @@ -717,9 +864,13 @@ private static void updateLocationInfo(
*/
private ListenableFuture<ResultSet> selectPeersFuture(final Connection connection) {
if (isPeersV2) {
String peersV2Query =
peersV2Columns == null
? SELECT_PEERS_V2
: buildProjectedQuery("system.peers_v2", peersV2Columns, null);
DefaultResultSetFuture peersV2Future =
new DefaultResultSetFuture(
null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2));
null, cluster.protocolVersion(), new Requests.Query(peersV2Query));
connection.write(peersV2Future);
final SettableFuture<ResultSet> peersFuture = SettableFuture.create();
// if peers v2 query fails, query peers table instead.
Expand All @@ -729,6 +880,9 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio

@Override
public void onSuccess(ResultSet result) {
if (peersV2Columns == null) {
peersV2Columns = intersectWithNeeded(result, PEERS_V2_COLUMNS_OF_INTEREST);
}
peersFuture.set(result);
}

Expand All @@ -742,6 +896,9 @@ public void onFailure(Throwable t) {
|| (t instanceof ServerError
&& t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) {
isPeersV2 = false;
// Also reset the peers cache so the first system.peers query issues SELECT *
// to discover which columns that table exposes on this server.
peersColumns = null;
MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection));
} else {
peersFuture.setException(t);
Expand All @@ -751,14 +908,73 @@ public void onFailure(Throwable t) {
MoreExecutors.directExecutor());
return peersFuture;
} else {
DefaultResultSetFuture peersFuture =
String peersQuery =
peersColumns == null
? SELECT_PEERS
: buildProjectedQuery("system.peers", peersColumns, null);
DefaultResultSetFuture rawFuture =
new DefaultResultSetFuture(
null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
connection.write(peersFuture);
null, cluster.protocolVersion(), new Requests.Query(peersQuery));
connection.write(rawFuture);
final SettableFuture<ResultSet> peersFuture = SettableFuture.create();
Futures.addCallback(
rawFuture,
new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
if (peersColumns == null) {
peersColumns = intersectWithNeeded(result, PEERS_COLUMNS_OF_INTEREST);
}
peersFuture.set(result);
}

@Override
public void onFailure(Throwable t) {
peersFuture.setException(t);
}
},
MoreExecutors.directExecutor());
return peersFuture;
}
}

/**
* Returns the intersection of the columns returned by the server (from {@code rs}) with the given
* {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache
* projected column lists so subsequent queries fetch only what the driver actually reads. A
* {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver
* continues issuing {@code SELECT *} rather than generating an invalid empty-column projection.
*/
@VisibleForTesting
static Set<String> intersectWithNeeded(ResultSet rs, ImmutableSet<String> needed) {
ImmutableSet.Builder<String> result = ImmutableSet.builder();
for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) {
if (needed.contains(def.getName())) {
result.add(def.getName());
}
}
ImmutableSet<String> built = result.build();
return built.isEmpty() ? null : built;
}

/**
* Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the
* given projected column set. {@code whereClause} may be {@code null} for table-wide scans.
*/
@VisibleForTesting
static String buildProjectedQuery(String table, Set<String> columns, String whereClause) {
StringBuilder sb = new StringBuilder("SELECT ");
boolean first = true;
for (String col : columns) {
if (!first) sb.append(", ");
sb.append(col);
first = false;
}
sb.append(" FROM ").append(table);
if (whereClause != null) sb.append(" WHERE ").append(whereClause);
return sb.toString();
}
Comment on lines +965 to +976
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildProjectedQuery() will produce an invalid CQL statement if columns is empty (it becomes SELECT FROM ...). Since intersectWithNeeded() can legitimately return an empty set if the server/prime doesn't expose any of the needed columns, this can turn a schema mismatch into a confusing syntax error and break subsequent refreshes once the empty set is cached. Add a guard to fall back to SELECT * (or keep the cache as null) when columns.isEmpty().

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. intersectWithNeeded() now returns null instead of an empty set when no server columns overlap the needed set. Since null is the uninitialised sentinel value for the cache fields, this keeps the cache in the "needs discovery" state and ensures buildProjectedQuery() is never called with an empty column list — the driver will continue issuing SELECT * until it gets a non-empty intersection.


private void refreshNodeListAndTokenMap(
final Connection connection,
final Cluster.Manager cluster,
Expand All @@ -772,9 +988,12 @@ private void refreshNodeListAndTokenMap(

// Make sure we're up to date on nodes and tokens

String localQuery =
localColumns == null
? SELECT_LOCAL
: buildProjectedQuery("system.local", localColumns, "key='local'");
DefaultResultSetFuture localFuture =
new DefaultResultSetFuture(
null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL));
new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(localQuery));
ListenableFuture<ResultSet> peersFuture = selectPeersFuture(connection);
connection.write(localFuture);

Expand All @@ -783,7 +1002,11 @@ private void refreshNodeListAndTokenMap(
Map<Host, Set<Token>> tokenMap = new HashMap<Host, Set<Token>>();

// Update cluster name, DC and rack for the one node we are connected to
Row localRow = localFuture.get().one();
ResultSet localRs = localFuture.get();
if (localColumns == null) {
localColumns = intersectWithNeeded(localRs, LOCAL_COLUMNS_OF_INTEREST);
}
Row localRow = localRs.one();
if (localRow == null) {
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ public void should_fetch_whole_peers_table_if_broadcast_address_changed()
.build();

scassandras.node(1).primingClient().clearAllPrimes();
// Reset the column caches so the driver re-discovers columns via SELECT * rather than
// sending projected queries against the now-cleared Scassandra primes.
cluster.manager.controlConnection.resetColumnCaches();

// the driver will attempt to locate host2 in system.peers by its old broadcast address, and
// that will fail
Expand Down
Loading
Loading