responseObserver) {
+ return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getProcessMethod(), responseObserver);
+ }
+ }
+
+ /**
+ * Base class for the server implementation of the service ExternalProcessor.
+ *
+ * A service that can access and modify HTTP requests and responses
+ * as part of a filter chain.
+ * The overall external processing protocol works like this:
+ * 1. The data plane sends to the service information about the HTTP request.
+ * 2. The service sends back a ProcessingResponse message that directs
+ * the data plane to either stop processing, continue without it, or send
+ * it the next chunk of the message body.
+ * 3. If so requested, the data plane sends the server the message body in
+ * chunks, or the entire body at once. In either case, the server may send
+ * back a ProcessingResponse for each message it receives, or wait for
+ * a certain amount of body chunks received before streaming back the
+ * ProcessingResponse messages.
+ * 4. If so requested, the data plane sends the server the HTTP trailers,
+ * and the server sends back a ProcessingResponse.
+ * 5. At this point, request processing is done, and we pick up again
+ * at step 1 when the data plane receives a response from the upstream
+ * server.
+ * 6. At any point above, if the server closes the gRPC stream cleanly,
+ * then the data plane proceeds without consulting the server.
+ * 7. At any point above, if the server closes the gRPC stream with an error,
+ * then the data plane returns a 500 error to the client, unless the filter
+ * was configured to ignore errors.
+ * In other words, the process is a request/response conversation, but
+ * using a gRPC stream to make it easier for the server to
+ * maintain state.
+ *
+ */
+ public static abstract class ExternalProcessorImplBase
+ implements io.grpc.BindableService, AsyncService {
+
+ @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
+ return ExternalProcessorGrpc.bindService(this);
+ }
+ }
+
+ /**
+ * A stub to allow clients to do asynchronous rpc calls to service ExternalProcessor.
+ *
+ * A service that can access and modify HTTP requests and responses
+ * as part of a filter chain.
+ * The overall external processing protocol works like this:
+ * 1. The data plane sends to the service information about the HTTP request.
+ * 2. The service sends back a ProcessingResponse message that directs
+ * the data plane to either stop processing, continue without it, or send
+ * it the next chunk of the message body.
+ * 3. If so requested, the data plane sends the server the message body in
+ * chunks, or the entire body at once. In either case, the server may send
+ * back a ProcessingResponse for each message it receives, or wait for
+ * a certain amount of body chunks received before streaming back the
+ * ProcessingResponse messages.
+ * 4. If so requested, the data plane sends the server the HTTP trailers,
+ * and the server sends back a ProcessingResponse.
+ * 5. At this point, request processing is done, and we pick up again
+ * at step 1 when the data plane receives a response from the upstream
+ * server.
+ * 6. At any point above, if the server closes the gRPC stream cleanly,
+ * then the data plane proceeds without consulting the server.
+ * 7. At any point above, if the server closes the gRPC stream with an error,
+ * then the data plane returns a 500 error to the client, unless the filter
+ * was configured to ignore errors.
+ * In other words, the process is a request/response conversation, but
+ * using a gRPC stream to make it easier for the server to
+ * maintain state.
+ *
+ */
+ public static final class ExternalProcessorStub
+ extends io.grpc.stub.AbstractAsyncStub {
+ private ExternalProcessorStub(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ super(channel, callOptions);
+ }
+
+ @java.lang.Override
+ protected ExternalProcessorStub build(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ return new ExternalProcessorStub(channel, callOptions);
+ }
+
+ /**
+ *
+ * This begins the bidirectional stream that the data plane will use to
+ * give the server control over what the filter does. The actual
+ * protocol is described by the ProcessingRequest and ProcessingResponse
+ * messages below.
+ *
+ */
+ public io.grpc.stub.StreamObserver process(
+ io.grpc.stub.StreamObserver responseObserver) {
+ return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
+ getChannel().newCall(getProcessMethod(), getCallOptions()), responseObserver);
+ }
+ }
+
+ /**
+ * A stub to allow clients to do synchronous rpc calls to service ExternalProcessor.
+ *
+ * A service that can access and modify HTTP requests and responses
+ * as part of a filter chain.
+ * The overall external processing protocol works like this:
+ * 1. The data plane sends to the service information about the HTTP request.
+ * 2. The service sends back a ProcessingResponse message that directs
+ * the data plane to either stop processing, continue without it, or send
+ * it the next chunk of the message body.
+ * 3. If so requested, the data plane sends the server the message body in
+ * chunks, or the entire body at once. In either case, the server may send
+ * back a ProcessingResponse for each message it receives, or wait for
+ * a certain amount of body chunks received before streaming back the
+ * ProcessingResponse messages.
+ * 4. If so requested, the data plane sends the server the HTTP trailers,
+ * and the server sends back a ProcessingResponse.
+ * 5. At this point, request processing is done, and we pick up again
+ * at step 1 when the data plane receives a response from the upstream
+ * server.
+ * 6. At any point above, if the server closes the gRPC stream cleanly,
+ * then the data plane proceeds without consulting the server.
+ * 7. At any point above, if the server closes the gRPC stream with an error,
+ * then the data plane returns a 500 error to the client, unless the filter
+ * was configured to ignore errors.
+ * In other words, the process is a request/response conversation, but
+ * using a gRPC stream to make it easier for the server to
+ * maintain state.
+ *
+ */
+ public static final class ExternalProcessorBlockingV2Stub
+ extends io.grpc.stub.AbstractBlockingStub {
+ private ExternalProcessorBlockingV2Stub(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ super(channel, callOptions);
+ }
+
+ @java.lang.Override
+ protected ExternalProcessorBlockingV2Stub build(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ return new ExternalProcessorBlockingV2Stub(channel, callOptions);
+ }
+
+ /**
+ *
+ * This begins the bidirectional stream that the data plane will use to
+ * give the server control over what the filter does. The actual
+ * protocol is described by the ProcessingRequest and ProcessingResponse
+ * messages below.
+ *
+ */
+ @io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
+ public io.grpc.stub.BlockingClientCall
+ process() {
+ return io.grpc.stub.ClientCalls.blockingBidiStreamingCall(
+ getChannel(), getProcessMethod(), getCallOptions());
+ }
+ }
+
+ /**
+ * A stub to allow clients to do limited synchronous rpc calls to service ExternalProcessor.
+ *
+ * A service that can access and modify HTTP requests and responses
+ * as part of a filter chain.
+ * The overall external processing protocol works like this:
+ * 1. The data plane sends to the service information about the HTTP request.
+ * 2. The service sends back a ProcessingResponse message that directs
+ * the data plane to either stop processing, continue without it, or send
+ * it the next chunk of the message body.
+ * 3. If so requested, the data plane sends the server the message body in
+ * chunks, or the entire body at once. In either case, the server may send
+ * back a ProcessingResponse for each message it receives, or wait for
+ * a certain amount of body chunks received before streaming back the
+ * ProcessingResponse messages.
+ * 4. If so requested, the data plane sends the server the HTTP trailers,
+ * and the server sends back a ProcessingResponse.
+ * 5. At this point, request processing is done, and we pick up again
+ * at step 1 when the data plane receives a response from the upstream
+ * server.
+ * 6. At any point above, if the server closes the gRPC stream cleanly,
+ * then the data plane proceeds without consulting the server.
+ * 7. At any point above, if the server closes the gRPC stream with an error,
+ * then the data plane returns a 500 error to the client, unless the filter
+ * was configured to ignore errors.
+ * In other words, the process is a request/response conversation, but
+ * using a gRPC stream to make it easier for the server to
+ * maintain state.
+ *
+ */
+ public static final class ExternalProcessorBlockingStub
+ extends io.grpc.stub.AbstractBlockingStub {
+ private ExternalProcessorBlockingStub(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ super(channel, callOptions);
+ }
+
+ @java.lang.Override
+ protected ExternalProcessorBlockingStub build(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ return new ExternalProcessorBlockingStub(channel, callOptions);
+ }
+ }
+
+ /**
+ * A stub to allow clients to do ListenableFuture-style rpc calls to service ExternalProcessor.
+ *
+ * A service that can access and modify HTTP requests and responses
+ * as part of a filter chain.
+ * The overall external processing protocol works like this:
+ * 1. The data plane sends to the service information about the HTTP request.
+ * 2. The service sends back a ProcessingResponse message that directs
+ * the data plane to either stop processing, continue without it, or send
+ * it the next chunk of the message body.
+ * 3. If so requested, the data plane sends the server the message body in
+ * chunks, or the entire body at once. In either case, the server may send
+ * back a ProcessingResponse for each message it receives, or wait for
+ * a certain amount of body chunks received before streaming back the
+ * ProcessingResponse messages.
+ * 4. If so requested, the data plane sends the server the HTTP trailers,
+ * and the server sends back a ProcessingResponse.
+ * 5. At this point, request processing is done, and we pick up again
+ * at step 1 when the data plane receives a response from the upstream
+ * server.
+ * 6. At any point above, if the server closes the gRPC stream cleanly,
+ * then the data plane proceeds without consulting the server.
+ * 7. At any point above, if the server closes the gRPC stream with an error,
+ * then the data plane returns a 500 error to the client, unless the filter
+ * was configured to ignore errors.
+ * In other words, the process is a request/response conversation, but
+ * using a gRPC stream to make it easier for the server to
+ * maintain state.
+ *
+ */
+ public static final class ExternalProcessorFutureStub
+ extends io.grpc.stub.AbstractFutureStub {
+ private ExternalProcessorFutureStub(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ super(channel, callOptions);
+ }
+
+ @java.lang.Override
+ protected ExternalProcessorFutureStub build(
+ io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+ return new ExternalProcessorFutureStub(channel, callOptions);
+ }
+ }
+
+ private static final int METHODID_PROCESS = 0;
+
+ private static final class MethodHandlers implements
+ io.grpc.stub.ServerCalls.UnaryMethod,
+ io.grpc.stub.ServerCalls.ServerStreamingMethod,
+ io.grpc.stub.ServerCalls.ClientStreamingMethod,
+ io.grpc.stub.ServerCalls.BidiStreamingMethod {
+ private final AsyncService serviceImpl;
+ private final int methodId;
+
+ MethodHandlers(AsyncService serviceImpl, int methodId) {
+ this.serviceImpl = serviceImpl;
+ this.methodId = methodId;
+ }
+
+ @java.lang.Override
+ @java.lang.SuppressWarnings("unchecked")
+ public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) {
+ switch (methodId) {
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ @java.lang.Override
+ @java.lang.SuppressWarnings("unchecked")
+ public io.grpc.stub.StreamObserver invoke(
+ io.grpc.stub.StreamObserver responseObserver) {
+ switch (methodId) {
+ case METHODID_PROCESS:
+ return (io.grpc.stub.StreamObserver) serviceImpl.process(
+ (io.grpc.stub.StreamObserver) responseObserver);
+ default:
+ throw new AssertionError();
+ }
+ }
+ }
+
+ public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
+ return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
+ .addMethod(
+ getProcessMethod(),
+ io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
+ new MethodHandlers<
+ io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest,
+ io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse>(
+ service, METHODID_PROCESS)))
+ .build();
+ }
+
+ private static abstract class ExternalProcessorBaseDescriptorSupplier
+ implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
+ ExternalProcessorBaseDescriptorSupplier() {}
+
+ @java.lang.Override
+ public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
+ return io.envoyproxy.envoy.service.ext_proc.v3.ExternalProcessorProto.getDescriptor();
+ }
+
+ @java.lang.Override
+ public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
+ return getFileDescriptor().findServiceByName("ExternalProcessor");
+ }
+ }
+
+ private static final class ExternalProcessorFileDescriptorSupplier
+ extends ExternalProcessorBaseDescriptorSupplier {
+ ExternalProcessorFileDescriptorSupplier() {}
+ }
+
+ private static final class ExternalProcessorMethodDescriptorSupplier
+ extends ExternalProcessorBaseDescriptorSupplier
+ implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
+ private final java.lang.String methodName;
+
+ ExternalProcessorMethodDescriptorSupplier(java.lang.String methodName) {
+ this.methodName = methodName;
+ }
+
+ @java.lang.Override
+ public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
+ return getServiceDescriptor().findMethodByName(methodName);
+ }
+ }
+
+ private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
+
+ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
+ io.grpc.ServiceDescriptor result = serviceDescriptor;
+ if (result == null) {
+ synchronized (ExternalProcessorGrpc.class) {
+ result = serviceDescriptor;
+ if (result == null) {
+ serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
+ .setSchemaDescriptor(new ExternalProcessorFileDescriptorSupplier())
+ .addMethod(getProcessMethod())
+ .build();
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git a/xds/src/main/java/io/grpc/xds/ExtAuthzConfigParser.java b/xds/src/main/java/io/grpc/xds/ExtAuthzConfigParser.java
deleted file mode 100644
index 853e8a5c03a..00000000000
--- a/xds/src/main/java/io/grpc/xds/ExtAuthzConfigParser.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2025 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.grpc.xds;
-
-import com.google.common.collect.ImmutableList;
-import io.envoyproxy.envoy.extensions.filters.http.ext_authz.v3.ExtAuthz;
-import io.grpc.internal.GrpcUtil;
-import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
-import io.grpc.xds.client.Bootstrapper.ServerInfo;
-import io.grpc.xds.internal.MatcherParser;
-import io.grpc.xds.internal.extauthz.ExtAuthzConfig;
-import io.grpc.xds.internal.extauthz.ExtAuthzParseException;
-import io.grpc.xds.internal.grpcservice.GrpcServiceConfig;
-import io.grpc.xds.internal.grpcservice.GrpcServiceParseException;
-import io.grpc.xds.internal.headermutations.HeaderMutationRulesParseException;
-import io.grpc.xds.internal.headermutations.HeaderMutationRulesParser;
-
-
-/**
- * Parser for {@link io.envoyproxy.envoy.extensions.filters.http.ext_authz.v3.ExtAuthz}.
- */
-final class ExtAuthzConfigParser {
-
- private ExtAuthzConfigParser() {}
-
- /**
- * Parses the {@link io.envoyproxy.envoy.extensions.filters.http.ext_authz.v3.ExtAuthz} proto to
- * create an {@link ExtAuthzConfig} instance.
- *
- * @param extAuthzProto The ext_authz proto to parse.
- * @return An {@link ExtAuthzConfig} instance.
- * @throws ExtAuthzParseException if the proto is invalid or contains unsupported features.
- */
- public static ExtAuthzConfig parse(
- ExtAuthz extAuthzProto, BootstrapInfo bootstrapInfo, ServerInfo serverInfo)
- throws ExtAuthzParseException {
- if (!extAuthzProto.hasGrpcService()) {
- throw new ExtAuthzParseException(
- "unsupported ExtAuthz service type: only grpc_service is supported");
- }
- GrpcServiceConfig grpcServiceConfig;
- try {
- grpcServiceConfig =
- GrpcServiceConfigParser.parse(extAuthzProto.getGrpcService(), bootstrapInfo, serverInfo);
- } catch (GrpcServiceParseException e) {
- throw new ExtAuthzParseException("Failed to parse GrpcService config: " + e.getMessage(), e);
- }
- ExtAuthzConfig.Builder builder = ExtAuthzConfig.builder().grpcService(grpcServiceConfig)
- .failureModeAllow(extAuthzProto.getFailureModeAllow())
- .failureModeAllowHeaderAdd(extAuthzProto.getFailureModeAllowHeaderAdd())
- .includePeerCertificate(extAuthzProto.getIncludePeerCertificate())
- .denyAtDisable(extAuthzProto.getDenyAtDisable().getDefaultValue().getValue());
-
- if (extAuthzProto.hasFilterEnabled()) {
- try {
- builder.filterEnabled(
- MatcherParser.parseFractionMatcher(extAuthzProto.getFilterEnabled().getDefaultValue()));
- } catch (IllegalArgumentException e) {
- throw new ExtAuthzParseException(e.getMessage());
- }
- }
-
- if (extAuthzProto.hasStatusOnError()) {
- builder.statusOnError(
- GrpcUtil.httpStatusToGrpcStatus(extAuthzProto.getStatusOnError().getCodeValue()));
- }
-
- if (extAuthzProto.hasAllowedHeaders()) {
- builder.allowedHeaders(extAuthzProto.getAllowedHeaders().getPatternsList().stream()
- .map(MatcherParser::parseStringMatcher).collect(ImmutableList.toImmutableList()));
- }
-
- if (extAuthzProto.hasDisallowedHeaders()) {
- builder.disallowedHeaders(extAuthzProto.getDisallowedHeaders().getPatternsList().stream()
- .map(MatcherParser::parseStringMatcher).collect(ImmutableList.toImmutableList()));
- }
-
- if (extAuthzProto.hasDecoderHeaderMutationRules()) {
- try {
- builder.decoderHeaderMutationRules(
- HeaderMutationRulesParser.parse(extAuthzProto.getDecoderHeaderMutationRules()));
- } catch (HeaderMutationRulesParseException e) {
- throw new ExtAuthzParseException(e.getMessage(), e);
- }
- }
-
- return builder.build();
- }
-}
diff --git a/xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java b/xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
new file mode 100644
index 00000000000..ab695f8b76e
--- /dev/null
+++ b/xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
@@ -0,0 +1,1621 @@
+/*
+ * Copyright 2024 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.BaseEncoding;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Duration;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Value;
+import com.google.protobuf.util.Durations;
+import io.envoyproxy.envoy.config.core.v3.GrpcService;
+import io.envoyproxy.envoy.config.core.v3.HeaderMap;
+import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides;
+import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute;
+import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor;
+import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.HeaderForwardingRules;
+import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ProcessingMode;
+import io.envoyproxy.envoy.service.ext_proc.v3.BodyMutation;
+import io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse;
+import io.envoyproxy.envoy.service.ext_proc.v3.CommonResponse;
+import io.envoyproxy.envoy.service.ext_proc.v3.ExternalProcessorGrpc;
+import io.envoyproxy.envoy.service.ext_proc.v3.HeaderMutation;
+import io.envoyproxy.envoy.service.ext_proc.v3.HttpBody;
+import io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders;
+import io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers;
+import io.envoyproxy.envoy.service.ext_proc.v3.ImmediateResponse;
+import io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest;
+import io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse;
+import io.envoyproxy.envoy.service.ext_proc.v3.ProtocolConfiguration;
+import io.envoyproxy.envoy.service.ext_proc.v3.StreamedBodyResponse;
+import io.grpc.Attributes;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.internal.DelayedClientCall;
+import io.grpc.internal.SerializingExecutor;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.xds.internal.MatcherParser;
+import io.grpc.xds.internal.Matchers;
+import io.grpc.xds.internal.grpcservice.CachedChannelManager;
+import io.grpc.xds.internal.grpcservice.GrpcServiceConfig;
+import io.grpc.xds.internal.grpcservice.GrpcServiceParseException;
+import io.grpc.xds.internal.grpcservice.HeaderValue;
+import io.grpc.xds.internal.headermutations.HeaderMutationDisallowedException;
+import io.grpc.xds.internal.headermutations.HeaderMutationFilter;
+import io.grpc.xds.internal.headermutations.HeaderMutationRulesConfig;
+import io.grpc.xds.internal.headermutations.HeaderMutationRulesParseException;
+import io.grpc.xds.internal.headermutations.HeaderMutationRulesParser;
+import io.grpc.xds.internal.headermutations.HeaderMutations;
+import io.grpc.xds.internal.headermutations.HeaderMutator;
+import io.grpc.xds.internal.headermutations.HeaderValueOption;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+
+/**
+ * Filter for external processing as per gRFC A93.
+ */
+public class ExternalProcessorFilter implements Filter {
+ static final String TYPE_URL =
+ "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor";
+
+ private final CachedChannelManager cachedChannelManager;
+
+ public ExternalProcessorFilter(String name) {
+ this(name, new CachedChannelManager());
+ }
+
+ ExternalProcessorFilter(String name, CachedChannelManager cachedChannelManager) {
+ this.cachedChannelManager = checkNotNull(cachedChannelManager, "cachedChannelManager");
+ }
+
+ @Override
+ public void close() {
+ cachedChannelManager.close();
+ }
+
+ static final class Provider implements Filter.Provider {
+ @Override
+ public String[] typeUrls() {
+ return new String[]{TYPE_URL};
+ }
+
+ @Override
+ public boolean isClientFilter() {
+ return true;
+ }
+
+ @Override
+ public ExternalProcessorFilter newInstance(String name) {
+ return new ExternalProcessorFilter(name);
+ }
+
+ @Override
+ public ConfigOrError parseFilterConfig(
+ Message rawProtoMessage, FilterContext context) {
+ if (!(rawProtoMessage instanceof Any)) {
+ return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
+ }
+ ExternalProcessor externalProcessor;
+ try {
+ externalProcessor = ((Any) rawProtoMessage).unpack(ExternalProcessor.class);
+ } catch (InvalidProtocolBufferException e) {
+ return ConfigOrError.fromError("Invalid proto: " + e);
+ }
+
+ return ExternalProcessorFilterConfig.create(externalProcessor, context);
+ }
+
+ @Override
+ public ConfigOrError parseFilterConfigOverride(
+ Message rawProtoMessage, FilterContext context) {
+ if (!(rawProtoMessage instanceof Any)) {
+ return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
+ }
+ ExtProcPerRoute perRoute;
+ try {
+ perRoute = ((Any) rawProtoMessage).unpack(ExtProcPerRoute.class);
+ } catch (InvalidProtocolBufferException e) {
+ return ConfigOrError.fromError("Invalid proto: " + e);
+ }
+ ExtProcOverrides overrides = perRoute.hasOverrides()
+ ? perRoute.getOverrides() : ExtProcOverrides.getDefaultInstance();
+ return ExternalProcessorFilterOverrideConfig.create(overrides, context);
+ }
+ }
+
+ @Nullable
+ @Override
+ public ClientInterceptor buildClientInterceptor(FilterConfig filterConfig,
+ @Nullable FilterConfig overrideConfig, ScheduledExecutorService scheduler) {
+ ExternalProcessorFilterConfig extProcFilterConfig =
+ (ExternalProcessorFilterConfig) filterConfig;
+ if (overrideConfig != null) {
+ extProcFilterConfig = mergeConfigs(extProcFilterConfig,
+ (ExternalProcessorFilterOverrideConfig) overrideConfig);
+ }
+ return new ExternalProcessorInterceptor(extProcFilterConfig, cachedChannelManager, scheduler);
+ }
+
+ private static ExternalProcessorFilterConfig mergeConfigs(
+ ExternalProcessorFilterConfig extProcFilterConfig,
+ ExternalProcessorFilterOverrideConfig extProcFilterConfigOverride) {
+ ExternalProcessor parentProto = extProcFilterConfig.getExternalProcessor();
+ ExternalProcessor.Builder mergedProtoBuilder = parentProto.toBuilder();
+
+ if (extProcFilterConfigOverride.hasProcessingMode()) {
+ mergedProtoBuilder.setProcessingMode(extProcFilterConfigOverride.getProcessingMode());
+ }
+
+ if (extProcFilterConfigOverride.hasRequestAttributes()) {
+ mergedProtoBuilder.clearRequestAttributes()
+ .addAllRequestAttributes(extProcFilterConfigOverride.getRequestAttributesList());
+ }
+ if (extProcFilterConfigOverride.hasResponseAttributes()) {
+ mergedProtoBuilder.clearResponseAttributes()
+ .addAllResponseAttributes(extProcFilterConfigOverride.getResponseAttributesList());
+ }
+ if (extProcFilterConfigOverride.hasGrpcService()) {
+ mergedProtoBuilder.setGrpcService(extProcFilterConfigOverride.getGrpcService());
+ }
+
+ if (extProcFilterConfigOverride.hasFailureModeAllow()) {
+ mergedProtoBuilder.setFailureModeAllow(extProcFilterConfigOverride.getFailureModeAllow());
+ }
+
+ GrpcServiceConfig grpcServiceConfig =
+ extProcFilterConfigOverride.getGrpcServiceConfig() != null
+ ? extProcFilterConfigOverride.getGrpcServiceConfig()
+ : extProcFilterConfig.getGrpcServiceConfig();
+
+ return new ExternalProcessorFilterConfig(
+ mergedProtoBuilder.build(),
+ grpcServiceConfig,
+ extProcFilterConfig.getMutationRulesConfig(),
+ extProcFilterConfig.getForwardRulesConfig());
+ }
+
+ private static ConfigOrError> parseAndValidate(
+ ProcessingMode mode, GrpcService grpcService, boolean isParent, FilterContext context) {
+ if (mode.getRequestBodyMode() != ProcessingMode.BodySendMode.GRPC
+ && mode.getRequestBodyMode() != ProcessingMode.BodySendMode.NONE) {
+ return ConfigOrError.fromError("Invalid request_body_mode: " + mode.getRequestBodyMode()
+ + ". Only GRPC and NONE are supported.");
+ }
+ if (mode.getResponseBodyMode() != ProcessingMode.BodySendMode.GRPC
+ && mode.getResponseBodyMode() != ProcessingMode.BodySendMode.NONE) {
+ return ConfigOrError.fromError("Invalid response_body_mode: " + mode.getResponseBodyMode()
+ + ". Only GRPC and NONE are supported.");
+ }
+
+ try {
+ GrpcServiceConfig grpcServiceConfig = null;
+ if (grpcService != null && grpcService.hasGoogleGrpc()) {
+ grpcServiceConfig = GrpcServiceConfigParser.parse(
+ grpcService, context.bootstrapInfo(), context.serverInfo());
+ } else if (isParent) {
+ return ConfigOrError.fromError("Error parsing GrpcService config: "
+ + "Unsupported: GrpcService must have GoogleGrpc, got: " + grpcService);
+ }
+ return ConfigOrError.fromConfig(Optional.ofNullable(grpcServiceConfig));
+ } catch (GrpcServiceParseException e) {
+ return ConfigOrError.fromError("Error parsing GrpcService config: " + e.getMessage());
+ }
+ }
+
+ static final class ExternalProcessorFilterConfig implements FilterConfig {
+
+ private final ExternalProcessor externalProcessor;
+ private final GrpcServiceConfig grpcServiceConfig;
+ private final Optional mutationRulesConfig;
+ private final Optional forwardRulesConfig;
+
+ static ConfigOrError create(
+ ExternalProcessor externalProcessor, FilterContext context) {
+ ProcessingMode mode = externalProcessor.getProcessingMode();
+ GrpcService grpcService = externalProcessor.getGrpcService();
+ HeaderMutationRulesConfig mutationRulesConfig = null;
+ HeaderForwardingRulesConfig forwardRulesConfig = null;
+
+ if (externalProcessor.hasMutationRules()) {
+ try {
+ mutationRulesConfig =
+ HeaderMutationRulesParser.parse(externalProcessor.getMutationRules());
+ } catch (HeaderMutationRulesParseException e) {
+ return ConfigOrError.fromError("Error parsing HeaderMutationRules: " + e.getMessage());
+ }
+ }
+
+ if (externalProcessor.hasForwardRules()) {
+ forwardRulesConfig =
+ HeaderForwardingRulesConfig.create(externalProcessor.getForwardRules());
+ }
+
+ if (externalProcessor.hasDeferredCloseTimeout()) {
+ Duration deferredCloseTimeout = externalProcessor.getDeferredCloseTimeout();
+ try {
+ Durations.checkValid(deferredCloseTimeout);
+ } catch (IllegalArgumentException e) {
+ return ConfigOrError.fromError("Invalid deferred_close_timeout: " + e.getMessage());
+ }
+ long deferredCloseTimeoutNanos = Durations.toNanos(deferredCloseTimeout);
+ if (deferredCloseTimeoutNanos <= 0) {
+ return ConfigOrError.fromError("deferred_close_timeout must be positive");
+ }
+ }
+
+ ConfigOrError> parsed =
+ parseAndValidate(mode, grpcService, true, context);
+ if (parsed.errorDetail != null) {
+ return ConfigOrError.fromError(parsed.errorDetail);
+ }
+
+ return ConfigOrError.fromConfig(new ExternalProcessorFilterConfig(
+ externalProcessor, parsed.config.orElse(null),
+ Optional.ofNullable(mutationRulesConfig),
+ Optional.ofNullable(forwardRulesConfig)));
+ }
+
+ ExternalProcessorFilterConfig(
+ ExternalProcessor externalProcessor,
+ GrpcServiceConfig grpcServiceConfig,
+ Optional mutationRulesConfig,
+ Optional forwardRulesConfig) {
+ this.externalProcessor = checkNotNull(externalProcessor, "externalProcessor");
+ this.grpcServiceConfig = grpcServiceConfig;
+ this.mutationRulesConfig = mutationRulesConfig;
+ this.forwardRulesConfig = forwardRulesConfig;
+ }
+
+ @Override
+ public String typeUrl() {
+ return TYPE_URL;
+ }
+
+ ExternalProcessor getExternalProcessor() {
+ return externalProcessor;
+ }
+
+ GrpcServiceConfig getGrpcServiceConfig() {
+ return grpcServiceConfig;
+ }
+
+ Optional getMutationRulesConfig() {
+ return mutationRulesConfig;
+ }
+
+ Optional getForwardRulesConfig() {
+ return forwardRulesConfig;
+ }
+
+ ImmutableList getRequestAttributes() {
+ return ImmutableList.copyOf(externalProcessor.getRequestAttributesList());
+ }
+
+ boolean getDisableImmediateResponse() {
+ return externalProcessor.getDisableImmediateResponse();
+ }
+
+ long getDeferredCloseTimeoutNanos() {
+ if (externalProcessor.hasDeferredCloseTimeout()) {
+ return Durations.toNanos(externalProcessor.getDeferredCloseTimeout());
+ }
+ return TimeUnit.SECONDS.toNanos(5);
+ }
+
+ boolean getObservabilityMode() {
+ return externalProcessor.getObservabilityMode();
+ }
+
+ boolean getFailureModeAllow() {
+ return externalProcessor.getFailureModeAllow();
+ }
+ }
+
+ static final class ExternalProcessorFilterOverrideConfig implements FilterConfig {
+ private final ExtProcOverrides extProcOverrides;
+ private final GrpcServiceConfig grpcServiceConfig;
+
+ static ConfigOrError create(
+ ExtProcOverrides overrides, FilterContext context) {
+ ConfigOrError> parsed =
+ parseAndValidate(
+ overrides.getProcessingMode(), overrides.getGrpcService(), false, context);
+ if (parsed.errorDetail != null) {
+ return ConfigOrError.fromError(parsed.errorDetail);
+ }
+ return ConfigOrError.fromConfig(
+ new ExternalProcessorFilterOverrideConfig(overrides, parsed.config.orElse(null)));
+ }
+
+ ExternalProcessorFilterOverrideConfig(
+ ExtProcOverrides extProcOverrides, GrpcServiceConfig grpcServiceConfig) {
+ this.extProcOverrides = checkNotNull(extProcOverrides, "extProcOverrides");
+ this.grpcServiceConfig = grpcServiceConfig;
+ }
+
+ @Override
+ public String typeUrl() {
+ return TYPE_URL;
+ }
+
+ boolean hasProcessingMode() {
+ return extProcOverrides.hasProcessingMode();
+ }
+
+ ProcessingMode getProcessingMode() {
+ return extProcOverrides.getProcessingMode();
+ }
+
+ boolean hasRequestAttributes() {
+ return extProcOverrides.getRequestAttributesCount() > 0;
+ }
+
+ List getRequestAttributesList() {
+ return extProcOverrides.getRequestAttributesList();
+ }
+
+ boolean hasResponseAttributes() {
+ return extProcOverrides.getResponseAttributesCount() > 0;
+ }
+
+ List getResponseAttributesList() {
+ return extProcOverrides.getResponseAttributesList();
+ }
+
+ boolean hasGrpcService() {
+ return extProcOverrides.hasGrpcService();
+ }
+
+ GrpcService getGrpcService() {
+ return extProcOverrides.getGrpcService();
+ }
+
+ boolean hasFailureModeAllow() {
+ return extProcOverrides.hasFailureModeAllow();
+ }
+
+ boolean getFailureModeAllow() {
+ return extProcOverrides.hasFailureModeAllow()
+ && extProcOverrides.getFailureModeAllow().getValue();
+ }
+
+ GrpcServiceConfig getGrpcServiceConfig() {
+ return grpcServiceConfig;
+ }
+ }
+
+ static final class HeaderForwardingRulesConfig {
+ private final ImmutableList allowedHeaders;
+ private final ImmutableList disallowedHeaders;
+
+ HeaderForwardingRulesConfig(
+ ImmutableList allowedHeaders,
+ ImmutableList disallowedHeaders) {
+ this.allowedHeaders = checkNotNull(allowedHeaders, "allowedHeaders");
+ this.disallowedHeaders = checkNotNull(disallowedHeaders, "disallowedHeaders");
+ }
+
+ static HeaderForwardingRulesConfig create(HeaderForwardingRules proto) {
+ ImmutableList allowedHeaders = ImmutableList.of();
+ if (proto.hasAllowedHeaders()) {
+ allowedHeaders = MatcherParser.parseListStringMatcher(proto.getAllowedHeaders());
+ }
+ ImmutableList disallowedHeaders = ImmutableList.of();
+ if (proto.hasDisallowedHeaders()) {
+ disallowedHeaders = MatcherParser.parseListStringMatcher(proto.getDisallowedHeaders());
+ }
+ return new HeaderForwardingRulesConfig(allowedHeaders, disallowedHeaders);
+ }
+
+ boolean isAllowed(String headerName) {
+ String lowerHeaderName = headerName.toLowerCase(Locale.ROOT);
+ if (!allowedHeaders.isEmpty()) {
+ boolean matched = false;
+ for (Matchers.StringMatcher matcher : allowedHeaders) {
+ if (matcher.matches(lowerHeaderName)) {
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ return false;
+ }
+ }
+ if (!disallowedHeaders.isEmpty()) {
+ for (Matchers.StringMatcher matcher : disallowedHeaders) {
+ if (matcher.matches(lowerHeaderName)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ static final class ExternalProcessorInterceptor implements ClientInterceptor {
+ private final CachedChannelManager cachedChannelManager;
+ private final ExternalProcessorFilterConfig filterConfig;
+ private final ScheduledExecutorService scheduler;
+
+ private static final MethodDescriptor.Marshaller RAW_MARSHALLER =
+ new MethodDescriptor.Marshaller() {
+ @Override
+ public InputStream stream(InputStream value) {
+ return value;
+ }
+
+ @Override
+ public InputStream parse(InputStream stream) {
+ return stream;
+ }
+ };
+
+ ExternalProcessorInterceptor(ExternalProcessorFilterConfig filterConfig,
+ CachedChannelManager cachedChannelManager,
+ ScheduledExecutorService scheduler) {
+ this.filterConfig = filterConfig;
+ this.cachedChannelManager = checkNotNull(cachedChannelManager, "cachedChannelManager");
+ this.scheduler = checkNotNull(scheduler, "scheduler");
+ }
+
+ @VisibleForTesting
+ ExternalProcessorFilterConfig getFilterConfig() {
+ return filterConfig;
+ }
+
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor method,
+ CallOptions callOptions,
+ Channel next) {
+ SerializingExecutor serializingExecutor = new SerializingExecutor(callOptions.getExecutor());
+
+ ExternalProcessorGrpc.ExternalProcessorStub extProcStub = ExternalProcessorGrpc.newStub(
+ cachedChannelManager.getChannel(filterConfig.grpcServiceConfig))
+ .withExecutor(serializingExecutor);
+
+ if (filterConfig.grpcServiceConfig.timeout() != null
+ && filterConfig.grpcServiceConfig.timeout().isPresent()) {
+ long timeoutNanos = filterConfig.grpcServiceConfig.timeout().get().toNanos();
+ if (timeoutNanos > 0) {
+ extProcStub = extProcStub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ ImmutableList initialMetadata = filterConfig.grpcServiceConfig.initialMetadata();
+ extProcStub = extProcStub.withInterceptors(new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor extMethod,
+ CallOptions extCallOptions,
+ Channel extNext) {
+ return new SimpleForwardingClientCall(
+ extNext.newCall(extMethod, extCallOptions)) {
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ for (HeaderValue headerValue : initialMetadata) {
+ String key = headerValue.key();
+ if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ if (headerValue.rawValue().isPresent()) {
+ Metadata.Key metadataKey =
+ Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER);
+ headers.put(metadataKey, headerValue.rawValue().get().toByteArray());
+ }
+ } else {
+ if (headerValue.value().isPresent()) {
+ Metadata.Key metadataKey =
+ Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
+ headers.put(metadataKey, headerValue.value().get());
+ }
+ }
+ }
+ super.start(responseListener, headers);
+ }
+ };
+ }
+ });
+
+ MethodDescriptor rawMethod =
+ method.toBuilder(RAW_MARSHALLER, RAW_MARSHALLER).build();
+ ClientCall rawCall = next.newCall(rawMethod, callOptions);
+
+ // Create a local subclass instance to buffer outbound actions
+ DataPlaneDelayedCall delayedCall =
+ new DataPlaneDelayedCall<>(
+ serializingExecutor, scheduler, callOptions.getDeadline());
+
+ DataPlaneClientCall dataPlaneCall = new DataPlaneClientCall(
+ delayedCall, rawCall, extProcStub, filterConfig, filterConfig.getMutationRulesConfig(),
+ scheduler, method, next);
+
+ return new ClientCall() {
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ dataPlaneCall.start(new Listener() {
+ @Override
+ public void onHeaders(Metadata headers) {
+ responseListener.onHeaders(headers);
+ }
+
+ @Override
+ public void onMessage(InputStream message) {
+ responseListener.onMessage(method.getResponseMarshaller().parse(message));
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ responseListener.onClose(status, trailers);
+ }
+
+ @Override
+ public void onReady() {
+ responseListener.onReady();
+ }
+ }, headers);
+ }
+
+ @Override
+ public void request(int numMessages) {
+ dataPlaneCall.request(numMessages);
+ }
+
+ @Override
+ public void cancel(@Nullable String message, @Nullable Throwable cause) {
+ dataPlaneCall.cancel(message, cause);
+ }
+
+ @Override
+ public void halfClose() {
+ dataPlaneCall.halfClose();
+ }
+
+ @Override
+ public void sendMessage(ReqT message) {
+ dataPlaneCall.sendMessage(method.getRequestMarshaller().stream(message));
+ }
+
+ @Override
+ public boolean isReady() {
+ return dataPlaneCall.isReady();
+ }
+
+ @Override
+ public void setMessageCompression(boolean enabled) {
+ dataPlaneCall.setMessageCompression(enabled);
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return dataPlaneCall.getAttributes();
+ }
+ };
+ }
+
+ // --- SHARED UTILITY METHODS ---
+ private static HeaderMap toHeaderMap(
+ Metadata metadata, Optional forwardRules) {
+ HeaderMap.Builder builder =
+ HeaderMap.newBuilder();
+
+ for (String key : metadata.keys()) {
+ if (forwardRules.isPresent() && !forwardRules.get().isAllowed(key)) {
+ continue;
+ }
+ // Skip binary headers for this basic mapping
+ if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ Metadata.Key binKey = Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER);
+ Iterable values = metadata.getAll(binKey);
+ if (values != null) {
+ for (byte[] binValue : values) {
+ String encoded = BaseEncoding.base64().encode(binValue);
+ io.envoyproxy.envoy.config.core.v3.HeaderValue headerValue =
+ io.envoyproxy.envoy.config.core.v3.HeaderValue.newBuilder()
+ .setKey(key.toLowerCase(Locale.ROOT))
+ .setValue(encoded)
+ .build();
+ builder.addHeaders(headerValue);
+ }
+ }
+ } else {
+ Metadata.Key asciiKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
+ Iterable values = metadata.getAll(asciiKey);
+ if (values != null) {
+ for (String value : values) {
+ io.envoyproxy.envoy.config.core.v3.HeaderValue headerValue =
+ io.envoyproxy.envoy.config.core.v3.HeaderValue.newBuilder()
+ .setKey(key.toLowerCase(Locale.ROOT))
+ .setValue(value)
+ .build();
+ builder.addHeaders(headerValue);
+ }
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ private static ImmutableMap collectAttributes(
+ ImmutableList requestedAttributes,
+ MethodDescriptor, ?> method,
+ Channel channel,
+ Metadata headers) {
+ if (requestedAttributes.isEmpty()) {
+ return ImmutableMap.of();
+ }
+ ImmutableMap.Builder attributes = ImmutableMap.builder();
+ for (String attr : requestedAttributes) {
+ switch (attr) {
+ case "request.path":
+ case "request.url_path":
+ attributes.put(attr, encodeAttribute("/" + method.getFullMethodName()));
+ break;
+ case "request.host":
+ attributes.put(attr, encodeAttribute(channel.authority()));
+ break;
+ case "request.method":
+ attributes.put(attr, encodeAttribute("POST"));
+ break;
+ case "request.headers":
+ attributes.put(attr, encodeHeaders(headers));
+ break;
+ case "request.referer":
+ String referer = getHeaderValue(headers, "referer");
+ if (referer != null) {
+ attributes.put(attr, encodeAttribute(referer));
+ }
+ break;
+ case "request.useragent":
+ String ua = getHeaderValue(headers, "user-agent");
+ if (ua != null) {
+ attributes.put(attr, encodeAttribute(ua));
+ }
+ break;
+ case "request.id":
+ String id = getHeaderValue(headers, "x-request-id");
+ if (id != null) {
+ attributes.put(attr, encodeAttribute(id));
+ }
+ break;
+ case "request.query":
+ attributes.put(attr, encodeAttribute(""));
+ break;
+ default:
+ // "Not set" attributes or unrecognized ones (already validated) are skipped.
+ break;
+ }
+ }
+ return attributes.buildOrThrow();
+ }
+
+ private static Struct encodeAttribute(String value) {
+ return Struct.newBuilder()
+ .putFields("", Value.newBuilder().setStringValue(value).build())
+ .build();
+ }
+
+ private static Struct encodeHeaders(Metadata headers) {
+ Struct.Builder builder = Struct.newBuilder();
+ for (String key : headers.keys()) {
+ String value = getHeaderValue(headers, key);
+ if (value != null) {
+ builder.putFields(key.toLowerCase(Locale.ROOT),
+ Value.newBuilder().setStringValue(value).build());
+ }
+ }
+ return builder.build();
+ }
+
+ @Nullable
+ private static String getHeaderValue(Metadata headers, String headerName) {
+ if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ Metadata.Key key;
+ try {
+ key = Metadata.Key.of(headerName, Metadata.BINARY_BYTE_MARSHALLER);
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ Iterable values = headers.getAll(key);
+ if (values == null) {
+ return null;
+ }
+ java.util.List encoded = new ArrayList<>();
+ for (byte[] v : values) {
+ encoded.add(BaseEncoding.base64().omitPadding().encode(v));
+ }
+ return com.google.common.base.Joiner.on(",").join(encoded);
+ }
+ Metadata.Key key;
+ try {
+ key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ Iterable values = headers.getAll(key);
+ return values == null ? null : com.google.common.base.Joiner.on(",").join(values);
+ }
+
+ /**
+ * A local subclass to expose the protected constructor of DelayedClientCall.
+ */
+ private static class DataPlaneDelayedCall extends DelayedClientCall {
+ DataPlaneDelayedCall(
+ Executor executor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
+ super(executor, scheduler, deadline);
+ }
+ }
+
+ /**
+ * Handles the bidirectional stream with the External Processor.
+ * Buffers the actual RPC start until the Ext Proc header response is received.
+ */
+ private static class DataPlaneClientCall
+ extends SimpleForwardingClientCall {
+ private enum EventType {
+ REQUEST_HEADERS,
+ REQUEST_BODY,
+ RESPONSE_HEADERS,
+ RESPONSE_BODY,
+ RESPONSE_TRAILERS
+ }
+
+ private final ExternalProcessorGrpc.ExternalProcessorStub stub;
+ private final ExternalProcessorFilterConfig config;
+ private final ClientCall rawCall;
+ private final DataPlaneDelayedCall delayedCall;
+ private final ScheduledExecutorService scheduler;
+ private final Object streamLock = new Object();
+ private final Queue expectedResponses = new ConcurrentLinkedQueue<>();
+ private volatile ClientCallStreamObserver extProcClientCallRequestObserver;
+ private final Queue pendingProcessingRequests =
+ new ConcurrentLinkedQueue<>();
+ private volatile DataPlaneListener wrappedListener;
+ private final HeaderMutationFilter mutationFilter;
+ private final HeaderMutator mutator = HeaderMutator.create();
+ private final AtomicInteger pendingRequests = new AtomicInteger(0);
+ private final ProcessingMode currentProcessingMode;
+ private final MethodDescriptor, ?> method;
+ private final Channel channel;
+
+ private volatile Metadata requestHeaders;
+ final AtomicBoolean activated = new AtomicBoolean(false);
+ final AtomicBoolean extProcStreamFailed = new AtomicBoolean(false);
+ final AtomicBoolean extProcStreamCompleted = new AtomicBoolean(false);
+ final AtomicBoolean passThroughMode = new AtomicBoolean(false);
+ final AtomicBoolean notifiedApp = new AtomicBoolean(false);
+ final AtomicBoolean drainingExtProcStream = new AtomicBoolean(false);
+ final AtomicBoolean halfClosed = new AtomicBoolean(false);
+ final AtomicBoolean requestSideClosed = new AtomicBoolean(false);
+ final AtomicBoolean isProcessingTrailers = new AtomicBoolean(false);
+
+ protected DataPlaneClientCall(
+ DataPlaneDelayedCall delayedCall,
+ ClientCall rawCall,
+ ExternalProcessorGrpc.ExternalProcessorStub stub,
+ ExternalProcessorFilterConfig config,
+ Optional mutationRulesConfig,
+ ScheduledExecutorService scheduler,
+ MethodDescriptor, ?> method,
+ Channel channel) {
+ super(delayedCall);
+ this.delayedCall = delayedCall;
+ this.rawCall = rawCall;
+ this.stub = stub;
+ this.config = config;
+ this.currentProcessingMode = config.getExternalProcessor().getProcessingMode();
+ this.mutationFilter = new HeaderMutationFilter(mutationRulesConfig);
+ this.scheduler = scheduler;
+ this.method = method;
+ this.channel = channel;
+ }
+
+ private void activateCall() {
+ if (extProcStreamFailed.get() || !activated.compareAndSet(false, true)) {
+ return;
+ }
+ Runnable toRun = delayedCall.setCall(rawCall);
+ if (toRun != null) {
+ toRun.run();
+ }
+ drainPendingRequests();
+ onReadyNotify();
+ }
+
+ private boolean checkCompressionSupport(BodyResponse bodyResponse) {
+ if (bodyResponse.hasResponse() && bodyResponse.getResponse().hasBodyMutation()) {
+ BodyMutation mutation =
+ bodyResponse.getResponse().getBodyMutation();
+ if (mutation.hasStreamedResponse()
+ && mutation.getStreamedResponse().getGrpcMessageCompressed()) {
+ StatusRuntimeException ex = Status.UNAVAILABLE
+ .withDescription("gRPC message compression not supported in ext_proc")
+ .asRuntimeException();
+ if (!extProcStreamCompleted.get() && extProcClientCallRequestObserver != null) {
+ extProcClientCallRequestObserver.onError(ex);
+ }
+ activateCall();
+ extProcStreamFailed.set(true);
+ delayedCall.cancel("gRPC message compression not supported in ext_proc", ex);
+ closeExtProcStream();
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void applyHeaderMutations(Metadata metadata,
+ HeaderMutation mutation)
+ throws HeaderMutationDisallowedException {
+ if (metadata == null) {
+ return;
+ }
+ ImmutableList.Builder headersToModify = ImmutableList.builder();
+ for (io.envoyproxy.envoy.config.core.v3.HeaderValueOption protoOption
+ : mutation.getSetHeadersList()) {
+ io.envoyproxy.envoy.config.core.v3.HeaderValue protoHeader = protoOption.getHeader();
+ HeaderValue headerValue;
+ if (protoHeader.getKey().endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ headerValue = HeaderValue.create(protoHeader.getKey(),
+ ByteString.copyFrom(
+ BaseEncoding.base64().decode(protoHeader.getValue())));
+ } else {
+ headerValue = HeaderValue.create(protoHeader.getKey(), protoHeader.getValue());
+ }
+ headersToModify.add(HeaderValueOption.create(
+ headerValue,
+ HeaderValueOption.HeaderAppendAction.valueOf(protoOption.getAppendAction().name()),
+ protoOption.getKeepEmptyValue()));
+ }
+
+ HeaderMutations mutations = HeaderMutations.create(
+ headersToModify.build(),
+ ImmutableList.copyOf(mutation.getRemoveHeadersList()));
+
+ HeaderMutations filteredMutations = mutationFilter.filter(mutations);
+ mutator.applyMutations(filteredMutations, metadata);
+ }
+
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ this.requestHeaders = headers;
+ this.wrappedListener = new DataPlaneListener(responseListener, rawCall, this);
+
+ // DelayedClientCall.start will buffer the listener and headers until setCall is called.
+ super.start(wrappedListener, headers);
+
+ stub.process(new ClientResponseObserver() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver requestStream) {
+ synchronized (streamLock) {
+ extProcClientCallRequestObserver = requestStream;
+ while (!pendingProcessingRequests.isEmpty()) {
+ requestStream.onNext(pendingProcessingRequests.poll());
+ }
+ }
+ requestStream.setOnReadyHandler(DataPlaneClientCall.this::onExtProcStreamReady);
+ }
+
+ @Override
+ public void onNext(ProcessingResponse response) {
+ try {
+ if (response.hasImmediateResponse()) {
+ if (config.getDisableImmediateResponse()) {
+ onError(Status.UNAVAILABLE
+ .withDescription(
+ "Immediate response is disabled but received from external processor")
+ .asRuntimeException());
+ return;
+ }
+ handleImmediateResponse(response.getImmediateResponse(), wrappedListener);
+ return;
+ }
+
+ if (config.getObservabilityMode()) {
+ return;
+ }
+
+ EventType expected = expectedResponses.peek();
+ EventType received = null;
+ if (response.hasRequestHeaders()) {
+ received = EventType.REQUEST_HEADERS;
+ } else if (response.hasRequestBody()) {
+ received = EventType.REQUEST_BODY;
+ } else if (response.hasResponseHeaders()) {
+ received = EventType.RESPONSE_HEADERS;
+ } else if (response.hasResponseBody()) {
+ received = EventType.RESPONSE_BODY;
+ } else if (response.hasResponseTrailers()) {
+ received = EventType.RESPONSE_TRAILERS;
+ }
+
+ if (received != null) {
+ if (expected == null || expected != received) {
+ onError(Status.UNAVAILABLE
+ .withDescription("Protocol error: received response out of order. Expected: "
+ + expected + ", Received: " + received)
+ .asRuntimeException());
+ return;
+ }
+ expectedResponses.poll();
+ }
+
+ if (response.getRequestDrain()) {
+ drainingExtProcStream.set(true);
+ halfCloseExtProcStream();
+ activateCall();
+ }
+
+ // 1. Client Headers
+ if (response.hasRequestHeaders()) {
+ if (response.getRequestHeaders().hasResponse()) {
+ if (response.getRequestHeaders().getResponse().getStatus()
+ == CommonResponse.ResponseStatus.CONTINUE_AND_REPLACE) {
+ onError(Status.UNAVAILABLE
+ .withDescription("CONTINUE_AND_REPLACE is not supported")
+ .asRuntimeException());
+ return;
+ }
+ applyHeaderMutations(
+ requestHeaders,
+ response.getRequestHeaders().getResponse().getHeaderMutation());
+ }
+ activateCall();
+ }
+ // 2. Client Message (Request Body)
+ else if (response.hasRequestBody()) {
+ if (checkCompressionSupport(response.getRequestBody())) {
+ handleRequestBodyResponse(response.getRequestBody());
+ }
+ }
+ // 3. Client Trailers
+ else if (response.hasRequestTrailers()) {
+ wrappedListener.proceedWithClose();
+ }
+ // 4. Server Headers
+ else if (response.hasResponseHeaders()) {
+ if (response.getResponseHeaders().hasResponse()) {
+ if (response.getResponseHeaders().getResponse().getStatus()
+ == CommonResponse.ResponseStatus.CONTINUE_AND_REPLACE) {
+ onError(Status.UNAVAILABLE
+ .withDescription("CONTINUE_AND_REPLACE is not supported")
+ .asRuntimeException());
+ return;
+ }
+ Metadata target = wrappedListener.trailersOnly.get()
+ ? wrappedListener.savedTrailers : wrappedListener.savedHeaders;
+ applyHeaderMutations(
+ target, response.getResponseHeaders().getResponse().getHeaderMutation());
+ }
+ if (wrappedListener.trailersOnly.get()) {
+ wrappedListener.proceedWithClose();
+ } else {
+ wrappedListener.proceedWithHeaders();
+ }
+ }
+ // 5. Server Message (Response Body)
+ else if (response.hasResponseBody()) {
+ if (checkCompressionSupport(response.getResponseBody())) {
+ handleResponseBodyResponse(response.getResponseBody(), wrappedListener);
+ }
+ }
+ // 6. Response Trailers
+ else if (response.hasResponseTrailers()) {
+ if (response.getResponseTrailers().hasHeaderMutation()) {
+ applyHeaderMutations(
+ wrappedListener.savedTrailers,
+ response.getResponseTrailers().getHeaderMutation()
+ );
+ }
+ }
+
+ checkEndOfStream(response);
+ } catch (Throwable t) {
+ onError(t);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (extProcStreamCompleted.compareAndSet(false, true)) {
+ synchronized (streamLock) {
+ if (extProcClientCallRequestObserver != null) {
+ extProcClientCallRequestObserver.onError(t);
+ extProcClientCallRequestObserver = null;
+ }
+ }
+ if (config.getFailureModeAllow()) {
+ handleFailOpen(wrappedListener);
+ } else {
+ extProcStreamFailed.set(true);
+ String message = "External processor stream failed";
+ delayedCall.cancel(message, t);
+ wrappedListener.proceedWithClose();
+ }
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ if (extProcStreamCompleted.compareAndSet(false, true)) {
+ drainingExtProcStream.set(false);
+ handleFailOpen(wrappedListener);
+ }
+ }
+ });
+
+ boolean sendRequestHeaders =
+ currentProcessingMode.getRequestHeaderMode() == ProcessingMode.HeaderSendMode.SEND
+ || currentProcessingMode.getRequestHeaderMode()
+ == ProcessingMode.HeaderSendMode.DEFAULT;
+
+ if (sendRequestHeaders) {
+ sendToExtProc(ProcessingRequest.newBuilder()
+ .setRequestHeaders(HttpHeaders.newBuilder()
+ .setHeaders(toHeaderMap(headers, config.getForwardRulesConfig()))
+ .setEndOfStream(false)
+ .build())
+ .putAllAttributes(
+ collectAttributes(config.getRequestAttributes(), method, channel, headers))
+ .setProtocolConfig(ProtocolConfiguration.newBuilder()
+ .setRequestBodyMode(currentProcessingMode.getRequestBodyMode())
+ .setResponseBodyMode(currentProcessingMode.getResponseBodyMode())
+ .build())
+ .build());
+ }
+
+ if (config.getObservabilityMode() || !sendRequestHeaders) {
+ activateCall();
+ }
+ }
+
+ private void sendToExtProc(ProcessingRequest request) {
+ synchronized (streamLock) {
+ if (extProcStreamCompleted.get()) {
+ return;
+ }
+
+ if (request.hasRequestHeaders()) {
+ expectedResponses.add(EventType.REQUEST_HEADERS);
+ } else if (request.hasRequestBody()) {
+ expectedResponses.add(EventType.REQUEST_BODY);
+ } else if (request.hasResponseHeaders()) {
+ expectedResponses.add(EventType.RESPONSE_HEADERS);
+ } else if (request.hasResponseBody()) {
+ expectedResponses.add(EventType.RESPONSE_BODY);
+ } else if (request.hasResponseTrailers()) {
+ expectedResponses.add(EventType.RESPONSE_TRAILERS);
+ }
+
+ if (extProcClientCallRequestObserver != null) {
+ extProcClientCallRequestObserver.onNext(request);
+ } else {
+ pendingProcessingRequests.add(request);
+ }
+ }
+ }
+
+ private void onExtProcStreamReady() {
+ drainPendingRequests();
+ onReadyNotify();
+ }
+
+ private void drainPendingRequests() {
+ int toRequest = pendingRequests.getAndSet(0);
+ if (toRequest > 0) {
+ super.request(toRequest);
+ }
+ }
+
+ private void closeExtProcStream() {
+ synchronized (streamLock) {
+ if (extProcStreamCompleted.compareAndSet(false, true)) {
+ if (extProcClientCallRequestObserver != null) {
+ extProcClientCallRequestObserver.onCompleted();
+ }
+ }
+ }
+ }
+
+ private void halfCloseExtProcStream() {
+ synchronized (streamLock) {
+ if (!extProcStreamCompleted.get() && extProcClientCallRequestObserver != null) {
+ extProcClientCallRequestObserver.onCompleted();
+ }
+ }
+ }
+
+ private void onReadyNotify() {
+ wrappedListener.onReadyNotify();
+ }
+
+ private boolean isSidecarReady() {
+ if (extProcStreamCompleted.get()) {
+ return true;
+ }
+ if (drainingExtProcStream.get()) {
+ return false;
+ }
+ synchronized (streamLock) {
+ ClientCallStreamObserver observer = extProcClientCallRequestObserver;
+ return observer != null && observer.isReady();
+ }
+ }
+
+ @Override
+ public boolean isReady() {
+ if (passThroughMode.get()) {
+ return super.isReady();
+ }
+ if (extProcStreamCompleted.get()) {
+ return super.isReady();
+ }
+ if (!activated.get() && !config.getObservabilityMode()) {
+ return false;
+ }
+ boolean sidecarReady = isSidecarReady();
+ if (config.getObservabilityMode()) {
+ return super.isReady() && sidecarReady;
+ }
+ return sidecarReady;
+ }
+
+ @Override
+ public void request(int numMessages) {
+ if (passThroughMode.get() || extProcStreamCompleted.get()) {
+ super.request(numMessages);
+ return;
+ }
+ if (!isSidecarReady()) {
+ pendingRequests.addAndGet(numMessages);
+ return;
+ }
+ super.request(numMessages);
+ }
+
+ @Override
+ public void sendMessage(InputStream message) {
+ if (requestSideClosed.get()) {
+ // External processor already closed the request stream. Discard further messages.
+ return;
+ }
+
+ if (passThroughMode.get() || extProcStreamCompleted.get()) {
+ super.sendMessage(message);
+ return;
+ }
+
+ if (currentProcessingMode.getRequestBodyMode() == ProcessingMode.BodySendMode.NONE) {
+ super.sendMessage(message);
+ return;
+ }
+
+ // Mode is GRPC
+ try {
+ byte[] bodyBytes = ByteStreams.toByteArray(message);
+ sendToExtProc(ProcessingRequest.newBuilder()
+ .setRequestBody(HttpBody.newBuilder()
+ .setBody(ByteString.copyFrom(bodyBytes))
+ .setEndOfStream(false)
+ .build())
+ .build());
+
+ if (config.getObservabilityMode()) {
+ super.sendMessage(new ByteArrayInputStream(bodyBytes));
+ }
+ } catch (IOException e) {
+ rawCall.cancel("Failed to serialize message for External Processor", e);
+ }
+ }
+
+ @Override
+ public void halfClose() {
+ halfClosed.set(true);
+ if (passThroughMode.get() || extProcStreamCompleted.get()) {
+ if (requestSideClosed.compareAndSet(false, true)) {
+ super.halfClose();
+ }
+ return;
+ }
+
+ if (currentProcessingMode.getRequestBodyMode() == ProcessingMode.BodySendMode.NONE) {
+ if (requestSideClosed.compareAndSet(false, true)) {
+ super.halfClose();
+ }
+ return;
+ }
+
+ // Mode is GRPC
+ sendToExtProc(ProcessingRequest.newBuilder()
+ .setRequestBody(HttpBody.newBuilder()
+ .setEndOfStreamWithoutMessage(true)
+ .build())
+ .build());
+
+ // Defer super.halfClose() until ext-proc response signals end_of_stream.
+ }
+
+ @Override
+ public void cancel(@Nullable String message, @Nullable Throwable cause) {
+ synchronized (streamLock) {
+ if (!extProcStreamCompleted.get() && extProcClientCallRequestObserver != null) {
+ extProcClientCallRequestObserver.onError(
+ Status.CANCELLED
+ .withDescription(message)
+ .withCause(cause)
+ .asRuntimeException());
+ }
+ }
+ super.cancel(message, cause);
+ }
+
+ private void handleRequestBodyResponse(BodyResponse bodyResponse) {
+ if (bodyResponse.hasResponse() && bodyResponse.getResponse().hasBodyMutation()) {
+ BodyMutation mutation = bodyResponse.getResponse().getBodyMutation();
+ if (mutation.hasStreamedResponse()) {
+ StreamedBodyResponse streamed = mutation.getStreamedResponse();
+ if (!streamed.getBody().isEmpty()) {
+ super.sendMessage(streamed.getBody().newInput());
+ }
+ }
+ }
+ // If the application already half-closed, and we just received a response from
+ // the sidecar for the last part of the request body, we can now half-close the data plane.
+ if (halfClosed.get()) {
+ if (requestSideClosed.compareAndSet(false, true)) {
+ super.halfClose();
+ }
+ }
+ }
+
+ private void handleResponseBodyResponse(
+ BodyResponse bodyResponse, DataPlaneListener listener) {
+ if (bodyResponse.hasResponse() && bodyResponse.getResponse().hasBodyMutation()) {
+ BodyMutation mutation = bodyResponse.getResponse().getBodyMutation();
+ if (mutation.hasStreamedResponse()) {
+ StreamedBodyResponse streamed = mutation.getStreamedResponse();
+ if (!streamed.getBody().isEmpty()) {
+ listener.onExternalBody(streamed.getBody());
+ }
+ if (streamed.getEndOfStream() || streamed.getEndOfStreamWithoutMessage()) {
+ listener.proceedWithClose();
+ }
+ }
+ }
+ }
+
+ private void handleImmediateResponse(ImmediateResponse immediate, DataPlaneListener listener)
+ throws HeaderMutationDisallowedException {
+ Status status = Status.fromCodeValue(immediate.getGrpcStatus().getStatus());
+ if (!immediate.getDetails().isEmpty()) {
+ status = status.withDescription(immediate.getDetails());
+ }
+
+ Metadata trailers = new Metadata();
+ if (immediate.hasHeaders()) {
+ applyHeaderMutations(trailers, immediate.getHeaders());
+ }
+
+ // ImmediateResponse should take precedence over any other closure
+ // if it arrives before the app is notified.
+ listener.savedStatus = status;
+ listener.savedTrailers = trailers;
+
+ if (isProcessingTrailers.get()) {
+ // If sent in response to a server trailers event, sets the status and optionally
+ // headers to be included in the trailers.
+ listener.unblockAfterStreamComplete();
+ } else {
+ // If sent in response to any other event, it will cause the data plane RPC to
+ // immediately fail with the specified status as if it were an out-of-band
+ // cancellation.
+ rawCall.cancel(status.getDescription(), null);
+ listener.unblockAfterStreamComplete();
+ }
+ closeExtProcStream();
+ }
+
+ private void handleFailOpen(DataPlaneListener listener) {
+ activateCall();
+ listener.unblockAfterStreamComplete();
+ closeExtProcStream();
+ }
+
+ private void checkEndOfStream(ProcessingResponse response) {
+ boolean terminal = false;
+ if (response.hasResponseTrailers()) {
+ terminal = true;
+ } else if (response.hasResponseHeaders() && wrappedListener.trailersOnly.get()) {
+ terminal = true;
+ }
+
+ if (terminal) {
+ wrappedListener.unblockAfterStreamComplete();
+ closeExtProcStream();
+ }
+ }
+ }
+
+ private static class DataPlaneListener extends ClientCall.Listener {
+ private final ClientCall.Listener delegate;
+ private final ClientCall, ?> rawCall;
+ private final DataPlaneClientCall dataPlaneClientCall;
+ private final Queue savedMessages = new ConcurrentLinkedQueue<>();
+ private volatile Metadata savedHeaders;
+ private volatile Metadata savedTrailers;
+ private volatile Status savedStatus;
+ private final AtomicBoolean terminationTriggered = new AtomicBoolean(false);
+ private final AtomicBoolean responseHeadersSent = new AtomicBoolean(false);
+ private final AtomicBoolean trailersOnly = new AtomicBoolean(false);
+
+ protected DataPlaneListener(
+ ClientCall.Listener delegate,
+ ClientCall, ?> rawCall,
+ DataPlaneClientCall dataPlaneClientCall) {
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.rawCall = rawCall;
+ this.dataPlaneClientCall = dataPlaneClientCall;
+ }
+
+ @Override
+ public void onReady() {
+ dataPlaneClientCall.drainPendingRequests();
+ onReadyNotify();
+ }
+
+ void onReadyNotify() {
+ delegate.onReady();
+ }
+
+ @Override
+ public void onHeaders(Metadata headers) {
+ responseHeadersSent.set(true);
+ boolean sendResponseHeaders =
+ dataPlaneClientCall.currentProcessingMode.getResponseHeaderMode()
+ == ProcessingMode.HeaderSendMode.SEND
+ || dataPlaneClientCall.currentProcessingMode.getResponseHeaderMode()
+ == ProcessingMode.HeaderSendMode.DEFAULT;
+
+ if (dataPlaneClientCall.passThroughMode.get()
+ || dataPlaneClientCall.extProcStreamCompleted.get()
+ || !sendResponseHeaders) {
+ delegate.onHeaders(headers);
+ return;
+ }
+
+ this.savedHeaders = headers;
+ dataPlaneClientCall.sendToExtProc(ProcessingRequest.newBuilder()
+ .setResponseHeaders(HttpHeaders.newBuilder()
+ .setHeaders(
+ toHeaderMap(headers, dataPlaneClientCall.config.getForwardRulesConfig()))
+ .build())
+ .build());
+
+ if (dataPlaneClientCall.config.getObservabilityMode()) {
+ proceedWithHeaders();
+ }
+ }
+
+ void proceedWithHeaders() {
+ if (savedHeaders != null) {
+ delegate.onHeaders(savedHeaders);
+ savedHeaders = null;
+ InputStream msg;
+ while ((msg = savedMessages.poll()) != null) {
+ onMessage(msg);
+ }
+ onReadyNotify();
+ if (savedStatus != null) {
+ triggerCloseHandshake();
+ }
+ }
+ }
+
+ @Override
+ public void onMessage(InputStream message) {
+ if (dataPlaneClientCall.passThroughMode.get()) {
+ delegate.onMessage(message);
+ return;
+ }
+
+ if (savedHeaders != null) {
+ savedMessages.add(message);
+ return;
+ }
+
+ if (dataPlaneClientCall.extProcStreamCompleted.get()
+ || dataPlaneClientCall.currentProcessingMode.getResponseBodyMode()
+ != ProcessingMode.BodySendMode.GRPC) {
+ delegate.onMessage(message);
+ return;
+ }
+
+ try {
+ byte[] bodyBytes = ByteStreams.toByteArray(message);
+ sendResponseBodyToExtProc(bodyBytes, false);
+
+ if (dataPlaneClientCall.config.getObservabilityMode()) {
+ delegate.onMessage(new ByteArrayInputStream(bodyBytes));
+ }
+ } catch (IOException e) {
+ rawCall.cancel("Failed to read server response", e);
+ }
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ if (dataPlaneClientCall.extProcStreamFailed.get()) {
+ if (dataPlaneClientCall.notifiedApp.compareAndSet(false, true)) {
+ delegate.onClose(Status.UNAVAILABLE.withDescription("External processor stream failed")
+ .withCause(status.getCause()), new Metadata());
+ }
+ return;
+ }
+ if (dataPlaneClientCall.passThroughMode.get()) {
+ if (dataPlaneClientCall.notifiedApp.compareAndSet(false, true)) {
+ delegate.onClose(status, trailers);
+ }
+ return;
+ }
+
+ this.savedStatus = status;
+ this.savedTrailers = trailers;
+
+ if (dataPlaneClientCall.extProcStreamCompleted.get()) {
+ proceedWithClose();
+ return;
+ }
+
+ if (savedHeaders != null) {
+ return;
+ }
+
+ if (!responseHeadersSent.get()) {
+ trailersOnly.set(true);
+ }
+
+ triggerCloseHandshake();
+
+ if (dataPlaneClientCall.config.getObservabilityMode()) {
+ proceedWithClose();
+ @SuppressWarnings("unused")
+ ScheduledFuture> unused = dataPlaneClientCall.scheduler.schedule(
+ dataPlaneClientCall::closeExtProcStream,
+ dataPlaneClientCall.config.getDeferredCloseTimeoutNanos(),
+ TimeUnit.NANOSECONDS);
+ }
+ }
+
+ private void triggerCloseHandshake() {
+ if (dataPlaneClientCall.extProcStreamCompleted.get()
+ || !terminationTriggered.compareAndSet(false, true)) {
+ return;
+ }
+
+ if (trailersOnly.get()) {
+ dataPlaneClientCall.sendToExtProc(ProcessingRequest.newBuilder()
+ .setResponseHeaders(HttpHeaders.newBuilder()
+ .setHeaders(
+ toHeaderMap(
+ savedTrailers,
+ dataPlaneClientCall.config.getForwardRulesConfig()))
+ .setEndOfStream(true)
+ .build())
+ .build());
+ return;
+ }
+
+ boolean sendResponseTrailers =
+ dataPlaneClientCall.currentProcessingMode.getResponseTrailerMode()
+ == ProcessingMode.HeaderSendMode.SEND;
+
+ if (sendResponseTrailers) {
+ dataPlaneClientCall.isProcessingTrailers.set(true);
+ dataPlaneClientCall.sendToExtProc(ProcessingRequest.newBuilder()
+ .setResponseTrailers(HttpTrailers.newBuilder()
+ .setTrailers(
+ toHeaderMap(
+ savedTrailers,
+ dataPlaneClientCall.config.getForwardRulesConfig()))
+ .build())
+ .build());
+ } else {
+ // Send EOS signal via empty body
+ dataPlaneClientCall.sendToExtProc(ProcessingRequest.newBuilder()
+ .setResponseBody(HttpBody.newBuilder()
+ .setEndOfStreamWithoutMessage(true)
+ .build())
+ .build());
+
+ if (dataPlaneClientCall.config.getObservabilityMode()) {
+ // In observability mode we don't wait for handshake response
+ proceedWithClose();
+ }
+ }
+ }
+
+ private void sendResponseBodyToExtProc(@Nullable byte[] bodyBytes, boolean endOfStream) {
+ if (dataPlaneClientCall.extProcStreamCompleted.get()
+ || dataPlaneClientCall.currentProcessingMode.getResponseBodyMode()
+ != ProcessingMode.BodySendMode.GRPC) {
+ return;
+ }
+
+ HttpBody.Builder bodyBuilder =
+ HttpBody.newBuilder();
+ if (bodyBytes != null) {
+ bodyBuilder.setBody(ByteString.copyFrom(bodyBytes));
+ }
+ bodyBuilder.setEndOfStream(endOfStream);
+
+ dataPlaneClientCall.sendToExtProc(ProcessingRequest.newBuilder()
+ .setResponseBody(bodyBuilder.build())
+ .build());
+ }
+
+ void proceedWithClose() {
+ if (savedStatus != null) {
+ if (dataPlaneClientCall.notifiedApp.compareAndSet(false, true)) {
+ delegate.onClose(savedStatus, savedTrailers);
+ }
+ savedStatus = null;
+ savedTrailers = null;
+ }
+ }
+
+ void onExternalBody(ByteString body) {
+ delegate.onMessage(body.newInput());
+ }
+
+ void unblockAfterStreamComplete() {
+ proceedWithHeaders();
+ dataPlaneClientCall.passThroughMode.set(true);
+ proceedWithClose();
+ }
+ }
+ }
+}
diff --git a/xds/src/main/java/io/grpc/xds/FaultFilter.java b/xds/src/main/java/io/grpc/xds/FaultFilter.java
index 0f3bb5b0557..e0533889d74 100644
--- a/xds/src/main/java/io/grpc/xds/FaultFilter.java
+++ b/xds/src/main/java/io/grpc/xds/FaultFilter.java
@@ -104,7 +104,8 @@ public FaultFilter newInstance(String name) {
}
@Override
- public ConfigOrError parseFilterConfig(Message rawProtoMessage) {
+ public ConfigOrError parseFilterConfig(
+ Message rawProtoMessage, FilterContext context) {
HTTPFault httpFaultProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
@@ -119,8 +120,9 @@ public ConfigOrError parseFilterConfig(Message rawProtoMessage) {
}
@Override
- public ConfigOrError parseFilterConfigOverride(Message rawProtoMessage) {
- return parseFilterConfig(rawProtoMessage);
+ public ConfigOrError parseFilterConfigOverride(
+ Message rawProtoMessage, FilterContext context) {
+ return parseFilterConfig(rawProtoMessage, context);
}
private static ConfigOrError parseHttpFault(HTTPFault httpFault) {
diff --git a/xds/src/main/java/io/grpc/xds/Filter.java b/xds/src/main/java/io/grpc/xds/Filter.java
index 416d929becf..d70b3063a50 100644
--- a/xds/src/main/java/io/grpc/xds/Filter.java
+++ b/xds/src/main/java/io/grpc/xds/Filter.java
@@ -16,10 +16,14 @@
package io.grpc.xds;
+
+import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.protobuf.Message;
import io.grpc.ClientInterceptor;
import io.grpc.ServerInterceptor;
+import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
+import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
@@ -93,13 +97,15 @@ default boolean isServerFilter() {
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
- ConfigOrError extends FilterConfig> parseFilterConfig(Message rawProtoMessage);
+ ConfigOrError extends FilterConfig> parseFilterConfig(
+ Message rawProtoMessage, FilterContext context);
/**
* Parses the per-filter override filter config from raw proto message. The message may be
* either a {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
- ConfigOrError extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);
+ ConfigOrError extends FilterConfig> parseFilterConfigOverride(
+ Message rawProtoMessage, FilterContext context);
}
/** Uses the FilterConfigs produced above to produce an HTTP filter interceptor for clients. */
@@ -125,6 +131,27 @@ default ServerInterceptor buildServerInterceptor(
@Override
default void close() {}
+ /** Context carrying dynamic metadata for a filter. */
+ @AutoValue
+ abstract static class FilterContext {
+ abstract BootstrapInfo bootstrapInfo();
+
+ abstract ServerInfo serverInfo();
+
+ static Builder builder() {
+ return new AutoValue_Filter_FilterContext.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder bootstrapInfo(BootstrapInfo info);
+
+ abstract Builder serverInfo(ServerInfo info);
+
+ abstract FilterContext build();
+ }
+ }
+
/** Filter config with instance name. */
final class NamedFilterConfig {
// filter instance name
diff --git a/xds/src/main/java/io/grpc/xds/FilterRegistry.java b/xds/src/main/java/io/grpc/xds/FilterRegistry.java
index da3a59fe8c1..29241fc1da8 100644
--- a/xds/src/main/java/io/grpc/xds/FilterRegistry.java
+++ b/xds/src/main/java/io/grpc/xds/FilterRegistry.java
@@ -17,6 +17,7 @@
package io.grpc.xds;
import com.google.common.annotations.VisibleForTesting;
+import io.grpc.internal.GrpcUtil;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
@@ -39,6 +40,9 @@ static synchronized FilterRegistry getDefaultRegistry() {
new RouterFilter.Provider(),
new RbacFilter.Provider(),
new GcpAuthenticationFilter.Provider());
+ if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_EXT_PROC_ON_CLIENT", false)) {
+ instance.register(new ExternalProcessorFilter.Provider());
+ }
}
return instance;
}
diff --git a/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java b/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java
index 8ec02f4f809..78d20edec46 100644
--- a/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java
+++ b/xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java
@@ -86,7 +86,8 @@ public GcpAuthenticationFilter newInstance(String name) {
}
@Override
- public ConfigOrError parseFilterConfig(Message rawProtoMessage) {
+ public ConfigOrError parseFilterConfig(
+ Message rawProtoMessage, FilterContext context) {
GcpAuthnFilterConfig gcpAuthnProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
@@ -121,8 +122,8 @@ public ConfigOrError parseFilterConfig(Message rawProto
@Override
public ConfigOrError parseFilterConfigOverride(
- Message rawProtoMessage) {
- return parseFilterConfig(rawProtoMessage);
+ Message rawProtoMessage, FilterContext context) {
+ return parseFilterConfig(rawProtoMessage, context);
}
}
diff --git a/xds/src/main/java/io/grpc/xds/RbacFilter.java b/xds/src/main/java/io/grpc/xds/RbacFilter.java
index 91df1e68802..035bfd06607 100644
--- a/xds/src/main/java/io/grpc/xds/RbacFilter.java
+++ b/xds/src/main/java/io/grpc/xds/RbacFilter.java
@@ -94,7 +94,8 @@ public RbacFilter newInstance(String name) {
}
@Override
- public ConfigOrError parseFilterConfig(Message rawProtoMessage) {
+ public ConfigOrError parseFilterConfig(
+ Message rawProtoMessage, FilterContext context) {
RBAC rbacProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
@@ -109,7 +110,8 @@ public ConfigOrError parseFilterConfig(Message rawProtoMessage) {
}
@Override
- public ConfigOrError parseFilterConfigOverride(Message rawProtoMessage) {
+ public ConfigOrError parseFilterConfigOverride(
+ Message rawProtoMessage, FilterContext context) {
RBACPerRoute rbacPerRoute;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
diff --git a/xds/src/main/java/io/grpc/xds/RouterFilter.java b/xds/src/main/java/io/grpc/xds/RouterFilter.java
index 504c4213149..c80e57c9010 100644
--- a/xds/src/main/java/io/grpc/xds/RouterFilter.java
+++ b/xds/src/main/java/io/grpc/xds/RouterFilter.java
@@ -61,13 +61,14 @@ public RouterFilter newInstance(String name) {
}
@Override
- public ConfigOrError extends FilterConfig> parseFilterConfig(Message rawProtoMessage) {
+ public ConfigOrError extends FilterConfig> parseFilterConfig(
+ Message rawProtoMessage, FilterContext context) {
return ConfigOrError.fromConfig(ROUTER_CONFIG);
}
@Override
public ConfigOrError extends FilterConfig> parseFilterConfigOverride(
- Message rawProtoMessage) {
+ Message rawProtoMessage, FilterContext context) {
return ConfigOrError.fromError("Router Filter should not have override config");
}
}
diff --git a/xds/src/main/java/io/grpc/xds/XdsListenerResource.java b/xds/src/main/java/io/grpc/xds/XdsListenerResource.java
index 041b659b4c3..4bf1b0066c2 100644
--- a/xds/src/main/java/io/grpc/xds/XdsListenerResource.java
+++ b/xds/src/main/java/io/grpc/xds/XdsListenerResource.java
@@ -527,7 +527,7 @@ static io.grpc.xds.HttpConnectionManager parseHttpConnectionManager(
"HttpConnectionManager contains duplicate HttpFilter: " + filterName);
}
StructOrError filterConfig =
- parseHttpFilter(httpFilter, filterRegistry, isForClient);
+ parseHttpFilter(httpFilter, filterRegistry, isForClient, args);
if ((i == proto.getHttpFiltersCount() - 1)
&& (filterConfig == null || !isTerminalFilter(filterConfig.getStruct()))) {
throw new ResourceInvalidException("The last HttpFilter must be a terminal filter: "
@@ -581,7 +581,8 @@ private static boolean isTerminalFilter(Filter.FilterConfig filterConfig) {
@Nullable // Returns null if the filter is optional but not supported.
static StructOrError parseHttpFilter(
io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter
- httpFilter, FilterRegistry filterRegistry, boolean isForClient) {
+ httpFilter, FilterRegistry filterRegistry, boolean isForClient,
+ XdsResourceType.Args args) {
String filterName = httpFilter.getName();
boolean isOptional = httpFilter.getIsOptional();
if (!httpFilter.hasTypedConfig()) {
@@ -616,7 +617,13 @@ static StructOrError parseHttpFilter(
"HttpFilter [" + filterName + "](" + typeUrl + ") is required but unsupported for " + (
isForClient ? "client" : "server"));
}
- ConfigOrError extends FilterConfig> filterConfig = provider.parseFilterConfig(rawConfig);
+
+ Filter.FilterContext filterContext = Filter.FilterContext.builder()
+ .bootstrapInfo(args.getBootstrapInfo())
+ .serverInfo(args.getServerInfo())
+ .build();
+ ConfigOrError extends FilterConfig> filterConfig =
+ provider.parseFilterConfig(rawConfig, filterContext);
if (filterConfig.errorDetail != null) {
return StructOrError.fromError(
"Invalid filter config for HttpFilter [" + filterName + "]: " + filterConfig.errorDetail);
diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
index 69b0b824433..78550582d27 100644
--- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
+++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
@@ -110,7 +110,7 @@ final class XdsNameResolver extends NameResolver {
@Nullable
private final String targetAuthority;
private final String serviceAuthority;
- // Encoded version of the service authority as per
+ // Encoded version of the service authority as per
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
private final String encodedServiceAuthority;
private final String overrideAuthority;
@@ -154,8 +154,8 @@ final class XdsNameResolver extends NameResolver {
this(target, targetAuthority, name, overrideAuthority, serviceConfigParser,
syncContext, scheduler,
bootstrapOverride == null
- ? SharedXdsClientPoolProvider.getDefaultProvider()
- : new SharedXdsClientPoolProvider(),
+ ? SharedXdsClientPoolProvider.getDefaultProvider()
+ : new SharedXdsClientPoolProvider(),
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
metricRecorder, nameResolverArgs);
}
@@ -172,8 +172,8 @@ final class XdsNameResolver extends NameResolver {
// The name might have multiple slashes so encode it before verifying.
serviceAuthority = checkNotNull(name, "name");
- this.encodedServiceAuthority =
- GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
+ this.encodedServiceAuthority =
+ GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
this.overrideAuthority = overrideAuthority;
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
@@ -234,7 +234,7 @@ public void start(Listener2 listener) {
}
String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
- ) {
+ ) {
listener.onError(Status.INVALID_ARGUMENT.withDescription(
"invalid listener resource URI for service authority: " + serviceAuthority));
return;
@@ -921,8 +921,8 @@ private void cleanUpRoutes(Status error) {
// the config selector handles the error message itself.
listener.onResult2(ResolutionResult.newBuilder()
.setAttributes(Attributes.newBuilder()
- .set(InternalConfigSelector.KEY, configSelector)
- .build())
+ .set(InternalConfigSelector.KEY, configSelector)
+ .build())
.setServiceConfig(emptyServiceConfig)
.build());
}
diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
index 51b1ff49bf0..c395c363acb 100644
--- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
+++ b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
@@ -63,7 +63,7 @@ public XdsNameResolverProvider() {
}
private XdsNameResolverProvider(String scheme,
- @Nullable Map bootstrapOverride) {
+ @Nullable Map bootstrapOverride) {
this.scheme = checkNotNull(scheme, "scheme");
this.bootstrapOverride = bootstrapOverride;
}
@@ -73,7 +73,7 @@ private XdsNameResolverProvider(String scheme,
* and bootstrap.
*/
public static XdsNameResolverProvider createForTest(String scheme,
- @Nullable Map bootstrapOverride) {
+ @Nullable Map bootstrapOverride) {
return new XdsNameResolverProvider(scheme, bootstrapOverride);
}
diff --git a/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java b/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java
index 24ec0659b42..890a2936861 100644
--- a/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java
+++ b/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java
@@ -198,7 +198,7 @@ private static StructOrError parseVirtualHost(
routes.add(route.getStruct());
}
StructOrError