Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,11 @@ def __init__(self, source: SourceBase) -> None:
"""Initializes a Read transform.

Args:
source: Data source to read from.
source: Data source to read from. A ``BoundedSource`` is wrapped in the
bounded SDF reader; an ``UnboundedSource`` is dispatched through
:class:`apache_beam.io.unbounded_source.ReadFromUnboundedSource` with
the default poll interval (users wanting a custom poll cadence must
instantiate ``ReadFromUnboundedSource`` directly).
"""
super().__init__()
self.source = source
Expand All @@ -945,6 +949,16 @@ def expand(self, pbegin):
| 'EmitSource' >>
core.Map(lambda _: self.source).with_output_types(BoundedSource)
| SDFBoundedSourceReader(display_data))
# Lazy import to break the iobase <-> unbounded_source cycle: the
# unbounded_source module imports iobase (UnboundedSource extends
# SourceBase). Pattern matches the _PubSubSource lazy import below.
from apache_beam.io.unbounded_source import ReadFromUnboundedSource
from apache_beam.io.unbounded_source import UnboundedSource
if isinstance(self.source, UnboundedSource):
# Delegate to the dedicated SDF PTransform; identical to the user
# writing `p | ReadFromUnboundedSource(self.source)` directly. Custom
# poll_interval_seconds requires using ReadFromUnboundedSource directly.
return pbegin | ReadFromUnboundedSource(self.source)
elif isinstance(self.source, ptransform.PTransform):
# The Read transform can also admit a full PTransform as an input
# rather than an anctual source. If the input is a PTransform, then
Expand Down Expand Up @@ -986,7 +1000,15 @@ def to_runner_api_parameter(
timestamp_attribute=self.source.timestamp_attribute,
with_attributes=self.source.with_attributes,
id_attribute=self.source.id_label))
if isinstance(self.source, BoundedSource):
# Lazy import to avoid the iobase <-> unbounded_source cycle.
from apache_beam.io.unbounded_source import UnboundedSource
if isinstance(self.source, (BoundedSource, UnboundedSource)):
# READ.urn covers both source flavours; the IsBounded enum distinguishes
# them. NB: today the bundle_processor.py IMPULSE_READ_TRANSFORM handler
# only consumes BOUNDED - the UNBOUNDED branch round-trips correctly
# through the protobuf graph but execution still flows through this
# composite's expanded sub-transforms (Impulse | Map | SDF-ParDo), not
# through the URN-handler. Runner-side UNBOUNDED dispatch is W2 work.
return (
common_urns.deprecated_primitives.READ.urn,
beam_runner_api_pb2.ReadPayload(
Expand Down
84 changes: 80 additions & 4 deletions sdks/python/apache_beam/io/iobase_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@

import unittest

import mock

import apache_beam as beam
from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.concat_source_test import RangeSource
import mock
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.concat_source_test import RangeSource
from apache_beam.io.iobase import SourceBundle
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.testing.util import assert_that
Expand Down Expand Up @@ -220,5 +219,82 @@ def test_sdf_wrap_range_source(self):
self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3])


class UseSdfUnboundedSourcesTests(unittest.TestCase):
"""Mirrors UseSdfBoundedSourcesTests for the new UnboundedSource branch in
iobase.Read.expand(). Uses CountingSource from unbounded_source_test as the
fake finite UnboundedSource (avoids dragging the network in).
"""
def test_read_dispatches_to_read_from_unbounded_source(self):
from apache_beam.io.unbounded_source_test import CountingSource
with mock.patch(
'apache_beam.io.unbounded_source.ReadFromUnboundedSource.expand'
) as mock_expand:
mock_expand.side_effect = (
lambda pbegin: pbegin | beam.Impulse() | beam.Map(lambda _: 'fake'))
with beam.Pipeline() as p:
out = p | beam.io.Read(CountingSource(3))
assert_that(out, equal_to(['fake']))
mock_expand.assert_called_once()

def test_read_end_to_end_unbounded(self):
from apache_beam.io.unbounded_source_test import CountingSource
with beam.Pipeline() as p:
out = p | beam.io.Read(CountingSource(5))
assert_that(out, equal_to([0, 1, 2, 3, 4]))

def test_read_unbounded_pcollection_is_unbounded(self):
from apache_beam.io.unbounded_source_test import CountingSource
with beam.Pipeline() as p:
out = p | beam.io.Read(CountingSource(3))
self.assertFalse(out.is_bounded)

def test_to_runner_api_emits_unbounded_read_payload(self):
"""``Read.to_runner_api_parameter`` must serialize an UnboundedSource as
``READ.urn`` with ``IsBounded.UNBOUNDED``. The runner-side handler is W2
and ignores this enum today, but the wire format must round-trip
consistently for pipeline persistence / cross-runner submission.
"""
from apache_beam.io.unbounded_source_test import CountingSource
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.pipeline_context import PipelineContext

read = beam.io.Read(CountingSource(5))
urn, payload = read.to_runner_api_parameter(PipelineContext())

self.assertEqual(urn, common_urns.deprecated_primitives.READ.urn)
self.assertIsInstance(payload, beam_runner_api_pb2.ReadPayload)
self.assertEqual(
payload.is_bounded, beam_runner_api_pb2.IsBounded.UNBOUNDED)
# The source field must be populated -- a non-empty FunctionSpec proto.
self.assertTrue(payload.source.urn)

def test_read_unbounded_round_trips_through_runner_api(self):
"""Encode then decode via the runner-API protobuf. The restored
transform must be a ``Read`` wrapping an equivalent UnboundedSource.
"""
from apache_beam.io.unbounded_source import UnboundedSource
from apache_beam.io.unbounded_source_test import CountingSource
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.pipeline_context import PipelineContext

original = beam.io.Read(CountingSource(7))
context = PipelineContext()
urn, payload = original.to_runner_api_parameter(context)

transform_proto = beam_runner_api_pb2.PTransform()
transform_proto.spec.urn = urn
restored = iobase.Read.from_runner_api_parameter(
transform_proto, payload, context)

self.assertIsInstance(restored, iobase.Read)
self.assertIsInstance(restored.source, UnboundedSource)
self.assertIsInstance(restored.source, CountingSource)
self.assertFalse(restored.source.is_bounded())
# Verify the source's internal state survived pickle round-trip.
self.assertEqual(restored.source._count, 7)


if __name__ == '__main__':
unittest.main()
Loading
Loading