diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java index 5ad8c111262..cf92e007b2a 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java @@ -55,9 +55,13 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -196,69 +200,190 @@ CompletionStage connect( } connect( - endPoint, - shardingInfo, - shardId, - options, - nodeMetricUpdater, - currentVersion, - isNegotiating, - attemptedVersions, - resultFuture); + new ConnectRequest( + endPoint, + shardingInfo, + shardId, + options, + nodeMetricUpdater, + currentVersion, + isNegotiating, + attemptedVersions, + resultFuture)); return resultFuture; } - private void connect( - EndPoint endPoint, - NodeShardingInfo shardingInfo, - Integer shardId, - DriverChannelOptions options, - NodeMetricUpdater nodeMetricUpdater, - ProtocolVersion currentVersion, - boolean isNegotiating, - List attemptedVersions, - CompletableFuture resultFuture) { + /** + * Bundles all per-connection-attempt state so it can be threaded through the decomposed connect + * methods without a growing parameter list. + */ + private static class ConnectRequest { + final EndPoint endPoint; + final NodeShardingInfo shardingInfo; + final Integer shardId; + final DriverChannelOptions options; + final NodeMetricUpdater nodeMetricUpdater; + ProtocolVersion currentVersion; + boolean isNegotiating; + final List attemptedVersions; + final CompletableFuture resultFuture; - SocketAddress resolvedAddress; + ConnectRequest( + EndPoint endPoint, + NodeShardingInfo shardingInfo, + Integer shardId, + DriverChannelOptions options, + NodeMetricUpdater nodeMetricUpdater, + ProtocolVersion currentVersion, + boolean isNegotiating, + List attemptedVersions, + CompletableFuture resultFuture) { + this.endPoint = endPoint; + this.shardingInfo = shardingInfo; + this.shardId = shardId; + this.options = options; + this.nodeMetricUpdater = nodeMetricUpdater; + this.currentVersion = currentVersion; + this.isNegotiating = isNegotiating; + this.attemptedVersions = attemptedVersions; + this.resultFuture = resultFuture; + } + } + + /** + * Entry point for an actual connection attempt. Resolves the endpoint address — expanding + * unresolved hostnames to all known IP addresses so that non-responsive individual IPs can be + * skipped — then delegates iteration to {@link #tryNextAddress}. + */ + private void connect(ConnectRequest request) { + SocketAddress raw; try { - resolvedAddress = endPoint.resolve(); + raw = request.endPoint.resolve(); } catch (Exception e) { - resultFuture.completeExceptionally(e); + request.resultFuture.completeExceptionally(e); return; } - NettyOptions nettyOptions = context.getNettyOptions(); + List candidates; + if (raw instanceof InetSocketAddress) { + InetSocketAddress inetAddr = (InetSocketAddress) raw; + if (inetAddr.isUnresolved()) { + // Hostname has not been resolved yet — expand it to all known IPs now so that we can + // fall back to subsequent addresses if the first one is non-responsive. + try { + InetAddress[] all = InetAddress.getAllByName(inetAddr.getHostString()); + candidates = new ArrayList<>(all.length); + for (InetAddress addr : all) { + candidates.add(new InetSocketAddress(addr, inetAddr.getPort())); + } + } catch (UnknownHostException e) { + request.resultFuture.completeExceptionally(e); + return; + } + } else { + candidates = Collections.singletonList(inetAddr); + } + } else { + // Non-inet address (e.g. Unix domain socket) — pass through as-is. + tryNextAddressRaw(request, raw); + return; + } + + tryNextAddress(request, candidates, 0); + } + /** + * Iterates through the candidate addresses, calling {@link #connectToAddress} for each. If an + * address fails for a reason other than protocol negotiation, the next candidate is tried. Only + * when all candidates are exhausted is the overall {@code resultFuture} failed. + */ + private void tryNextAddress( + ConnectRequest request, List candidates, int index) { + InetSocketAddress address = candidates.get(index); + connectToAddress(request, address) + .whenComplete( + (channel, error) -> { + if (error == null) { + // Handshake succeeded on this address — propagate to the overall result. + request.resultFuture.complete(channel); + } else if (index + 1 < candidates.size()) { + LOG.debug( + "[{}] Failed to connect to {} ({}), trying next address", + logPrefix, + address, + error.getMessage()); + tryNextAddress(request, candidates, index + 1); + } else { + // Note: might be completed already if the failure happened in initializer() + request.resultFuture.completeExceptionally(error); + } + }); + } + + /** + * Performs a Netty bootstrap connect to a single, already-resolved {@link InetSocketAddress}. + * Handles protocol version negotiation (downgrade retries) internally, staying on the same + * address. + * + *

The returned {@code addressFuture} is wired as the initializer's {@code resultFuture}, so it + * is completed by {@link ProtocolInitHandler} after the full handshake — not at TCP-connect time. + * This lets {@link #tryNextAddress} distinguish a per-address TCP failure (try the next IP) from + * a successful protocol init (propagate to the overall {@code request.resultFuture}). + * + *

On TCP failure the listener rejects the {@code addressFuture} immediately, bypassing the + * handshake. On {@link UnsupportedProtocolVersionException} the same address is retried with a + * downgraded protocol, chaining back into the same {@code callerFuture}. + */ + private CompletableFuture connectToAddress( + ConnectRequest request, InetSocketAddress address) { + // callerFuture is what tryNextAddress observes. It is completed after protocol negotiation + // has fully settled (either success, downgrade-retry success, or unrecoverable failure). + CompletableFuture callerFuture = new CompletableFuture<>(); + + connectToAddressInternal(request, address, callerFuture); + return callerFuture; + } + + private void connectToAddressInternal( + ConnectRequest request, + InetSocketAddress address, + CompletableFuture callerFuture) { + NettyOptions nettyOptions = context.getNettyOptions(); Bootstrap bootstrap = new Bootstrap() .group(nettyOptions.ioEventLoopGroup()) .channel(nettyOptions.channelClass()) .option(ChannelOption.ALLOCATOR, nettyOptions.allocator()) .handler( - initializer(endPoint, currentVersion, options, nodeMetricUpdater, resultFuture)); - + initializer( + request.endPoint, + request.currentVersion, + request.options, + request.nodeMetricUpdater, + callerFuture)); nettyOptions.afterBootstrapInitialized(bootstrap); ChannelFuture connectFuture; - if (shardId == null || shardingInfo == null) { - if (shardId != null) { + if (request.shardId == null || request.shardingInfo == null) { + if (request.shardId != null) { LOG.debug( "Requested connection to shard {} but shardingInfo is currently missing for Node at endpoint {}. Falling back to arbitrary local port.", - shardId, - endPoint); + request.shardId, + request.endPoint); } - connectFuture = bootstrap.connect(resolvedAddress); + connectFuture = bootstrap.connect(address); } else { int localPort = - PortAllocator.getNextAvailablePort(shardingInfo.getShardsCount(), shardId, context); + PortAllocator.getNextAvailablePort( + request.shardingInfo.getShardsCount(), request.shardId, context); if (localPort == -1) { LOG.warn( "Could not find free port for shard {} at {}. Falling back to arbitrary local port.", - shardId, - endPoint); - connectFuture = bootstrap.connect(resolvedAddress); + request.shardId, + request.endPoint); + connectFuture = bootstrap.connect(address); } else { - connectFuture = bootstrap.connect(resolvedAddress, new InetSocketAddress(localPort)); + connectFuture = bootstrap.connect(address, new InetSocketAddress(localPort)); } } @@ -267,11 +392,92 @@ private void connect( if (connectFuture.isSuccess()) { Channel channel = connectFuture.channel(); DriverChannel driverChannel = - new DriverChannel(endPoint, channel, context.getWriteCoalescer(), currentVersion); - // If this is the first successful connection, remember the protocol version and - // cluster name for future connections. - if (isNegotiating) { - ChannelFactory.this.protocolVersion = currentVersion; + new DriverChannel( + request.endPoint, channel, context.getWriteCoalescer(), request.currentVersion); + if (request.isNegotiating) { + ChannelFactory.this.protocolVersion = request.currentVersion; + } + if (ChannelFactory.this.clusterName == null) { + ChannelFactory.this.clusterName = driverChannel.getClusterName(); + } + Map> supportedOptions = driverChannel.getOptions(); + if (ChannelFactory.this.productType == null && supportedOptions != null) { + List productTypes = supportedOptions.get("PRODUCT_TYPE"); + String productType = + productTypes != null && !productTypes.isEmpty() + ? productTypes.get(0) + : UNKNOWN_PRODUCT_TYPE; + ChannelFactory.this.productType = productType; + DriverConfig driverConfig = context.getConfig(); + if (driverConfig instanceof TypesafeDriverConfig + && productType.equals(DATASTAX_CLOUD_PRODUCT_TYPE)) { + ((TypesafeDriverConfig) driverConfig) + .overrideDefaults( + ImmutableMap.of( + DefaultDriverOption.REQUEST_CONSISTENCY, + ConsistencyLevel.LOCAL_QUORUM.name())); + } + } + callerFuture.complete(driverChannel); + } else { + Throwable error = connectFuture.cause(); + if (error instanceof UnsupportedProtocolVersionException && request.isNegotiating) { + request.attemptedVersions.add(request.currentVersion); + Optional downgraded = + context.getProtocolVersionRegistry().downgrade(request.currentVersion); + if (downgraded.isPresent()) { + LOG.debug( + "[{}] Failed to connect with protocol {}, retrying with {}", + logPrefix, + request.currentVersion, + downgraded.get()); + request.currentVersion = downgraded.get(); + connectToAddressInternal(request, address, callerFuture); + } else { + callerFuture.completeExceptionally( + UnsupportedProtocolVersionException.forNegotiation( + request.endPoint, request.attemptedVersions)); + } + } else { + // Note: might be completed already if the failure happened in initializer() + callerFuture.completeExceptionally(error); + } + } + }); + } + + /** + * Handles the non-{@link InetSocketAddress} path (e.g. Unix domain sockets, Netty local + * transport) by connecting directly to the raw address without multi-address fallback. Supports + * protocol-version negotiation (downgrade retries) the same way as {@link + * #connectToAddressInternal}. + */ + private void tryNextAddressRaw(ConnectRequest request, SocketAddress rawAddress) { + NettyOptions nettyOptions = context.getNettyOptions(); + Bootstrap bootstrap = + new Bootstrap() + .group(nettyOptions.ioEventLoopGroup()) + .channel(nettyOptions.channelClass()) + .option(ChannelOption.ALLOCATOR, nettyOptions.allocator()) + .handler( + initializer( + request.endPoint, + request.currentVersion, + request.options, + request.nodeMetricUpdater, + request.resultFuture)); + nettyOptions.afterBootstrapInitialized(bootstrap); + + ChannelFuture connectFuture = bootstrap.connect(rawAddress); + connectFuture.addListener( + cf -> { + if (connectFuture.isSuccess()) { + Channel channel = connectFuture.channel(); + DriverChannel driverChannel = + new DriverChannel( + request.endPoint, channel, context.getWriteCoalescer(), request.currentVersion); + if (request.isNegotiating) { + ChannelFactory.this.protocolVersion = request.currentVersion; } if (ChannelFactory.this.clusterName == null) { ChannelFactory.this.clusterName = driverChannel.getClusterName(); @@ -294,38 +500,29 @@ private void connect( ConsistencyLevel.LOCAL_QUORUM.name())); } } - resultFuture.complete(driverChannel); + request.resultFuture.complete(driverChannel); } else { Throwable error = connectFuture.cause(); - if (error instanceof UnsupportedProtocolVersionException && isNegotiating) { - attemptedVersions.add(currentVersion); + if (error instanceof UnsupportedProtocolVersionException && request.isNegotiating) { + request.attemptedVersions.add(request.currentVersion); Optional downgraded = - context.getProtocolVersionRegistry().downgrade(currentVersion); + context.getProtocolVersionRegistry().downgrade(request.currentVersion); if (downgraded.isPresent()) { LOG.debug( "[{}] Failed to connect with protocol {}, retrying with {}", logPrefix, - currentVersion, + request.currentVersion, downgraded.get()); - connect( - endPoint, - shardingInfo, - shardId, - options, - nodeMetricUpdater, - downgraded.get(), - true, - attemptedVersions, - resultFuture); + request.currentVersion = downgraded.get(); + tryNextAddressRaw(request, rawAddress); } else { - resultFuture.completeExceptionally( + request.resultFuture.completeExceptionally( UnsupportedProtocolVersionException.forNegotiation( - endPoint, attemptedVersions)); + request.endPoint, request.attemptedVersions)); } } else { - // Note: might be completed already if the failure happened in initializer(), this is - // fine - resultFuture.completeExceptionally(error); + // Note: might be completed already if the failure happened in initializer() + request.resultFuture.completeExceptionally(error); } } }); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index 4e9eefebf63..8f6d8dcf3c2 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -201,6 +201,48 @@ public void run_replace_test_20_times() { } } + /** + * Verifies that the driver can connect to a cluster when the first DNS entry for the contact + * point hostname resolves to a non-responsive IP address (DRIVER-201). + * + *

With {@code RESOLVE_CONTACT_POINTS=false} (the default), the hostname is kept unresolved + * until connection time. Previously, only the first IP returned by DNS was tried, causing an + * {@code AllNodesFailedException} when that IP was unreachable. The fix expands the hostname to + * all IPs at connection time and tries each one in sequence. + */ + @Test + public void should_connect_when_first_dns_entry_is_non_responsive() { + // Use a 2-node cluster on 127.0.1.x. Node 11 (127.0.1.11) does not exist and is therefore + // non-responsive; nodes 1 and 2 are real. + try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(2).withIpPrefix("127.0.1.").build()) { + MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake"); + // First entry intentionally points to a non-existent/non-responsive address. + MultimapHostResolverProvider.addResolverEntry("test.cluster.fake", "127.0.1.11"); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(1)); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(2)); + ccmBridge.create(); + ccmBridge.start(); + + DriverConfigLoader loader = + new DefaultProgrammaticDriverConfigLoaderBuilder() + .withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false) + .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), false) + .withStringList( + TypedDriverOption.CONTACT_POINTS.getRawOption(), + Collections.singletonList("test.cluster.fake:9042")) + .build(); + + // The session must open successfully despite the first DNS entry being unreachable. + try (CqlSession session = new CqlSessionBuilder().withConfigLoader(loader).build()) { + ResultSet rs = session.execute("select * from system.local where key='local'"); + assertThat(rs.one()).isNotNull(); + waitForAllNodesUp(session, 2); + } + } + } + // This is too long to run during CI, but is useful for manual investigations. @SuppressWarnings("unused") public void cannot_reconnect_with_resolved_socket() {