Skip to content

[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863

Open
scwhittle wants to merge 3 commits into
apache:masterfrom
scwhittle:named_streams
Open

[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863
scwhittle wants to merge 3 commits into
apache:masterfrom
scwhittle:named_streams

Conversation

@scwhittle

@scwhittle scwhittle commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses to isolate bundle processing from each other. Instead of single data stream from the sdk, the sdk will create a data stream for each name. The benefit of doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream. With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.

While making changes:

  • remove some nullness suppressions
  • document the existing metadata headers used on fnapi streams

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@scwhittle scwhittle changed the title [FnApi] Add support for a new name field to ApiServiceDescriptor under StandardProtocols.API_SERVICE_DESCRIPTOR_NAME [FnApi] Add support for a sending header metadata when connecting to ApiServiceDescriptor under StandardProtocols.API_SERVICE_DESCRIPTOR_HEADER_METADATA Jun 9, 2026
@scwhittle scwhittle changed the title [FnApi] Add support for a sending header metadata when connecting to ApiServiceDescriptor under StandardProtocols.API_SERVICE_DESCRIPTOR_HEADER_METADATA [FnApi Java] Add support for separate named data streams to provide bundle isolation Jun 12, 2026
…undle isolation.

This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses
to isolate bundle processing from each other. Instead of single data stream
from the sdk, the sdk will create a data stream for each name.  The benefit of
doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream.  With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.
@scwhittle scwhittle marked this pull request as ready for review June 12, 2026 13:37
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces support for named data streams in the FnApi, enabling runners to isolate bundle processing by utilizing separate gRPC streams. This architecture prevents slow bundles from causing head-of-line blocking on shared streams by providing dedicated flow control. The changes include protocol updates, a refactoring of the outbound data aggregator to handle instruction lifecycles, and improved documentation for stream metadata.

Highlights

  • Protocol Changes: Added the NAMED_DATA_STREAMS protocol capability and a data_stream_id field to the FnApi to support bundle isolation.
  • API Refactoring: Refactored BeamFnDataOutboundAggregator to explicitly manage instruction lifecycles using prepareForInstruction and finishInstruction methods.
  • Code Cleanup and Documentation: Removed several nullness suppressions and added comprehensive documentation for metadata headers used in FnApi streams.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for named data streams in the Apache Beam Fn API, allowing the runner to request specific data stream IDs during bundle processing. It refactors BeamFnDataOutboundAggregator to be reusable across multiple bundles using explicit instruction lifecycle methods, and updates BeamFnDataGrpcClient to multiplex connections based on both the API service descriptor and the data stream ID. The review feedback identifies several critical issues: a potential NullPointerException in the background flushing thread of BeamFnDataOutboundAggregator if a flush occurs after an instruction finishes, the need to reset bytesWrittenSinceFlush at the end of each bundle, and the importance of normalizing dataStreamId in MultiplexerKey to prevent duplicate multiplexers. Additionally, minor documentation copy-paste errors in beam_fn_api.proto and grammatical issues in FnHarness.java should be addressed.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Elements.Builder elements = convertBufferForTransmission();
if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
outboundObserver.onNext(elements.build());
checkNotNull(outboundObserver).onNext(elements.build());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since outboundObserver is now mutable and set to null when an instruction finishes, the background flushing thread can encounter a null observer if a flush occurs after finishInstruction() is called. Instead of throwing a NullPointerException via checkNotNull, we should defensively check if the observer is non-null before calling onNext.

      StreamObserver<Elements> observer = outboundObserver;
      if (observer != null) {
        observer.onNext(elements.build());
      }

Comment on lines +242 to 248
// This is the end of the bundle so we reset state to prepare for future bundles.
if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
return bufferedElements.build();
}
outboundObserver.onNext(bufferedElements.build());
// This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new
// bundles.
checkNotNull(outboundObserver).onNext(bufferedElements.build());
hasFlushedForBundle = false;
return null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since BeamFnDataOutboundAggregator is now reused across multiple bundles, we must reset bytesWrittenSinceFlush to 0 at the end of each bundle. Otherwise, bytesWrittenSinceFlush will remain non-zero, causing the background flushing thread to attempt a flush after the instruction has finished, which leads to NullPointerException or other unexpected behaviors.

    // This is the end of the bundle so we reset state to prepare for future bundles.
    bytesWrittenSinceFlush = 0L;
    if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
      return bufferedElements.build();
    }
    checkNotNull(outboundObserver).onNext(bufferedElements.build());
    hasFlushedForBundle = false;
    return null;

Comment on lines +53 to +57
private MultiplexerKey(
Endpoints.ApiServiceDescriptor apiServiceDescriptor, String dataStreamId) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.dataStreamId = dataStreamId;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent creating duplicate BeamFnDataGrpcMultiplexer instances and separate gRPC connections for null and empty ("") stream IDs, we should normalize dataStreamId to a consistent value (e.g., empty string) in the MultiplexerKey constructor.

Suggested change
private MultiplexerKey(
Endpoints.ApiServiceDescriptor apiServiceDescriptor, String dataStreamId) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.dataStreamId = dataStreamId;
}
private MultiplexerKey(
Endpoints.ApiServiceDescriptor apiServiceDescriptor, @Nullable String dataStreamId) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.dataStreamId = dataStreamId == null ? "" : dataStreamId;
}

Comment on lines 149 to +163
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
"LOGGING_API_SERVICE_DESCRIPTOR env var be set."));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
"CONTROL_API_SERVICE_DESCRIPTOR env var be set."));

@Nullable String envVar = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR);
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
? null
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
String id = environmentVarGetter.apply(HARNESS_ID);
(envVar == null) ? null : getApiServiceDescriptor(envVar);
String id = checkNotNull(environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var be set.");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error messages in checkNotNull are missing the word "must" (e.g., "LOGGING_API_SERVICE_DESCRIPTOR env var be set."). They should be updated to "... env var must be set." for grammatical correctness and clarity.

Suggested change
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
"LOGGING_API_SERVICE_DESCRIPTOR env var be set."));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
"CONTROL_API_SERVICE_DESCRIPTOR env var be set."));
@Nullable String envVar = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR);
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
? null
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
String id = environmentVarGetter.apply(HARNESS_ID);
(envVar == null) ? null : getApiServiceDescriptor(envVar);
String id = checkNotNull(environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var be set.");
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
"LOGGING_API_SERVICE_DESCRIPTOR env var must be set."));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
"CONTROL_API_SERVICE_DESCRIPTOR env var must be set."));
@Nullable String envVar = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR);
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
(envVar == null) ? null : getApiServiceDescriptor(envVar);
String id =
checkNotNull(
environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var must be set.");

Comment on lines +1320 to +1323
// Used to get/append/clear state stored by the runner on behalf of the SDK.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment block contains copy-pasted text from the BeamFnState service ("Used to get/append/clear state...") which is incorrect for BeamFnLogging. Additionally, there is a typo "specified keys pairs" which should be "specified key-value pairs".

Suggested change
// Used to get/append/clear state stored by the runner on behalf of the SDK.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
// Header metadata has the specified key-value pairs:
// - "worker_id": the id of the sdk

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant