From f6e055b1a69fc14d88055e971b9793870d0900f5 Mon Sep 17 00:00:00 2001 From: Mansi Singh Date: Fri, 20 Feb 2026 22:56:12 -0800 Subject: [PATCH 1/5] Make GCS filesystem lookup lazy to match S3 behavior (fixes #37445) - Remove top-level gcsio import, replace with try/except for backward compat - Add _get_gcsio() lazy import in _gcsIO() method - Convert CHUNK_SIZE from class variable to lazy property - Add test verifying get_filesystem() works without GCP extras installed --- .../apache_beam/io/gcp/gcsfilesystem.py | 21 +++++++++++++++---- .../apache_beam/io/gcp/gcsfilesystem_test.py | 7 +++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3763e21abc9f..531b04c154d2 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,7 +34,11 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem -from apache_beam.io.gcp import gcsio + +try: + from apache_beam.io.gcp import gcsio +except ImportError: + gcsio = None # type: ignore[assignment] __all__ = ['GCSFileSystem'] @@ -43,13 +47,17 @@ class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ - CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): super().__init__(pipeline_options) self._pipeline_options = pipeline_options + @property + def CHUNK_SIZE(self): + """Chunk size in batch operations.""" + return self._gcsIO().MAX_BATCH_OPERATION_SIZE + @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -139,7 +147,11 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): - return gcsio.GcsIO(pipeline_options=self._pipeline_options) + if gcsio is None: + from apache_beam.io.gcp import gcsio as _gcsio # pylint: disable=g-import-not-at-top + else: + _gcsio = gcsio + return _gcsio.GcsIO(pipeline_options=self._pipeline_options) def _path_open( self, @@ -370,7 +382,8 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - components = gcsio.parse_gcs_path(path, object_optional=True) + from apache_beam.io.gcp import gcsio as _gcsio # pylint: disable=g-import-not-at-top + components = _gcsio.parse_gcs_path(path, object_optional=True) except ValueError: # report lineage is fail-safe traceback.print_exc() diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..9bf9eeef9a44 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -48,6 +48,13 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') + def test_get_filesystem_does_not_require_gcp_extra(self): + # Verifies that GCSFileSystem can be looked up without GCP deps installed. + # GCP dependency errors should only be raised at usage time, not lookup. + from apache_beam.io.filesystems import FileSystems + fs = FileSystems.get_filesystem('gs://test-bucket/path') + self.assertEqual(fs.scheme(), 'gs') + def test_join(self): self.assertEqual( 'gs://bucket/path/to/file', From 71815d437e487570b6d340d2f8c817f155d413f4 Mon Sep 17 00:00:00 2001 From: Mansi Singh Date: Sat, 21 Feb 2026 16:17:33 -0800 Subject: [PATCH 2/5] Address review feedback: simplify lazy loading approach - Keep gcsio at module level with try/except for backward compat with mocks - Use module-level gcsio directly in _gcsIO() and report_lineage (no re-import) - Fix test to use @mock.patch to simulate missing GCP extra --- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 11 +++-------- sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py | 1 + 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 531b04c154d2..9133e8429890 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -147,11 +147,7 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): - if gcsio is None: - from apache_beam.io.gcp import gcsio as _gcsio # pylint: disable=g-import-not-at-top - else: - _gcsio = gcsio - return _gcsio.GcsIO(pipeline_options=self._pipeline_options) + return gcsio.GcsIO(pipeline_options=self._pipeline_options) def _path_open( self, @@ -382,9 +378,8 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - from apache_beam.io.gcp import gcsio as _gcsio # pylint: disable=g-import-not-at-top - components = _gcsio.parse_gcs_path(path, object_optional=True) - except ValueError: + components = gcsio.parse_gcs_path(path, object_optional=True) + except (ValueError, AttributeError): # report lineage is fail-safe traceback.print_exc() return diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 9bf9eeef9a44..578745e42bbe 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -48,6 +48,7 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio', None) def test_get_filesystem_does_not_require_gcp_extra(self): # Verifies that GCSFileSystem can be looked up without GCP deps installed. # GCP dependency errors should only be raised at usage time, not lookup. From 20b39c6abeb3b5122395e3de8ea3d5d09cb5b626 Mon Sep 17 00:00:00 2001 From: Mansi Singh Date: Sat, 21 Feb 2026 16:27:16 -0800 Subject: [PATCH 3/5] Address review feedback on GroupBy snippets - Fix groupby_attr_expr.py: move if test block inside pipeline context, move beam.Map(print) to else branch - Fix groupby_two_exprs.py: same fixes, restore [START/END] markers --- .../snippets/transforms/aggregation/groupby_attr_expr.py | 9 +++++---- .../snippets/transforms/aggregation/groupby_two_exprs.py | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_attr_expr.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_attr_expr.py index cef24534342e..f94c2d62c04a 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_attr_expr.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_attr_expr.py @@ -53,12 +53,13 @@ def groupby_attr_expr(test=None): grouped = ( p | beam.Create(GROCERY_LIST) - | beam.GroupBy('recipe', is_berry=lambda x: 'berry' in x.fruit) - | beam.Map(print)) + | beam.GroupBy('recipe', is_berry=lambda x: 'berry' in x.fruit)) # [END groupby_attr_expr] - if test: - test(grouped) + if test: + test(grouped) + else: + grouped | beam.Map(print) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_two_exprs.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_two_exprs.py index 6b890fee9f9c..c1cbda0e578e 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_two_exprs.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupby_two_exprs.py @@ -44,12 +44,13 @@ def groupby_two_exprs(test=None): p | beam.Create( ['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana']) - | beam.GroupBy(letter=lambda s: s[0], is_berry=lambda s: 'berry' in s) - | beam.Map(print)) + | beam.GroupBy(letter=lambda s: s[0], is_berry=lambda s: 'berry' in s)) # [END groupby_two_exprs] - if test: - test(grouped) + if test: + test(grouped) + else: + grouped | beam.Map(print) if __name__ == '__main__': From 0fc61a1370dcbd433b54a12827cac8b21c3832b5 Mon Sep 17 00:00:00 2001 From: Mansi Singh Date: Sun, 1 Mar 2026 19:31:29 -0800 Subject: [PATCH 4/5] Fix CHUNK_SIZE to use module-level gcsio.MAX_BATCH_OPERATION_SIZE instead of instance attribute --- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 9133e8429890..29ef675d1d6b 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -56,7 +56,8 @@ def __init__(self, pipeline_options): @property def CHUNK_SIZE(self): """Chunk size in batch operations.""" - return self._gcsIO().MAX_BATCH_OPERATION_SIZE + from apache_beam.io.gcp import gcsio + return gcsio.MAX_BATCH_OPERATION_SIZE @classmethod def scheme(cls): From b8961ae4e942ea32d4c2995889b9f8eabecc6b8e Mon Sep 17 00:00:00 2001 From: Mansi Singh Date: Sun, 1 Mar 2026 19:34:47 -0800 Subject: [PATCH 5/5] Fix CHUNK_SIZE property and report_lineage to use lazy gcsio imports --- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 29ef675d1d6b..5b1dba8ad2de 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -379,7 +379,8 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - components = gcsio.parse_gcs_path(path, object_optional=True) + from apache_beam.io.gcp import gcsio as _gcsio + components = _gcsio.parse_gcs_path(path, object_optional=True) except (ValueError, AttributeError): # report lineage is fail-safe traceback.print_exc()