-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[FnApi Java] Add support for separate named data streams to provide bundle isolation #38863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,9 @@ | |
| */ | ||
| package org.apache.beam.sdk.fn.data; | ||
|
|
||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; | ||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
|
|
@@ -28,7 +31,6 @@ | |
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Supplier; | ||
| import javax.annotation.Nullable; | ||
| import javax.annotation.concurrent.NotThreadSafe; | ||
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; | ||
|
|
@@ -56,7 +58,7 @@ | |
| @SuppressWarnings({ | ||
| "nullness" // TODO(https://github.com/apache/beam/issues/20497) | ||
| }) | ||
| // The calling thread that invokes sendBufferedDataAndFinishOutboundStreams synchronizes on | ||
| // The calling thread that invokes sendOrCollectBufferedDataAndFinishOutboundStreams synchronizes on | ||
| // flushLock effectively making the periodic flushing no longer read or mutate hasFlushedForBundle | ||
| // and allowing the calling thread to read and mutate hasFlushedForBundle safely without needing to | ||
| // create another memory barrier. Also note that flush is always invoked when synchronizing on | ||
|
|
@@ -72,33 +74,56 @@ public class BeamFnDataOutboundAggregator { | |
| private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class); | ||
| private final int sizeLimit; | ||
| private final long timeLimit; | ||
| private final Supplier<String> processBundleRequestIdSupplier; | ||
| private String instructionId; | ||
| @VisibleForTesting final Map<String, Receiver<?>> outputDataReceivers; | ||
| @VisibleForTesting final Map<TimerEndpoint, Receiver<?>> outputTimersReceivers; | ||
| private final StreamObserver<Elements> outboundObserver; | ||
| @Nullable private StreamObserver<Elements> outboundObserver; | ||
| @Nullable @VisibleForTesting ScheduledFuture<?> flushFuture; | ||
| private long bytesWrittenSinceFlush; | ||
| private final Object flushLock; | ||
| private final boolean collectElementsIfNoFlushes; | ||
| private boolean hasFlushedForBundle; | ||
|
|
||
| public BeamFnDataOutboundAggregator( | ||
| PipelineOptions options, | ||
| Supplier<String> processBundleRequestIdSupplier, | ||
| StreamObserver<Elements> outboundObserver, | ||
| boolean collectElementsIfNoFlushes) { | ||
| public BeamFnDataOutboundAggregator(PipelineOptions options, boolean collectElementsIfNoFlushes) { | ||
| this.sizeLimit = getSizeLimit(options); | ||
| this.timeLimit = getTimeLimit(options); | ||
| this.collectElementsIfNoFlushes = collectElementsIfNoFlushes; | ||
| this.outputDataReceivers = new HashMap<>(); | ||
| this.outputTimersReceivers = new HashMap<>(); | ||
| this.outboundObserver = outboundObserver; | ||
| this.processBundleRequestIdSupplier = processBundleRequestIdSupplier; | ||
| this.bytesWrittenSinceFlush = 0L; | ||
| this.flushLock = new Object(); | ||
| this.hasFlushedForBundle = false; | ||
| } | ||
|
|
||
| public void prepareForInstruction( | ||
| String instructionId, StreamObserver<Elements> outboundObserver) { | ||
| if (timeLimit > 0) { | ||
| synchronized (flushLock) { | ||
| checkState(this.instructionId == null && this.outboundObserver == null); | ||
| this.instructionId = instructionId; | ||
| this.outboundObserver = outboundObserver; | ||
| } | ||
| } else { | ||
| checkState(this.instructionId == null && this.outboundObserver == null); | ||
| this.instructionId = instructionId; | ||
| this.outboundObserver = outboundObserver; | ||
| } | ||
| } | ||
|
|
||
| public void finishInstruction() { | ||
| if (timeLimit > 0) { | ||
| synchronized (flushLock) { | ||
| checkState(this.instructionId != null && this.outboundObserver != null); | ||
| this.instructionId = null; | ||
| this.outboundObserver = null; | ||
| } | ||
| } else { | ||
| checkState(this.instructionId != null && this.outboundObserver != null); | ||
| this.instructionId = null; | ||
| this.outboundObserver = null; | ||
| } | ||
| } | ||
|
|
||
| /** Starts the flushing daemon thread if data_buffer_time_limit_ms is set. */ | ||
| public void start() { | ||
| if (timeLimit > 0 && this.flushFuture == null) { | ||
|
|
@@ -166,7 +191,7 @@ private void flushInternal() { | |
| } | ||
| Elements.Builder elements = convertBufferForTransmission(); | ||
| if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) { | ||
| outboundObserver.onNext(elements.build()); | ||
| checkNotNull(outboundObserver).onNext(elements.build()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since StreamObserver<Elements> observer = outboundObserver;
if (observer != null) {
observer.onNext(elements.build());
} |
||
| } | ||
| hasFlushedForBundle = true; | ||
| } | ||
|
|
@@ -177,6 +202,7 @@ private void flushInternal() { | |
| * collectElementsIfNoFlushes=true, and there was no previous flush in this bundle, otherwise | ||
| * returns null. | ||
| */ | ||
| @Nullable | ||
| public Elements sendOrCollectBufferedDataAndFinishOutboundStreams() { | ||
| if (outputTimersReceivers.isEmpty() && outputDataReceivers.isEmpty()) { | ||
| return null; | ||
|
|
@@ -191,14 +217,14 @@ public Elements sendOrCollectBufferedDataAndFinishOutboundStreams() { | |
| } | ||
| LOG.debug( | ||
| "Closing streams for instruction {} and outbound data {} and timers {}.", | ||
| processBundleRequestIdSupplier.get(), | ||
| instructionId, | ||
| outputDataReceivers, | ||
| outputTimersReceivers); | ||
| for (Map.Entry<String, Receiver<?>> entry : outputDataReceivers.entrySet()) { | ||
| String pTransformId = entry.getKey(); | ||
| bufferedElements | ||
| .addDataBuilder() | ||
| .setInstructionId(processBundleRequestIdSupplier.get()) | ||
| .setInstructionId(instructionId) | ||
| .setTransformId(pTransformId) | ||
| .setIsLast(true); | ||
| entry.getValue().resetStats(); | ||
|
|
@@ -207,30 +233,29 @@ public Elements sendOrCollectBufferedDataAndFinishOutboundStreams() { | |
| TimerEndpoint timerKey = entry.getKey(); | ||
| bufferedElements | ||
| .addTimersBuilder() | ||
| .setInstructionId(processBundleRequestIdSupplier.get()) | ||
| .setInstructionId(instructionId) | ||
| .setTransformId(timerKey.pTransformId) | ||
| .setTimerFamilyId(timerKey.timerFamilyId) | ||
| .setIsLast(true); | ||
| entry.getValue().resetStats(); | ||
| } | ||
| // This is the end of the bundle so we reset state to prepare for future bundles. | ||
| if (collectElementsIfNoFlushes && !hasFlushedForBundle) { | ||
| return bufferedElements.build(); | ||
| } | ||
| outboundObserver.onNext(bufferedElements.build()); | ||
| // This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new | ||
| // bundles. | ||
| checkNotNull(outboundObserver).onNext(bufferedElements.build()); | ||
| hasFlushedForBundle = false; | ||
| return null; | ||
|
Comment on lines
+242
to
248
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since // This is the end of the bundle so we reset state to prepare for future bundles.
bytesWrittenSinceFlush = 0L;
if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
return bufferedElements.build();
}
checkNotNull(outboundObserver).onNext(bufferedElements.build());
hasFlushedForBundle = false;
return null; |
||
| } | ||
|
|
||
| // Send the elements to the StreamObserver associated with this aggregator. | ||
| public void sendElements(Elements elements) { | ||
| outboundObserver.onNext(elements); | ||
| checkNotNull(outboundObserver).onNext(elements); | ||
| } | ||
|
|
||
| public void discard() { | ||
| if (flushFuture != null) { | ||
| flushFuture.cancel(true); | ||
| flushFuture.cancel(false); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -243,7 +268,7 @@ private Elements.Builder convertBufferForTransmission() { | |
| ByteString bytes = entry.getValue().toByteStringAndResetBuffer(); | ||
| bufferedElements | ||
| .addDataBuilder() | ||
| .setInstructionId(processBundleRequestIdSupplier.get()) | ||
| .setInstructionId(instructionId) | ||
| .setTransformId(entry.getKey()) | ||
| .setData(bytes); | ||
| } | ||
|
|
@@ -254,7 +279,7 @@ private Elements.Builder convertBufferForTransmission() { | |
| ByteString bytes = entry.getValue().toByteStringAndResetBuffer(); | ||
| bufferedElements | ||
| .addTimersBuilder() | ||
| .setInstructionId(processBundleRequestIdSupplier.get()) | ||
| .setInstructionId(instructionId) | ||
| .setTransformId(entry.getKey().pTransformId) | ||
| .setTimerFamilyId(entry.getKey().timerFamilyId) | ||
| .setTimers(bytes); | ||
|
|
@@ -353,10 +378,12 @@ public void accept(T input) throws Exception { | |
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public long getByteCount() { | ||
| return perBundleByteCount; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public long getElementCount() { | ||
| return perBundleElementCount; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment block contains copy-pasted text from the
BeamFnStateservice ("Used to get/append/clear state...") which is incorrect forBeamFnLogging. Additionally, there is a typo "specified keys pairs" which should be "specified key-value pairs".