5555import io .netty .channel .ChannelOption ;
5656import io .netty .channel .ChannelPipeline ;
5757import java .io .IOException ;
58+ import java .net .InetAddress ;
5859import java .net .InetSocketAddress ;
5960import java .net .ServerSocket ;
6061import java .net .SocketAddress ;
62+ import java .net .UnknownHostException ;
63+ import java .util .ArrayList ;
64+ import java .util .Collections ;
6165import java .util .List ;
6266import java .util .Map ;
6367import java .util .Optional ;
@@ -196,69 +200,173 @@ CompletionStage<DriverChannel> connect(
196200 }
197201
198202 connect (
199- endPoint ,
200- shardingInfo ,
201- shardId ,
202- options ,
203- nodeMetricUpdater ,
204- currentVersion ,
205- isNegotiating ,
206- attemptedVersions ,
207- resultFuture );
203+ new ConnectRequest (
204+ endPoint ,
205+ shardingInfo ,
206+ shardId ,
207+ options ,
208+ nodeMetricUpdater ,
209+ currentVersion ,
210+ isNegotiating ,
211+ attemptedVersions ,
212+ resultFuture ));
208213 return resultFuture ;
209214 }
210215
211- private void connect (
212- EndPoint endPoint ,
213- NodeShardingInfo shardingInfo ,
214- Integer shardId ,
215- DriverChannelOptions options ,
216- NodeMetricUpdater nodeMetricUpdater ,
217- ProtocolVersion currentVersion ,
218- boolean isNegotiating ,
219- List <ProtocolVersion > attemptedVersions ,
220- CompletableFuture <DriverChannel > resultFuture ) {
216+ /**
217+ * Bundles all per-connection-attempt state so it can be threaded through the decomposed connect
218+ * methods without a growing parameter list.
219+ */
220+ private static class ConnectRequest {
221+ final EndPoint endPoint ;
222+ final NodeShardingInfo shardingInfo ;
223+ final Integer shardId ;
224+ final DriverChannelOptions options ;
225+ final NodeMetricUpdater nodeMetricUpdater ;
226+ ProtocolVersion currentVersion ;
227+ boolean isNegotiating ;
228+ final List <ProtocolVersion > attemptedVersions ;
229+ final CompletableFuture <DriverChannel > resultFuture ;
230+
231+ ConnectRequest (
232+ EndPoint endPoint ,
233+ NodeShardingInfo shardingInfo ,
234+ Integer shardId ,
235+ DriverChannelOptions options ,
236+ NodeMetricUpdater nodeMetricUpdater ,
237+ ProtocolVersion currentVersion ,
238+ boolean isNegotiating ,
239+ List <ProtocolVersion > attemptedVersions ,
240+ CompletableFuture <DriverChannel > resultFuture ) {
241+ this .endPoint = endPoint ;
242+ this .shardingInfo = shardingInfo ;
243+ this .shardId = shardId ;
244+ this .options = options ;
245+ this .nodeMetricUpdater = nodeMetricUpdater ;
246+ this .currentVersion = currentVersion ;
247+ this .isNegotiating = isNegotiating ;
248+ this .attemptedVersions = attemptedVersions ;
249+ this .resultFuture = resultFuture ;
250+ }
251+ }
221252
222- SocketAddress resolvedAddress ;
253+ /**
254+ * Entry point for an actual connection attempt. Resolves the endpoint address — expanding
255+ * unresolved hostnames to all known IP addresses so that non-responsive individual IPs can be
256+ * skipped — then delegates iteration to {@link #tryNextAddress}.
257+ */
258+ private void connect (ConnectRequest request ) {
259+ SocketAddress raw ;
223260 try {
224- resolvedAddress = endPoint .resolve ();
261+ raw = request . endPoint .resolve ();
225262 } catch (Exception e ) {
226- resultFuture .completeExceptionally (e );
263+ request . resultFuture .completeExceptionally (e );
227264 return ;
228265 }
229266
230- NettyOptions nettyOptions = context .getNettyOptions ();
267+ List <InetSocketAddress > candidates ;
268+ if (raw instanceof InetSocketAddress ) {
269+ InetSocketAddress inetAddr = (InetSocketAddress ) raw ;
270+ if (inetAddr .isUnresolved ()) {
271+ // Hostname has not been resolved yet — expand it to all known IPs now so that we can
272+ // fall back to subsequent addresses if the first one is non-responsive.
273+ try {
274+ InetAddress [] all = InetAddress .getAllByName (inetAddr .getHostString ());
275+ candidates = new ArrayList <>(all .length );
276+ for (InetAddress addr : all ) {
277+ candidates .add (new InetSocketAddress (addr , inetAddr .getPort ()));
278+ }
279+ } catch (UnknownHostException e ) {
280+ request .resultFuture .completeExceptionally (e );
281+ return ;
282+ }
283+ } else {
284+ candidates = Collections .singletonList (inetAddr );
285+ }
286+ } else {
287+ // Non-inet address (e.g. Unix domain socket) — pass through as-is.
288+ candidates = Collections .singletonList (new InetSocketAddress (raw .toString (), 0 ));
289+ tryNextAddressRaw (request , raw );
290+ return ;
291+ }
292+
293+ tryNextAddress (request , candidates , 0 );
294+ }
295+
296+ /**
297+ * Iterates through the candidate addresses, calling {@link #connectToAddress} for each. If an
298+ * address fails for a reason other than protocol negotiation, the next candidate is tried. Only
299+ * when all candidates are exhausted is the overall {@code resultFuture} failed.
300+ */
301+ private void tryNextAddress (
302+ ConnectRequest request , List <InetSocketAddress > candidates , int index ) {
303+ InetSocketAddress address = candidates .get (index );
304+ connectToAddress (request , address )
305+ .whenComplete (
306+ (channel , error ) -> {
307+ if (error == null ) {
308+ // connectToAddress already completed resultFuture on success.
309+ } else if (index + 1 < candidates .size ()) {
310+ LOG .debug (
311+ "[{}] Failed to connect to {} ({}), trying next address" ,
312+ logPrefix ,
313+ address ,
314+ error .getMessage ());
315+ tryNextAddress (request , candidates , index + 1 );
316+ } else {
317+ // Note: might be completed already if the failure happened in initializer()
318+ request .resultFuture .completeExceptionally (error );
319+ }
320+ });
321+ }
231322
323+ /**
324+ * Performs a Netty bootstrap connect to a single, already-resolved {@link InetSocketAddress}.
325+ * Handles protocol version negotiation (downgrade retries) internally, staying on the same
326+ * address. Returns a {@link CompletableFuture} that completes with the {@link DriverChannel} on
327+ * success, or fails with the connection error so the caller can decide whether to try a different
328+ * address.
329+ */
330+ private CompletableFuture <DriverChannel > connectToAddress (
331+ ConnectRequest request , InetSocketAddress address ) {
332+ CompletableFuture <DriverChannel > addressFuture = new CompletableFuture <>();
333+
334+ NettyOptions nettyOptions = context .getNettyOptions ();
232335 Bootstrap bootstrap =
233336 new Bootstrap ()
234337 .group (nettyOptions .ioEventLoopGroup ())
235338 .channel (nettyOptions .channelClass ())
236339 .option (ChannelOption .ALLOCATOR , nettyOptions .allocator ())
237340 .handler (
238- initializer (endPoint , currentVersion , options , nodeMetricUpdater , resultFuture ));
239-
341+ initializer (
342+ request .endPoint ,
343+ request .currentVersion ,
344+ request .options ,
345+ request .nodeMetricUpdater ,
346+ request .resultFuture ));
240347 nettyOptions .afterBootstrapInitialized (bootstrap );
241348
242349 ChannelFuture connectFuture ;
243- if (shardId == null || shardingInfo == null ) {
244- if (shardId != null ) {
350+ if (request . shardId == null || request . shardingInfo == null ) {
351+ if (request . shardId != null ) {
245352 LOG .debug (
246353 "Requested connection to shard {} but shardingInfo is currently missing for Node at endpoint {}. Falling back to arbitrary local port." ,
247- shardId ,
248- endPoint );
354+ request . shardId ,
355+ request . endPoint );
249356 }
250- connectFuture = bootstrap .connect (resolvedAddress );
357+ connectFuture = bootstrap .connect (address );
251358 } else {
252359 int localPort =
253- PortAllocator .getNextAvailablePort (shardingInfo .getShardsCount (), shardId , context );
360+ PortAllocator .getNextAvailablePort (
361+ request .shardingInfo .getShardsCount (), request .shardId , context );
254362 if (localPort == -1 ) {
255363 LOG .warn (
256364 "Could not find free port for shard {} at {}. Falling back to arbitrary local port." ,
257- shardId ,
258- endPoint );
259- connectFuture = bootstrap .connect (resolvedAddress );
365+ request . shardId ,
366+ request . endPoint );
367+ connectFuture = bootstrap .connect (address );
260368 } else {
261- connectFuture = bootstrap .connect (resolvedAddress , new InetSocketAddress (localPort ));
369+ connectFuture = bootstrap .connect (address , new InetSocketAddress (localPort ));
262370 }
263371 }
264372
@@ -267,11 +375,12 @@ private void connect(
267375 if (connectFuture .isSuccess ()) {
268376 Channel channel = connectFuture .channel ();
269377 DriverChannel driverChannel =
270- new DriverChannel (endPoint , channel , context .getWriteCoalescer (), currentVersion );
378+ new DriverChannel (
379+ request .endPoint , channel , context .getWriteCoalescer (), request .currentVersion );
271380 // If this is the first successful connection, remember the protocol version and
272381 // cluster name for future connections.
273- if (isNegotiating ) {
274- ChannelFactory .this .protocolVersion = currentVersion ;
382+ if (request . isNegotiating ) {
383+ ChannelFactory .this .protocolVersion = request . currentVersion ;
275384 }
276385 if (ChannelFactory .this .clusterName == null ) {
277386 ChannelFactory .this .clusterName = driverChannel .getClusterName ();
@@ -294,39 +403,81 @@ private void connect(
294403 ConsistencyLevel .LOCAL_QUORUM .name ()));
295404 }
296405 }
297- resultFuture .complete (driverChannel );
406+ request .resultFuture .complete (driverChannel );
407+ addressFuture .complete (driverChannel );
298408 } else {
299409 Throwable error = connectFuture .cause ();
300- if (error instanceof UnsupportedProtocolVersionException && isNegotiating ) {
301- attemptedVersions .add (currentVersion );
410+ if (error instanceof UnsupportedProtocolVersionException && request .isNegotiating ) {
411+ // Protocol negotiation failure: downgrade and retry the SAME address.
412+ request .attemptedVersions .add (request .currentVersion );
302413 Optional <ProtocolVersion > downgraded =
303- context .getProtocolVersionRegistry ().downgrade (currentVersion );
414+ context .getProtocolVersionRegistry ().downgrade (request . currentVersion );
304415 if (downgraded .isPresent ()) {
305416 LOG .debug (
306417 "[{}] Failed to connect with protocol {}, retrying with {}" ,
307418 logPrefix ,
308- currentVersion ,
419+ request . currentVersion ,
309420 downgraded .get ());
310- connect (
311- endPoint ,
312- shardingInfo ,
313- shardId ,
314- options ,
315- nodeMetricUpdater ,
316- downgraded .get (),
317- true ,
318- attemptedVersions ,
319- resultFuture );
421+ request .currentVersion = downgraded .get ();
422+ connectToAddress (request , address )
423+ .whenComplete (
424+ (ch , err ) -> {
425+ if (err != null ) addressFuture .completeExceptionally (err );
426+ else addressFuture .complete (ch );
427+ });
320428 } else {
321- resultFuture . completeExceptionally (
429+ Throwable negotiationError =
322430 UnsupportedProtocolVersionException .forNegotiation (
323- endPoint , attemptedVersions ));
431+ request .endPoint , request .attemptedVersions );
432+ request .resultFuture .completeExceptionally (negotiationError );
433+ addressFuture .completeExceptionally (negotiationError );
324434 }
325435 } else {
326- // Note: might be completed already if the failure happened in initializer(), this is
327- // fine
328- resultFuture .completeExceptionally (error );
436+ addressFuture .completeExceptionally (error );
437+ }
438+ }
439+ });
440+
441+ return addressFuture ;
442+ }
443+
444+ /**
445+ * Handles the non-{@link InetSocketAddress} path (e.g. Unix domain sockets) by connecting
446+ * directly to the raw address without multi-address fallback.
447+ */
448+ private void tryNextAddressRaw (ConnectRequest request , SocketAddress rawAddress ) {
449+ NettyOptions nettyOptions = context .getNettyOptions ();
450+ Bootstrap bootstrap =
451+ new Bootstrap ()
452+ .group (nettyOptions .ioEventLoopGroup ())
453+ .channel (nettyOptions .channelClass ())
454+ .option (ChannelOption .ALLOCATOR , nettyOptions .allocator ())
455+ .handler (
456+ initializer (
457+ request .endPoint ,
458+ request .currentVersion ,
459+ request .options ,
460+ request .nodeMetricUpdater ,
461+ request .resultFuture ));
462+ nettyOptions .afterBootstrapInitialized (bootstrap );
463+
464+ ChannelFuture connectFuture = bootstrap .connect (rawAddress );
465+ connectFuture .addListener (
466+ cf -> {
467+ if (connectFuture .isSuccess ()) {
468+ Channel channel = connectFuture .channel ();
469+ DriverChannel driverChannel =
470+ new DriverChannel (
471+ request .endPoint , channel , context .getWriteCoalescer (), request .currentVersion );
472+ if (request .isNegotiating ) {
473+ ChannelFactory .this .protocolVersion = request .currentVersion ;
329474 }
475+ if (ChannelFactory .this .clusterName == null ) {
476+ ChannelFactory .this .clusterName = driverChannel .getClusterName ();
477+ }
478+ request .resultFuture .complete (driverChannel );
479+ } else {
480+ request .resultFuture .completeExceptionally (connectFuture .cause ());
330481 }
331482 });
332483 }
0 commit comments