Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ message RemoteGrpcPort {
service BeamFnControl {
// Instructions sent by the runner to the SDK requesting different types
// of work.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc Control(
// A stream of responses to instructions the SDK was asked to be
// performed.
Expand All @@ -130,6 +133,9 @@ service BeamFnControl {

// Used to get the full process bundle descriptors for bundles one
// is asked to process.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc GetProcessBundleDescriptor(GetProcessBundleDescriptorRequest) returns (
ProcessBundleDescriptor) {}
}
Expand Down Expand Up @@ -416,14 +422,21 @@ message ProcessBundleRequest {
// at https://s.apache.org/beam-fn-api-control-data-embedding.
Elements elements = 3;

// indicates that the runner has no stare for the keys in this bundle
// Indicates that the runner has no state for the keys in this bundle
// so SDk can safely begin stateful processing with a locally-generated
// initial empty state
// initial empty state.
bool has_no_state = 4;

// indicates that the runner will never process another bundle for the keys
// Indicates that the runner will never process another bundle for the keys
// in this bundle so state need not be included in the bundle commit.
bool only_bundle_for_keys = 5;

// (Optional) If non-empty, the ID of the data stream to use for this bundle.
// See comments at BeamFnData.Data for more details.
//
// The runner should only populate this field if the sdk advertises the
// beam:protocol:named_data_streams:v1 capability.
string data_stream_id = 6;
}

message ProcessBundleResponse {
Expand Down Expand Up @@ -835,6 +848,11 @@ message Elements {
// Stable
service BeamFnData {
// Used to send data between harnesses.
//
// Header metadata has the specified keys pairs:
// - "worker_id": value is the id of the sdk
// - "data_stream_id": value is the id of the data stream, distinguishing it from other data streams from the same
// sdk. This field should only be populated if requested in a received ProcessBundleRequest from the runner.
rpc Data(
// A stream of data representing input.
stream Elements)
Expand Down Expand Up @@ -900,6 +918,9 @@ message StateResponse {

service BeamFnState {
// Used to get/append/clear state stored by the runner on behalf of the SDK.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc State(
// A stream of state instructions requested of the runner.
stream StateRequest)
Expand Down Expand Up @@ -1295,6 +1316,11 @@ message LogControl {}
service BeamFnLogging {
// Allows for the SDK to emit log entries which the runner can
// associate with the active job.
//
// Used to get/append/clear state stored by the runner on behalf of the SDK.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
Comment on lines +1320 to +1323

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment block contains copy-pasted text from the BeamFnState service ("Used to get/append/clear state...") which is incorrect for BeamFnLogging. Additionally, there is a typo "specified keys pairs" which should be "specified key-value pairs".

Suggested change
// Used to get/append/clear state stored by the runner on behalf of the SDK.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
// Header metadata has the specified key-value pairs:
// - "worker_id": the id of the sdk

rpc Logging(
// A stream of log entries batched into lists emitted by the SDK harness.
stream LogEntry.List)
Expand Down Expand Up @@ -1356,6 +1382,8 @@ message WorkerStatusResponse {

// API for SDKs to report debug-related statuses to runner during pipeline execution.
service BeamFnWorkerStatus {
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc WorkerStatus (stream WorkerStatusResponse)
returns (stream WorkerStatusRequest) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,10 @@ message StandardProtocols {
// Indicates whether the SDK supports multimap state.
MULTIMAP_STATE = 12
[(beam_urn) = "beam:protocol:multimap_state:v1"];

// Indicates whether the SDK supports data stream ids being requested by the runner in
// ProcessBundleRequests.
NAMED_DATA_STREAMS = 13 [(beam_urn) = "beam:protocol:named_data_streams:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public ActiveBundle newBundle(
ImmutableMap.Builder<LogicalEndpoint, FnDataReceiver<?>> receiverBuilder =
ImmutableMap.builder();
BeamFnDataOutboundAggregator beamFnDataOutboundAggregator =
fnApiDataService.createOutboundAggregator(() -> bundleId, false);
fnApiDataService.createOutboundAggregator(bundleId, false);
for (RemoteInputDestination remoteInput : remoteInputs) {
LogicalEndpoint endpoint = LogicalEndpoint.data(bundleId, remoteInput.getPTransformId());
receiverBuilder.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.fnexecution.data;

import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
Expand Down Expand Up @@ -69,5 +68,5 @@ public interface FnDataService {
* <p>The returned aggregator is not thread safe.
*/
BeamFnDataOutboundAggregator createOutboundAggregator(
Supplier<String> processBundleRequestIdSupplier, boolean collectElementsIfNoFlushes);
String processBundleId, boolean collectElementsIfNoFlushes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
Expand Down Expand Up @@ -175,13 +174,13 @@ public void unregisterReceiver(String instructionId) {

@Override
public BeamFnDataOutboundAggregator createOutboundAggregator(
Supplier<String> processBundleRequestIdSupplier, boolean collectElementsIfNoFlushes) {
String instructionId, boolean collectElementsIfNoFlushes) {
try {
return new BeamFnDataOutboundAggregator(
options,
processBundleRequestIdSupplier,
connectedClient.get(3, TimeUnit.MINUTES).getOutboundObserver(),
collectElementsIfNoFlushes);
BeamFnDataOutboundAggregator aggregator =
new BeamFnDataOutboundAggregator(options, collectElementsIfNoFlushes);
aggregator.prepareForInstruction(
instructionId, connectedClient.get(3, TimeUnit.MINUTES).getOutboundObserver());
return aggregator;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testMessageReceivedBySingleClientWhenThereAreMultipleClients() throw
for (int i = 0; i < 3; ++i) {
final String instructionId = Integer.toString(i);
BeamFnDataOutboundAggregator aggregator =
service.createOutboundAggregator(() -> instructionId, false);
service.createOutboundAggregator(instructionId, false);
aggregator.start();
FnDataReceiver<WindowedValue<String>> consumer =
aggregator.registerOutputDataLocation(TRANSFORM_ID, CODER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
private final Cache</*instructionId=*/ String, /*unused=*/ Boolean> poisonedInstructionIds;

private static class PoisonedException extends RuntimeException {
public PoisonedException() {
private PoisonedException() {
super("Instruction poisoned");
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.fn.data;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -28,7 +31,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
Expand Down Expand Up @@ -56,7 +58,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
// The calling thread that invokes sendBufferedDataAndFinishOutboundStreams synchronizes on
// The calling thread that invokes sendOrCollectBufferedDataAndFinishOutboundStreams synchronizes on
// flushLock effectively making the periodic flushing no longer read or mutate hasFlushedForBundle
// and allowing the calling thread to read and mutate hasFlushedForBundle safely without needing to
// create another memory barrier. Also note that flush is always invoked when synchronizing on
Expand All @@ -72,33 +74,56 @@ public class BeamFnDataOutboundAggregator {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
private final int sizeLimit;
private final long timeLimit;
private final Supplier<String> processBundleRequestIdSupplier;
private String instructionId;
@VisibleForTesting final Map<String, Receiver<?>> outputDataReceivers;
@VisibleForTesting final Map<TimerEndpoint, Receiver<?>> outputTimersReceivers;
private final StreamObserver<Elements> outboundObserver;
@Nullable private StreamObserver<Elements> outboundObserver;
@Nullable @VisibleForTesting ScheduledFuture<?> flushFuture;
private long bytesWrittenSinceFlush;
private final Object flushLock;
private final boolean collectElementsIfNoFlushes;
private boolean hasFlushedForBundle;

public BeamFnDataOutboundAggregator(
PipelineOptions options,
Supplier<String> processBundleRequestIdSupplier,
StreamObserver<Elements> outboundObserver,
boolean collectElementsIfNoFlushes) {
public BeamFnDataOutboundAggregator(PipelineOptions options, boolean collectElementsIfNoFlushes) {
this.sizeLimit = getSizeLimit(options);
this.timeLimit = getTimeLimit(options);
this.collectElementsIfNoFlushes = collectElementsIfNoFlushes;
this.outputDataReceivers = new HashMap<>();
this.outputTimersReceivers = new HashMap<>();
this.outboundObserver = outboundObserver;
this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;
this.bytesWrittenSinceFlush = 0L;
this.flushLock = new Object();
this.hasFlushedForBundle = false;
}

public void prepareForInstruction(
String instructionId, StreamObserver<Elements> outboundObserver) {
if (timeLimit > 0) {
synchronized (flushLock) {
checkState(this.instructionId == null && this.outboundObserver == null);
this.instructionId = instructionId;
this.outboundObserver = outboundObserver;
}
} else {
checkState(this.instructionId == null && this.outboundObserver == null);
this.instructionId = instructionId;
this.outboundObserver = outboundObserver;
}
}

public void finishInstruction() {
if (timeLimit > 0) {
synchronized (flushLock) {
checkState(this.instructionId != null && this.outboundObserver != null);
this.instructionId = null;
this.outboundObserver = null;
}
} else {
checkState(this.instructionId != null && this.outboundObserver != null);
this.instructionId = null;
this.outboundObserver = null;
}
}

/** Starts the flushing daemon thread if data_buffer_time_limit_ms is set. */
public void start() {
if (timeLimit > 0 && this.flushFuture == null) {
Expand Down Expand Up @@ -166,7 +191,7 @@ private void flushInternal() {
}
Elements.Builder elements = convertBufferForTransmission();
if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
outboundObserver.onNext(elements.build());
checkNotNull(outboundObserver).onNext(elements.build());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since outboundObserver is now mutable and set to null when an instruction finishes, the background flushing thread can encounter a null observer if a flush occurs after finishInstruction() is called. Instead of throwing a NullPointerException via checkNotNull, we should defensively check if the observer is non-null before calling onNext.

      StreamObserver<Elements> observer = outboundObserver;
      if (observer != null) {
        observer.onNext(elements.build());
      }

}
hasFlushedForBundle = true;
}
Expand All @@ -177,6 +202,7 @@ private void flushInternal() {
* collectElementsIfNoFlushes=true, and there was no previous flush in this bundle, otherwise
* returns null.
*/
@Nullable
public Elements sendOrCollectBufferedDataAndFinishOutboundStreams() {
if (outputTimersReceivers.isEmpty() && outputDataReceivers.isEmpty()) {
return null;
Expand All @@ -191,14 +217,14 @@ public Elements sendOrCollectBufferedDataAndFinishOutboundStreams() {
}
LOG.debug(
"Closing streams for instruction {} and outbound data {} and timers {}.",
processBundleRequestIdSupplier.get(),
instructionId,
outputDataReceivers,
outputTimersReceivers);
for (Map.Entry<String, Receiver<?>> entry : outputDataReceivers.entrySet()) {
String pTransformId = entry.getKey();
bufferedElements
.addDataBuilder()
.setInstructionId(processBundleRequestIdSupplier.get())
.setInstructionId(instructionId)
.setTransformId(pTransformId)
.setIsLast(true);
entry.getValue().resetStats();
Expand All @@ -207,30 +233,29 @@ public Elements sendOrCollectBufferedDataAndFinishOutboundStreams() {
TimerEndpoint timerKey = entry.getKey();
bufferedElements
.addTimersBuilder()
.setInstructionId(processBundleRequestIdSupplier.get())
.setInstructionId(instructionId)
.setTransformId(timerKey.pTransformId)
.setTimerFamilyId(timerKey.timerFamilyId)
.setIsLast(true);
entry.getValue().resetStats();
}
// This is the end of the bundle so we reset state to prepare for future bundles.
if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
return bufferedElements.build();
}
outboundObserver.onNext(bufferedElements.build());
// This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new
// bundles.
checkNotNull(outboundObserver).onNext(bufferedElements.build());
hasFlushedForBundle = false;
return null;
Comment on lines +242 to 248

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since BeamFnDataOutboundAggregator is now reused across multiple bundles, we must reset bytesWrittenSinceFlush to 0 at the end of each bundle. Otherwise, bytesWrittenSinceFlush will remain non-zero, causing the background flushing thread to attempt a flush after the instruction has finished, which leads to NullPointerException or other unexpected behaviors.

    // This is the end of the bundle so we reset state to prepare for future bundles.
    bytesWrittenSinceFlush = 0L;
    if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
      return bufferedElements.build();
    }
    checkNotNull(outboundObserver).onNext(bufferedElements.build());
    hasFlushedForBundle = false;
    return null;

}

// Send the elements to the StreamObserver associated with this aggregator.
public void sendElements(Elements elements) {
outboundObserver.onNext(elements);
checkNotNull(outboundObserver).onNext(elements);
}

public void discard() {
if (flushFuture != null) {
flushFuture.cancel(true);
flushFuture.cancel(false);
}
}

Expand All @@ -243,7 +268,7 @@ private Elements.Builder convertBufferForTransmission() {
ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
bufferedElements
.addDataBuilder()
.setInstructionId(processBundleRequestIdSupplier.get())
.setInstructionId(instructionId)
.setTransformId(entry.getKey())
.setData(bytes);
}
Expand All @@ -254,7 +279,7 @@ private Elements.Builder convertBufferForTransmission() {
ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
bufferedElements
.addTimersBuilder()
.setInstructionId(processBundleRequestIdSupplier.get())
.setInstructionId(instructionId)
.setTransformId(entry.getKey().pTransformId)
.setTimerFamilyId(entry.getKey().timerFamilyId)
.setTimers(bytes);
Expand Down Expand Up @@ -353,10 +378,12 @@ public void accept(T input) throws Exception {
}
}

@VisibleForTesting
public long getByteCount() {
return perBundleByteCount;
}

@VisibleForTesting
public long getElementCount() {
return perBundleElementCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ public static Set<String> getJavaCapabilities() {
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.SDK_CONSUMING_RECEIVED_DATA));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.ORDERED_LIST_STATE));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.MULTIMAP_STATE));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.NAMED_DATA_STREAMS));
return capabilities.build();
}

Expand Down
Loading
Loading