Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions tests/integration/distributed/utils/networking_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import uuid
from textwrap import dedent

from google.cloud.aiplatform_v1.types import env_var
from parameterized import param, parameterized

from gigl.common.constants import DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU
from gigl.common.services.vertex_ai import VertexAiJobConfig, VertexAIService
from gigl.common.utils.proto_utils import ProtoUtils
from gigl.env.constants import GIGL_RESOURCE_CONFIG_URI_ENV_KEY
from gigl.env.pipelines_config import get_resource_config
from gigl.src.common.utils.file_loader import FileLoader
from tests.test_assets.test_case import TestCase


Expand All @@ -24,8 +28,31 @@ def setUp(self):
service_account=self._service_account,
staging_bucket=self._staging_bucket,
)

# get_graph_store_info() (run on the launched workers) calls
# get_resource_config() to build the readiness URI, so the workers need a
# resource config they can read. The test runner's resource config URI may
# be a local path that does not exist on the worker image, so we upload the
# in-memory resource config to the regional bucket (which the workers can
# read from GCS) and pass that URI via GIGL_RESOURCE_CONFIG_URI.
self._file_loader = FileLoader()
self._remote_resource_config_uri = (
self._resource_config.temp_assets_regional_bucket_path
/ "gigl"
/ "integration_tests"
/ "networking"
/ f"resource_config_{uuid.uuid4()}.yaml"
)
ProtoUtils().write_proto_to_yaml(
proto=self._resource_config.resource_config,
uri=self._remote_resource_config_uri,
)
super().setUp()

def tearDown(self):
self._file_loader.delete_files([self._remote_resource_config_uri])
super().tearDown()

@parameterized.expand(
[
param(
Expand Down Expand Up @@ -63,12 +90,22 @@ def test_get_graph_store_info(self, _, storage_nodes, compute_nodes):
"""
),
]
# launch_graph_store_job propagates the compute pool's environment_variables
# to both the compute and storage container specs, so the uploaded resource
# config URI is visible to every worker.
resource_config_env_vars = [
env_var.EnvVar(
name=GIGL_RESOURCE_CONFIG_URI_ENV_KEY,
value=self._remote_resource_config_uri.uri,
)
]
compute_cluster_config = VertexAiJobConfig(
job_name=job_name,
container_uri=DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU,
replica_count=compute_nodes,
command=command,
machine_type="n2-standard-8",
environment_variables=resource_config_env_vars,
)
storage_cluster_config = VertexAiJobConfig(
job_name=job_name,
Expand Down