Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
# go/keep-sorted start
"PyYAML>=6.0.2, <7.0.0", # For APIHubToolset.
"aiosqlite>=0.21.0", # For SQLite database
"agentic_sandbox @ git+https://github.com/kubernetes-sigs/agent-sandbox.git@main#subdirectory=clients/python/agentic-sandbox-client", # For Agent Sandboxed Code Execution
Comment thread
SHRUTI6991 marked this conversation as resolved.
Outdated
"anyio>=4.9.0, <5.0.0", # For MCP Session Manager
"authlib>=1.5.1, <2.0.0", # For RestAPI Tool
"click>=8.1.8, <9.0.0", # For CLI tools
Expand Down
56 changes: 49 additions & 7 deletions src/google/adk/code_executors/gke_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import logging
import uuid

from agentic_sandbox import SandboxClient

import kubernetes as k8s
from kubernetes.watch import Watch

Expand Down Expand Up @@ -70,6 +72,8 @@ class GkeCodeExecutor(BaseCodeExecutor):
namespace: str = "default"
image: str = "python:3.11-slim"
timeout_seconds: int = 300
executor_type: str = "job" # "job" or "sandbox"
sandbox_gateway_name: str | None = None
cpu_requested: str = "200m"
mem_requested: str = "256Mi"
# The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core.
Expand All @@ -84,6 +88,8 @@ class GkeCodeExecutor(BaseCodeExecutor):

def __init__(
self,
executor_type: str = "job",
Comment thread
SHRUTI6991 marked this conversation as resolved.
sandbox_gateway_name: str | None = None,
kubeconfig_path: str | None = None,
kubeconfig_context: str | None = None,
**data,
Expand All @@ -96,6 +102,8 @@ def __init__(
3. Automatically via the default local kubeconfig file (~/.kube/config).
"""
super().__init__(**data)
self.executor_type = executor_type
self.sandbox_gateway_name = sandbox_gateway_name
self.kubeconfig_path = kubeconfig_path
self.kubeconfig_context = kubeconfig_context

Expand Down Expand Up @@ -136,11 +144,29 @@ def __init__(
self._batch_v1 = client.BatchV1Api()
self._core_v1 = client.CoreV1Api()

def execute_code(
self,
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
) -> CodeExecutionResult:
def _execute_in_sandbox(self, code: str) -> CodeExecutionResult:
"""Executes code using Agent Sandbox Client."""
try:
with SandboxClient(
template_name="python-sandbox-template",
Comment thread
SHRUTI6991 marked this conversation as resolved.
Outdated
gateway_name=self.sandbox_gateway_name,
namespace=self.namespace
) as sandbox:
# Execute the code as a python script
logging.debug("Executing code in sandbox:\n```\n%s\n```", code)
Comment thread
SHRUTI6991 marked this conversation as resolved.
Outdated
sandbox.write("script.py", code)
result = sandbox.run("python3 script.py")

return CodeExecutionResult(
stdout=result.stdout,
stderr=result.stderr if result.stderr else None
Comment thread
SHRUTI6991 marked this conversation as resolved.
Outdated
)
except Exception as e:
return CodeExecutionResult(
stderr=f"Sandbox execution failed: {str(e)}",
Comment thread
SHRUTI6991 marked this conversation as resolved.
Outdated
)

def _execute_as_job(self, code: str, invocation_context: InvocationContext) -> CodeExecutionResult:
"""Orchestrates the secure execution of a code snippet on GKE."""
job_name = f"adk-exec-{uuid.uuid4().hex[:10]}"
configmap_name = f"code-src-{job_name}"
Expand All @@ -150,7 +176,7 @@ def execute_code(
# 1. Create a ConfigMap to mount LLM-generated code into the Pod.
# 2. Create a Job that runs the code from the ConfigMap.
# 3. Set the Job as the ConfigMap's owner for automatic cleanup.
self._create_code_configmap(configmap_name, code_execution_input.code)
self._create_code_configmap(configmap_name, code)
job_manifest = self._create_job_manifest(
job_name, configmap_name, invocation_context
)
Expand All @@ -162,7 +188,7 @@ def execute_code(
logger.info(
f"Submitted Job '{job_name}' to namespace '{self.namespace}'."
)
logger.debug("Executing code:\n```\n%s\n```", code_execution_input.code)
logger.debug("Executing code:\n```\n%s\n```", code)
return self._watch_job_completion(job_name)

except ApiException as e:
Expand All @@ -186,6 +212,22 @@ def execute_code(
stderr=f"An unexpected executor error occurred: {e}"
)

def execute_code(
self,
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
) -> CodeExecutionResult:

"""Overrides the base method to route execution based on executor_type."""

code = code_execution_input.code

if self.executor_type == "sandbox":
return self._execute_in_sandbox(code)
else:
# Fallback to existing GKE Job logic
return self._execute_as_job(code, invocation_context)
Comment thread
SHRUTI6991 marked this conversation as resolved.
Outdated

def _create_job_manifest(
self,
job_name: str,
Expand Down
77 changes: 77 additions & 0 deletions tests/unittests/code_executors/test_gke_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def test_init_defaults(self):
assert executor.timeout_seconds == 300
assert executor.cpu_requested == "200m"
assert executor.mem_limit == "512Mi"
assert executor.executor_type == "job"

def test_init_with_overrides(self):
"""Tests that class attributes can be overridden at instantiation."""
Expand All @@ -79,11 +80,13 @@ def test_init_with_overrides(self):
image="custom-python:latest",
timeout_seconds=60,
cpu_limit="1000m",
executor_type="sandbox",
)
assert executor.namespace == "test-ns"
assert executor.image == "custom-python:latest"
assert executor.timeout_seconds == 60
assert executor.cpu_limit == "1000m"
assert executor.executor_type == "sandbox"

@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_success(
Expand Down Expand Up @@ -225,3 +228,77 @@ def test_create_job_manifest_structure(self, mock_invocation_context):
assert sec_context.allow_privilege_escalation is False
assert sec_context.read_only_root_filesystem is True
assert sec_context.capabilities.drop == ["ALL"]

Comment thread
SHRUTI6991 marked this conversation as resolved.
@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
def test_execute_code_forks_to_sandbox(
self,
mock_sandbox_client,
mock_invocation_context,
mock_k8s_clients,
):
"""Tests that execute_code uses SandboxClient when executor_type='sandbox'."""
# Setup Sandbox mock
mock_sandbox_instance = (
mock_sandbox_client.return_value.__enter__.return_value
)
mock_run_result = MagicMock()
mock_run_result.stdout = "sandbox stdout"
mock_run_result.stderr = None
mock_sandbox_instance.run.return_value = mock_run_result

# Instantiate with sandbox type
executor = GkeCodeExecutor(executor_type="sandbox")
code_input = CodeExecutionInput(code='print("sandbox")')

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == "sandbox stdout"

# Verify SandboxClient was used
mock_sandbox_client.assert_called_once()
mock_sandbox_instance.run.assert_called_once()

# Verify Job path was NOT taken
mock_k8s_clients["batch_v1"].create_namespaced_job.assert_not_called()

@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_forks_to_job(
self,
mock_watch,
mock_sandbox_client,
mock_invocation_context,
mock_k8s_clients,
):
"""Tests that execute_code uses K8s Job when executor_type='job'."""
# Setup K8s Job mocks (success path)
mock_job = MagicMock()
mock_job.status.succeeded = True
mock_watch.return_value.stream.return_value = [{"object": mock_job}]

mock_pod = MagicMock()
mock_pod.metadata.name = "pod-1"
mock_k8s_clients["core_v1"].list_namespaced_pod.return_value.items = [
mock_pod
]
mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = (
"job stdout"
)

# Instantiate with job type
executor = GkeCodeExecutor(executor_type="job")
code_input = CodeExecutionInput(code='print("job")')

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == "job stdout"

# Verify Job path WAS taken
mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once()

# Verify SandboxClient was NOT used
mock_sandbox_client.assert_not_called()