Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -196,69 +200,190 @@ CompletionStage<DriverChannel> connect(
}

connect(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
currentVersion,
isNegotiating,
attemptedVersions,
resultFuture);
new ConnectRequest(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
currentVersion,
isNegotiating,
attemptedVersions,
resultFuture));
return resultFuture;
}

private void connect(
EndPoint endPoint,
NodeShardingInfo shardingInfo,
Integer shardId,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater,
ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
CompletableFuture<DriverChannel> resultFuture) {
/**
* Bundles all per-connection-attempt state so it can be threaded through the decomposed connect
* methods without a growing parameter list.
*/
private static class ConnectRequest {
final EndPoint endPoint;
final NodeShardingInfo shardingInfo;
final Integer shardId;
final DriverChannelOptions options;
final NodeMetricUpdater nodeMetricUpdater;
ProtocolVersion currentVersion;
boolean isNegotiating;
final List<ProtocolVersion> attemptedVersions;
final CompletableFuture<DriverChannel> resultFuture;

SocketAddress resolvedAddress;
ConnectRequest(
EndPoint endPoint,
NodeShardingInfo shardingInfo,
Integer shardId,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater,
ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
CompletableFuture<DriverChannel> resultFuture) {
this.endPoint = endPoint;
this.shardingInfo = shardingInfo;
this.shardId = shardId;
this.options = options;
this.nodeMetricUpdater = nodeMetricUpdater;
this.currentVersion = currentVersion;
this.isNegotiating = isNegotiating;
this.attemptedVersions = attemptedVersions;
this.resultFuture = resultFuture;
}
}

/**
* Entry point for an actual connection attempt. Resolves the endpoint address — expanding
* unresolved hostnames to all known IP addresses so that non-responsive individual IPs can be
* skipped — then delegates iteration to {@link #tryNextAddress}.
*/
private void connect(ConnectRequest request) {
SocketAddress raw;
try {
resolvedAddress = endPoint.resolve();
raw = request.endPoint.resolve();
} catch (Exception e) {
resultFuture.completeExceptionally(e);
request.resultFuture.completeExceptionally(e);
return;
}

NettyOptions nettyOptions = context.getNettyOptions();
List<InetSocketAddress> candidates;
if (raw instanceof InetSocketAddress) {
InetSocketAddress inetAddr = (InetSocketAddress) raw;
if (inetAddr.isUnresolved()) {
// Hostname has not been resolved yet — expand it to all known IPs now so that we can
// fall back to subsequent addresses if the first one is non-responsive.
try {
InetAddress[] all = InetAddress.getAllByName(inetAddr.getHostString());
candidates = new ArrayList<>(all.length);
for (InetAddress addr : all) {
candidates.add(new InetSocketAddress(addr, inetAddr.getPort()));
}
} catch (UnknownHostException e) {
request.resultFuture.completeExceptionally(e);
return;
}
} else {
candidates = Collections.singletonList(inetAddr);
}
} else {
// Non-inet address (e.g. Unix domain socket) — pass through as-is.
tryNextAddressRaw(request, raw);
return;
}

tryNextAddress(request, candidates, 0);
}

/**
* Iterates through the candidate addresses, calling {@link #connectToAddress} for each. If an
* address fails for a reason other than protocol negotiation, the next candidate is tried. Only
* when all candidates are exhausted is the overall {@code resultFuture} failed.
*/
private void tryNextAddress(
ConnectRequest request, List<InetSocketAddress> candidates, int index) {
InetSocketAddress address = candidates.get(index);
connectToAddress(request, address)
.whenComplete(
(channel, error) -> {
if (error == null) {
// Handshake succeeded on this address — propagate to the overall result.
request.resultFuture.complete(channel);
} else if (index + 1 < candidates.size()) {
LOG.debug(
"[{}] Failed to connect to {} ({}), trying next address",
logPrefix,
address,
error.getMessage());
tryNextAddress(request, candidates, index + 1);
} else {
// Note: might be completed already if the failure happened in initializer()
request.resultFuture.completeExceptionally(error);
}
});
}

/**
* Performs a Netty bootstrap connect to a single, already-resolved {@link InetSocketAddress}.
* Handles protocol version negotiation (downgrade retries) internally, staying on the same
* address.
*
* <p>The returned {@code addressFuture} is wired as the initializer's {@code resultFuture}, so it
* is completed by {@link ProtocolInitHandler} after the full handshake — not at TCP-connect time.
* This lets {@link #tryNextAddress} distinguish a per-address TCP failure (try the next IP) from
* a successful protocol init (propagate to the overall {@code request.resultFuture}).
*
* <p>On TCP failure the listener rejects the {@code addressFuture} immediately, bypassing the
* handshake. On {@link UnsupportedProtocolVersionException} the same address is retried with a
* downgraded protocol, chaining back into the same {@code callerFuture}.
*/
private CompletableFuture<DriverChannel> connectToAddress(
ConnectRequest request, InetSocketAddress address) {
// callerFuture is what tryNextAddress observes. It is completed after protocol negotiation
// has fully settled (either success, downgrade-retry success, or unrecoverable failure).
CompletableFuture<DriverChannel> callerFuture = new CompletableFuture<>();

connectToAddressInternal(request, address, callerFuture);
return callerFuture;
}

private void connectToAddressInternal(
ConnectRequest request,
InetSocketAddress address,
CompletableFuture<DriverChannel> callerFuture) {
NettyOptions nettyOptions = context.getNettyOptions();
Bootstrap bootstrap =
new Bootstrap()
.group(nettyOptions.ioEventLoopGroup())
.channel(nettyOptions.channelClass())
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
.handler(
initializer(endPoint, currentVersion, options, nodeMetricUpdater, resultFuture));

initializer(
request.endPoint,
request.currentVersion,
request.options,
request.nodeMetricUpdater,
callerFuture));
nettyOptions.afterBootstrapInitialized(bootstrap);

ChannelFuture connectFuture;
if (shardId == null || shardingInfo == null) {
if (shardId != null) {
if (request.shardId == null || request.shardingInfo == null) {
if (request.shardId != null) {
LOG.debug(
"Requested connection to shard {} but shardingInfo is currently missing for Node at endpoint {}. Falling back to arbitrary local port.",
shardId,
endPoint);
request.shardId,
request.endPoint);
}
connectFuture = bootstrap.connect(resolvedAddress);
connectFuture = bootstrap.connect(address);
} else {
int localPort =
PortAllocator.getNextAvailablePort(shardingInfo.getShardsCount(), shardId, context);
PortAllocator.getNextAvailablePort(
request.shardingInfo.getShardsCount(), request.shardId, context);
if (localPort == -1) {
LOG.warn(
"Could not find free port for shard {} at {}. Falling back to arbitrary local port.",
shardId,
endPoint);
connectFuture = bootstrap.connect(resolvedAddress);
request.shardId,
request.endPoint);
connectFuture = bootstrap.connect(address);
} else {
connectFuture = bootstrap.connect(resolvedAddress, new InetSocketAddress(localPort));
connectFuture = bootstrap.connect(address, new InetSocketAddress(localPort));
}
}

Expand All @@ -267,11 +392,92 @@ private void connect(
if (connectFuture.isSuccess()) {
Channel channel = connectFuture.channel();
DriverChannel driverChannel =
new DriverChannel(endPoint, channel, context.getWriteCoalescer(), currentVersion);
// If this is the first successful connection, remember the protocol version and
// cluster name for future connections.
if (isNegotiating) {
ChannelFactory.this.protocolVersion = currentVersion;
new DriverChannel(
request.endPoint, channel, context.getWriteCoalescer(), request.currentVersion);
if (request.isNegotiating) {
ChannelFactory.this.protocolVersion = request.currentVersion;
}
if (ChannelFactory.this.clusterName == null) {
ChannelFactory.this.clusterName = driverChannel.getClusterName();
}
Map<String, List<String>> supportedOptions = driverChannel.getOptions();
if (ChannelFactory.this.productType == null && supportedOptions != null) {
List<String> productTypes = supportedOptions.get("PRODUCT_TYPE");
String productType =
productTypes != null && !productTypes.isEmpty()
? productTypes.get(0)
: UNKNOWN_PRODUCT_TYPE;
ChannelFactory.this.productType = productType;
DriverConfig driverConfig = context.getConfig();
if (driverConfig instanceof TypesafeDriverConfig
&& productType.equals(DATASTAX_CLOUD_PRODUCT_TYPE)) {
((TypesafeDriverConfig) driverConfig)
.overrideDefaults(
ImmutableMap.of(
DefaultDriverOption.REQUEST_CONSISTENCY,
ConsistencyLevel.LOCAL_QUORUM.name()));
}
}
callerFuture.complete(driverChannel);
} else {
Throwable error = connectFuture.cause();
if (error instanceof UnsupportedProtocolVersionException && request.isNegotiating) {
request.attemptedVersions.add(request.currentVersion);
Optional<ProtocolVersion> downgraded =
context.getProtocolVersionRegistry().downgrade(request.currentVersion);
if (downgraded.isPresent()) {
LOG.debug(
"[{}] Failed to connect with protocol {}, retrying with {}",
logPrefix,
request.currentVersion,
downgraded.get());
request.currentVersion = downgraded.get();
connectToAddressInternal(request, address, callerFuture);
} else {
callerFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(
request.endPoint, request.attemptedVersions));
}
} else {
// Note: might be completed already if the failure happened in initializer()
callerFuture.completeExceptionally(error);
}
}
});
}

/**
* Handles the non-{@link InetSocketAddress} path (e.g. Unix domain sockets, Netty local
* transport) by connecting directly to the raw address without multi-address fallback. Supports
* protocol-version negotiation (downgrade retries) the same way as {@link
* #connectToAddressInternal}.
*/
private void tryNextAddressRaw(ConnectRequest request, SocketAddress rawAddress) {
NettyOptions nettyOptions = context.getNettyOptions();
Bootstrap bootstrap =
new Bootstrap()
.group(nettyOptions.ioEventLoopGroup())
.channel(nettyOptions.channelClass())
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
.handler(
initializer(
request.endPoint,
request.currentVersion,
request.options,
request.nodeMetricUpdater,
request.resultFuture));
nettyOptions.afterBootstrapInitialized(bootstrap);

ChannelFuture connectFuture = bootstrap.connect(rawAddress);
connectFuture.addListener(
cf -> {
if (connectFuture.isSuccess()) {
Channel channel = connectFuture.channel();
DriverChannel driverChannel =
new DriverChannel(
request.endPoint, channel, context.getWriteCoalescer(), request.currentVersion);
if (request.isNegotiating) {
ChannelFactory.this.protocolVersion = request.currentVersion;
}
if (ChannelFactory.this.clusterName == null) {
ChannelFactory.this.clusterName = driverChannel.getClusterName();
Expand All @@ -294,38 +500,29 @@ private void connect(
ConsistencyLevel.LOCAL_QUORUM.name()));
}
}
resultFuture.complete(driverChannel);
request.resultFuture.complete(driverChannel);
} else {
Throwable error = connectFuture.cause();
if (error instanceof UnsupportedProtocolVersionException && isNegotiating) {
attemptedVersions.add(currentVersion);
if (error instanceof UnsupportedProtocolVersionException && request.isNegotiating) {
request.attemptedVersions.add(request.currentVersion);
Optional<ProtocolVersion> downgraded =
context.getProtocolVersionRegistry().downgrade(currentVersion);
context.getProtocolVersionRegistry().downgrade(request.currentVersion);
if (downgraded.isPresent()) {
LOG.debug(
"[{}] Failed to connect with protocol {}, retrying with {}",
logPrefix,
currentVersion,
request.currentVersion,
downgraded.get());
connect(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
downgraded.get(),
true,
attemptedVersions,
resultFuture);
request.currentVersion = downgraded.get();
tryNextAddressRaw(request, rawAddress);
} else {
resultFuture.completeExceptionally(
request.resultFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(
endPoint, attemptedVersions));
request.endPoint, request.attemptedVersions));
}
} else {
// Note: might be completed already if the failure happened in initializer(), this is
// fine
resultFuture.completeExceptionally(error);
// Note: might be completed already if the failure happened in initializer()
request.resultFuture.completeExceptionally(error);
}
}
});
Expand Down
Loading
Loading