Implement Asynchronous wrapper for DoFn in Java SDK#38609
Implement Asynchronous wrapper for DoFn in Java SDK#38609tejasiyer-dev wants to merge 5 commits into
Conversation
…he Apache Beam Java SDK (apache#38529)
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces an asynchronous wrapper for Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper designed to convert synchronous DoFn operations into asynchronous ones within Apache Beam pipelines. It includes a core implementation using a thread pool and persistent state for buffering, along with a comprehensive test suite. The review identified several critical issues: a performance bottleneck caused by global map scans during timer firing, potential data loss due to ignored timestamps in the output receiver, and logic bugs in the reconciliation loop that could lead to redundant task rescheduling and quadratic complexity. Additionally, the use of a single static lock for all instances may cause thread contention.
| public void output(T output) { | ||
| outputs.add(output); | ||
| } | ||
|
|
||
| @Override | ||
| public void outputWithTimestamp(T output, Instant timestamp) { | ||
| outputs.add(output); | ||
| } |
There was a problem hiding this comment.
The AccumulatingOutputReceiver ignores timestamps, which can lead to data loss or incorrect windowing if the runner's default timestamp (e.g., the timer's firing time) falls outside the element's window. Both output and outputWithTimestamp should preserve the relevant timestamp of the element being processed or the explicitly provided timestamp.
There was a problem hiding this comment.
@gemini-code-assist how would you best fix this issue?
There was a problem hiding this comment.
To address the issue of timestamp loss in AccumulatingOutputReceiver, you need to capture the timestamp provided by the DoFn during execution and propagate it when emitting the results. Since OutputReceiver does not natively support timestamps, you should store the output as a TimestampedValue container.
However, please note that AsyncDoFn is a DoFn wrapper, and in the onTimer method, you only have access to OnTimerContext, which does not provide a ProcessContext to emit with a timestamp. If the downstream DoFn expects specific windowing or timestamp behavior, this limitation might require a more fundamental change to how AsyncDoFn handles output emission (e.g., by using a MultiOutputReceiver or ensuring the runner supports timestamped output in the timer context).
If you are certain the downstream receiver supports it, you can cast the receiver to a type that supports timestamped output, but this is generally not recommended as it breaks the OutputReceiver abstraction. A safer approach is to ensure the syncFn itself handles the windowing/timestamping logic if possible, or to use a different mechanism to emit the results.
| public void output(T output) { | |
| outputs.add(output); | |
| } | |
| @Override | |
| public void outputWithTimestamp(T output, Instant timestamp) { | |
| outputs.add(output); | |
| } | |
| private static class TimestampedValue<T> { | |
| final T value; | |
| final Instant timestamp; | |
| TimestampedValue(T value, Instant timestamp) { | |
| this.value = value; | |
| this.timestamp = timestamp; | |
| } | |
| } | |
| private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { | |
| private final List<TimestampedValue<T>> outputs = Collections.synchronizedList(new ArrayList<>()); | |
| @Override | |
| public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { | |
| return org.apache.beam.sdk.values.WindowedValues.<T>builder() | |
| .setValue(value) | |
| .setTimestamp(Instant.now()) | |
| .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) | |
| .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) | |
| .setReceiver(windowedValue -> outputs.add(new TimestampedValue<>(windowedValue.getValue(), windowedValue.getTimestamp()))); | |
| } | |
| @Override | |
| public void output(T output) { | |
| outputs.add(new TimestampedValue<>(output, Instant.now())); | |
| } | |
| @Override | |
| public void outputWithTimestamp(T output, Instant timestamp) { | |
| outputs.add(new TimestampedValue<>(output, timestamp)); | |
| } | |
| public List<TimestampedValue<T>> getOutputs() { | |
| return outputs; | |
| } | |
| } |
There was a problem hiding this comment.
The issue of timestamp loss in AccumulatingOutputReceiver remains in the current implementation. The output and outputWithTimestamp methods still do not preserve the original element's timestamp, which is critical for correct windowing behavior in Beam. To resolve this, you should store the output along with its timestamp (e.g., using a TimestampedValue container) and emit it with the correct timestamp when the results are reconciled. The previous suggestion provides a robust way to handle this by wrapping the output values.
| public void output(T output) { | |
| outputs.add(output); | |
| } | |
| @Override | |
| public void outputWithTimestamp(T output, Instant timestamp) { | |
| outputs.add(output); | |
| } | |
| private static class TimestampedValue<T> { | |
| final T value; | |
| final Instant timestamp; | |
| TimestampedValue(T value, Instant timestamp) { | |
| this.value = value; | |
| this.timestamp = timestamp; | |
| } | |
| } | |
| private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { | |
| private final List<TimestampedValue<T>> outputs = Collections.synchronizedList(new ArrayList<>()); | |
| @Override | |
| public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { | |
| return org.apache.beam.sdk.values.WindowedValues.<T>builder() | |
| .setValue(value) | |
| .setTimestamp(Instant.now()) | |
| .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) | |
| .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) | |
| .setReceiver(windowedValue -> outputs.add(new TimestampedValue<>(windowedValue.getValue(), windowedValue.getTimestamp()))); | |
| } | |
| @Override | |
| public void output(T output) { | |
| outputs.add(new TimestampedValue<>(output, Instant.now())); | |
| } | |
| @Override | |
| public void outputWithTimestamp(T output, Instant timestamp) { | |
| outputs.add(new TimestampedValue<>(output, timestamp)); | |
| } | |
| public List<TimestampedValue<T>> getOutputs() { | |
| return outputs; | |
| } | |
| } |
| private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = | ||
| new ConcurrentHashMap<>(); | ||
|
|
||
| private static final ReentrantLock lock = new ReentrantLock(); |
There was a problem hiding this comment.
The use of a single static ReentrantLock across all instances of AsyncDoFn can lead to significant thread contention if multiple asynchronous transforms are used in the same pipeline. Since the registries are already partitioned by uuid, consider using a per-transform lock or leveraging ConcurrentHashMap atomic operations to reduce global contention.
There was a problem hiding this comment.
I kept a single static class lock because it matches the exact synchronization model used by Python's AsyncWrapper (_lock = RLock()).
If you prefer, we can change this to a per-transform lock or leverage ConcurrentHashMap to reduce global thread contention, but I wanted to keep it aligned with the Python SDK first.
…emoved O(N) global activeElements scan. Fixed logic bug where duplicate elements were incorrectly marked for rescheduling. Optimized lookups by converting finishedItems from a list to a HashSet.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
…ng sleep. Added two more tests to match Python SDK. Fixed formatting issues.
…ixes. Spot Bugs potential fixes.
… that preserves deterministic jitter behavior and avoids pressure on garbage collector (apache#38529)
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper class designed to process elements asynchronously using a background thread pool, along with a comprehensive test suite AsyncDoFnTest. The code review highlights several critical concurrency and correctness issues in the implementation. Specifically, the wrapped syncFn is shared across background threads without being thread-safe, exceptions during async execution are caught and result in silent data loss, and tearing down cloned instances can prematurely shut down shared thread pools. Additionally, the reviewer noted a potential division-by-zero bug if timerFrequency is non-positive, and recommended documenting the class's limitations in the Javadoc.
| @TimerId("timer") | ||
| private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); | ||
|
|
||
| private final DoFn<InputT, OutputT> syncFn; |
There was a problem hiding this comment.
Critical Concurrency Issue: Shared syncFn Instance Across Background Threads\n\nThe syncFn instance is shared globally across all background threads in the executor pool. If parallelism > 1, multiple threads will concurrently invoke processElement, startBundle, and finishBundle on the exact same syncFn instance.\n\nSince standard DoFns in Apache Beam are not thread-safe and often maintain state (especially during bundle lifecycle methods), this will lead to severe race conditions, state corruption, and unpredictable runtime failures.\n\n#### Recommended Solution:\nUse a ThreadLocal to clone syncFn per thread using SerializableUtils.clone(syncFn). This ensures each thread has its own isolated, thread-safe instance of the DoFn.\n\njava\nprivate transient ThreadLocal<DoFn<InputT, OutputT>> localSyncFn;\nprivate transient ConcurrentHashMap.KeySetView<DoFnInvoker<InputT, OutputT>, Boolean> localInvokers;\n\n\nIn @Setup, initialize the ThreadLocal and track the cloned invokers:\njava\nthis.localInvokers = ConcurrentHashMap.newKeySet();\nthis.localSyncFn = ThreadLocal.withInitial(() -> {\n DoFn<InputT, OutputT> clone = SerializableUtils.clone(syncFn);\n DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.invokerFor(clone);\n localInvokers.add(invoker);\n invoker.invokeSetup(new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {\n @Override\n public PipelineOptions pipelineOptions() {\n return options;\n }\n @Override\n public String getErrorContext() {\n return \"AsyncDoFn/Setup\";\n }\n });\n return clone;\n});\n\n\nIn @Teardown, invoke teardown on all cloned invokers:\njava\nif (localInvokers != null) {\n for (DoFnInvoker<InputT, OutputT> invoker : localInvokers) {\n try {\n invoker.invokeTeardown();\n } catch (Exception e) {\n LOG.warn(\"Error tearing down cloned DoFn\", e);\n }\n }\n}\n
| } catch (Exception e) { | ||
| LOG.error("Error executing async task for element {}", element, e); | ||
| finishedItems.add(element); | ||
| finishedElementIds.add(elementId); | ||
| activeElements.remove(elementId); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Critical Correctness Issue: Silent Data Loss on Exception\n\nWhen an exception is thrown during the execution of the async task, it is caught, logged, and then the element is added to finishedItems and removed from the persistent state (toProcessState).\n\nThis causes silent data loss because the failed element is permanently discarded and the pipeline continues as if it succeeded. In Apache Beam, exceptions should be propagated to fail the bundle/timer so that the runner can retry the execution.
} catch (Exception e) {\n LOG.error(\"Error executing async task for element {}\", element, e);\n throw new RuntimeException(\"Error executing async task for element \" + element, e);\n }| ExecutorService threadPool; | ||
| lock.lock(); | ||
| try { | ||
| threadPool = pool.remove(uuid); | ||
| processingElements.remove(uuid); | ||
| itemsInBuffer.remove(uuid); | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
|
|
There was a problem hiding this comment.
High Severity Bug: Shared Resource Teardown Race Condition\n\nSince runners clone DoFn instances on the same worker node, multiple active instances of AsyncDoFn with the same uuid will share the same ExecutorService and static registries.\n\nWhen any of these cloned instances is torn down, its @Teardown method is called, which immediately removes the shared threadPool from the static map and shuts it down. This will cause all other active instances sharing the same uuid to fail with IllegalStateException when they attempt to access the thread pool or processing elements map.\n\n#### Solution:\nImplement reference counting to ensure that the shared resources are only removed and shut down when the last active instance of AsyncDoFn for a given uuid is torn down.
ExecutorService threadPool = null;\n lock.lock();\n try {\n AtomicInteger refCount = refCounts.get(uuid);\n if (refCount != null && refCount.decrementAndGet() == 0) {\n refCounts.remove(uuid);\n threadPool = pool.remove(uuid);\n processingElements.remove(uuid);\n itemsInBuffer.remove(uuid);\n }\n } finally {\n lock.unlock();\n }| private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = | ||
| new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Declare a static reference counter map to track active instances of AsyncDoFn per uuid for safe teardown.
private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =\n new ConcurrentHashMap<>();\n private static final ConcurrentHashMap<String, AtomicInteger> refCounts =\n new ConcurrentHashMap<>();| lock.lock(); | ||
| try { | ||
| pool.computeIfAbsent(uuid, k -> Executors.newFixedThreadPool(parallelism)); | ||
| processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>()); | ||
| itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0)); | ||
| } finally { | ||
| lock.unlock(); | ||
| } |
There was a problem hiding this comment.
Increment the reference count for the current uuid during @Setup to ensure safe teardown.
lock.lock();\n try {\n pool.computeIfAbsent(uuid, k -> Executors.newFixedThreadPool(parallelism));\n processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>());\n itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0));\n refCounts.computeIfAbsent(uuid, k -> new AtomicInteger(0)).incrementAndGet();\n } finally {\n lock.unlock();\n }| static void resetState() { | ||
| lock.lock(); | ||
| try { | ||
| for (Map.Entry<String, ExecutorService> entry : pool.entrySet()) { | ||
| entry.getValue().shutdownNow(); | ||
| } | ||
| pool.clear(); | ||
| processingElements.clear(); | ||
| itemsInBuffer.clear(); | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Clear the static refCounts map when resetting state.
static void resetState() {\n lock.lock();\n try {\n for (Map.Entry<String, ExecutorService> entry : pool.entrySet()) {\n entry.getValue().shutdownNow();\n }\n pool.clear();\n processingElements.clear();\n itemsInBuffer.clear();\n refCounts.clear();\n } finally {\n lock.unlock();\n }\n }| this.parallelism = parallelism; | ||
| this.timerFrequency = timerFrequency; |
There was a problem hiding this comment.
High Severity Bug: Division by Zero / Infinite Loop\n\nIf timerFrequency is set to Duration.ZERO or a negative duration, timerFrequencySec will be 0.0 or negative. This will cause a division by zero in nextTimeToFire, resulting in Double.NaN or Double.POSITIVE_INFINITY, which ultimately sets the timer to Instant.ofEpochMilli(0). This causes the timer to fire immediately and repeatedly in an infinite loop, hanging the pipeline.\n\nWe should validate that timerFrequency is strictly positive in the constructor.
this.parallelism = parallelism;\n if (timerFrequency.getMillis() <= 0) {\n throw new IllegalArgumentException(\"timerFrequency must be greater than zero\");\n }\n this.timerFrequency = timerFrequency;| /** | ||
| * Class that wraps a dofn and converts it from one which process elements synchronously to one | ||
| * which processes them asynchronously. | ||
| * | ||
| * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed | ||
| * in parallel and that processing an element will block all other work on that key. In addition | ||
| * runners are optimized for latencies less than a few seconds and longer operations can result in | ||
| * high retry rates. Async should be considered when the default parallelism is not correct and/or | ||
| * items are expected to take longer than a few seconds to process. | ||
| */ |
There was a problem hiding this comment.
Medium Severity: Document Limitations and Requirements in Javadoc\n\nThe class Javadoc should clearly document the requirements and limitations of AsyncDoFn to prevent users from encountering hard-to-debug runtime issues:\n1. Thread-Safety: The wrapped DoFn must be thread-safe because its methods are invoked concurrently by multiple background threads.\n2. No Multi-Output Support: Tagged outputs and MultiOutputReceiver are not supported.\n3. Bundle Lifecycle: startBundle and finishBundle are invoked per element, so any batching or aggregation logic in them will not behave as expected.
/**\n * Class that wraps a {@link DoFn} and converts it from one which processes elements synchronously to one\n * which processes them asynchronously.\n *\n * <p>For synchronous DoFns, the default settings mean that many (100s) of elements will be processed\n * in parallel and that processing an element will block all other work on that key. In addition,\n * runners are optimized for latencies less than a few seconds, and longer operations can result in\n * high retry rates. Async should be considered when the default parallelism is not correct and/or\n * items are expected to take longer than a few seconds to process.\n *\n * <h3>Limitations & Requirements:</h3>\n * <ul>\n * <li><b>Thread-Safety:</b> The wrapped {@code DoFn} must be thread-safe because its methods\n * (including {@code processElement}) will be invoked concurrently by multiple background threads.</li>\n * <li><b>No Multi-Output Support:</b> Tagged outputs and {@code MultiOutputReceiver} are not supported.\n * Attempting to use them will result in an {@link UnsupportedOperationException}.</li>\n * <li><b>Bundle Lifecycle:</b> {@code startBundle} and {@code finishBundle} are invoked per element\n * within the background tasks, meaning any batching or aggregation logic implemented in them\n * will not function as expected.</li>\n * </ul>\n */
fixes #38529
R: @AMOOOMA
This PR introduces AsyncDoFn and AsyncDoFnTest to the Apache Beam Java SDK.
AsyncDoFn acts as an execution wrapper around a standard synchronous DoFn, offloading element processing to a background thread pool. Decoupling the runner's event loop (main thread) from high-latency, I/O-heavy element processing (background threads) prevents synchronous blocking, implements backpressure, and significantly increases pipeline throughput.
1. Ingestion & Local Deduplication (Main Thread)
2. Backpressure & Capacity Check (Main Thread)
3. Task Creation & Durable State Writing (Main Thread)
When capacity is available, the main thread performs the following steps sequentially:
4. Background Execution (Background Worker Threads)
5. Timer Reconciliation & Cleanup (Main Thread)
When the @Timer fires for Key K, the main thread executes a synchronous reconciliation cycle:
Early Exit: If BagState for Key K is empty, it exits immediately to free up CPU.
State Reconciliation: Iterates through the elements listed in BagState:
Timer Reset: If any elements remain unfinished, a new timer is scheduled for the next check cycle.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.