5555import io .netty .channel .ChannelOption ;
5656import io .netty .channel .ChannelPipeline ;
5757import java .io .IOException ;
58+ import java .net .InetAddress ;
5859import java .net .InetSocketAddress ;
5960import java .net .ServerSocket ;
6061import java .net .SocketAddress ;
62+ import java .net .UnknownHostException ;
63+ import java .util .ArrayList ;
64+ import java .util .Collections ;
6165import java .util .List ;
6266import java .util .Map ;
6367import java .util .Optional ;
@@ -196,69 +200,190 @@ CompletionStage<DriverChannel> connect(
196200 }
197201
198202 connect (
199- endPoint ,
200- shardingInfo ,
201- shardId ,
202- options ,
203- nodeMetricUpdater ,
204- currentVersion ,
205- isNegotiating ,
206- attemptedVersions ,
207- resultFuture );
203+ new ConnectRequest (
204+ endPoint ,
205+ shardingInfo ,
206+ shardId ,
207+ options ,
208+ nodeMetricUpdater ,
209+ currentVersion ,
210+ isNegotiating ,
211+ attemptedVersions ,
212+ resultFuture ));
208213 return resultFuture ;
209214 }
210215
211- private void connect (
212- EndPoint endPoint ,
213- NodeShardingInfo shardingInfo ,
214- Integer shardId ,
215- DriverChannelOptions options ,
216- NodeMetricUpdater nodeMetricUpdater ,
217- ProtocolVersion currentVersion ,
218- boolean isNegotiating ,
219- List <ProtocolVersion > attemptedVersions ,
220- CompletableFuture <DriverChannel > resultFuture ) {
216+ /**
217+ * Bundles all per-connection-attempt state so it can be threaded through the decomposed connect
218+ * methods without a growing parameter list.
219+ */
220+ private static class ConnectRequest {
221+ final EndPoint endPoint ;
222+ final NodeShardingInfo shardingInfo ;
223+ final Integer shardId ;
224+ final DriverChannelOptions options ;
225+ final NodeMetricUpdater nodeMetricUpdater ;
226+ ProtocolVersion currentVersion ;
227+ boolean isNegotiating ;
228+ final List <ProtocolVersion > attemptedVersions ;
229+ final CompletableFuture <DriverChannel > resultFuture ;
221230
222- SocketAddress resolvedAddress ;
231+ ConnectRequest (
232+ EndPoint endPoint ,
233+ NodeShardingInfo shardingInfo ,
234+ Integer shardId ,
235+ DriverChannelOptions options ,
236+ NodeMetricUpdater nodeMetricUpdater ,
237+ ProtocolVersion currentVersion ,
238+ boolean isNegotiating ,
239+ List <ProtocolVersion > attemptedVersions ,
240+ CompletableFuture <DriverChannel > resultFuture ) {
241+ this .endPoint = endPoint ;
242+ this .shardingInfo = shardingInfo ;
243+ this .shardId = shardId ;
244+ this .options = options ;
245+ this .nodeMetricUpdater = nodeMetricUpdater ;
246+ this .currentVersion = currentVersion ;
247+ this .isNegotiating = isNegotiating ;
248+ this .attemptedVersions = attemptedVersions ;
249+ this .resultFuture = resultFuture ;
250+ }
251+ }
252+
253+ /**
254+ * Entry point for an actual connection attempt. Resolves the endpoint address — expanding
255+ * unresolved hostnames to all known IP addresses so that non-responsive individual IPs can be
256+ * skipped — then delegates iteration to {@link #tryNextAddress}.
257+ */
258+ private void connect (ConnectRequest request ) {
259+ SocketAddress raw ;
223260 try {
224- resolvedAddress = endPoint .resolve ();
261+ raw = request . endPoint .resolve ();
225262 } catch (Exception e ) {
226- resultFuture .completeExceptionally (e );
263+ request . resultFuture .completeExceptionally (e );
227264 return ;
228265 }
229266
230- NettyOptions nettyOptions = context .getNettyOptions ();
267+ List <InetSocketAddress > candidates ;
268+ if (raw instanceof InetSocketAddress ) {
269+ InetSocketAddress inetAddr = (InetSocketAddress ) raw ;
270+ if (inetAddr .isUnresolved ()) {
271+ // Hostname has not been resolved yet — expand it to all known IPs now so that we can
272+ // fall back to subsequent addresses if the first one is non-responsive.
273+ try {
274+ InetAddress [] all = InetAddress .getAllByName (inetAddr .getHostString ());
275+ candidates = new ArrayList <>(all .length );
276+ for (InetAddress addr : all ) {
277+ candidates .add (new InetSocketAddress (addr , inetAddr .getPort ()));
278+ }
279+ } catch (UnknownHostException e ) {
280+ request .resultFuture .completeExceptionally (e );
281+ return ;
282+ }
283+ } else {
284+ candidates = Collections .singletonList (inetAddr );
285+ }
286+ } else {
287+ // Non-inet address (e.g. Unix domain socket) — pass through as-is.
288+ tryNextAddressRaw (request , raw );
289+ return ;
290+ }
291+
292+ tryNextAddress (request , candidates , 0 );
293+ }
231294
295+ /**
296+ * Iterates through the candidate addresses, calling {@link #connectToAddress} for each. If an
297+ * address fails for a reason other than protocol negotiation, the next candidate is tried. Only
298+ * when all candidates are exhausted is the overall {@code resultFuture} failed.
299+ */
300+ private void tryNextAddress (
301+ ConnectRequest request , List <InetSocketAddress > candidates , int index ) {
302+ InetSocketAddress address = candidates .get (index );
303+ connectToAddress (request , address )
304+ .whenComplete (
305+ (channel , error ) -> {
306+ if (error == null ) {
307+ // Handshake succeeded on this address — propagate to the overall result.
308+ request .resultFuture .complete (channel );
309+ } else if (index + 1 < candidates .size ()) {
310+ LOG .debug (
311+ "[{}] Failed to connect to {} ({}), trying next address" ,
312+ logPrefix ,
313+ address ,
314+ error .getMessage ());
315+ tryNextAddress (request , candidates , index + 1 );
316+ } else {
317+ // Note: might be completed already if the failure happened in initializer()
318+ request .resultFuture .completeExceptionally (error );
319+ }
320+ });
321+ }
322+
323+ /**
324+ * Performs a Netty bootstrap connect to a single, already-resolved {@link InetSocketAddress}.
325+ * Handles protocol version negotiation (downgrade retries) internally, staying on the same
326+ * address.
327+ *
328+ * <p>The returned {@code addressFuture} is wired as the initializer's {@code resultFuture}, so it
329+ * is completed by {@link ProtocolInitHandler} after the full handshake — not at TCP-connect time.
330+ * This lets {@link #tryNextAddress} distinguish a per-address TCP failure (try the next IP) from
331+ * a successful protocol init (propagate to the overall {@code request.resultFuture}).
332+ *
333+ * <p>On TCP failure the listener rejects the {@code addressFuture} immediately, bypassing the
334+ * handshake. On {@link UnsupportedProtocolVersionException} the same address is retried with a
335+ * downgraded protocol, chaining back into the same {@code callerFuture}.
336+ */
337+ private CompletableFuture <DriverChannel > connectToAddress (
338+ ConnectRequest request , InetSocketAddress address ) {
339+ // callerFuture is what tryNextAddress observes. It is completed after protocol negotiation
340+ // has fully settled (either success, downgrade-retry success, or unrecoverable failure).
341+ CompletableFuture <DriverChannel > callerFuture = new CompletableFuture <>();
342+
343+ connectToAddressInternal (request , address , callerFuture );
344+ return callerFuture ;
345+ }
346+
347+ private void connectToAddressInternal (
348+ ConnectRequest request ,
349+ InetSocketAddress address ,
350+ CompletableFuture <DriverChannel > callerFuture ) {
351+ NettyOptions nettyOptions = context .getNettyOptions ();
232352 Bootstrap bootstrap =
233353 new Bootstrap ()
234354 .group (nettyOptions .ioEventLoopGroup ())
235355 .channel (nettyOptions .channelClass ())
236356 .option (ChannelOption .ALLOCATOR , nettyOptions .allocator ())
237357 .handler (
238- initializer (endPoint , currentVersion , options , nodeMetricUpdater , resultFuture ));
239-
358+ initializer (
359+ request .endPoint ,
360+ request .currentVersion ,
361+ request .options ,
362+ request .nodeMetricUpdater ,
363+ callerFuture ));
240364 nettyOptions .afterBootstrapInitialized (bootstrap );
241365
242366 ChannelFuture connectFuture ;
243- if (shardId == null || shardingInfo == null ) {
244- if (shardId != null ) {
367+ if (request . shardId == null || request . shardingInfo == null ) {
368+ if (request . shardId != null ) {
245369 LOG .debug (
246370 "Requested connection to shard {} but shardingInfo is currently missing for Node at endpoint {}. Falling back to arbitrary local port." ,
247- shardId ,
248- endPoint );
371+ request . shardId ,
372+ request . endPoint );
249373 }
250- connectFuture = bootstrap .connect (resolvedAddress );
374+ connectFuture = bootstrap .connect (address );
251375 } else {
252376 int localPort =
253- PortAllocator .getNextAvailablePort (shardingInfo .getShardsCount (), shardId , context );
377+ PortAllocator .getNextAvailablePort (
378+ request .shardingInfo .getShardsCount (), request .shardId , context );
254379 if (localPort == -1 ) {
255380 LOG .warn (
256381 "Could not find free port for shard {} at {}. Falling back to arbitrary local port." ,
257- shardId ,
258- endPoint );
259- connectFuture = bootstrap .connect (resolvedAddress );
382+ request . shardId ,
383+ request . endPoint );
384+ connectFuture = bootstrap .connect (address );
260385 } else {
261- connectFuture = bootstrap .connect (resolvedAddress , new InetSocketAddress (localPort ));
386+ connectFuture = bootstrap .connect (address , new InetSocketAddress (localPort ));
262387 }
263388 }
264389
@@ -267,11 +392,92 @@ private void connect(
267392 if (connectFuture .isSuccess ()) {
268393 Channel channel = connectFuture .channel ();
269394 DriverChannel driverChannel =
270- new DriverChannel (endPoint , channel , context .getWriteCoalescer (), currentVersion );
271- // If this is the first successful connection, remember the protocol version and
272- // cluster name for future connections.
273- if (isNegotiating ) {
274- ChannelFactory .this .protocolVersion = currentVersion ;
395+ new DriverChannel (
396+ request .endPoint , channel , context .getWriteCoalescer (), request .currentVersion );
397+ if (request .isNegotiating ) {
398+ ChannelFactory .this .protocolVersion = request .currentVersion ;
399+ }
400+ if (ChannelFactory .this .clusterName == null ) {
401+ ChannelFactory .this .clusterName = driverChannel .getClusterName ();
402+ }
403+ Map <String , List <String >> supportedOptions = driverChannel .getOptions ();
404+ if (ChannelFactory .this .productType == null && supportedOptions != null ) {
405+ List <String > productTypes = supportedOptions .get ("PRODUCT_TYPE" );
406+ String productType =
407+ productTypes != null && !productTypes .isEmpty ()
408+ ? productTypes .get (0 )
409+ : UNKNOWN_PRODUCT_TYPE ;
410+ ChannelFactory .this .productType = productType ;
411+ DriverConfig driverConfig = context .getConfig ();
412+ if (driverConfig instanceof TypesafeDriverConfig
413+ && productType .equals (DATASTAX_CLOUD_PRODUCT_TYPE )) {
414+ ((TypesafeDriverConfig ) driverConfig )
415+ .overrideDefaults (
416+ ImmutableMap .of (
417+ DefaultDriverOption .REQUEST_CONSISTENCY ,
418+ ConsistencyLevel .LOCAL_QUORUM .name ()));
419+ }
420+ }
421+ callerFuture .complete (driverChannel );
422+ } else {
423+ Throwable error = connectFuture .cause ();
424+ if (error instanceof UnsupportedProtocolVersionException && request .isNegotiating ) {
425+ request .attemptedVersions .add (request .currentVersion );
426+ Optional <ProtocolVersion > downgraded =
427+ context .getProtocolVersionRegistry ().downgrade (request .currentVersion );
428+ if (downgraded .isPresent ()) {
429+ LOG .debug (
430+ "[{}] Failed to connect with protocol {}, retrying with {}" ,
431+ logPrefix ,
432+ request .currentVersion ,
433+ downgraded .get ());
434+ request .currentVersion = downgraded .get ();
435+ connectToAddressInternal (request , address , callerFuture );
436+ } else {
437+ callerFuture .completeExceptionally (
438+ UnsupportedProtocolVersionException .forNegotiation (
439+ request .endPoint , request .attemptedVersions ));
440+ }
441+ } else {
442+ // Note: might be completed already if the failure happened in initializer()
443+ callerFuture .completeExceptionally (error );
444+ }
445+ }
446+ });
447+ }
448+
449+ /**
450+ * Handles the non-{@link InetSocketAddress} path (e.g. Unix domain sockets, Netty local
451+ * transport) by connecting directly to the raw address without multi-address fallback. Supports
452+ * protocol-version negotiation (downgrade retries) the same way as {@link
453+ * #connectToAddressInternal}.
454+ */
455+ private void tryNextAddressRaw (ConnectRequest request , SocketAddress rawAddress ) {
456+ NettyOptions nettyOptions = context .getNettyOptions ();
457+ Bootstrap bootstrap =
458+ new Bootstrap ()
459+ .group (nettyOptions .ioEventLoopGroup ())
460+ .channel (nettyOptions .channelClass ())
461+ .option (ChannelOption .ALLOCATOR , nettyOptions .allocator ())
462+ .handler (
463+ initializer (
464+ request .endPoint ,
465+ request .currentVersion ,
466+ request .options ,
467+ request .nodeMetricUpdater ,
468+ request .resultFuture ));
469+ nettyOptions .afterBootstrapInitialized (bootstrap );
470+
471+ ChannelFuture connectFuture = bootstrap .connect (rawAddress );
472+ connectFuture .addListener (
473+ cf -> {
474+ if (connectFuture .isSuccess ()) {
475+ Channel channel = connectFuture .channel ();
476+ DriverChannel driverChannel =
477+ new DriverChannel (
478+ request .endPoint , channel , context .getWriteCoalescer (), request .currentVersion );
479+ if (request .isNegotiating ) {
480+ ChannelFactory .this .protocolVersion = request .currentVersion ;
275481 }
276482 if (ChannelFactory .this .clusterName == null ) {
277483 ChannelFactory .this .clusterName = driverChannel .getClusterName ();
@@ -294,38 +500,29 @@ private void connect(
294500 ConsistencyLevel .LOCAL_QUORUM .name ()));
295501 }
296502 }
297- resultFuture .complete (driverChannel );
503+ request . resultFuture .complete (driverChannel );
298504 } else {
299505 Throwable error = connectFuture .cause ();
300- if (error instanceof UnsupportedProtocolVersionException && isNegotiating ) {
301- attemptedVersions .add (currentVersion );
506+ if (error instanceof UnsupportedProtocolVersionException && request . isNegotiating ) {
507+ request . attemptedVersions .add (request . currentVersion );
302508 Optional <ProtocolVersion > downgraded =
303- context .getProtocolVersionRegistry ().downgrade (currentVersion );
509+ context .getProtocolVersionRegistry ().downgrade (request . currentVersion );
304510 if (downgraded .isPresent ()) {
305511 LOG .debug (
306512 "[{}] Failed to connect with protocol {}, retrying with {}" ,
307513 logPrefix ,
308- currentVersion ,
514+ request . currentVersion ,
309515 downgraded .get ());
310- connect (
311- endPoint ,
312- shardingInfo ,
313- shardId ,
314- options ,
315- nodeMetricUpdater ,
316- downgraded .get (),
317- true ,
318- attemptedVersions ,
319- resultFuture );
516+ request .currentVersion = downgraded .get ();
517+ tryNextAddressRaw (request , rawAddress );
320518 } else {
321- resultFuture .completeExceptionally (
519+ request . resultFuture .completeExceptionally (
322520 UnsupportedProtocolVersionException .forNegotiation (
323- endPoint , attemptedVersions ));
521+ request . endPoint , request . attemptedVersions ));
324522 }
325523 } else {
326- // Note: might be completed already if the failure happened in initializer(), this is
327- // fine
328- resultFuture .completeExceptionally (error );
524+ // Note: might be completed already if the failure happened in initializer()
525+ request .resultFuture .completeExceptionally (error );
329526 }
330527 }
331528 });
0 commit comments