[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863
[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863scwhittle wants to merge 3 commits into
Conversation
…undle isolation. This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses to isolate bundle processing from each other. Instead of single data stream from the sdk, the sdk will create a data stream for each name. The benefit of doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream. With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for named data streams in the FnApi, enabling runners to isolate bundle processing by utilizing separate gRPC streams. This architecture prevents slow bundles from causing head-of-line blocking on shared streams by providing dedicated flow control. The changes include protocol updates, a refactoring of the outbound data aggregator to handle instruction lifecycles, and improved documentation for stream metadata. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for named data streams in the Apache Beam Fn API, allowing the runner to request specific data stream IDs during bundle processing. It refactors BeamFnDataOutboundAggregator to be reusable across multiple bundles using explicit instruction lifecycle methods, and updates BeamFnDataGrpcClient to multiplex connections based on both the API service descriptor and the data stream ID. The review feedback identifies several critical issues: a potential NullPointerException in the background flushing thread of BeamFnDataOutboundAggregator if a flush occurs after an instruction finishes, the need to reset bytesWrittenSinceFlush at the end of each bundle, and the importance of normalizing dataStreamId in MultiplexerKey to prevent duplicate multiplexers. Additionally, minor documentation copy-paste errors in beam_fn_api.proto and grammatical issues in FnHarness.java should be addressed.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| Elements.Builder elements = convertBufferForTransmission(); | ||
| if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) { | ||
| outboundObserver.onNext(elements.build()); | ||
| checkNotNull(outboundObserver).onNext(elements.build()); |
There was a problem hiding this comment.
Since outboundObserver is now mutable and set to null when an instruction finishes, the background flushing thread can encounter a null observer if a flush occurs after finishInstruction() is called. Instead of throwing a NullPointerException via checkNotNull, we should defensively check if the observer is non-null before calling onNext.
StreamObserver<Elements> observer = outboundObserver;
if (observer != null) {
observer.onNext(elements.build());
}| // This is the end of the bundle so we reset state to prepare for future bundles. | ||
| if (collectElementsIfNoFlushes && !hasFlushedForBundle) { | ||
| return bufferedElements.build(); | ||
| } | ||
| outboundObserver.onNext(bufferedElements.build()); | ||
| // This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new | ||
| // bundles. | ||
| checkNotNull(outboundObserver).onNext(bufferedElements.build()); | ||
| hasFlushedForBundle = false; | ||
| return null; |
There was a problem hiding this comment.
Since BeamFnDataOutboundAggregator is now reused across multiple bundles, we must reset bytesWrittenSinceFlush to 0 at the end of each bundle. Otherwise, bytesWrittenSinceFlush will remain non-zero, causing the background flushing thread to attempt a flush after the instruction has finished, which leads to NullPointerException or other unexpected behaviors.
// This is the end of the bundle so we reset state to prepare for future bundles.
bytesWrittenSinceFlush = 0L;
if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
return bufferedElements.build();
}
checkNotNull(outboundObserver).onNext(bufferedElements.build());
hasFlushedForBundle = false;
return null;| private MultiplexerKey( | ||
| Endpoints.ApiServiceDescriptor apiServiceDescriptor, String dataStreamId) { | ||
| this.apiServiceDescriptor = apiServiceDescriptor; | ||
| this.dataStreamId = dataStreamId; | ||
| } |
There was a problem hiding this comment.
To prevent creating duplicate BeamFnDataGrpcMultiplexer instances and separate gRPC connections for null and empty ("") stream IDs, we should normalize dataStreamId to a consistent value (e.g., empty string) in the MultiplexerKey constructor.
| private MultiplexerKey( | |
| Endpoints.ApiServiceDescriptor apiServiceDescriptor, String dataStreamId) { | |
| this.apiServiceDescriptor = apiServiceDescriptor; | |
| this.dataStreamId = dataStreamId; | |
| } | |
| private MultiplexerKey( | |
| Endpoints.ApiServiceDescriptor apiServiceDescriptor, @Nullable String dataStreamId) { | |
| this.apiServiceDescriptor = apiServiceDescriptor; | |
| this.dataStreamId = dataStreamId == null ? "" : dataStreamId; | |
| } |
| Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = | ||
| getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR)); | ||
| getApiServiceDescriptor( | ||
| checkNotNull( | ||
| environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR), | ||
| "LOGGING_API_SERVICE_DESCRIPTOR env var be set.")); | ||
| Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = | ||
| getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR)); | ||
| getApiServiceDescriptor( | ||
| checkNotNull( | ||
| environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR), | ||
| "CONTROL_API_SERVICE_DESCRIPTOR env var be set.")); | ||
|
|
||
| @Nullable String envVar = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR); | ||
| Endpoints.ApiServiceDescriptor statusApiServiceDescriptor = | ||
| environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null | ||
| ? null | ||
| : getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR)); | ||
| String id = environmentVarGetter.apply(HARNESS_ID); | ||
| (envVar == null) ? null : getApiServiceDescriptor(envVar); | ||
| String id = checkNotNull(environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var be set."); |
There was a problem hiding this comment.
The error messages in checkNotNull are missing the word "must" (e.g., "LOGGING_API_SERVICE_DESCRIPTOR env var be set."). They should be updated to "... env var must be set." for grammatical correctness and clarity.
| Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = | |
| getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR)); | |
| getApiServiceDescriptor( | |
| checkNotNull( | |
| environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR), | |
| "LOGGING_API_SERVICE_DESCRIPTOR env var be set.")); | |
| Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = | |
| getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR)); | |
| getApiServiceDescriptor( | |
| checkNotNull( | |
| environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR), | |
| "CONTROL_API_SERVICE_DESCRIPTOR env var be set.")); | |
| @Nullable String envVar = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR); | |
| Endpoints.ApiServiceDescriptor statusApiServiceDescriptor = | |
| environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null | |
| ? null | |
| : getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR)); | |
| String id = environmentVarGetter.apply(HARNESS_ID); | |
| (envVar == null) ? null : getApiServiceDescriptor(envVar); | |
| String id = checkNotNull(environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var be set."); | |
| Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = | |
| getApiServiceDescriptor( | |
| checkNotNull( | |
| environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR), | |
| "LOGGING_API_SERVICE_DESCRIPTOR env var must be set.")); | |
| Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = | |
| getApiServiceDescriptor( | |
| checkNotNull( | |
| environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR), | |
| "CONTROL_API_SERVICE_DESCRIPTOR env var must be set.")); | |
| @Nullable String envVar = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR); | |
| Endpoints.ApiServiceDescriptor statusApiServiceDescriptor = | |
| (envVar == null) ? null : getApiServiceDescriptor(envVar); | |
| String id = | |
| checkNotNull( | |
| environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var must be set."); |
| // Used to get/append/clear state stored by the runner on behalf of the SDK. | ||
| // | ||
| // Header metadata has the specified keys pairs: | ||
| // - "worker_id": the id of the sdk |
There was a problem hiding this comment.
The comment block contains copy-pasted text from the BeamFnState service ("Used to get/append/clear state...") which is incorrect for BeamFnLogging. Additionally, there is a typo "specified keys pairs" which should be "specified key-value pairs".
| // Used to get/append/clear state stored by the runner on behalf of the SDK. | |
| // | |
| // Header metadata has the specified keys pairs: | |
| // - "worker_id": the id of the sdk | |
| // Header metadata has the specified key-value pairs: | |
| // - "worker_id": the id of the sdk |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses to isolate bundle processing from each other. Instead of single data stream from the sdk, the sdk will create a data stream for each name. The benefit of doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream. With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.
While making changes:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.