Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support",
"https://github.com/apache/beam/pull/34830": "testing",
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
"trigger-2026-04-04": "portable_runner expand_sdf opt-in",
"https://github.com/apache/beam/pull/38892": "UnboundedSource portable VR test"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"https://github.com/apache/beam/pull/34830": "testing",
"https://github.com/apache/beam/issues/35429": "testing",
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
"trigger-2026-04-04": "portable_runner expand_sdf opt-in",
"https://github.com/apache/beam/pull/38892": "UnboundedSource portable VR test"
}
14 changes: 13 additions & 1 deletion sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,10 @@ def __init__(self, source: SourceBase) -> None:
"""Initializes a Read transform.

Args:
source: Data source to read from.
source: the data source to read from. May be a ``BoundedSource``, an
``UnboundedSource``, or a ``PTransform`` (which is applied directly).
For any other source ``Read`` is treated as a primitive and relayed to
the runner implementation.
"""
super().__init__()
self.source = source
Expand All @@ -944,6 +947,11 @@ def expand(self, pbegin):
| 'EmitSource' >>
core.Map(lambda _: self.source).with_output_types(BoundedSource)
| SDFBoundedSourceReader(display_data))
# Local import to avoid a circular dependency.
from apache_beam.io.unbounded_source import ReadFromUnboundedSource
from apache_beam.io.unbounded_source import UnboundedSource
if isinstance(self.source, UnboundedSource):
return pbegin | ReadFromUnboundedSource(self.source)
elif isinstance(self.source, ptransform.PTransform):
# The Read transform can also admit a full PTransform as an input
# rather than an anctual source. If the input is a PTransform, then
Expand Down Expand Up @@ -993,6 +1001,10 @@ def to_runner_api_parameter(
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
if self.source.is_bounded() else
beam_runner_api_pb2.IsBounded.UNBOUNDED))
# Local import to avoid a circular dependency.
from apache_beam.io.unbounded_source import UnboundedSource
if isinstance(self.source, UnboundedSource):
return super().to_runner_api_parameter(context)
elif isinstance(self.source, ptransform.PTransform):
return self.source.to_runner_api_parameter(context)
raise NotImplementedError(
Expand Down
49 changes: 45 additions & 4 deletions sdks/python/apache_beam/io/iobase_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@

import unittest

import mock

import apache_beam as beam
from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.concat_source_test import RangeSource
import mock
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.concat_source_test import RangeSource
from apache_beam.io.iobase import SourceBundle
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

Expand Down Expand Up @@ -220,5 +221,45 @@ def test_sdf_wrap_range_source(self):
self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3])


class UseSdfUnboundedSourcesTests(unittest.TestCase):
"""Covers the UnboundedSource branch in
``iobase.Read.expand()``. Uses ``UnboundedCountingSource`` from
``unbounded_source_test`` as a finite fake source (no network).
"""
def test_read_end_to_end_unbounded(self):
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
with beam.Pipeline() as p:
out = p | beam.io.Read(UnboundedCountingSource(5))
assert_that(out, equal_to([0, 1, 2, 3, 4]))

def test_read_unbounded_pcollection_is_unbounded(self):
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
p = beam.Pipeline()
out = p | beam.io.Read(UnboundedCountingSource(3))
self.assertFalse(out.is_bounded)

def test_read_unbounded_serializes_as_expanded_composite(self):
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
p = beam.Pipeline()
p | 'ReadIt' >> beam.io.Read(UnboundedCountingSource(3))

proto = p.to_runner_api(use_fake_coders=True)
transforms = proto.components.transforms.values()
deprecated_reads = [
transform.unique_name for transform in transforms
if transform.spec.urn == common_urns.deprecated_primitives.READ.urn
]
read_transforms = [
transform for transform in proto.components.transforms.values()
if transform.unique_name == 'ReadIt'
]

self.assertEqual([], deprecated_reads)
self.assertEqual(1, len(read_transforms))
self.assertEqual(
python_urns.GENERIC_COMPOSITE_TRANSFORM, read_transforms[0].spec.urn)
self.assertTrue(read_transforms[0].subtransforms)


if __name__ == '__main__':
unittest.main()
Loading
Loading