[Python] Add UnboundedSource SDF wrapper (#19137)#38724
Conversation
Brings Java's ``UnboundedSource`` / ``UnboundedReader`` / ``CheckpointMark`` abstractions to the Python SDK as a Splittable-DoFn wrapper runnable on the portable Fn API (DirectRunner / FnApiRunner). Wires the new source type into ``iobase.Read.expand()`` so ``p | beam.io.Read(my_unbounded_source)`` dispatches alongside the existing ``BoundedSource`` branch. Loosely inspired by Java's ``Read.UnboundedSourceAsSDFWrapperFn``; the streaming-SDF template followed for the process loop / watermark / defer plumbing is ``apache_beam.transforms.periodicsequence``. addresses apache#19137 What's added ------------ ``sdks/python/apache_beam/io/unbounded_source.py`` Public ABCs (``CheckpointMark``, ``UnboundedReader``, ``UnboundedSource``, ``ReadFromUnboundedSource``) plus the SDF wrapper internals (``_UnboundedSourceRestriction``, ``_UnboundedSourceRestrictionCoder``, ``_UnboundedSourceRestrictionTracker``, ``_UnboundedSourceRestrictionProvider``). ``sdks/python/apache_beam/io/unbounded_source_test.py`` 42 deterministic tests covering ABC contracts, restriction coder round-trip, tracker state machine (claim / split / EOF / no-data / check_done / progress / is_bounded), finalize idempotency, fan-out via ``source.split``, source-watermark vs. record-event-time, finalize vs. resume channel separation, tracker-internal exception close on reader-method failures, DoFn generator close (unit + integration with downstream raising ``Map``), cloudpickle round-trip, circular import in three subprocess orderings, and an end-to-end DirectRunner pipeline. What's changed in iobase.py --------------------------- * ``Read.expand`` gains an ``UnboundedSource`` branch (function-local lazy import to break the iobase <-> unbounded_source cycle) that delegates to ``ReadFromUnboundedSource``. * ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to ``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` + ``ReadPayload(is_bounded=UNBOUNDED)``. Decode rides the existing ``PICKLED_SOURCE`` URN on ``SourceBase``. Runner-side ``IsBounded.UNBOUNDED`` dispatch in ``bundle_processor.IMPULSE_READ_TRANSFORM`` remains W2 -- execution flows through the composite's expanded ``Impulse | Map | SDF-ParDo``. Correctness covered ------------------- * Data-path watermark uses ``reader.get_watermark()`` (Java ``Read.java:594`` parity), not the per-record event time. Holder is ``(value, record_ts, source_wm)``. * Restriction has separate ``checkpoint_mark`` (resume) and ``finalization_checkpoint_mark`` (commit hook) channels; coder is a fixed 5-tuple. * Reader is closed on every exit path -- tracker-internal close on EOF / split / reader-method exception; DoFn ``finally`` defense-in-depth for yield / downstream raise via the SDF wrapper's private chain (isinstance guard + warning log if the chain ever moves upstream). * EOF advances the watermark estimator to ``MAX_TIMESTAMP`` so downstream event-time windows can close. * ``UnboundedSource.split(desired_num_splits=20, options)`` is honoured; returned sub-sources are validated as ``UnboundedSource`` (raises ``TypeError`` outside the split-refusal ``except``); on split-exception the provider falls back to a single restriction with WARNING. * ``default_output_coder`` reaches the output PCollection via ``coders.registry.register_coder`` + ``element_type``. * ``ReadFromUnboundedSource`` validates ``poll_interval_seconds > 0``. Out of scope (tracked under apache#19137) ----------------------------------- Listed exhaustively in the module docstring at ``sdks/python/apache_beam/io/unbounded_source.py``: * Record-id-based deduplication (Java's ``ValueWithRecordId``). * Backlog-byte reporting (``restriction_size`` is constant 1; ``current_progress`` is binary 0.0 / 1.0). * Dynamic split fractions / runner-initiated work stealing. * Source-specific checkpoint coders threaded through the SDF restriction coder (checkpoint marks are pickled today regardless of the source's ``get_checkpoint_mark_coder``). * Reader caching across bundles (Java uses a Guava cache). * ``EmptyUnboundedSource`` terminal-state marker (this PoC uses an ``is_done`` flag). * Runner-side ``IsBounded.UNBOUNDED`` dispatch in ``bundle_processor.IMPULSE_READ_TRANSFORM``. Tests: 42/42 ``unbounded_source_test.py``, 16/16 ``iobase_test.py``, yapf + isort clean.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request brings streaming source capabilities to the Python SDK by implementing the UnboundedSource abstraction via a Splittable DoFn wrapper. This change allows developers to define unbounded sources that are compatible with the portable Fn API, significantly improving the parity between Python and Java streaming I/O implementations. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a minimal, self-contained UnboundedSource implementation for the Python SDK, allowing Java-like unbounded source abstractions to run on the portable Fn API path via a Splittable DoFn. The feedback identifies several key improvement opportunities: refactoring _UnboundedSourceRestrictionCoder to decode the source dynamically so that the provider and DoFn can be defined as standard module-level classes (avoiding potential serialization issues with standard pickle); correcting the instantiation of RestrictionProgress to use completed_work and remaining_work instead of fraction; registering the source's coder on the pipeline-specific coder_registry rather than the global registry to prevent side effects; and improving the robustness of the dynamic tracker unwrapping logic.
| class _UnboundedSourceRestrictionCoder(Coder): | ||
| """Encodes :class:`_UnboundedSourceRestriction` as a fixed 5-tuple. | ||
|
|
||
| Shape: pickled source + nullable resume checkpoint (encoded with the | ||
| source's own checkpoint coder if provided, else pickle) + watermark + | ||
| done flag + nullable finalization checkpoint (same coder as resume). | ||
| """ | ||
| def __init__(self, checkpoint_mark_coder: Optional[Coder] = None): | ||
| nullable_checkpoint = NullableCoder( | ||
| checkpoint_mark_coder or _MemoizingPickleCoder()) | ||
| self._tuple_coder = TupleCoder(( | ||
| _MemoizingPickleCoder(), # source | ||
| nullable_checkpoint, # checkpoint_mark (RESUME state, may be None) | ||
| TimestampCoder(), # watermark | ||
| BooleanCoder(), # is_done | ||
| nullable_checkpoint)) # finalization_checkpoint_mark (commit hook) | ||
|
|
||
| def encode(self, restriction: '_UnboundedSourceRestriction') -> bytes: | ||
| return self._tuple_coder.encode(( | ||
| restriction.source, | ||
| restriction.checkpoint_mark, | ||
| restriction.watermark, | ||
| restriction.is_done, | ||
| restriction.finalization_checkpoint_mark)) | ||
|
|
||
| def decode(self, encoded: bytes) -> '_UnboundedSourceRestriction': | ||
| (source, checkpoint_mark, watermark, is_done, | ||
| finalization_checkpoint_mark) = self._tuple_coder.decode(encoded) | ||
| return _UnboundedSourceRestriction( | ||
| source=source, | ||
| checkpoint_mark=checkpoint_mark, | ||
| watermark=watermark, | ||
| is_done=is_done, | ||
| finalization_checkpoint_mark=finalization_checkpoint_mark) | ||
|
|
||
| def is_deterministic(self) -> bool: | ||
| # The source and checkpoint are pickled, which is not guaranteed | ||
| # deterministic; matches the bounded SDF restriction coder in iobase.py. | ||
| # NOTE on forward-compat: the wire format is a fixed 5-tuple. Adding a | ||
| # 6th field in a future version would break decoding of in-flight blobs | ||
| # from older workers. If/when another field is needed, switch this to a | ||
| # length-prefixed or version-tagged encoding -- out of scope for W1. | ||
| return False | ||
|
|
There was a problem hiding this comment.
The _UnboundedSourceRestrictionCoder currently requires the checkpoint_mark_coder at initialization time. This forces the _UnboundedSourceRestrictionProvider and the _ReadFromUnboundedSourceDoFn to be defined dynamically inside expand() to close over the source-specific coder. Defining DoFns inside methods can cause serialization issues with standard pickle on some runners.
By making _UnboundedSourceRestrictionCoder dynamic (decoding the source first, then using its coder to decode the checkpoint marks), we can remove the initialization dependency and define both the provider and the DoFn as standard module-level classes.
class _UnboundedSourceRestrictionCoder(Coder):
"""Encodes :class:`_UnboundedSourceRestriction` as a fixed 5-tuple.
Shape: pickled source + nullable resume checkpoint (encoded with the
source's own checkpoint coder if provided, else pickle) + watermark +
done flag + nullable finalization checkpoint (same coder as resume).
"""
def encode(self, restriction: '_UnboundedSourceRestriction') -> bytes:
from apache_beam.coders.coders import BytesCoder
source_coder = _MemoizingPickleCoder()
source_bytes = source_coder.encode(restriction.source)
checkpoint_coder = NullableCoder(restriction.source.get_checkpoint_mark_coder())
checkpoint_bytes = checkpoint_coder.encode(restriction.checkpoint_mark)
finalize_bytes = checkpoint_coder.encode(restriction.finalization_checkpoint_mark)
return TupleCoder((
BytesCoder(),
BytesCoder(),
TimestampCoder(),
BooleanCoder(),
BytesCoder()
)).encode((
source_bytes,
checkpoint_bytes,
restriction.watermark,
restriction.is_done,
finalize_bytes
))
def decode(self, encoded: bytes) -> '_UnboundedSourceRestriction':
from apache_beam.coders.coders import BytesCoder
source_bytes, checkpoint_bytes, watermark, is_done, finalize_bytes = TupleCoder((
BytesCoder(),
BytesCoder(),
TimestampCoder(),
BooleanCoder(),
BytesCoder()
)).decode(encoded)
source = _MemoizingPickleCoder().decode(source_bytes)
checkpoint_coder = NullableCoder(source.get_checkpoint_mark_coder())
checkpoint_mark = checkpoint_coder.decode(checkpoint_bytes)
finalization_checkpoint_mark = checkpoint_coder.decode(finalize_bytes)
return _UnboundedSourceRestriction(
source=source,
checkpoint_mark=checkpoint_mark,
watermark=watermark,
is_done=is_done,
finalization_checkpoint_mark=finalization_checkpoint_mark
)
def is_deterministic(self) -> bool:
return False| def current_progress(self) -> 'iobase.RestrictionProgress': | ||
| # Backlog-based progress is out of scope; report a coarse done/not-done | ||
| # fraction so the runner has a (recommended) signal. | ||
| return iobase.RestrictionProgress( | ||
| fraction=1.0 if self._restriction.is_done else 0.0) |
There was a problem hiding this comment.
In Apache Beam, RestrictionProgress does not accept a fraction parameter. Its standard parameters are completed_work and remaining_work. Passing fraction directly will result in it being ignored or placed in **kwargs without being recognized by the runner.
| def current_progress(self) -> 'iobase.RestrictionProgress': | |
| # Backlog-based progress is out of scope; report a coarse done/not-done | |
| # fraction so the runner has a (recommended) signal. | |
| return iobase.RestrictionProgress( | |
| fraction=1.0 if self._restriction.is_done else 0.0) | |
| def current_progress(self) -> 'iobase.RestrictionProgress': | |
| # Backlog-based progress is out of scope; report a coarse done/not-done | |
| # signal so the runner has a (recommended) signal. | |
| if self._restriction.is_done: | |
| return iobase.RestrictionProgress(completed_work=1.0, remaining_work=0.0) | |
| return iobase.RestrictionProgress(completed_work=0.0, remaining_work=1.0) |
| if type_hint is not None: | ||
| try: | ||
| coders.registry.register_coder(type_hint, type(output_coder)) |
There was a problem hiding this comment.
Modifying the global coders.registry can cause side effects across different pipelines running in the same process (e.g., in multi-tenant service environments or unit tests). It is safer to register the coder on the pipeline-specific coder_registry.
| if type_hint is not None: | |
| try: | |
| coders.registry.register_coder(type_hint, type(output_coder)) | |
| if type_hint is not None: | |
| try: | |
| pbegin.pipeline.coder_registry.register_coder(type_hint, type(output_coder)) |
There was a problem hiding this comment.
Pipeline has no coder_registry attr
| finally: | ||
| current = tracker.current_restriction() | ||
| # Register finalization only when a real checkpoint was cut this | ||
| # bundle. Restriction identity (`current is not initial`) mirrors | ||
| # Java's reference-equality gate in Read.java. We read the explicit | ||
| # finalization channel, NOT ``checkpoint_mark`` (which is the | ||
| # RESUME state and may belong to the residual after a split). | ||
| finalize_mark = current.finalization_checkpoint_mark | ||
| if current is not initial and finalize_mark is not None: | ||
| bundle_finalizer.register(finalize_mark.finalize_checkpoint) | ||
| # Release the underlying reader on every exit path, including the | ||
| # exception path where a downstream yield raised between two | ||
| # try_claim calls (reader-method failures are already closed inside | ||
| # the tracker). ``RestrictionTrackerView`` does not expose the inner | ||
| # tracker, so traverse the (stable-but-private) wrapper chain. If | ||
| # the chain changes in a future Beam version we log a warning and | ||
| # let GC eventually close; never call ``close`` on an unrelated | ||
| # tracker subclass. | ||
| threadsafe = getattr(tracker, '_threadsafe_restriction_tracker', None) | ||
| inner_tracker = getattr(threadsafe, '_restriction_tracker', None) | ||
| if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker): | ||
| inner_tracker._close_reader_if_open() | ||
| elif inner_tracker is not None or threadsafe is not None: | ||
| _LOGGER.warning( | ||
| 'UnboundedSource DoFn could not reach the inner tracker via ' | ||
| '_threadsafe_restriction_tracker._restriction_tracker; reader ' | ||
| 'close on exception path skipped, relying on GC. Beam SDF ' | ||
| 'wrapper internals may have changed -- file an issue.') |
There was a problem hiding this comment.
The logic to unwrap the inner tracker from tracker assumes a specific nesting structure (_threadsafe_restriction_tracker -> _restriction_tracker). If the tracker is not wrapped (e.g., in some testing or direct execution scenarios), or if the wrapper chain changes, this lookup will fail. A more robust approach is to check for the attributes dynamically and fallback gracefully.
finally:
current = tracker.current_restriction()
# Register finalization only when a real checkpoint was cut this
# bundle. Restriction identity (`current is not initial`) mirrors
# Java's reference-equality gate in Read.java. We read the explicit
# finalization channel, NOT ``checkpoint_mark`` (which is the
# RESUME state and may belong to the residual after a split).
finalize_mark = current.finalization_checkpoint_mark
if current is not initial and finalize_mark is not None:
bundle_finalizer.register(finalize_mark.finalize_checkpoint)
# Release the underlying reader on every exit path, including the
# exception path where a downstream yield raised between two
# try_claim calls (reader-method failures are already closed inside
# the tracker). ``RestrictionTrackerView`` does not expose the inner
# tracker, so traverse the (stable-but-private) wrapper chain. If
# the chain changes in a future Beam version we log a warning and
# let GC eventually close; never call ``close`` on an unrelated
# tracker subclass.
inner_tracker = tracker
if hasattr(inner_tracker, '_threadsafe_restriction_tracker'):
inner_tracker = getattr(inner_tracker, '_threadsafe_restriction_tracker')
if hasattr(inner_tracker, '_restriction_tracker'):
inner_tracker = getattr(inner_tracker, '_restriction_tracker')
if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker):
inner_tracker._close_reader_if_open()
else:
_LOGGER.warning(
'UnboundedSource DoFn could not reach the inner tracker; reader '
'close on exception path skipped, relying on GC. Beam SDF '
'wrapper internals may have changed -- file an issue.')* Make the restriction coder stateless and source-dynamic: encode/decode reads the source's checkpoint_mark_coder from the restriction itself rather than from the coder's constructor. This removes the source- specific dependency that forced the provider and DoFn to be defined inside ``ReadFromUnboundedSource.expand``. * Move ``_UnboundedSourceRestrictionProvider`` and ``_ReadFromUnboundedSourceDoFn`` to module level, backed by a stateless ``_PROVIDER`` singleton. Closure-defined DoFns serialise only via cloudpickle; lifting both to module level lets stdlib pickle and any runner that does not use cloudpickle handle the DoFn too. (PipelineOptions forwarded to ``UnboundedSource.split`` becomes W2 work; today the provider passes ``None``.) * Register the source-declared output coder against the pipeline-specific ``pbegin.pipeline.coder_registry`` instead of the process-global ``coders.registry`` so the registration does not leak across pipelines running in the same process. * Use ``RestrictionProgress(completed=, remaining=)`` instead of ``fraction=`` so ``completed_work`` / ``remaining_work`` resolve directly. * Make the DoFn's ``finally`` tracker-unwrap chain hasattr-driven so a future ``RestrictionTrackerView`` refactor degrades gracefully instead of skipping reader close silently. * Apply yapf + isort across the four files (CI ``beam_PreCommit_PythonFormatter`` was failing on ``iobase_test.py``). Tests: 42/42 ``unbounded_source_test.py``, 16/16 ``iobase_test.py``. Tracking apache#19137.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
beam_PreCommit_Python_Coverage CI on PR 38724 surfaced ``AttributeError: 'Pipeline' object has no attribute 'coder_registry'`` from the previous commit's attempt to register the source-declared output coder against a pipeline-scoped registry. Beam's Python ``Pipeline`` has no such attribute today; the global ``coders.registry`` is the only available knob, matching the pattern used by ``BoundedSource`` at iobase.py:938. Reverting to ``coders.registry.register_coder``. The cross-pipeline side effect that the pipeline-scoped attempt was trying to avoid (a registration persists for the process lifetime and may affect concurrent pipelines that use the same element type) is now documented as a known limitation tracked under apache#19137 W2. 42/42 unbounded_source_test, 16/16 iobase_test.
dcbcdd5 to
9a71653
Compare
The Python SDK has no public
UnboundedSourceAPI: pipelines that need to read a custom unbounded source (Pub/Sub-like queues, CDC feeds, etc.) cannot do so in Python without dropping to the legacy non-portableReadprimitive.This change adds a Splittable-DoFn wrapper that brings Java's
UnboundedSourcesemantics to the Python SDK:UnboundedSource,UnboundedReader,CheckpointMarkand theReadFromUnboundedSourcePTransform. Names are Pythonic; semantics match the Java contract._UnboundedSourceRestrictioncarries source / resume checkpoint / watermark / done flag / finalize checkpoint (fixed 5-tuple coder)._UnboundedSourceRestrictionTrackerdrives the reader, capturesreader.get_watermark()on the data path (JavaRead.java:594parity), closes the reader on EOF / split / reader-method exception, and advances the watermark estimator toMAX_TIMESTAMPon the terminal claim so downstream event-time windows can close.iobase.Read.expanddispatch — a function-local lazy-import branch routesRead(UnboundedSource(...))throughReadFromUnboundedSource, breaking theiobase↔unbounded_sourcecycle.Read.to_runner_api_parameterwidens to(BoundedSource, UnboundedSource), writingREAD.urn+IsBounded.UNBOUNDED. Decode rides the existingPICKLED_SOURCEURN onSourceBase. Runner-side dispatch onUNBOUNDEDinbundle_processor.IMPULSE_READ_TRANSFORMis deferred.ReadFromUnboundedSource.expandcallscoders.registry.register_coder+ setselement_typeso a source'sdefault_output_coderpropagates to the output PCollection._UnboundedSourceRestrictionProvider.splitinvokesUnboundedSource.split(20, options)with strictisinstancevalidation; falls back to a single restriction on split-refusal exceptions.Tests:
unbounded_source_test.py— ABC contracts, restriction coder round-trip, tracker state machine, finalize idempotency, source-watermark vs. record-timestamp regression, finalize / resume channel separation, tracker-internal close onreader.advance/reader.get_watermarkexceptions, DoFnfinallyclose on downstream yield exception (unit viagenerator.close()+ integration with raisingMap), cloudpickle round-trip, circular import in three subprocess orderings, e2e DirectRunner withFixedWindows+GroupByKey.iobase_test.py—Read(UnboundedSource)dispatch through the newexpandbranch;Read.to_runner_api/from_runner_apiround-trip asserting theIsBounded.UNBOUNDEDenum.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.