From 7713bafdb11c5712246d113a99442379e2724574 Mon Sep 17 00:00:00 2001 From: jrmccluskey Date: Mon, 8 Jun 2026 11:46:20 -0400 Subject: [PATCH 1/3] Fix parsing of dataflow api endpoint URLs to remove trailing slash --- ...ommit_Python_ValidatesRunner_Dataflow.json | 2 +- .../runners/dataflow/internal/apiclient.py | 3 +- .../dataflow/internal/apiclient_test.py | 35 +++++++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Dataflow.json index c537844dc84a..e0266d62f2e0 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 4 } diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index fdf2e0ea03e3..64ca566aa97e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -519,11 +519,12 @@ def __init__(self, options, root_staging_location=None): client_options = None transport = None if self.google_cloud_options.dataflow_endpoint: - endpoint = self.google_cloud_options.dataflow_endpoint + endpoint = self.google_cloud_options.dataflow_endpoint.strip() if 'localhost' in endpoint or 'sandbox' in endpoint: transport = 'rest' else: endpoint = re.sub('^https?://', '', endpoint) + endpoint = endpoint.rstrip('/') client_options = client_options_lib.ClientOptions(api_endpoint=endpoint) self._jobs_client = dataflow.JobsV1Beta3Client( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index b34f06b64df4..63ad1b818486 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1298,6 +1298,41 @@ def test_template_file_generation_with_upload_graph(self): self.assertFalse(template_obj.get('steps')) self.assertTrue(template_obj['stepsLocation']) + def test_dataflow_endpoint_clean(self): + endpoints_and_expectations = [ + # (input_endpoint, expected_endpoint, expected_transport) + ('https://dataflow.googleapis.com/', 'dataflow.googleapis.com', None), + ('https://dataflow.googleapis.com ', 'dataflow.googleapis.com', None), + ('dataflow.googleapis.com/', 'dataflow.googleapis.com', None), + ('http://localhost:8080/', 'http://localhost:8080', 'rest'), + ('localhost:8080/', 'localhost:8080', 'rest'), + ] + + for input_ep, expected_ep, expected_transport in endpoints_and_expectations: + pipeline_options = PipelineOptions([ + '--project', + 'test-project', + '--temp_location', + 'gs://test-location/temp', + '--dataflow_endpoint', + input_ep, + '--no_auth', + ]) + with mock.patch('apache_beam.runners.dataflow.internal.apiclient.dataflow' + '.JobsV1Beta3Client') as mock_jobs: + with mock.patch( + 'apache_beam.runners.dataflow.internal.apiclient.dataflow' + '.MessagesV1Beta3Client'): + with mock.patch( + 'apache_beam.runners.dataflow.internal.apiclient.dataflow' + '.MetricsV1Beta3Client'): + apiclient.DataflowApplicationClient(pipeline_options) + called_kwargs = mock_jobs.call_args[1] + client_opts = called_kwargs.get('client_options') + self.assertIsNotNone(client_opts) + self.assertEqual(client_opts.api_endpoint, expected_ep) + self.assertEqual(called_kwargs.get('transport'), expected_transport) + def test_stage_resources(self): pipeline_options = PipelineOptions([ '--temp_location', From bfe8c269ce551c5754d1fc0b6c4a745e650acb87 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Mon, 8 Jun 2026 11:53:20 -0400 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../apache_beam/runners/dataflow/internal/apiclient_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 63ad1b818486..12c51d305145 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1327,7 +1327,8 @@ def test_dataflow_endpoint_clean(self): 'apache_beam.runners.dataflow.internal.apiclient.dataflow' '.MetricsV1Beta3Client'): apiclient.DataflowApplicationClient(pipeline_options) - called_kwargs = mock_jobs.call_args[1] + mock_jobs.assert_called_once() + called_kwargs = mock_jobs.call_args.kwargs client_opts = called_kwargs.get('client_options') self.assertIsNotNone(client_opts) self.assertEqual(client_opts.api_endpoint, expected_ep) From ef34afe8fb6c53ffcc81de27396a4a31aff48762 Mon Sep 17 00:00:00 2001 From: jrmccluskey Date: Mon, 8 Jun 2026 11:55:10 -0400 Subject: [PATCH 3/3] gemini existance suggestion --- .../runners/dataflow/internal/apiclient.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 64ca566aa97e..4755aea0bc2c 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -520,12 +520,13 @@ def __init__(self, options, root_staging_location=None): transport = None if self.google_cloud_options.dataflow_endpoint: endpoint = self.google_cloud_options.dataflow_endpoint.strip() - if 'localhost' in endpoint or 'sandbox' in endpoint: - transport = 'rest' - else: - endpoint = re.sub('^https?://', '', endpoint) - endpoint = endpoint.rstrip('/') - client_options = client_options_lib.ClientOptions(api_endpoint=endpoint) + if endpoint: + if 'localhost' in endpoint or 'sandbox' in endpoint: + transport = 'rest' + else: + endpoint = re.sub('^https?://', '', endpoint) + endpoint = endpoint.rstrip('/') + client_options = client_options_lib.ClientOptions(api_endpoint=endpoint) self._jobs_client = dataflow.JobsV1Beta3Client( credentials=gapic_credentials,