3434import com .datastax .driver .core .utils .MoreFutures ;
3535import com .datastax .driver .core .utils .MoreObjects ;
3636import com .google .common .annotations .VisibleForTesting ;
37+ import com .google .common .collect .ImmutableSet ;
3738import com .google .common .collect .Iterators ;
3839import com .google .common .util .concurrent .FutureCallback ;
3940import com .google .common .util .concurrent .Futures ;
@@ -86,6 +87,91 @@ class ControlConnection implements Connection.Owner {
8687 private static final String SELECT_SCHEMA_LOCAL =
8788 "SELECT schema_version, host_id FROM system.local WHERE key='local'" ;
8889
90+ // IMPORTANT: Every column read from system.local rows — in updateInfo(),
91+ // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here.
92+ // If a new column read is added anywhere that consumes a system table row, add it to the
93+ // appropriate set below, otherwise it will be silently excluded from projected queries.
94+ @ VisibleForTesting
95+ static final ImmutableSet <String > LOCAL_COLUMNS_OF_INTEREST =
96+ ImmutableSet .of (
97+ "cluster_name" ,
98+ "partitioner" ,
99+ "data_center" ,
100+ "rack" ,
101+ "release_version" ,
102+ "native_address" ,
103+ "native_port" ,
104+ "native_transport_address" ,
105+ "native_transport_port" ,
106+ "native_transport_port_ssl" ,
107+ "rpc_address" ,
108+ "broadcast_address" ,
109+ "broadcast_port" ,
110+ "listen_address" ,
111+ "listen_port" ,
112+ "tokens" ,
113+ "host_id" ,
114+ "schema_version" ,
115+ "workload" ,
116+ "graph" ,
117+ "dse_version" );
118+
119+ // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
120+ // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
121+ // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows.
122+ // Columns that are absent from the actual server schema are silently excluded by
123+ // intersectWithNeeded(), so listing extra columns here is safe.
124+ @ VisibleForTesting
125+ static final ImmutableSet <String > PEERS_COLUMNS_OF_INTEREST =
126+ ImmutableSet .of (
127+ "peer" ,
128+ "peer_port" , // peers_v2 column; harmless to list here — absent on peers, excluded safely
129+ "rpc_address" ,
130+ "data_center" ,
131+ "rack" ,
132+ "release_version" ,
133+ "tokens" ,
134+ "listen_address" ,
135+ "listen_port" ,
136+ "host_id" ,
137+ "schema_version" ,
138+ "native_address" , // may appear on some server variants; guarded by contains() in code
139+ "native_port" , // same
140+ "native_transport_address" ,
141+ "native_transport_port" ,
142+ "native_transport_port_ssl" ,
143+ "workload" ,
144+ "graph" ,
145+ "dse_version" );
146+
147+ // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
148+ // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
149+ // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows.
150+ // Columns that are absent from the actual server schema are silently excluded by
151+ // intersectWithNeeded(), so listing extra columns here is safe.
152+ @ VisibleForTesting
153+ static final ImmutableSet <String > PEERS_V2_COLUMNS_OF_INTEREST =
154+ ImmutableSet .of (
155+ "peer" ,
156+ "peer_port" ,
157+ "native_address" ,
158+ "native_port" ,
159+ "data_center" ,
160+ "rack" ,
161+ "release_version" ,
162+ "tokens" ,
163+ "host_id" ,
164+ "schema_version" ,
165+ "workload" ,
166+ "graph" ,
167+ "dse_version" ,
168+ "listen_address" ,
169+ "listen_port" ,
170+ "rpc_address" , // legacy; guarded by contains() in code — harmless if absent
171+ "native_transport_address" , // same
172+ "native_transport_port" , // same
173+ "native_transport_port_ssl" ); // same
174+
89175 private static final VersionNumber _3_11 = VersionNumber .parse ("3.11.0" );
90176
91177 @ VisibleForTesting
@@ -102,10 +188,30 @@ class ControlConnection implements Connection.Owner {
102188 // from here on out.
103189 private volatile boolean isPeersV2 = true ;
104190
191+ // Column projection caches. null = uninitialized: the first query to each system table issues
192+ // SELECT * to discover which columns the server exposes, then subsequent queries project only
193+ // the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set.
194+ // Reset to null on every new connection so that the new server's schema is re-discovered.
195+ private volatile Set <String > localColumns = null ;
196+ private volatile Set <String > peersColumns = null ;
197+ private volatile Set <String > peersV2Columns = null ;
198+
105199 public ControlConnection (Cluster .Manager manager ) {
106200 this .cluster = manager ;
107201 }
108202
203+ /**
204+ * Resets the projected-column caches so that the next query to each system table sends {@code
205+ * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra
206+ * primes between driver operations.
207+ */
208+ @ VisibleForTesting
209+ void resetColumnCaches () {
210+ localColumns = null ;
211+ peersColumns = null ;
212+ peersV2Columns = null ;
213+ }
214+
109215 // Only for the initial connection. Does not schedule retries if it fails
110216 void connect () throws UnsupportedProtocolVersionException {
111217 if (isShutdown ) return ;
@@ -234,6 +340,11 @@ private void signalError() {
234340 private void setNewConnection (Connection newConnection ) {
235341 Host .statesLogger .debug ("[Control connection] established to {}" , newConnection .endPoint );
236342 newConnection .setOwner (this );
343+ // Reset column caches so the new connection triggers a fresh SELECT * to discover which
344+ // columns the server exposes before switching to projected queries.
345+ localColumns = null ;
346+ peersColumns = null ;
347+ peersV2Columns = null ;
237348 Connection old = connectionRef .getAndSet (newConnection );
238349 if (old != null && !old .isClosed ()) old .closeAsync ().force ();
239350 }
@@ -491,26 +602,54 @@ private Row fetchNodeInfo(Host host, Connection c)
491602 if (isConnectedHost || host .getBroadcastSocketAddress () != null ) {
492603 String query ;
493604 if (isConnectedHost ) {
494- query = SELECT_LOCAL ;
605+ query =
606+ localColumns == null
607+ ? SELECT_LOCAL
608+ : buildProjectedQuery ("system.local" , localColumns , "key='local'" );
495609 } else {
496610 InetSocketAddress broadcastAddress = host .getBroadcastSocketAddress ();
497- query =
498- isPeersV2
499- ? SELECT_PEERS_V2
500- + " WHERE peer='"
501- + broadcastAddress .getAddress ().getHostAddress ()
502- + "' AND peer_port="
503- + broadcastAddress .getPort ()
504- : SELECT_PEERS
505- + " WHERE peer='"
506- + broadcastAddress .getAddress ().getHostAddress ()
507- + "'" ;
611+ // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for
612+ // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and
613+ // every node has the projected full-scan prime registered. For WHERE lookups the control
614+ // connection may query a node that was never restarted (and therefore still carries only
615+ // the original SELECT * prime from init time), so projecting here risks a cache miss.
616+ if (isPeersV2 ) {
617+ String whereClause =
618+ "peer='"
619+ + broadcastAddress .getAddress ().getHostAddress ()
620+ + "' AND peer_port="
621+ + broadcastAddress .getPort ();
622+ query = SELECT_PEERS_V2 + " WHERE " + whereClause ;
623+ } else {
624+ String whereClause = "peer='" + broadcastAddress .getAddress ().getHostAddress () + "'" ;
625+ query = SELECT_PEERS + " WHERE " + whereClause ;
626+ }
508627 }
509628 DefaultResultSetFuture future =
510629 new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (query ));
511630 c .write (future );
512- Row row = future .get ().one ();
631+ ResultSet rs = future .get ();
632+ Row row = rs .one ();
633+ // Populate the column cache on first successful WHERE lookup so that subsequent full-table
634+ // scans via selectPeersFuture() can send projected queries. Only populate when a row is
635+ // found: if the WHERE returned empty we are about to fall through to the full-scan path,
636+ // which has its own cache-population logic; populating here on an empty result would cause
637+ // selectPeersFuture() to send a projected query before it has verified the server supports
638+ // the projected columns.
513639 if (row != null ) {
640+ if (isConnectedHost ) {
641+ if (localColumns == null ) {
642+ localColumns = intersectWithNeeded (rs , LOCAL_COLUMNS_OF_INTEREST );
643+ }
644+ } else if (isPeersV2 ) {
645+ if (peersV2Columns == null ) {
646+ peersV2Columns = intersectWithNeeded (rs , PEERS_V2_COLUMNS_OF_INTEREST );
647+ }
648+ } else {
649+ if (peersColumns == null ) {
650+ peersColumns = intersectWithNeeded (rs , PEERS_COLUMNS_OF_INTEREST );
651+ }
652+ }
514653 return row ;
515654 } else {
516655 InetSocketAddress address = host .getBroadcastSocketAddress ();
@@ -717,9 +856,13 @@ private static void updateLocationInfo(
717856 */
718857 private ListenableFuture <ResultSet > selectPeersFuture (final Connection connection ) {
719858 if (isPeersV2 ) {
859+ String peersV2Query =
860+ peersV2Columns == null
861+ ? SELECT_PEERS_V2
862+ : buildProjectedQuery ("system.peers_v2" , peersV2Columns , null );
720863 DefaultResultSetFuture peersV2Future =
721864 new DefaultResultSetFuture (
722- null , cluster .protocolVersion (), new Requests .Query (SELECT_PEERS_V2 ));
865+ null , cluster .protocolVersion (), new Requests .Query (peersV2Query ));
723866 connection .write (peersV2Future );
724867 final SettableFuture <ResultSet > peersFuture = SettableFuture .create ();
725868 // if peers v2 query fails, query peers table instead.
@@ -729,6 +872,9 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio
729872
730873 @ Override
731874 public void onSuccess (ResultSet result ) {
875+ if (peersV2Columns == null ) {
876+ peersV2Columns = intersectWithNeeded (result , PEERS_V2_COLUMNS_OF_INTEREST );
877+ }
732878 peersFuture .set (result );
733879 }
734880
@@ -742,6 +888,9 @@ public void onFailure(Throwable t) {
742888 || (t instanceof ServerError
743889 && t .getMessage ().contains ("Unknown keyspace/cf pair (system.peers_v2)" ))) {
744890 isPeersV2 = false ;
891+ // Also reset the peers cache so the first system.peers query issues SELECT *
892+ // to discover which columns that table exposes on this server.
893+ peersColumns = null ;
745894 MoreFutures .propagateFuture (peersFuture , selectPeersFuture (connection ));
746895 } else {
747896 peersFuture .setException (t );
@@ -751,14 +900,73 @@ public void onFailure(Throwable t) {
751900 MoreExecutors .directExecutor ());
752901 return peersFuture ;
753902 } else {
754- DefaultResultSetFuture peersFuture =
903+ String peersQuery =
904+ peersColumns == null
905+ ? SELECT_PEERS
906+ : buildProjectedQuery ("system.peers" , peersColumns , null );
907+ DefaultResultSetFuture rawFuture =
755908 new DefaultResultSetFuture (
756- null , cluster .protocolVersion (), new Requests .Query (SELECT_PEERS ));
757- connection .write (peersFuture );
909+ null , cluster .protocolVersion (), new Requests .Query (peersQuery ));
910+ connection .write (rawFuture );
911+ final SettableFuture <ResultSet > peersFuture = SettableFuture .create ();
912+ Futures .addCallback (
913+ rawFuture ,
914+ new FutureCallback <ResultSet >() {
915+ @ Override
916+ public void onSuccess (ResultSet result ) {
917+ if (peersColumns == null ) {
918+ peersColumns = intersectWithNeeded (result , PEERS_COLUMNS_OF_INTEREST );
919+ }
920+ peersFuture .set (result );
921+ }
922+
923+ @ Override
924+ public void onFailure (Throwable t ) {
925+ peersFuture .setException (t );
926+ }
927+ },
928+ MoreExecutors .directExecutor ());
758929 return peersFuture ;
759930 }
760931 }
761932
933+ /**
934+ * Returns the intersection of the columns returned by the server (from {@code rs}) with the given
935+ * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache
936+ * projected column lists so subsequent queries fetch only what the driver actually reads. A
937+ * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver
938+ * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection.
939+ */
940+ @ VisibleForTesting
941+ static Set <String > intersectWithNeeded (ResultSet rs , ImmutableSet <String > needed ) {
942+ ImmutableSet .Builder <String > result = ImmutableSet .builder ();
943+ for (ColumnDefinitions .Definition def : rs .getColumnDefinitions ()) {
944+ if (needed .contains (def .getName ())) {
945+ result .add (def .getName ());
946+ }
947+ }
948+ ImmutableSet <String > built = result .build ();
949+ return built .isEmpty () ? null : built ;
950+ }
951+
952+ /**
953+ * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the
954+ * given projected column set. {@code whereClause} may be {@code null} for table-wide scans.
955+ */
956+ @ VisibleForTesting
957+ static String buildProjectedQuery (String table , Set <String > columns , String whereClause ) {
958+ StringBuilder sb = new StringBuilder ("SELECT " );
959+ boolean first = true ;
960+ for (String col : columns ) {
961+ if (!first ) sb .append (", " );
962+ sb .append (col );
963+ first = false ;
964+ }
965+ sb .append (" FROM " ).append (table );
966+ if (whereClause != null ) sb .append (" WHERE " ).append (whereClause );
967+ return sb .toString ();
968+ }
969+
762970 private void refreshNodeListAndTokenMap (
763971 final Connection connection ,
764972 final Cluster .Manager cluster ,
@@ -772,9 +980,12 @@ private void refreshNodeListAndTokenMap(
772980
773981 // Make sure we're up to date on nodes and tokens
774982
983+ String localQuery =
984+ localColumns == null
985+ ? SELECT_LOCAL
986+ : buildProjectedQuery ("system.local" , localColumns , "key='local'" );
775987 DefaultResultSetFuture localFuture =
776- new DefaultResultSetFuture (
777- null , cluster .protocolVersion (), new Requests .Query (SELECT_LOCAL ));
988+ new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (localQuery ));
778989 ListenableFuture <ResultSet > peersFuture = selectPeersFuture (connection );
779990 connection .write (localFuture );
780991
@@ -783,7 +994,11 @@ private void refreshNodeListAndTokenMap(
783994 Map <Host , Set <Token >> tokenMap = new HashMap <Host , Set <Token >>();
784995
785996 // Update cluster name, DC and rack for the one node we are connected to
786- Row localRow = localFuture .get ().one ();
997+ ResultSet localRs = localFuture .get ();
998+ if (localColumns == null ) {
999+ localColumns = intersectWithNeeded (localRs , LOCAL_COLUMNS_OF_INTEREST );
1000+ }
1001+ Row localRow = localRs .one ();
7871002 if (localRow == null ) {
7881003 throw new IllegalStateException (
7891004 String .format (
0 commit comments