From b13e6d3de9bbff4e7dadd729241f7dcd06958976 Mon Sep 17 00:00:00 2001 From: Kevin Zhao Date: Mon, 9 Mar 2026 02:13:37 +0000 Subject: [PATCH 1/3] fix: remove predictable GCS bucket names to prevent bucket squatting The fix for CVE-2026-2473 in v1.133.0 patched metadata/_models.py but missed two other locations in utils/gcs_utils.py that construct GCS bucket names from predictable inputs (project ID + region): - stage_local_data_in_gcs(): "{project}-vertex-staging-{location}" - generate_gcs_directory_for_pipeline_artifacts(): "{project}-vertex-pipelines-{location}" An attacker who knows a victim's project ID and region can pre-register these bucket names, causing the SDK to silently upload model artifacts, training data, and pipeline outputs to attacker-controlled storage. Apply the same fix pattern: require explicit bucket configuration via aiplatform.init(staging_bucket=...) instead of auto-generating predictable names. --- google/cloud/aiplatform/utils/gcs_utils.py | 39 +++++++++------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 69f57f311f..602fb992c7 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -194,26 +194,12 @@ def stage_local_data_in_gcs( staging_gcs_dir = staging_gcs_dir or initializer.global_config.staging_bucket if not staging_gcs_dir: - project = project or initializer.global_config.project - location = location or initializer.global_config.location - credentials = credentials or initializer.global_config.credentials - # Creating the bucket if it does not exist. - # Currently we only do this when staging_gcs_dir is not specified. - # The buckets that we create are regional. - # This prevents errors when some service required regional bucket. - # E.g. "FailedPrecondition: 400 The Cloud Storage bucket of `gs://...` is in location `us`. It must be in the same regional location as the service location `us-central1`." - # We are making the bucket name region-specific since the bucket is regional. - staging_bucket_name = project + "-vertex-staging-" + location - client = storage.Client(project=project, credentials=credentials) - staging_bucket = storage.Bucket(client=client, name=staging_bucket_name) - if not staging_bucket.exists(): - _logger.info(f'Creating staging GCS bucket "{staging_bucket_name}"') - staging_bucket = client.create_bucket( - bucket_or_name=staging_bucket, - project=project, - location=location, - ) - staging_gcs_dir = "gs://" + staging_bucket_name + raise RuntimeError( + "staging_gcs_dir should be passed to stage_local_data_in_gcs or " + "should be set using aiplatform.init(staging_bucket='gs://my-bucket'). " + "This is required to prevent the use of predictable bucket names " + "which could be exploited via bucket squatting attacks." + ) timestamp = datetime.datetime.now().isoformat(sep="-", timespec="milliseconds") staging_gcs_subdir = ( @@ -248,11 +234,16 @@ def generate_gcs_directory_for_pipeline_artifacts( Returns: Google Cloud Storage URI of the staged data. """ - project = project or initializer.global_config.project - location = location or initializer.global_config.location + pipeline_root = initializer.global_config.staging_bucket + if not pipeline_root: + raise RuntimeError( + "pipeline_root should be passed to PipelineJob or " + "should be set using aiplatform.init(staging_bucket='gs://my-bucket'). " + "This is required to prevent the use of predictable bucket names " + "which could be exploited via bucket squatting attacks." + ) - pipelines_bucket_name = project + "-vertex-pipelines-" + location - output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/" + output_artifacts_gcs_dir = pipeline_root.rstrip("/") + "/output_artifacts/" return output_artifacts_gcs_dir From 42d3996e1b727feefc862cc5fa856249354d12e7 Mon Sep 17 00:00:00 2001 From: Kevin Zhao Date: Mon, 9 Mar 2026 02:19:25 +0000 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20updat?= =?UTF-8?q?e=20tests,=20add=20validation,=20fix=20docstrings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Update test_generate_gcs_directory_for_pipeline_artifacts to test both success (with staging_bucket set) and failure (RuntimeError) - Update test_create_gcs_bucket_for_pipeline_artifacts to pass explicit output_artifacts_gcs_dir instead of relying on auto-generation - Add validate_gcs_path() call in generate_gcs_directory_for_pipeline_artifacts - Add Raises section to docstrings for new RuntimeError conditions - Mark deprecated parameters in generate_gcs_directory_for_pipeline_artifacts --- google/cloud/aiplatform/utils/gcs_utils.py | 17 ++++++++--- tests/unit/aiplatform/test_utils.py | 35 +++++++++++++++------- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 602fb992c7..b225d4371e 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -185,6 +185,8 @@ def stage_local_data_in_gcs( Raises: RuntimeError: When source_path does not exist. + RuntimeError: When staging_gcs_dir is not provided and staging_bucket + is not configured via aiplatform.init(). GoogleCloudError: When the upload process fails. """ data_path_obj = pathlib.Path(data_path) @@ -225,14 +227,20 @@ def generate_gcs_directory_for_pipeline_artifacts( project: Optional[str] = None, location: Optional[str] = None, ): - """Gets or creates the GCS directory for Vertex Pipelines artifacts. + """Gets the GCS directory for Vertex Pipelines artifacts. + + Requires staging_bucket to be configured via aiplatform.init(). + The project and location parameters are deprecated and ignored. Args: - project: Optional. Google Cloud Project that contains the staging bucket. - location: Optional. Google Cloud location to use for the staging bucket. + project: Deprecated. No longer used. + location: Deprecated. No longer used. Returns: - Google Cloud Storage URI of the staged data. + Google Cloud Storage URI for pipeline artifacts. + + Raises: + RuntimeError: When staging_bucket is not configured via aiplatform.init(). """ pipeline_root = initializer.global_config.staging_bucket if not pipeline_root: @@ -242,6 +250,7 @@ def generate_gcs_directory_for_pipeline_artifacts( "This is required to prevent the use of predictable bucket names " "which could be exploited via bucket squatting attacks." ) + validate_gcs_path(pipeline_root) output_artifacts_gcs_dir = pipeline_root.rstrip("/") + "/output_artifacts/" return output_artifacts_gcs_dir diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index 3bf19eac1c..8ffa9e45c9 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -577,11 +577,27 @@ def test_stage_local_data_in_gcs( == f"{staging_gcs_dir}/vertex_ai_auto_staging/{timestamp}/test.json" ) - def test_generate_gcs_directory_for_pipeline_artifacts(self): - output = gcs_utils.generate_gcs_directory_for_pipeline_artifacts( - "project", "us-central1" - ) - assert output == "gs://project-vertex-pipelines-us-central1/output_artifacts/" + def test_generate_gcs_directory_for_pipeline_artifacts_with_staging_bucket(self): + with patch.object( + gcs_utils.initializer.global_config, + "staging_bucket", + "gs://my-staging-bucket", + ): + output = gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + "project", "us-central1" + ) + assert output == "gs://my-staging-bucket/output_artifacts/" + + def test_generate_gcs_directory_for_pipeline_artifacts_raises_without_staging_bucket( + self, + ): + with patch.object( + gcs_utils.initializer.global_config, "staging_bucket", None + ): + with pytest.raises(RuntimeError, match="pipeline_root should be passed"): + gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + "project", "us-central1" + ) @patch.object(storage.Bucket, "exists", return_value=False) @patch.object(storage, "Client") @@ -593,15 +609,14 @@ def test_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( ): output = ( gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( - project="test-project", location="us-central1" + output_artifacts_gcs_dir="gs://my-bucket/output_artifacts/", + project="test-project", + location="us-central1", ) ) assert mock_storage_client.called assert mock_bucket_not_exist.called - assert mock_get_project_number.called - assert ( - output == "gs://test-project-vertex-pipelines-us-central1/output_artifacts/" - ) + assert output == "gs://my-bucket/output_artifacts/" def test_download_from_gcs_dir( self, mock_storage_client_list_blobs, mock_storage_blob_download_to_filename From 88b5c19033532e2810916cf45e39ea35243a5386 Mon Sep 17 00:00:00 2001 From: Kevin Zhao Date: Tue, 10 Mar 2026 23:24:52 +0000 Subject: [PATCH 3/3] fix: add validate_gcs_path to stage_local_data_in_gcs for consistency Add the same gs:// prefix validation that generate_gcs_directory_for_pipeline_artifacts already has, ensuring both code paths validate user-provided GCS paths. --- google/cloud/aiplatform/utils/gcs_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index b225d4371e..02d5b9f6ed 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -187,6 +187,7 @@ def stage_local_data_in_gcs( RuntimeError: When source_path does not exist. RuntimeError: When staging_gcs_dir is not provided and staging_bucket is not configured via aiplatform.init(). + ValueError: When staging_gcs_dir does not have a 'gs://' prefix. GoogleCloudError: When the upload process fails. """ data_path_obj = pathlib.Path(data_path) @@ -202,6 +203,7 @@ def stage_local_data_in_gcs( "This is required to prevent the use of predictable bucket names " "which could be exploited via bucket squatting attacks." ) + validate_gcs_path(staging_gcs_dir) timestamp = datetime.datetime.now().isoformat(sep="-", timespec="milliseconds") staging_gcs_subdir = (