-
Notifications
You must be signed in to change notification settings - Fork 139
feat: Add E2E fallback to the spanner client. #4282
Changes from 9 commits
afdef61
d317412
02a1bb1
83ea935
9eed709
bd64b27
ca5b5b6
1140d7d
bd2c2e5
0350d8f
cfc8169
81655ed
98562f9
0659726
8fc77e7
6539620
2b5b889
f811a2b
fbef34d
66b9315
a26d6b5
66534c9
d8fcc4a
06760db
29dac67
2bd6884
70e5eb3
15e611e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,19 +51,24 @@ | |
| import com.google.api.gax.rpc.StatusCode; | ||
| import com.google.api.gax.rpc.StatusCode.Code; | ||
| import com.google.api.gax.rpc.StreamController; | ||
| import com.google.api.gax.rpc.TransportChannel; | ||
| import com.google.api.gax.rpc.TransportChannelProvider; | ||
| import com.google.api.gax.rpc.UnaryCallSettings; | ||
| import com.google.api.gax.rpc.UnaryCallable; | ||
| import com.google.api.gax.rpc.UnavailableException; | ||
| import com.google.api.gax.rpc.WatchdogProvider; | ||
| import com.google.api.pathtemplate.PathTemplate; | ||
| import com.google.auth.Credentials; | ||
| import com.google.cloud.RetryHelper; | ||
| import com.google.cloud.RetryHelper.RetryHelperException; | ||
| import com.google.cloud.grpc.GcpManagedChannel; | ||
| import com.google.cloud.grpc.GcpManagedChannelBuilder; | ||
| import com.google.cloud.grpc.GcpManagedChannelOptions; | ||
| import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; | ||
| import com.google.cloud.grpc.GrpcTransportOptions; | ||
| import com.google.cloud.grpc.fallback.GcpFallbackChannel; | ||
| import com.google.cloud.grpc.fallback.GcpFallbackChannelOptions; | ||
| import com.google.cloud.grpc.fallback.GcpFallbackOpenTelemetry; | ||
| import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException; | ||
| import com.google.cloud.spanner.BackupId; | ||
| import com.google.cloud.spanner.ErrorCode; | ||
|
|
@@ -185,9 +190,16 @@ | |
| import com.google.spanner.v1.SpannerGrpc; | ||
| import com.google.spanner.v1.Transaction; | ||
| import io.grpc.CallCredentials; | ||
| import io.grpc.CallOptions; | ||
| import io.grpc.Channel; | ||
| import io.grpc.ClientCall; | ||
| import io.grpc.ClientInterceptor; | ||
| import io.grpc.Context; | ||
| import io.grpc.ForwardingChannelBuilder2; | ||
| import io.grpc.ManagedChannel; | ||
| import io.grpc.ManagedChannelBuilder; | ||
| import io.grpc.MethodDescriptor; | ||
| import io.grpc.auth.MoreCallCredentials; | ||
| import java.io.IOException; | ||
| import java.io.UnsupportedEncodingException; | ||
| import java.net.URLDecoder; | ||
|
|
@@ -214,6 +226,7 @@ | |
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
| import javax.annotation.Nullable; | ||
|
|
@@ -341,65 +354,100 @@ public GapicSpannerRpc(final SpannerOptions options) { | |
| this.isDynamicChannelPoolEnabled = options.isDynamicChannelPoolEnabled(); | ||
| this.baseGrpcCallContext = createBaseCallContext(); | ||
|
|
||
| boolean isEnableDirectAccess = options.isEnableDirectAccess(); | ||
|
|
||
| if (initializeStubs) { | ||
| // First check if SpannerOptions provides a TransportChannelProvider. Create one | ||
| // with information gathered from SpannerOptions if none is provided | ||
| CredentialsProvider credentialsProvider = | ||
| GrpcTransportOptions.setUpCredentialsProvider(options); | ||
|
|
||
| InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = | ||
| InstantiatingGrpcChannelProvider.newBuilder() | ||
| .setChannelConfigurator(options.getChannelConfigurator()) | ||
| .setEndpoint(options.getEndpoint()) | ||
| .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) | ||
| .setMaxInboundMetadataSize(MAX_METADATA_SIZE) | ||
| .setPoolSize(options.getNumChannels()) | ||
|
|
||
| // Set a keepalive time of 120 seconds to help long running | ||
| // commit GRPC calls succeed | ||
| .setKeepAliveTimeDuration(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS)) | ||
|
|
||
| // Then check if SpannerOptions provides an InterceptorProvider. Create a default | ||
| // SpannerInterceptorProvider if none is provided | ||
| .setInterceptorProvider( | ||
| SpannerInterceptorProvider.create( | ||
| MoreObjects.firstNonNull( | ||
| options.getInterceptorProvider(), | ||
| SpannerInterceptorProvider.createDefault(options.getOpenTelemetry()))) | ||
| // This sets the trace context headers. | ||
| .withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry()) | ||
| // This sets the response compressor (Server -> Client). | ||
| .withEncoding(compressorName)) | ||
| .setHeaderProvider(headerProviderWithUserAgent) | ||
| .setAllowNonDefaultServiceAccount(true); | ||
| boolean isEnableDirectAccess = options.isEnableDirectAccess(); | ||
| if (isEnableDirectAccess) { | ||
| defaultChannelProviderBuilder.setAttemptDirectPath(true); | ||
| if (isEnableDirectPathBoundToken()) { | ||
| // This will let the credentials try to fetch a hard-bound access token if the runtime | ||
| // environment supports it. | ||
| defaultChannelProviderBuilder.setAllowHardBoundTokenTypes( | ||
| Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); | ||
| createChannelProviderBuilder(options, headerProviderWithUserAgent, isEnableDirectAccess); | ||
|
|
||
| if (options.getChannelProvider() == null | ||
| && isEnableDirectAccess | ||
| && isEnableGcpFallbackEnv()) { | ||
| InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder = | ||
| createChannelProviderBuilder( | ||
| options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false); | ||
|
|
||
| final AtomicReference<ManagedChannelBuilder> cloudPathBuilderRef = new AtomicReference<>(); | ||
| cloudPathProviderBuilder.setChannelConfigurator( | ||
| builder -> { | ||
| if (options.getChannelConfigurator() != null) { | ||
| builder = options.getChannelConfigurator().apply(builder); | ||
| } | ||
| cloudPathBuilderRef.set(builder); | ||
| return builder; | ||
| }); | ||
|
|
||
| // Build the cloudPathProvider to extract the builder which will be provided to | ||
| // FallbackChannelBuilder. | ||
| try (TransportChannel ignored = cloudPathProviderBuilder.build().getTransportChannel()) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to always pre-emptively create a cloudpath TransportChannel and then close it again at startup. Are we OK with that? (I see that this is conditional on DirectPath being enabled, and fallback being enabled, which means that it probably means that it is not something that all clients will do, but it will increase startup cost both for the client and the server)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other way i know to capture the builder is to intentionally throw an exception at the end of cloudPathProviderBuilder.setChannelConfigurator() and catch it during build(). However doing that seems a bit weird. Is there another way to do this i am missing? Note that this fallback is planned to be enabled when directpath is so will eventually be run by many clients. |
||
| } catch (Exception e) { | ||
| throw asSpannerException(e); | ||
| } | ||
| defaultChannelProviderBuilder.setAttemptDirectPathXds(); | ||
| } | ||
|
|
||
| options.enablegRPCMetrics(defaultChannelProviderBuilder); | ||
| ManagedChannelBuilder cloudPathBuilder = cloudPathBuilderRef.get(); | ||
| if (cloudPathBuilder == null) { | ||
| throw new IllegalStateException("CloudPath builder was not captured."); | ||
| } | ||
|
|
||
| if (options.isUseVirtualThreads()) { | ||
| ExecutorService executor = | ||
| tryCreateVirtualThreadPerTaskExecutor("spanner-virtual-grpc-executor"); | ||
| if (executor != null) { | ||
| defaultChannelProviderBuilder.setExecutor(executor); | ||
| try { | ||
| Credentials credentials = credentialsProvider.getCredentials(); | ||
| if (credentials != null) { | ||
| cloudPathBuilder.intercept( | ||
| new ClientInterceptor() { | ||
| @Override | ||
| public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | ||
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { | ||
| return next.newCall( | ||
| method, | ||
| callOptions.withCallCredentials(MoreCallCredentials.from(credentials))); | ||
| } | ||
| }); | ||
| } | ||
| } catch (Exception e) { | ||
| throw asSpannerException(e); | ||
| } | ||
|
|
||
| defaultChannelProviderBuilder.setChannelConfigurator( | ||
| directPathBuilder -> { | ||
| if (options.getChannelConfigurator() != null) { | ||
| directPathBuilder = options.getChannelConfigurator().apply(directPathBuilder); | ||
| } | ||
|
|
||
| String jsonApiConfig = parseGrpcGcpApiConfig(); | ||
| GcpManagedChannelOptions gcpOptions = grpcGcpOptionsWithMetricsAndDcp(options); | ||
| if (gcpOptions == null) { | ||
| gcpOptions = GcpManagedChannelOptions.newBuilder().build(); | ||
| } | ||
|
|
||
| GcpManagedChannelBuilder primaryGcpBuilder = | ||
| GcpManagedChannelBuilder.forDelegateBuilder(directPathBuilder) | ||
| .withApiConfigJsonString(jsonApiConfig) | ||
| .withOptions(gcpOptions); | ||
|
|
||
| GcpManagedChannelBuilder fallbackGcpBuilder = | ||
| GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder) | ||
| .withApiConfigJsonString(jsonApiConfig) | ||
| .withOptions(gcpOptions); | ||
|
|
||
| GcpFallbackOpenTelemetry fallbackTelemetry = | ||
| GcpFallbackOpenTelemetry.newBuilder().withSdk(options.getOpenTelemetry()).build(); | ||
|
kinsaurralde marked this conversation as resolved.
Outdated
|
||
|
|
||
| return new FallbackChannelBuilder( | ||
| primaryGcpBuilder, | ||
| fallbackGcpBuilder, | ||
| createFallbackChannelOptions(fallbackTelemetry)); | ||
| }); | ||
| } | ||
| // If it is enabled in options uses the channel pool provided by the gRPC-GCP extension. | ||
| maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options); | ||
|
|
||
| // First check if SpannerOptions provides a TransportChannelProvider. Create one | ||
| // with information gathered from SpannerOptions if none is provided | ||
| TransportChannelProvider channelProvider = | ||
| MoreObjects.firstNonNull( | ||
| options.getChannelProvider(), defaultChannelProviderBuilder.build()); | ||
|
|
||
| CredentialsProvider credentialsProvider = | ||
| GrpcTransportOptions.setUpCredentialsProvider(options); | ||
|
|
||
| spannerWatchdog = | ||
| Executors.newSingleThreadScheduledExecutor( | ||
| new ThreadFactoryBuilder() | ||
|
|
@@ -563,6 +611,17 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla | |
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| GcpFallbackChannelOptions createFallbackChannelOptions( | ||
| GcpFallbackOpenTelemetry fallbackTelemetry) { | ||
| return GcpFallbackChannelOptions.newBuilder() | ||
| .setPrimaryChannelName("directpath") | ||
| .setFallbackChannelName("cloudpath") | ||
| .setMinFailedCalls(1) | ||
|
kinsaurralde marked this conversation as resolved.
Outdated
|
||
| .setGcpFallbackOpenTelemetry(fallbackTelemetry) | ||
|
kinsaurralde marked this conversation as resolved.
|
||
| .build(); | ||
| } | ||
|
|
||
| private static String parseGrpcGcpApiConfig() { | ||
| try { | ||
| return Resources.toString( | ||
|
|
@@ -572,6 +631,60 @@ private static String parseGrpcGcpApiConfig() { | |
| } | ||
| } | ||
|
|
||
| private InstantiatingGrpcChannelProvider.Builder createChannelProviderBuilder( | ||
| final SpannerOptions options, | ||
| final HeaderProvider headerProviderWithUserAgent, | ||
| boolean isEnableDirectAccess) { | ||
| InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = | ||
| InstantiatingGrpcChannelProvider.newBuilder() | ||
| .setChannelConfigurator(options.getChannelConfigurator()) | ||
| .setEndpoint(options.getEndpoint()) | ||
| .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) | ||
| .setMaxInboundMetadataSize(MAX_METADATA_SIZE) | ||
| .setPoolSize(options.getNumChannels()) | ||
|
|
||
| // Set a keepalive time of 120 seconds to help long running | ||
| // commit GRPC calls succeed | ||
| .setKeepAliveTimeDuration(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS)) | ||
|
|
||
| // Then check if SpannerOptions provides an InterceptorProvider. Create a default | ||
| // SpannerInterceptorProvider if none is provided | ||
| .setInterceptorProvider( | ||
| SpannerInterceptorProvider.create( | ||
| MoreObjects.firstNonNull( | ||
| options.getInterceptorProvider(), | ||
| SpannerInterceptorProvider.createDefault(options.getOpenTelemetry()))) | ||
| // This sets the trace context headers. | ||
| .withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry()) | ||
| // This sets the response compressor (Server -> Client). | ||
| .withEncoding(compressorName)) | ||
| .setHeaderProvider(headerProviderWithUserAgent) | ||
| .setAllowNonDefaultServiceAccount(true); | ||
| if (isEnableDirectAccess) { | ||
| defaultChannelProviderBuilder.setAttemptDirectPath(true); | ||
| defaultChannelProviderBuilder.setAttemptDirectPathXds(); | ||
| if (isEnableDirectPathBoundToken()) { | ||
| // This will let the credentials try to fetch a hard-bound access token if the runtime | ||
| // environment supports it. | ||
| defaultChannelProviderBuilder.setAllowHardBoundTokenTypes( | ||
| Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); | ||
| } | ||
| } | ||
|
|
||
| options.enablegRPCMetrics(defaultChannelProviderBuilder); | ||
|
|
||
| if (options.isUseVirtualThreads()) { | ||
| ExecutorService executor = | ||
| tryCreateVirtualThreadPerTaskExecutor("spanner-virtual-grpc-executor"); | ||
| if (executor != null) { | ||
| defaultChannelProviderBuilder.setExecutor(executor); | ||
| } | ||
| } | ||
| // If it is enabled in options uses the channel pool provided by the gRPC-GCP extension. | ||
| maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options); | ||
| return defaultChannelProviderBuilder; | ||
| } | ||
|
|
||
| // Enhance gRPC-GCP options with metrics and dynamic channel pool configuration. | ||
| private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerOptions options) { | ||
| GcpManagedChannelOptions grpcGcpOptions = | ||
|
|
@@ -715,6 +828,15 @@ public static boolean isEnableDirectPathBoundToken() { | |
| return !Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_DISABLE_DIRECT_ACCESS_BOUND_TOKEN")); | ||
| } | ||
|
|
||
| @VisibleForTesting static Boolean enableGcpFallbackEnv = null; | ||
|
|
||
| public static boolean isEnableGcpFallbackEnv() { | ||
| if (enableGcpFallbackEnv != null) { | ||
| return enableGcpFallbackEnv; | ||
| } | ||
| return Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_ENABLE_GCP_FALLBACK")); | ||
| } | ||
|
kinsaurralde marked this conversation as resolved.
Outdated
|
||
|
|
||
| private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS = | ||
| RetrySettings.newBuilder() | ||
| .setInitialRetryDelayDuration(Duration.ofSeconds(5L)) | ||
|
|
@@ -2313,4 +2435,40 @@ private static Duration systemProperty(String name, int defaultValue) { | |
| String stringValue = System.getProperty(name, ""); | ||
| return Duration.ofSeconds(stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue)); | ||
| } | ||
|
|
||
| // Wrapper class to build the GcpFallbackChannel using GAX's configuration | ||
| private static class FallbackChannelBuilder | ||
| extends ForwardingChannelBuilder2<FallbackChannelBuilder> { | ||
| private final GcpFallbackChannelOptions options; | ||
|
|
||
| private final GcpManagedChannelBuilder primaryGcpBuilder; | ||
| private final GcpManagedChannelBuilder fallbackGcpBuilder; | ||
|
|
||
| private FallbackChannelBuilder( | ||
| GcpManagedChannelBuilder primary, | ||
| GcpManagedChannelBuilder fallback, | ||
| GcpFallbackChannelOptions options) { | ||
| this.primaryGcpBuilder = primary; | ||
| this.fallbackGcpBuilder = fallback; | ||
| this.options = options; | ||
| } | ||
|
|
||
| /** | ||
| * Delegates all configuration calls (e.g., interceptors, userAgent) to the primary builder. | ||
| * This ensures the primary channel receives all of GAX's standard configuration. | ||
| */ | ||
| @Override | ||
| protected ManagedChannelBuilder<?> delegate() { | ||
| return primaryGcpBuilder; | ||
| } | ||
|
|
||
| /** | ||
| * Overrides the build method to return our custom GcpFallbackChannel instead of a standard gRPC | ||
| * channel. | ||
| */ | ||
| @Override | ||
| public ManagedChannel build() { | ||
| return new GcpFallbackChannel(options, primaryGcpBuilder, fallbackGcpBuilder); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.