Ext_proc filter#12792
Conversation
…roceedWithoutModification to use in-process server
…eBuffered to use in-process server
…reNOTBuffered to use in-process server
…AreBuffered to use in-process server
…PlaneRequestIsDrained to use in-process server
…estsAreForwardedImmediately to use in-process server
…roc response but only sent a mode override. As per envoy ext-processing ext-proc cannot omit request_headers (even if empty) in response to a request_headers processing request.
… track close already called for handling immediate response.
…lizingExecutor to use real dataPlaneChannel
…ubHasCorrectDeadline to use real dataPlaneChannel
…xtProcStreamSendsMetadata to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel and ServerInterceptor
…entToExtProc to use real dataPlaneChannel
…henMessagesAreDiscarded to use real dataPlaneChannel
…ExtProcAndSuperHalfCloseIsDeferred to use real dataPlaneChannel
…nSuperHalfCloseIsCalled to use real dataPlaneChannel
…False to use real dataPlaneChannel
…sTrue to use real dataPlaneChannel
…OnReady to use real dataPlaneChannel
…Ready to use real dataPlaneChannel
…stsAreForwardedImmediately to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel
1. Core State Flags
The mechanism relies on three primary atomic flags to coordinate the transition:
* extProcStreamCompleted: Signals that the sidecar stream is gone. Once set, the filter stops sending new requests to the sidecar and enters a "transition" state.
* passThroughMode: Signals that the interceptor has finished flushing all previously buffered response events. Once set, all new data plane traffic (both requests and responses) bypasses the interception logic.
* notifiedApp: A guard that ensures the application receives exactly one onClose signal, even if the sidecar and the server both signal closure simultaneously.
2. When Event Buffering and Queueing Starts
The interceptor manages the "Interception Debt" by capturing and ordering data plane response callbacks:
* Headers (onHeaders): If the sidecar needs to see headers (SEND mode), the data plane headers are captured in savedHeaders. They are held back from the application until the sidecar responds or fails.
* Messages (onMessage):
* Sidecar Processing: If GRPC body mode is active, messages are intercepted and sent to the sidecar.
* Queueing: If the sidecar has terminated OR the mode is NONE, but headers are still intercepted (savedHeaders != null), messages are added to the savedMessages queue. This is a critical safety measure: delivering a message to the application before its headers is a gRPC
protocol violation.
* Trailers & Status (onClose): If the server closes the RPC while the filter is still waiting on the sidecar or draining buffers, the status and trailers are captured in savedStatus and savedTrailers. They are held until all preceding headers and messages are delivered.
3. The Draining Lifecycle (Fail-Open or Graceful)
If the sidecar stream ends, the filter enters the Draining Phase. The sequence is strictly enforced within the unblockAfterStreamComplete() method to ensure protocol-compliant delivery:
1. Header Delivery: savedHeaders are delivered to the application first.
2. Message Draining: All messages in the savedMessages queue (those that arrived while headers or sidecar responses were pending) are polled and delivered sequentially via super.onMessage().
3. Ready Notification: onReady() is triggered to inform the application it can resume sending data.
4. Activation of Pass-Through: The passThroughMode flag is set to true. From this point forward, new events bypass the buffers.
5. Final Closure: Finally, savedStatus and savedTrailers are delivered to the application, concluding the RPC.
4. Pass-Through Mode
Once the draining is complete and passThroughMode is active, the interceptor becomes "transparent." Every subsequent callback from the server (onMessage, onClose) or call from the application (sendMessage, halfClose) is immediately delegated to the underlying gRPC stream.
---
Summary of Benefits
* Strict Ordering across Callbacks: Guarantees that the application always receives events in the sequence: onHeaders -> onMessage -> onClose.
* Race Resilience: Prevents new data plane messages from "jumping the queue" ahead of buffered ones during the sidecar's termination.
* Protocol Compliance: The savedMessages buffer prevents "Message before Headers" errors when the server's data stream outruns the sidecar's header processing.
* Seamless Fail-Open: Provides a robust path for the RPC to continue without interruption if the external processor becomes unavailable.
Implement forgotten draining of queued response messages when ext-proc mutated headers unblocks the dataplane rpc. Also call the same unblockAfterExtProcStreamComplete method during "immediate response" handling.
1. Graceful Handshake: The filter now performs an explicit handshake with the sidecar during RPC termination. It sends a ResponseTrailers message or an empty ResponseBody with the end_of_stream_without_message bit set, and then waits for a corresponding terminal response from the sidecar before finalizing the data plane call.
2. Mode Override Removal: All support for ProcessingResponse.mode_override has been removed. This includes the internal handleModeOverride logic and the associated configuration fields (allow_mode_override, allowed_override_modes).
3. Test Suite Stabilization:
* Updated all mock sidecars to correctly acknowledge the new handshake protocol.
* Fixed synchronization issues in client and bidirectional streaming tests by introducing asynchronous server responses with small delays.
* Removed all tests that were specifically targeting the now-defunct dynamic mode override feature.
* Added explicit verification of sidecar stream completion using new latches (extProcCompletedLatch, extProcBidiCompletedLatch).
* Updated io.grpc.xds.internal.MatcherParser to support parsing envoy.type.matcher.v3.ListStringMatcher into an internal list of matchers.
* Implemented a nested HeaderForwardingRulesConfig class within ExternalProcessorFilter to encapsulate the allow/disallow logic.
* Refactored ExternalProcessorInterceptor.toHeaderMap to perform case-insensitive header name filtering according to the configured rules.
* Applied this filtering to all headers sent to the sidecar (initial request, response headers, and trailers).
* Fixed givenClientStreamingRpc and givenBidiStreamingRpc failures.
* Threading Fixes: Replaced complex thread pools with directExecutor() for components where appropriate and used dedicated single-thread executors for async responses. This resolved the IllegalStateException: call is closed and the resource leak issues (AssertionError: Resources could not be released in time).
Often there is no cause, but connect(), channel credentials, and call credentials failures on the control plane RPC can include a useful causal exception. This was triggered by seeing an error like below, but it didn't include the cause, which would have included HTTP error information from the failure fetching the credential. ``` UNAVAILABLE: Error retrieving LDS resource xdstp://traffic-director-c2p.xds.googleapis.com/envoy.config.listener.v3.Listener/bigtable.googleapis.com: UNAUTHENTICATED: Failed computing credential metadata nodeID: C2P-798500073 ```
Metadata was accidentally being retained after the start of the call. That can be an overwhelming percentage of memory for an idle RPC; don't do that. The other changes are considerably smaller, but I happened to notice them and the changes are straight-forward without magic numbers (e.g., there's many arrays that could be tuned). The regular interop server uses 4600 bytes per full duplex stream while idle, but much of that is Census recorded events hanging around. Keeping the Census integration but removing the Census impl (so a noop is used) drops that to 3000 bytes. This change brings that down to ~2450 bytes (which is still including stuff from TestServiceImpl). But there's very little Metadata in the interop tests, so absolute real-life savings would be much higher (but relative real-life savings may be lower, because the application will often have more state). The measurements were captured using a modified timeout_on_sleeping_server client that had 100,000 concurrent full duplex calls on one connection.
This does reduce the largest supported integer from just less than 2^32 to slightly more than 2^29, which does not seem a significant loss. It would previously produce a corrupted integer, which makes debugging annoying. Note that continuations can contain just zeros and should still be detected as resulting in overflow, without waiting for any eventual 1. We could leave the encoder supporting up to 2^32-1, but it just seems wrong to encode values that the same implementation couldn't decode. Noticed by @August829
Align DelayedClientCall.DelayedListener with ClientCallImpl's existing behavior for listener exceptions. When the application listener throws from onHeaders/onMessage/onReady, catch the Throwable, cancel the call with CANCELLED (cause = the throwable), and swallow subsequent callbacks. When onClose throws, log and continue, matching ClientCallImpl.closeObserver. If onClose arrives from the transport after a prior callback threw, override its status/trailers with the captured CANCELLED so a server-supplied OK can't mask the local failure. Previously, a throw from the application listener escaped to the callExecutor's uncaught-exception handler. The real call was not cancelled and the transport kept delivering callbacks to an already broken listener, different from how the same bug behaves on a normal ClientCallImpl, and a timing-dependent inconsistency depending on whether callbacks arrived before or after setCall + drain completed. Trade-off: listener-callback throws are no longer visible to the executor's UncaughtExceptionHandler (they're attached as Status.cause instead). This matches ClientCallImpl and is the intended behavior. Exception handling for the outer drainPendingCalls loop (realCall.sendMessage/request/halfClose/cancel) remains unaddressed; that TODO is preserved. **Note:** This change only handles exceptions thrown by the application listener. I don't try and solve the problems that grpc#12737 is attempting to fix. My motivation is to fix the root cause behind bazelbuild/bazel#29316 --------- Co-authored-by: Kannan J <kannanjgithub@google.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This was already being done for servlet, so it was just copied to jakarta.
This commit introduces a library for handling header mutations as specified by the xDS protocol. This library provides the core functionality for modifying request and response headers based on a set of rules. The main components of this library are: - `HeaderMutator`: Applies header mutations to `Metadata` objects. - `HeaderMutationFilter`: Filters header mutations based on a set of configurable rules, such as disallowing mutations of system headers. - `HeaderMutations`: A value class that represents the set of mutations to be applied to request and response headers. - `HeaderMutationDisallowedException`: An exception that is thrown when a disallowed header mutation is attempted. This commit also includes comprehensive unit tests for the new library.
ExternalProcessorFilter.java
- Trailers-Only Detection: Added state tracking to ExtProcListener to identify when a gRPC "trailers-only" response occurs (i.e., when onClose is called without a preceding onHeaders).
- Protocol Compliance: Updated the state machine to send a RESPONSE_HEADERS message to the sidecar with the end_of_stream flag set for trailers-only responses. This satisfies the requirement that headers must be the first message in any response phase.
- Handshake Handling: Modified onNext to correctly apply sidecar mutations to gRPC trailers and terminate the interaction when a trailers-only handshake is completed.
- Robustness: Added null checks in header mutation logic to prevent NullPointerException during edge-case state transitions.
ExternalProcessorFilterTest.java
- Forward Compatibility: Updated the createBaseProto helper to default to SKIP mode for response headers and trailers. This ensures that the 60+ existing tests (which primarily focus on the request phase) continue to pass without being blocked by the new response handshake.
- Streaming Robustness: Refactored Category 11 (Client/Bidi Streaming) mock sidecars to handle and acknowledge the full sequence of protocol phases (including the newly added response phases).
- Category 15 (New): Added a dedicated test case givenTrailersOnly_whenResponseReceived_thenResponseHeadersSentWithEos which validates that:
1. Trailers are correctly sent to the sidecar as headers when the data plane server returns an immediate error.
2. The sidecar receives the end_of_stream signal.
3. Mutated trailers from the sidecar are correctly applied to the final RPC state.
Implemented a FIFO queue in ExternalProcessorFilter to ensure that responses from the external processor server arrive in the same order as the events sent by the filter, as required by gRFC A93. Added unit tests to verify that out-of-order responses correctly trigger a protocol error and fail the stream.
- Added rejection of CONTINUE_AND_REPLACE status in HeadersResponse for both request and response headers, treating it as a stream failure. - Fixed potential hangs by ensuring proceedWithClose() is called upon stream failure, especially in fail-open scenarios. - Added explicit sidecar notification via requestStream.onError() upon detecting protocol errors to ensure robust stream termination. - Added new unit test categories 17 in ExternalProcessorFilterTest to verify status enforcement.
Updated ExternalProcessorFilter to include the `protocol_config` field in the very first `ProcessingRequest` sent to the sidecar (RequestHeaders). The configuration includes the `request_body_mode` and `response_body_mode` derived from the filter's processing mode, as required by gRFC A93. Added a unit test in Category 4 to verify that `protocol_config` is correctly populated on the first message and omitted from all subsequent messages on the stream. Add tests for trailers HeaderSendMode default to SKIP for DEFAULT. nit: Renumbered out of order test categories.
…ment rather than a granular merge of its fields.
# Conflicts: # xds/src/test/java/io/grpc/xds/internal/headermutations/HeaderMutationsTest.java
# Conflicts: # xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Fix style and warning as errors.
| static final String TYPE_URL = | ||
| "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor"; | ||
|
|
||
| final String filterInstanceName; |
There was a problem hiding this comment.
Do we need filterInstanceName string? Maybe we just remove this?
| if (perRoute.hasOverrides()) { | ||
| return ExternalProcessorFilterConfig.create(perRoute.getOverrides(), context); | ||
| } | ||
| return ConfigOrError.fromError("ExtProcPerRoute must have overrides"); |
There was a problem hiding this comment.
https://github.com/grpc/proposal/pull/484/changes#diff-5b8fb70550f5331ffbe7083143468b63ee8f27a4b6a464fe3f6713f9711b0addR435-R451 documents overrides as an optional field.
I don't think we are supposed to nack the resource on empty overrides. Maybe we need a notion of empty FilterConfig.
| config = over; | ||
| } | ||
| } | ||
| checkNotNull(config, "config"); |
There was a problem hiding this comment.
Why do we need this?
If my understanding of the grfc is correct a successfully parsed regular config and a successfullly parsed override config should for all practical purposes be infallible and we need not handle errors.
That aside, what's the expected behavior here. I couldn't conclude it from the , I assume checkNotNull will crash the thread? Should we be returning null (which skips this interceptor) or FailingInterceptor(which always fails all calls) or just crash?
|
|
||
| ConfigOrError<ExternalProcessorFilterConfig> merged = | ||
| ExternalProcessorFilterConfig.create(mergedProtoBuilder.build(), parent.getFilterContext()); | ||
| checkNotNull(merged, "merged"); |
There was a problem hiding this comment.
Same as above , I am not entirely sure why we would fail given that validated configs should always be mergeable? Is this just defensive programming which we don't expect to happen?
| return merged.config; | ||
| } | ||
|
|
||
| static final class ExternalProcessorFilterConfig implements FilterConfig { |
There was a problem hiding this comment.
Okay, this answers some initial questions that I had.
So, we've merged the override config and the regular config into the same object. This seems to have led to various null or error checks, because we store the original proto , then merge the proto with override and then recreate the object from merged proto which may be a fallible operations.
I briefly tried to skim at the other Filters and it seems to be the first override type whose message is different from the base types message.
So, here's my recommendation ,
- we need two FilterConfigs , one for base and one for override each having their own distinct structure.
- Ideally, the FilterConfig should be independent of proto or at least not directly contain the entire ExternalProcesor or ExternalProcessorOverride proto.
- Create a merge function that doesn't reparse protos and merges them together. Maybe a builder pattern with autovalue might be useful here
| private final Optional<HeaderForwardingRulesConfig> forwardRulesConfig; | ||
| private final ImmutableList<String> requestAttributes; | ||
| private final boolean disableImmediateResponse; | ||
| private final long deferredCloseTimeoutNanos; |
There was a problem hiding this comment.
nit: I am not sure if that's intentional , but there's a small bit of inconcistency, I'd expect the failuremode.. and observability.. fields to be present here as well like the others. I am assuming we plan to access them directly from externalProcessor proto?
|
|
||
| static final class ExternalProcessorFilterConfig implements FilterConfig { | ||
|
|
||
| private final ExternalProcessor externalProcessor; |
There was a problem hiding this comment.
nit: maybe we need nullable annotations here? or is it acceptable from just the constructor callsite?
| private final ImmutableList<String> requestAttributes; | ||
| private final boolean disableImmediateResponse; | ||
| private final long deferredCloseTimeoutNanos; | ||
| private final FilterContext filterContext; |
There was a problem hiding this comment.
Why do we need it as a data member? It's only used in creation of GrpcServiceConfig and if we already have that we likely won't need to use this anymore.
| private final ImmutableList<Matchers.StringMatcher> disallowedHeaders; | ||
|
|
||
| HeaderForwardingRulesConfig( | ||
| @Nullable ImmutableList<Matchers.StringMatcher> allowedHeaders, |
There was a problem hiding this comment.
go/java-practices/null#collection - Nullable collections are usually not preferable.
https://github.com/envoyproxy/envoy/blob/08f7c15da7f2e4077404829402bd8465536864f2/api/envoy/type/matcher/string.proto#L78-L80 seems to indicate that the number of elements >=1 , so essentially I don't think we need to treat null and empty containers differently.
| return new HeaderForwardingRulesConfig(allowedHeaders, disallowedHeaders); | ||
| } | ||
|
|
||
| boolean isAllowed(String headerName) { |
There was a problem hiding this comment.
Can we simplify this somehow by preferring allowed? When combined with the nullability removal , could this probably become like the following which greatly reduces cyclomatic omplexity.
allowed(){
for(disallow: diss..headers){
if (matches) return false;
}
if(allowed.empty()) return true;
for (allow: allwe..headers){
if (matches) return true;
}
return false;
}
|
Reviewed approximately ~500 LOC of source(not tests), which currently mostly only covers the config part of things. Sending comments incrementally to kick start progress, since it seems that I'll need probably about 3 days to review all of source. |
Implements ext_proc filter from A93 (internal design doc)
Metrics aren't implemented yet.
Includes commits from unmerged Filter API enhancements and channel caching PRs.
Only the ExternaProcessingFilter.java, ExternaProcessingFilterTest.java and the envoy xds proto import and generated code need to be reviewed.
Rebasing commit history caused all received and merged commits to show my name as the committer, ignore all commits for which I'm not shown as the author.