From 9f210b4584806771fe045399992e041147c73270 Mon Sep 17 00:00:00 2001 From: PRADDZY <64481960+PRADDZY@users.noreply.github.com> Date: Fri, 5 Jun 2026 21:57:33 +0530 Subject: [PATCH 1/5] Preserve partitioning on temp file loads --- .../apache_beam/io/gcp/bigquery_file_loads.py | 62 ++++++++++++++-- .../io/gcp/bigquery_file_loads_test.py | 71 +++++++++++++++++++ 2 files changed, 126 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 4e45d0324ee2..b7f846d79e74 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -82,6 +82,31 @@ _SLEEP_DURATION_BETWEEN_POLLS = 10 +def _has_partitioning_load_parameters(additional_parameters): + return ('timePartitioning' in additional_parameters or + 'rangePartitioning' in additional_parameters) + + +def _add_destination_partitioning_load_parameters( + additional_parameters, destination_table): + if not isinstance(destination_table, bigquery_tools.bigquery.Table): + return additional_parameters + + additional_parameters = dict(additional_parameters) + + if ('timePartitioning' not in additional_parameters and + getattr(destination_table, 'timePartitioning', None) is not None): + additional_parameters['timePartitioning'] = ( + destination_table.timePartitioning) + + if ('rangePartitioning' not in additional_parameters and + getattr(destination_table, 'rangePartitioning', None) is not None): + additional_parameters['rangePartitioning'] = ( + destination_table.rangePartitioning) + + return additional_parameters + + def _generate_job_name(job_name, job_type, step_name): return bigquery_tools.generate_bq_job_name( job_name=job_name, @@ -716,6 +741,7 @@ def process( additional_parameters = self.additional_bq_parameters.get() else: additional_parameters = self.additional_bq_parameters + additional_parameters = dict(additional_parameters or {}) table_reference = bigquery_tools.parse_table_reference(destination) if table_reference.projectId is None: @@ -735,19 +761,36 @@ def process( create_disposition = self.create_disposition if self.temporary_tables: + destination_table = None + hashed_dest = bigquery_tools.get_hashable_destination(table_reference) + should_lookup_destination_table = ( + schema is None or + not _has_partitioning_load_parameters(additional_parameters)) + if should_lookup_destination_table: + try: + destination_table = self.bq_wrapper.get_table( + project_id=table_reference.projectId, + dataset_id=table_reference.datasetId, + table_id=table_reference.tableId) + except Exception as e: + if schema is None: + _LOGGER.warning( + "Input schema is absent and could not fetch the final " + "destination table's schema [%s]. Creating temp table [%s] " + "will likely fail: %s", + hashed_dest, + job_name, + e) + destination_table = None + # we need to create temp tables, so we need a schema. # if there is no input schema, fetch the destination table's schema if schema is None: - hashed_dest = bigquery_tools.get_hashable_destination(table_reference) if hashed_dest in self.schema_cache: schema = self.schema_cache[hashed_dest] - else: + elif destination_table is not None: try: - schema = bigquery_tools.table_schema_to_dict( - bigquery_tools.BigQueryWrapper().get_table( - project_id=table_reference.projectId, - dataset_id=table_reference.datasetId, - table_id=table_reference.tableId).schema) + schema = bigquery_tools.table_schema_to_dict(destination_table.schema) self.schema_cache[hashed_dest] = schema except Exception as e: _LOGGER.warning( @@ -758,6 +801,11 @@ def process( job_name, e) + if (destination_table is not None and + not _has_partitioning_load_parameters(additional_parameters)): + additional_parameters = _add_destination_partitioning_load_parameters( + additional_parameters, destination_table) + # If we are using temporary tables, then we must always create the # temporary tables, so we replace the create_disposition. create_disposition = 'CREATE_IF_NEEDED' diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 191719e6a208..f57b10c06cf6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -703,6 +703,77 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock): sleep_mock.assert_called_once() + def test_temporary_table_load_inherits_destination_time_partitioning(self): + destination = 'project1:dataset1.table1' + partition = (destination, (0, ['gs://bucket/file1'])) + job_reference = bigquery_api.JobReference(projectId='project1', + jobId='job_name1') + destination_table = bigquery_api.Table( + timePartitioning=bigquery_api.TimePartitioning(type='DAY')) + + dofn = bqfl.TriggerLoadJobs( + schema=_ELEMENTS_SCHEMA, + test_client=mock.Mock(), + temporary_tables=True) + dofn.start_bundle() + dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table) + dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) + + list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0))) + + load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs + self.assertEqual( + load_call['additional_load_parameters']['timePartitioning'], + destination_table.timePartitioning) + dofn.bq_wrapper.get_table.assert_called_once() + + def test_temporary_table_load_inherits_destination_range_partitioning(self): + destination = 'project1:dataset1.table1' + partition = (destination, (0, ['gs://bucket/file1'])) + job_reference = bigquery_api.JobReference(projectId='project1', + jobId='job_name1') + destination_table = bigquery_api.Table( + rangePartitioning=bigquery_api.RangePartitioning()) + + dofn = bqfl.TriggerLoadJobs( + schema=_ELEMENTS_SCHEMA, + test_client=mock.Mock(), + temporary_tables=True) + dofn.start_bundle() + dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table) + dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) + + list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0))) + + load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs + self.assertEqual( + load_call['additional_load_parameters']['rangePartitioning'], + destination_table.rangePartitioning) + dofn.bq_wrapper.get_table.assert_called_once() + + def test_temporary_table_load_keeps_explicit_partitioning_parameters(self): + destination = 'project1:dataset1.table1' + partition = (destination, (0, ['gs://bucket/file1'])) + explicit_partitioning = {'timePartitioning': {'type': 'DAY'}} + job_reference = bigquery_api.JobReference(projectId='project1', + jobId='job_name1') + + dofn = bqfl.TriggerLoadJobs( + schema=_ELEMENTS_SCHEMA, + test_client=mock.Mock(), + temporary_tables=True, + additional_bq_parameters=explicit_partitioning) + dofn.start_bundle() + dofn.bq_wrapper.get_table = mock.Mock() + dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) + + list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0))) + + load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs + self.assertEqual(load_call['additional_load_parameters'], + explicit_partitioning) + dofn.bq_wrapper.get_table.assert_not_called() + def test_multiple_partition_files(self): destination = 'project1:dataset1.table1' From d944d084228bf9c82eae3bb0e54ed926257344c7 Mon Sep 17 00:00:00 2001 From: PRADDZY <64481960+PRADDZY@users.noreply.github.com> Date: Sun, 7 Jun 2026 18:19:59 +0530 Subject: [PATCH 2/5] Apply yapf formatting for temp file loads --- .../apache_beam/io/gcp/bigquery_file_loads.py | 8 ++++--- .../io/gcp/bigquery_file_loads_test.py | 24 ++++++++----------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index b7f846d79e74..60b9495504fc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -83,8 +83,9 @@ def _has_partitioning_load_parameters(additional_parameters): - return ('timePartitioning' in additional_parameters or - 'rangePartitioning' in additional_parameters) + return ( + 'timePartitioning' in additional_parameters or + 'rangePartitioning' in additional_parameters) def _add_destination_partitioning_load_parameters( @@ -790,7 +791,8 @@ def process( schema = self.schema_cache[hashed_dest] elif destination_table is not None: try: - schema = bigquery_tools.table_schema_to_dict(destination_table.schema) + schema = bigquery_tools.table_schema_to_dict( + destination_table.schema) self.schema_cache[hashed_dest] = schema except Exception as e: _LOGGER.warning( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index f57b10c06cf6..dab19b2a1869 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -706,15 +706,13 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock): def test_temporary_table_load_inherits_destination_time_partitioning(self): destination = 'project1:dataset1.table1' partition = (destination, (0, ['gs://bucket/file1'])) - job_reference = bigquery_api.JobReference(projectId='project1', - jobId='job_name1') + job_reference = bigquery_api.JobReference( + projectId='project1', jobId='job_name1') destination_table = bigquery_api.Table( timePartitioning=bigquery_api.TimePartitioning(type='DAY')) dofn = bqfl.TriggerLoadJobs( - schema=_ELEMENTS_SCHEMA, - test_client=mock.Mock(), - temporary_tables=True) + schema=_ELEMENTS_SCHEMA, test_client=mock.Mock(), temporary_tables=True) dofn.start_bundle() dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table) dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) @@ -730,15 +728,13 @@ def test_temporary_table_load_inherits_destination_time_partitioning(self): def test_temporary_table_load_inherits_destination_range_partitioning(self): destination = 'project1:dataset1.table1' partition = (destination, (0, ['gs://bucket/file1'])) - job_reference = bigquery_api.JobReference(projectId='project1', - jobId='job_name1') + job_reference = bigquery_api.JobReference( + projectId='project1', jobId='job_name1') destination_table = bigquery_api.Table( rangePartitioning=bigquery_api.RangePartitioning()) dofn = bqfl.TriggerLoadJobs( - schema=_ELEMENTS_SCHEMA, - test_client=mock.Mock(), - temporary_tables=True) + schema=_ELEMENTS_SCHEMA, test_client=mock.Mock(), temporary_tables=True) dofn.start_bundle() dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table) dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) @@ -755,8 +751,8 @@ def test_temporary_table_load_keeps_explicit_partitioning_parameters(self): destination = 'project1:dataset1.table1' partition = (destination, (0, ['gs://bucket/file1'])) explicit_partitioning = {'timePartitioning': {'type': 'DAY'}} - job_reference = bigquery_api.JobReference(projectId='project1', - jobId='job_name1') + job_reference = bigquery_api.JobReference( + projectId='project1', jobId='job_name1') dofn = bqfl.TriggerLoadJobs( schema=_ELEMENTS_SCHEMA, @@ -770,8 +766,8 @@ def test_temporary_table_load_keeps_explicit_partitioning_parameters(self): list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0))) load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs - self.assertEqual(load_call['additional_load_parameters'], - explicit_partitioning) + self.assertEqual( + load_call['additional_load_parameters'], explicit_partitioning) dofn.bq_wrapper.get_table.assert_not_called() def test_multiple_partition_files(self): From 33c4a9c70db6077e718e7edc278a6fc6e013585f Mon Sep 17 00:00:00 2001 From: PRADDZY <64481960+PRADDZY@users.noreply.github.com> Date: Mon, 8 Jun 2026 11:38:33 +0530 Subject: [PATCH 3/5] Cache temp load destination metadata --- .../apache_beam/io/gcp/bigquery_file_loads.py | 25 +++++--- .../io/gcp/bigquery_file_loads_test.py | 62 ++++++++++++++++++- 2 files changed, 75 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 60b9495504fc..6444660d6e19 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -90,7 +90,7 @@ def _has_partitioning_load_parameters(additional_parameters): def _add_destination_partitioning_load_parameters( additional_parameters, destination_table): - if not isinstance(destination_table, bigquery_tools.bigquery.Table): + if destination_table is None: return additional_parameters additional_parameters = dict(additional_parameters) @@ -714,6 +714,7 @@ def start_bundle(self): self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) self.pending_jobs = [] self.schema_cache = {} + self.destination_table_cache = {} def process( self, @@ -764,17 +765,21 @@ def process( if self.temporary_tables: destination_table = None hashed_dest = bigquery_tools.get_hashable_destination(table_reference) - should_lookup_destination_table = ( - schema is None or - not _has_partitioning_load_parameters(additional_parameters)) - if should_lookup_destination_table: + need_schema = schema is None and hashed_dest not in self.schema_cache + need_partitioning = not _has_partitioning_load_parameters( + additional_parameters) + if need_schema or need_partitioning: try: - destination_table = self.bq_wrapper.get_table( - project_id=table_reference.projectId, - dataset_id=table_reference.datasetId, - table_id=table_reference.tableId) + if hashed_dest in self.destination_table_cache: + destination_table = self.destination_table_cache[hashed_dest] + else: + destination_table = self.bq_wrapper.get_table( + project_id=table_reference.projectId, + dataset_id=table_reference.datasetId, + table_id=table_reference.tableId) + self.destination_table_cache[hashed_dest] = destination_table except Exception as e: - if schema is None: + if need_schema: _LOGGER.warning( "Input schema is absent and could not fetch the final " "destination table's schema [%s]. Creating temp table [%s] " diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index dab19b2a1869..9a91fd84cbba 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -723,7 +723,8 @@ def test_temporary_table_load_inherits_destination_time_partitioning(self): self.assertEqual( load_call['additional_load_parameters']['timePartitioning'], destination_table.timePartitioning) - dofn.bq_wrapper.get_table.assert_called_once() + dofn.bq_wrapper.get_table.assert_called_once_with( + project_id='project1', dataset_id='dataset1', table_id='table1') def test_temporary_table_load_inherits_destination_range_partitioning(self): destination = 'project1:dataset1.table1' @@ -745,7 +746,8 @@ def test_temporary_table_load_inherits_destination_range_partitioning(self): self.assertEqual( load_call['additional_load_parameters']['rangePartitioning'], destination_table.rangePartitioning) - dofn.bq_wrapper.get_table.assert_called_once() + dofn.bq_wrapper.get_table.assert_called_once_with( + project_id='project1', dataset_id='dataset1', table_id='table1') def test_temporary_table_load_keeps_explicit_partitioning_parameters(self): destination = 'project1:dataset1.table1' @@ -770,6 +772,62 @@ def test_temporary_table_load_keeps_explicit_partitioning_parameters(self): load_call['additional_load_parameters'], explicit_partitioning) dofn.bq_wrapper.get_table.assert_not_called() + def test_temporary_table_load_uses_cached_schema_with_explicit_partitioning( + self): + destination = 'project1:dataset1.table1' + partition = (destination, (0, ['gs://bucket/file1'])) + explicit_partitioning = {'timePartitioning': {'type': 'DAY'}} + job_reference = bigquery_api.JobReference( + projectId='project1', jobId='job_name1') + table_reference = bigquery_tools.parse_table_reference(destination) + hashed_dest = bigquery_tools.get_hashable_destination(table_reference) + + dofn = bqfl.TriggerLoadJobs( + schema=None, + test_client=mock.Mock(), + temporary_tables=True, + additional_bq_parameters=explicit_partitioning) + dofn.start_bundle() + dofn.schema_cache[hashed_dest] = _ELEMENTS_SCHEMA + dofn.bq_wrapper.get_table = mock.Mock() + dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) + + list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0))) + + load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs + self.assertEqual(load_call['schema'], _ELEMENTS_SCHEMA) + self.assertEqual( + load_call['additional_load_parameters'], explicit_partitioning) + dofn.bq_wrapper.get_table.assert_not_called() + + def test_temporary_table_load_caches_destination_table_per_bundle(self): + destination = 'project1:dataset1.table1' + first_partition = (destination, (0, ['gs://bucket/file1'])) + second_partition = (destination, (1, ['gs://bucket/file2'])) + job_reference = bigquery_api.JobReference( + projectId='project1', jobId='job_name1') + destination_table = bigquery_api.Table( + timePartitioning=bigquery_api.TimePartitioning(type='DAY')) + + dofn = bqfl.TriggerLoadJobs( + schema=_ELEMENTS_SCHEMA, test_client=mock.Mock(), temporary_tables=True) + dofn.start_bundle() + dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table) + dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) + + list( + dofn.process(first_partition, 'test_job', pane_info=mock.Mock(index=0))) + list( + dofn.process( + second_partition, 'test_job', pane_info=mock.Mock(index=1))) + + dofn.bq_wrapper.get_table.assert_called_once_with( + project_id='project1', dataset_id='dataset1', table_id='table1') + load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs + self.assertEqual( + load_call['additional_load_parameters']['timePartitioning'], + destination_table.timePartitioning) + def test_multiple_partition_files(self): destination = 'project1:dataset1.table1' From 5fd3f693c166ec4a7ad6719791bd94cd1f4b687a Mon Sep 17 00:00:00 2001 From: PRADDZY <64481960+PRADDZY@users.noreply.github.com> Date: Mon, 8 Jun 2026 18:31:48 +0530 Subject: [PATCH 4/5] Filter invalid temp load partition metadata --- .../apache_beam/io/gcp/bigquery_file_loads.py | 16 ++++++------ .../io/gcp/bigquery_file_loads_test.py | 26 +++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 6444660d6e19..1baa2fd91c22 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -94,16 +94,16 @@ def _add_destination_partitioning_load_parameters( return additional_parameters additional_parameters = dict(additional_parameters) + time_partitioning = getattr(destination_table, 'timePartitioning', None) + range_partitioning = getattr(destination_table, 'rangePartitioning', None) if ('timePartitioning' not in additional_parameters and - getattr(destination_table, 'timePartitioning', None) is not None): - additional_parameters['timePartitioning'] = ( - destination_table.timePartitioning) - - if ('rangePartitioning' not in additional_parameters and - getattr(destination_table, 'rangePartitioning', None) is not None): - additional_parameters['rangePartitioning'] = ( - destination_table.rangePartitioning) + isinstance(time_partitioning, bigquery_tools.bigquery.TimePartitioning)): + additional_parameters['timePartitioning'] = time_partitioning + + if ('rangePartitioning' not in additional_parameters and isinstance( + range_partitioning, bigquery_tools.bigquery.RangePartitioning)): + additional_parameters['rangePartitioning'] = range_partitioning return additional_parameters diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 9a91fd84cbba..fda4a3e9d520 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -828,6 +828,32 @@ def test_temporary_table_load_caches_destination_table_per_bundle(self): load_call['additional_load_parameters']['timePartitioning'], destination_table.timePartitioning) + def test_temporary_table_load_ignores_invalid_mock_partitioning_metadata( + self): + destination = 'project1:dataset1.table1' + partition = (destination, (0, ['gs://bucket/file1'])) + job_reference = bigquery_api.JobReference( + projectId='project1', jobId='job_name1') + destination_table = mock.Mock() + destination_table.timePartitioning = mock.Mock() + destination_table.rangePartitioning = mock.Mock() + + dofn = bqfl.TriggerLoadJobs( + schema=_ELEMENTS_SCHEMA, test_client=mock.Mock(), temporary_tables=True) + dofn.start_bundle() + dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table) + dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference) + + list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0))) + + load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs + self.assertNotIn( + 'timePartitioning', load_call['additional_load_parameters']) + self.assertNotIn( + 'rangePartitioning', load_call['additional_load_parameters']) + dofn.bq_wrapper.get_table.assert_called_once_with( + project_id='project1', dataset_id='dataset1', table_id='table1') + def test_multiple_partition_files(self): destination = 'project1:dataset1.table1' From 02ee64b8ac43f0e1020ee590eb0d221e65406a25 Mon Sep 17 00:00:00 2001 From: PRADDZY <64481960+PRADDZY@users.noreply.github.com> Date: Mon, 8 Jun 2026 22:04:53 +0530 Subject: [PATCH 5/5] Retrigger cancelled CI workflow