Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
afdef61
Add GCPFallback support
kinsaurralde Dec 29, 2025
d317412
formatting
kinsaurralde Dec 29, 2025
02a1bb1
fix imports
kinsaurralde Jan 5, 2026
83ea935
Merge branch 'googleapis:main' into eef
kinsaurralde Jan 5, 2026
9eed709
Add fallback test to GapicSpannerRpcTest
kinsaurralde Jan 6, 2026
bd64b27
respond to comments
kinsaurralde Jan 8, 2026
ca5b5b6
fixes
kinsaurralde Jan 9, 2026
1140d7d
Merge branch 'googleapis:main' into eef
kinsaurralde Jan 9, 2026
bd2c2e5
Merge branch 'main' into eef
rahul2393 Jan 16, 2026
0350d8f
add minFailedCalls parameter to createFallbackChannelOptions
kinsaurralde Jan 16, 2026
cfc8169
Merge branch 'eef' of github.com:kinsaurralde/java-spanner into eef
kinsaurralde Jan 16, 2026
81655ed
limit collected eef metrics
kinsaurralde Jan 20, 2026
98562f9
Merge branch 'main' into eef
kinsaurralde Jan 20, 2026
0659726
Merge branch 'eef' of github.com:kinsaurralde/java-spanner into eef
kinsaurralde Jan 20, 2026
8fc77e7
Merge branch 'main' into eef
kinsaurralde Feb 2, 2026
6539620
fix test
kinsaurralde Feb 3, 2026
2b5b889
remove unintentional changes
kinsaurralde Feb 3, 2026
f811a2b
formatting fix
kinsaurralde Feb 3, 2026
fbef34d
Merge branch 'main' into eef
kinsaurralde Feb 3, 2026
66b9315
formatting fixes
kinsaurralde Feb 3, 2026
a26d6b5
Merge branch 'eef' of github.com:kinsaurralde/java-spanner into eef
kinsaurralde Feb 3, 2026
66534c9
formatting fixes
kinsaurralde Feb 3, 2026
d8fcc4a
Merge branch 'main' into eef
kinsaurralde Feb 5, 2026
06760db
Merge branch 'main' into eef
rahul2393 Feb 7, 2026
29dac67
Merge branch 'main' into eef
kinsaurralde Feb 9, 2026
2bd6884
merge fixes
kinsaurralde Feb 10, 2026
70e5eb3
address comments
kinsaurralde Feb 11, 2026
15e611e
Merge branch 'main' into eef
rahul2393 Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,24 @@
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.UnavailableException;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.api.pathtemplate.PathTemplate;
import com.google.auth.Credentials;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.grpc.GcpManagedChannel;
import com.google.cloud.grpc.GcpManagedChannelBuilder;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.fallback.GcpFallbackChannel;
import com.google.cloud.grpc.fallback.GcpFallbackChannelOptions;
import com.google.cloud.grpc.fallback.GcpFallbackOpenTelemetry;
import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException;
import com.google.cloud.spanner.BackupId;
import com.google.cloud.spanner.ErrorCode;
Expand Down Expand Up @@ -185,9 +190,16 @@
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.Transaction;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.ForwardingChannelBuilder2;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
Expand All @@ -214,6 +226,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -341,65 +354,95 @@ public GapicSpannerRpc(final SpannerOptions options) {
this.isDynamicChannelPoolEnabled = options.isDynamicChannelPoolEnabled();
this.baseGrpcCallContext = createBaseCallContext();

boolean isEnableDirectAccess = options.isEnableDirectAccess();

if (initializeStubs) {
// First check if SpannerOptions provides a TransportChannelProvider. Create one
// with information gathered from SpannerOptions if none is provided
CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(options.getChannelConfigurator())
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTimeDuration(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))

// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault(options.getOpenTelemetry())))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
// This sets the response compressor (Server -> Client).
.withEncoding(compressorName))
.setHeaderProvider(headerProviderWithUserAgent)
.setAllowNonDefaultServiceAccount(true);
boolean isEnableDirectAccess = options.isEnableDirectAccess();
if (isEnableDirectAccess) {
defaultChannelProviderBuilder.setAttemptDirectPath(true);
if (isEnableDirectPathBoundToken()) {
// This will let the credentials try to fetch a hard-bound access token if the runtime
// environment supports it.
defaultChannelProviderBuilder.setAllowHardBoundTokenTypes(
Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS));
getDefaultChannelProviderBuilder(
options, headerProviderWithUserAgent, isEnableDirectAccess);

if (options.getChannelProvider() == null
Comment thread
kinsaurralde marked this conversation as resolved.
&& isEnableDirectAccess
&& isEnableGcpFallbackEnv()) {
InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder =
getDefaultChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false);
cloudPathProviderBuilder.setAttemptDirectPath(false);
Comment thread
kinsaurralde marked this conversation as resolved.
Outdated

final AtomicReference<ManagedChannelBuilder> cloudPathBuilderRef = new AtomicReference<>();
cloudPathProviderBuilder.setChannelConfigurator(
builder -> {
cloudPathBuilderRef.set(builder);
return builder;
});

// Build the cloudPathProvider to extract the builder which will be provided to
// FallbackChannelBuilder.
try (TransportChannel ignored = cloudPathProviderBuilder.build().getTransportChannel()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to always pre-emptively create a cloudpath TransportChannel and then close it again at startup. Are we OK with that? (I see that this is conditional on DirectPath being enabled, and fallback being enabled, which means that it probably means that it is not something that all clients will do, but it will increase startup cost both for the client and the server)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other way i know to capture the builder is to intentionally throw an exception at the end of cloudPathProviderBuilder.setChannelConfigurator() and catch it during build(). However doing that seems a bit weird. Is there another way to do this i am missing?

Note that this fallback is planned to be enabled when directpath is so will eventually be run by many clients.

} catch (Exception e) {
throw asSpannerException(e);
}
defaultChannelProviderBuilder.setAttemptDirectPathXds();
}

options.enablegRPCMetrics(defaultChannelProviderBuilder);
ManagedChannelBuilder cloudPathBuilder = cloudPathBuilderRef.get();
if (cloudPathBuilder == null) {
throw new IllegalStateException("CloudPath builder was not captured.");
}

if (options.isUseVirtualThreads()) {
ExecutorService executor =
tryCreateVirtualThreadPerTaskExecutor("spanner-virtual-grpc-executor");
if (executor != null) {
defaultChannelProviderBuilder.setExecutor(executor);
try {
Credentials credentials = credentialsProvider.getCredentials();
if (credentials != null) {
cloudPathBuilder.intercept(
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return next.newCall(
method,
callOptions.withCallCredentials(MoreCallCredentials.from(credentials)));
}
});
}
} catch (Exception e) {
throw asSpannerException(e);
}

defaultChannelProviderBuilder.setChannelConfigurator(
directPathBuilder -> {
String jsonApiConfig = parseGrpcGcpApiConfig();
GcpManagedChannelOptions gcpOptions = options.getGrpcGcpOptions();
if (gcpOptions == null) {
gcpOptions = GcpManagedChannelOptions.newBuilder().build();
}

GcpManagedChannelBuilder primaryGcpBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(directPathBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);

GcpManagedChannelBuilder fallbackGcpBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);

GcpFallbackOpenTelemetry fallbackTelemetry =
GcpFallbackOpenTelemetry.newBuilder().withSdk(options.getOpenTelemetry()).build();
Comment thread
kinsaurralde marked this conversation as resolved.
Outdated

return new FallbackChannelBuilder(
primaryGcpBuilder,
fallbackGcpBuilder,
createFallbackChannelOptions(fallbackTelemetry));
});
}
// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);

// First check if SpannerOptions provides a TransportChannelProvider. Create one
// with information gathered from SpannerOptions if none is provided
TransportChannelProvider channelProvider =
MoreObjects.firstNonNull(
options.getChannelProvider(), defaultChannelProviderBuilder.build());

CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

spannerWatchdog =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
Expand Down Expand Up @@ -563,6 +606,17 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
}
}

@VisibleForTesting
GcpFallbackChannelOptions createFallbackChannelOptions(
GcpFallbackOpenTelemetry fallbackTelemetry) {
return GcpFallbackChannelOptions.newBuilder()
.setPrimaryChannelName("directpath")
.setFallbackChannelName("cloudpath")
.setMinFailedCalls(1)
Comment thread
kinsaurralde marked this conversation as resolved.
Outdated
.setGcpFallbackOpenTelemetry(fallbackTelemetry)
Comment thread
kinsaurralde marked this conversation as resolved.
.build();
}

private static String parseGrpcGcpApiConfig() {
try {
return Resources.toString(
Expand All @@ -572,6 +626,60 @@ private static String parseGrpcGcpApiConfig() {
}
}

private InstantiatingGrpcChannelProvider.Builder getDefaultChannelProviderBuilder(
Comment thread
kinsaurralde marked this conversation as resolved.
Outdated
final SpannerOptions options,
final HeaderProvider headerProviderWithUserAgent,
boolean isEnableDirectAccess) {
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(options.getChannelConfigurator())
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTimeDuration(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))

// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault(options.getOpenTelemetry())))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
// This sets the response compressor (Server -> Client).
.withEncoding(compressorName))
.setHeaderProvider(headerProviderWithUserAgent)
.setAllowNonDefaultServiceAccount(true);
if (isEnableDirectAccess) {
defaultChannelProviderBuilder.setAttemptDirectPath(true);
defaultChannelProviderBuilder.setAttemptDirectPathXds();
if (isEnableDirectPathBoundToken()) {
// This will let the credentials try to fetch a hard-bound access token if the runtime
// environment supports it.
defaultChannelProviderBuilder.setAllowHardBoundTokenTypes(
Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS));
}
}

options.enablegRPCMetrics(defaultChannelProviderBuilder);

if (options.isUseVirtualThreads()) {
ExecutorService executor =
tryCreateVirtualThreadPerTaskExecutor("spanner-virtual-grpc-executor");
if (executor != null) {
defaultChannelProviderBuilder.setExecutor(executor);
}
}
// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);
return defaultChannelProviderBuilder;
}

// Enhance gRPC-GCP options with metrics and dynamic channel pool configuration.
private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerOptions options) {
GcpManagedChannelOptions grpcGcpOptions =
Expand Down Expand Up @@ -715,6 +823,15 @@ public static boolean isEnableDirectPathBoundToken() {
return !Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_DISABLE_DIRECT_ACCESS_BOUND_TOKEN"));
}

@VisibleForTesting static Boolean enableGcpFallbackEnv = null;

public static boolean isEnableGcpFallbackEnv() {
if (enableGcpFallbackEnv != null) {
return enableGcpFallbackEnv;
}
return Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_ENABLE_GCP_FALLBACK"));
}
Comment thread
kinsaurralde marked this conversation as resolved.
Outdated

private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelayDuration(Duration.ofSeconds(5L))
Expand Down Expand Up @@ -2313,4 +2430,40 @@ private static Duration systemProperty(String name, int defaultValue) {
String stringValue = System.getProperty(name, "");
return Duration.ofSeconds(stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue));
}

// Wrapper class to build the GcpFallbackChannel using GAX's configuration
private static class FallbackChannelBuilder
extends ForwardingChannelBuilder2<FallbackChannelBuilder> {
private final GcpFallbackChannelOptions options;

private final GcpManagedChannelBuilder primaryGcpBuilder;
private final GcpManagedChannelBuilder fallbackGcpBuilder;

private FallbackChannelBuilder(
GcpManagedChannelBuilder primary,
GcpManagedChannelBuilder fallback,
GcpFallbackChannelOptions options) {
this.primaryGcpBuilder = primary;
this.fallbackGcpBuilder = fallback;
this.options = options;
}

/**
* Delegates all configuration calls (e.g., interceptors, userAgent) to the primary builder.
* This ensures the primary channel receives all of GAX's standard configuration.
*/
@Override
protected ManagedChannelBuilder<?> delegate() {
return primaryGcpBuilder;
}

/**
* Overrides the build method to return our custom GcpFallbackChannel instead of a standard gRPC
* channel.
*/
@Override
public ManagedChannel build() {
return new GcpFallbackChannel(options, primaryGcpBuilder, fallbackGcpBuilder);
}
}
}
Loading
Loading