-
Notifications
You must be signed in to change notification settings - Fork 134
feat: add initial opentelemetry tracing to big query HTTP requests #4126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f5fa8ea
4faaa82
4202fb3
0372db5
de0ee8a
68704bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,286 @@ | ||
| /* | ||
| * Copyright 2026 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.bigquery.spi.v2; | ||
|
|
||
| import com.google.api.client.http.*; | ||
| import com.google.api.core.InternalApi; | ||
| import com.google.cloud.bigquery.telemetry.BigQueryTelemetryTracer; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.trace.Span; | ||
| import io.opentelemetry.api.trace.StatusCode; | ||
| import io.opentelemetry.api.trace.Tracer; | ||
| import java.io.IOException; | ||
| import java.net.URI; | ||
| import java.net.URISyntaxException; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Set; | ||
|
|
||
| /** | ||
| * HttpRequestInitializer that wraps a delegate initializer, intercepts all HTTP requests, adds | ||
| * OpenTelemetry tracing and then invokes delegate interceptor. | ||
| */ | ||
| @InternalApi | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's mark all new classes as |
||
| public class HttpTracingRequestInitializer implements HttpRequestInitializer { | ||
|
|
||
| // HTTP Specific Telemetry Attributes | ||
| public static final AttributeKey<String> HTTP_REQUEST_METHOD = | ||
| AttributeKey.stringKey("http.request.method"); | ||
| public static final AttributeKey<String> URL_FULL = AttributeKey.stringKey("url.full"); | ||
| public static final AttributeKey<String> URL_TEMPLATE = AttributeKey.stringKey("url.template"); | ||
| public static final AttributeKey<String> URL_DOMAIN = AttributeKey.stringKey("url.domain"); | ||
| public static final AttributeKey<Long> HTTP_RESPONSE_STATUS_CODE = | ||
| AttributeKey.longKey("http.response.status_code"); | ||
| public static final AttributeKey<Long> HTTP_REQUEST_RESEND_COUNT = | ||
| AttributeKey.longKey("http.request.resend_count"); | ||
| public static final AttributeKey<Long> HTTP_REQUEST_BODY_SIZE = | ||
| AttributeKey.longKey("http.request.body.size"); | ||
| public static final AttributeKey<Long> HTTP_RESPONSE_BODY_SIZE = | ||
| AttributeKey.longKey("http.response.body.size"); | ||
|
|
||
| @VisibleForTesting static final String HTTP_RPC_SYSTEM_NAME = "http"; | ||
|
|
||
| private static final String REDACTED_VALUE = "REDACTED"; | ||
| // Required by OpenTelemetry semantic conventions: | ||
| // https://opentelemetry.io/docs/specs/semconv/registry/attributes/url/#url-full | ||
| private static final Set<String> SENSITIVE_QUERY_KEYS = | ||
| Collections.unmodifiableSet( | ||
| new HashSet<>( | ||
| Arrays.asList( | ||
| "AWSAccessKeyId", | ||
| "Signature", | ||
| "sig", | ||
| "X-Goog-Signature", | ||
| // Google uses this as a key in resumable uploads. | ||
| "upload_id"))); | ||
|
|
||
| private final HttpRequestInitializer delegate; | ||
| private final Tracer tracer; | ||
| private final String clientRootUrl; | ||
|
|
||
| public HttpTracingRequestInitializer( | ||
| HttpRequestInitializer delegate, Tracer tracer, String clientRootUrl) { | ||
| this.delegate = delegate; | ||
| this.tracer = tracer; | ||
| this.clientRootUrl = clientRootUrl; | ||
| } | ||
|
|
||
| @Override | ||
| public void initialize(HttpRequest request) throws IOException { | ||
| if (delegate != null) { | ||
| delegate.initialize(request); | ||
| } | ||
|
|
||
| if (tracer == null) { | ||
| return; | ||
| } | ||
|
|
||
| String httpMethod = request.getRequestMethod(); | ||
| String url = request.getUrl().build(); | ||
| String host = request.getUrl().getHost(); | ||
| Integer port = request.getUrl().getPort(); | ||
|
|
||
| Span span = createHttpTraceSpan(httpMethod, url, host, port); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the span is started here during |
||
|
|
||
| HttpResponseInterceptor originalInterceptor = request.getResponseInterceptor(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are exactly one of these guaranteed to run to close the span? |
||
| request.setResponseInterceptor( | ||
| response -> { | ||
| addCommonResponseAttributesToSpan( | ||
| request, response, span, httpMethod, response.getStatusCode()); | ||
| span.setStatus(StatusCode.OK); | ||
|
|
||
| try { | ||
| if (originalInterceptor != null) { | ||
| originalInterceptor.interceptResponse(response); | ||
| } | ||
| } catch (IOException e) { | ||
| addExceptionToSpan(e, span); | ||
| throw e; | ||
| } finally { | ||
| span.end(); | ||
| } | ||
| }); | ||
|
|
||
| HttpUnsuccessfulResponseHandler originalHandler = request.getUnsuccessfulResponseHandler(); | ||
| request.setUnsuccessfulResponseHandler( | ||
| (request1, response, supportsRetry) -> { | ||
| int statusCode = response.getStatusCode(); | ||
| addCommonResponseAttributesToSpan(request, response, span, httpMethod, statusCode); | ||
| addErrorResponseToSpan(response, span, statusCode); | ||
| try { | ||
| if (originalHandler != null) { | ||
| return originalHandler.handleResponse(request1, response, supportsRetry); | ||
| } | ||
| return false; | ||
| } catch (IOException e) { | ||
| addExceptionToSpan(e, span); | ||
| throw e; | ||
| } finally { | ||
| span.end(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this result in ending the span before a retry can occur? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this happen twice if the request is retried? |
||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** Initial HTTP trace span creation with basic attributes from request */ | ||
| private Span createHttpTraceSpan(String httpMethod, String url, String host, Integer port) { | ||
| // TODO: add url template && resource name | ||
| // TODO: appropriately determine span name using: {method} {url.template} or {method} | ||
| Span span = | ||
| BigQueryTelemetryTracer.newSpanBuilder(tracer, httpMethod) | ||
| .setAttribute(HTTP_REQUEST_METHOD, httpMethod) | ||
| .setAttribute(URL_FULL, sanitizeUrlFull(url)) | ||
| .setAttribute(BigQueryTelemetryTracer.SERVER_ADDRESS, host) | ||
| .setAttribute(URL_DOMAIN, resolveUrlDomain(host)) | ||
| .setAttribute(BigQueryTelemetryTracer.RPC_SYSTEM_NAME, HTTP_RPC_SYSTEM_NAME) | ||
| .startSpan(); | ||
| if (port != null && port > 0) { | ||
| span.setAttribute(BigQueryTelemetryTracer.SERVER_PORT, port.longValue()); | ||
| } | ||
| return span; | ||
| } | ||
|
|
||
| private String resolveUrlDomain(String requestHost) { | ||
| if (clientRootUrl != null) { | ||
| try { | ||
| String configuredHost = new GenericUrl(clientRootUrl).getHost(); | ||
| if (configuredHost != null && !configuredHost.isEmpty()) { | ||
| return configuredHost; | ||
| } | ||
| } catch (IllegalArgumentException ex) { | ||
| // Ignore malformed configured root URL and fall back to request host. | ||
| } | ||
| } | ||
| return requestHost; | ||
| } | ||
|
|
||
| private static void addCommonResponseAttributesToSpan( | ||
| HttpRequest request, HttpResponse response, Span span, String httpMethod, int statusCode) { | ||
| // We add request body size/update request method after we receive response as they sometimes | ||
| // the data is | ||
| // not available until after the http request execution | ||
| addRequestBodySizeToSpan(request, span); | ||
| checkForUpdatedRequestMethod(response, httpMethod, span); | ||
|
|
||
| addResponseBodySizeToSpan(response, span); | ||
| span.setAttribute(HTTP_RESPONSE_STATUS_CODE, statusCode); | ||
| } | ||
|
|
||
| private static void addExceptionToSpan(IOException e, Span span) { | ||
| span.recordException(e); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exactly does Same question for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recordException adds an event type "exception" to a span (does not affect attributes). See screenshot. For setStatus, it sets the overall status of the span (also not an attribute), default is Unset. See screenshot. Let me know if you want me to flag either of these for confirmation from Wes. |
||
| String message = e.getMessage(); | ||
| String statusMessage = message != null ? message : e.getClass().getName(); | ||
| span.setAttribute(BigQueryTelemetryTracer.EXCEPTION_TYPE, e.getClass().getName()); | ||
| span.setAttribute(BigQueryTelemetryTracer.ERROR_TYPE, e.getClass().getSimpleName()); | ||
| span.setAttribute(BigQueryTelemetryTracer.STATUS_MESSAGE, statusMessage); | ||
| span.setStatus(StatusCode.ERROR, statusMessage); | ||
| } | ||
|
|
||
| private static void addErrorResponseToSpan(HttpResponse response, Span span, int statusCode) { | ||
| String errorMessage = "HTTP " + statusCode; | ||
| try { | ||
| String statusMessage = response.getStatusMessage(); | ||
| if (statusMessage != null && !statusMessage.isEmpty()) { | ||
| errorMessage = statusMessage; | ||
| } | ||
| } catch (Exception ex) { | ||
| // Ignore | ||
| } | ||
| span.setAttribute(BigQueryTelemetryTracer.STATUS_MESSAGE, errorMessage); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any data privacy concerns with exporting the error message? I assume BigQuery error messages could potentially contain sensitive details like SQL query snippets, table names, or row data. |
||
| span.setAttribute(BigQueryTelemetryTracer.ERROR_TYPE, String.valueOf(statusCode)); | ||
| span.setStatus(StatusCode.ERROR, errorMessage); | ||
| } | ||
|
|
||
| private static void addRequestBodySizeToSpan(HttpRequest request, Span span) { | ||
| try { | ||
| long contentLength = request.getContent().getLength(); | ||
| if (contentLength > 0) { | ||
| span.setAttribute(HTTP_REQUEST_BODY_SIZE, contentLength); | ||
| } | ||
| } catch (Exception e) { | ||
| // Ignore - body size not available | ||
| } | ||
| } | ||
|
|
||
| private static void addResponseBodySizeToSpan(HttpResponse response, Span span) { | ||
| try { | ||
| long contentLength = response.getHeaders().getContentLength(); | ||
| if (contentLength > 0) { | ||
| span.setAttribute(HTTP_RESPONSE_BODY_SIZE, contentLength); | ||
| } | ||
| } catch (Exception e) { | ||
| // Ignore - body size not available | ||
| } | ||
| } | ||
|
|
||
| private static void checkForUpdatedRequestMethod( | ||
| HttpResponse response, String httpMethod, Span span) { | ||
| String actualMethod = response.getRequest().getRequestMethod(); | ||
| if (actualMethod != null && httpMethod == null) { | ||
| span.updateName(actualMethod); | ||
| span.setAttribute(HTTP_REQUEST_METHOD, actualMethod); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static String sanitizeUrlFull(String url) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems a lot of logic and could easily make mistakes. Did Rust do similar things?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is Rusts's logic, so in function I would say is comparable. I will try to come up with a simpler/idiomatic way to scrub the query params There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you use (a clone of) request.getUrl() instead of string manip? |
||
| try { | ||
| URI uri = new URI(url); | ||
| String sanitizedUserInfo = | ||
| uri.getRawUserInfo() != null ? REDACTED_VALUE + ":" + REDACTED_VALUE : null; | ||
| String sanitizedQuery = redactSensitiveQueryValues(uri.getRawQuery()); | ||
| URI sanitizedUri = | ||
| new URI( | ||
| uri.getScheme(), | ||
| sanitizedUserInfo, | ||
| uri.getHost(), | ||
| uri.getPort(), | ||
| uri.getRawPath(), | ||
| sanitizedQuery, | ||
| uri.getRawFragment()); | ||
| return sanitizedUri.toString(); | ||
| } catch (URISyntaxException | IllegalArgumentException ex) { | ||
| return url; | ||
| } | ||
| } | ||
|
|
||
| private static String redactSensitiveQueryValues(String rawQuery) { | ||
| if (rawQuery == null || rawQuery.isEmpty()) { | ||
| return rawQuery; | ||
| } | ||
|
|
||
| String[] params = rawQuery.split("&", -1); | ||
| for (int i = 0; i < params.length; i++) { | ||
| String param = params[i]; | ||
| int equalsIndex = param.indexOf('='); | ||
| String key = equalsIndex >= 0 ? param.substring(0, equalsIndex) : param; | ||
| if (SENSITIVE_QUERY_KEYS.contains(key)) { | ||
| params[i] = key + "=" + REDACTED_VALUE; | ||
| } | ||
| } | ||
|
|
||
| StringBuilder redactedQuery = new StringBuilder(); | ||
| for (int i = 0; i < params.length; i++) { | ||
| if (i > 0) { | ||
| redactedQuery.append('&'); | ||
| } | ||
| redactedQuery.append(params[i]); | ||
| } | ||
| return redactedQuery.toString(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| /* | ||
| * Copyright 2026 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.bigquery.telemetry; | ||
|
|
||
| import com.google.api.core.InternalApi; | ||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.trace.SpanBuilder; | ||
| import io.opentelemetry.api.trace.SpanKind; | ||
| import io.opentelemetry.api.trace.Tracer; | ||
|
|
||
| /** | ||
| * General BigQuery Telemetry class that stores generic telemetry attributes and any associated | ||
| * logic to calculate. | ||
| */ | ||
| @InternalApi | ||
| public final class BigQueryTelemetryTracer { | ||
|
|
||
| private BigQueryTelemetryTracer() {} | ||
|
|
||
| public static final String BQ_GCP_CLIENT_SERVICE = "bigquery"; | ||
| public static final String BQ_GCP_CLIENT_REPO = "googleapis/java-bigquery"; | ||
| public static final String BQ_GCP_CLIENT_ARTIFACT = "google-cloud-bigquery"; | ||
| public static final String BQ_GCP_CLIENT_LANGUAGE = "java"; | ||
|
|
||
| // Common GCP Attributes | ||
| public static final AttributeKey<String> GCP_CLIENT_SERVICE = | ||
| AttributeKey.stringKey("gcp.client.service"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's OK to hardcode these keys in Bigquery for now, we may want to use them from Gax once they are stabilized. |
||
| public static final AttributeKey<String> GCP_CLIENT_VERSION = | ||
| AttributeKey.stringKey("gcp.client.version"); | ||
| public static final AttributeKey<String> GCP_CLIENT_REPO = | ||
| AttributeKey.stringKey("gcp.client.repo"); | ||
| public static final AttributeKey<String> GCP_CLIENT_ARTIFACT = | ||
| AttributeKey.stringKey("gcp.client.artifact"); | ||
| public static final AttributeKey<String> GCP_CLIENT_LANGUAGE = | ||
| AttributeKey.stringKey("gcp.client.language"); | ||
| public static final AttributeKey<String> GCP_RESOURCE_NAME = | ||
| AttributeKey.stringKey("gcp.resource.name"); | ||
| public static final AttributeKey<String> RPC_SYSTEM_NAME = | ||
| AttributeKey.stringKey("rpc.system.name"); | ||
|
|
||
| // Common Error Attributes | ||
| public static final AttributeKey<String> ERROR_TYPE = AttributeKey.stringKey("error.type"); | ||
| public static final AttributeKey<String> EXCEPTION_TYPE = | ||
| AttributeKey.stringKey("exception.type"); | ||
| public static final AttributeKey<String> STATUS_MESSAGE = | ||
| AttributeKey.stringKey("status.message"); | ||
|
|
||
| // Common Server Attributes | ||
| public static final AttributeKey<String> SERVER_ADDRESS = | ||
| AttributeKey.stringKey("server.address"); | ||
| public static final AttributeKey<Long> SERVER_PORT = AttributeKey.longKey("server.port"); | ||
|
|
||
| public static SpanBuilder newSpanBuilder(Tracer tracer, String spanName) { | ||
| return tracer | ||
| .spanBuilder(spanName) | ||
| .setSpanKind(SpanKind.CLIENT) | ||
| .setAttribute(GCP_CLIENT_SERVICE, BQ_GCP_CLIENT_SERVICE) | ||
| .setAttribute(GCP_CLIENT_REPO, BQ_GCP_CLIENT_REPO) | ||
| .setAttribute(GCP_CLIENT_ARTIFACT, BQ_GCP_CLIENT_ARTIFACT) | ||
| .setAttribute(GCP_CLIENT_LANGUAGE, BQ_GCP_CLIENT_LANGUAGE); | ||
| // TODO: add version | ||
| } | ||
ldetmer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to put more thoughts about how to enable this feature.
This seems to be reusing the existing options which may or may not be a good practice. Because 1. Existing customers will also automatically get new Spans which they may not want. 2. Customers will get two different Spans which are not related at this moment.
Can we understand more about the use cases of the existing Spans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to Blake's concerns. I'm leaning against reusing the existing options for the reasons he listed. Would it be possible to have something like
isHttpTelemetryTracingEnabled()to focus on the network-level tracing?