From 831f30c215740c4dcc489664860aaf0899860e18 Mon Sep 17 00:00:00 2001 From: amabito Date: Sun, 15 Mar 2026 10:45:15 +0900 Subject: [PATCH 1/2] feat: add governance plugin (budget, circuit breaker, degradation) Adds src/google/adk_community/governance/ with VeronicaGovernancePlugin, a BasePlugin that enforces per-agent and org-level budget limits, isolates failing agents via circuit breaker, blocks disallowed tools, and degrades to cheaper models when budget runs low. 56 tests, isort + pyink formatted, no extra dependencies. --- contributing/samples/governance/README.md | 55 ++ contributing/samples/governance/main.py | 129 +++++ src/google/adk_community/__init__.py | 1 + .../adk_community/governance/__init__.py | 23 + .../adk_community/governance/_budget.py | 110 ++++ .../governance/_circuit_breaker.py | 136 +++++ .../adk_community/governance/_degradation.py | 91 +++ .../adk_community/governance/_policy.py | 40 ++ .../governance/veronica_governance_plugin.py | 456 +++++++++++++++ tests/unittests/governance/__init__.py | 13 + tests/unittests/governance/test_budget.py | 77 +++ .../governance/test_circuit_breaker.py | 75 +++ .../unittests/governance/test_degradation.py | 79 +++ tests/unittests/governance/test_policy.py | 58 ++ .../test_veronica_governance_plugin.py | 548 ++++++++++++++++++ 15 files changed, 1891 insertions(+) create mode 100644 contributing/samples/governance/README.md create mode 100644 contributing/samples/governance/main.py create mode 100644 src/google/adk_community/governance/__init__.py create mode 100644 src/google/adk_community/governance/_budget.py create mode 100644 src/google/adk_community/governance/_circuit_breaker.py create mode 100644 src/google/adk_community/governance/_degradation.py create mode 100644 src/google/adk_community/governance/_policy.py create mode 100644 src/google/adk_community/governance/veronica_governance_plugin.py create mode 100644 tests/unittests/governance/__init__.py create mode 100644 tests/unittests/governance/test_budget.py create mode 100644 tests/unittests/governance/test_circuit_breaker.py create mode 100644 tests/unittests/governance/test_degradation.py create mode 100644 tests/unittests/governance/test_policy.py create mode 100644 tests/unittests/governance/test_veronica_governance_plugin.py diff --git a/contributing/samples/governance/README.md b/contributing/samples/governance/README.md new file mode 100644 index 0000000..f896dbc --- /dev/null +++ b/contributing/samples/governance/README.md @@ -0,0 +1,55 @@ +# Governance Plugin Example + +Budget enforcement, circuit breaking, and model degradation for ADK agents. + +## Quickstart + +```python +from google.adk_community.governance import GovernanceConfig, VeronicaGovernancePlugin + +plugin = VeronicaGovernancePlugin(GovernanceConfig(max_cost_usd=1.0)) + +runner = Runner(agent=agent, session_service=session_service, plugins=[plugin]) +``` + +That's it. The plugin intercepts model and tool callbacks automatically. + +## Setup + +```bash +pip install google-adk-community +export GOOGLE_API_KEY="your-key" +``` + +## Run the full example + +```bash +python main.py +``` + +## What it does + +The example creates three agents (orchestrator, researcher, summarizer) and +registers a `VeronicaGovernancePlugin` on the Runner. The plugin: + +- Enforces a $0.50 org budget and $0.25 per-agent budget +- Blocks the `shell_exec` tool +- Degrades to `gemini-2.0-flash-lite` when budget hits 70% +- Disables `web_search` during degradation +- Trips the circuit breaker after 3 consecutive failures + +After the run completes, the plugin logs a summary: + +``` +[GOVERNANCE] Run complete in 2.3s. Model calls: 4, Tool calls: 0. +[GOVERNANCE] Budget: $0.0023 / $0.5000 (0.5% used). +[GOVERNANCE] Agent 'researcher': $0.0012 / $0.2500. +[GOVERNANCE] Agent 'summarizer': $0.0008 / $0.2500. +``` + +If degradation triggers, the summary includes: + +``` +[GOVERNANCE] Degradation events (1): +[GOVERNANCE] Agent 'researcher' at 72.0% -- degraded gemini-2.5-flash -> gemini-2.0-flash-lite. +``` diff --git a/contributing/samples/governance/main.py b/contributing/samples/governance/main.py new file mode 100644 index 0000000..8f34c12 --- /dev/null +++ b/contributing/samples/governance/main.py @@ -0,0 +1,129 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Example: multi-agent workflow with governance plugin. + +This example shows how to register VeronicaGovernancePlugin on an ADK +Runner to enforce per-agent budgets, block tools, and degrade to a +cheaper model when budget runs low. + +Usage: + export GOOGLE_API_KEY="your-key" + python main.py +""" + +import asyncio +import logging + +from google.adk import Runner +from google.adk.agents import Agent +from google.adk.sessions import InMemorySessionService + +from google.adk_community.governance import ( + GovernanceConfig, + VeronicaGovernancePlugin, +) + +logging.basicConfig(level=logging.INFO, format="%(message)s") + + +def main(): + # Configure governance limits + config = GovernanceConfig( + max_cost_usd=0.50, # org-level: 50 cents + agent_max_cost_usd=0.25, # per-agent: 25 cents + failure_threshold=3, # circuit breaker after 3 failures + recovery_timeout_s=30.0, + degradation_threshold=0.7, # degrade at 70% budget + fallback_model="gemini-2.0-flash-lite", + blocked_tools=["shell_exec"], + disable_tools_on_degrade=["web_search"], + ) + + plugin = VeronicaGovernancePlugin(config=config) + + # Define agents + researcher = Agent( + model="gemini-2.5-flash", + name="researcher", + instruction=( + "You are a research assistant. Answer questions using your" + " knowledge. Be concise." + ), + ) + + summarizer = Agent( + model="gemini-2.5-flash", + name="summarizer", + instruction=( + "You summarize text provided to you. Keep summaries to 2-3" + " sentences." + ), + ) + + # Orchestrator delegates to sub-agents + orchestrator = Agent( + model="gemini-2.5-flash", + name="orchestrator", + instruction=( + "You coordinate research tasks. Use the researcher agent to" + " find information, then the summarizer to condense it." + ), + sub_agents=[researcher, summarizer], + ) + + # Create runner with governance plugin + session_service = InMemorySessionService() + runner = Runner( + agent=orchestrator, + app_name="governance_demo", + session_service=session_service, + plugins=[plugin], + ) + + async def run(): + session = await session_service.create_session( + app_name="governance_demo", + user_id="demo_user", + ) + + from google.genai import types + + user_message = types.Content( + role="user", + parts=[types.Part(text="What is agent governance?")], + ) + + async for event in runner.run_async( + session_id=session.id, + user_id="demo_user", + new_message=user_message, + ): + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + print(f"[{event.author}] {part.text[:200]}") + + # After run, the plugin logs a governance summary automatically. + # You can also inspect programmatically: + snap = plugin.budget.snapshot() + print(f"\nTotal spent: ${snap.org_spent_usd:.4f}") + for agent, spent in snap.agent_spent.items(): + print(f" {agent}: ${spent:.4f}") + + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/src/google/adk_community/__init__.py b/src/google/adk_community/__init__.py index 9a1dc35..48d5bd6 100644 --- a/src/google/adk_community/__init__.py +++ b/src/google/adk_community/__init__.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from . import governance from . import memory from . import sessions from . import version diff --git a/src/google/adk_community/governance/__init__.py b/src/google/adk_community/governance/__init__.py new file mode 100644 index 0000000..61b922c --- /dev/null +++ b/src/google/adk_community/governance/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Community governance plugins for ADK.""" + +from .veronica_governance_plugin import GovernanceConfig +from .veronica_governance_plugin import VeronicaGovernancePlugin + +__all__ = [ + "GovernanceConfig", + "VeronicaGovernancePlugin", +] diff --git a/src/google/adk_community/governance/_budget.py b/src/google/adk_community/governance/_budget.py new file mode 100644 index 0000000..bd281df --- /dev/null +++ b/src/google/adk_community/governance/_budget.py @@ -0,0 +1,110 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Budget tracking for governance plugin.""" + +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +import threading + + +@dataclass +class BudgetSnapshot: + """Read-only snapshot of current budget state.""" + + org_spent_usd: float + org_limit_usd: float + agent_spent: dict[str, float] + agent_limit_usd: float + + @property + def org_utilization(self) -> float: + if self.org_limit_usd <= 0: + return 1.0 + return self.org_spent_usd / self.org_limit_usd + + +class BudgetTracker: + """Thread-safe budget tracker with per-agent and org-level limits.""" + + def __init__( + self, + *, + org_limit_usd: float, + agent_limit_usd: float, + cost_per_1k_input_tokens: float, + cost_per_1k_output_tokens: float, + ) -> None: + self._org_limit_usd = org_limit_usd + self._agent_limit_usd = agent_limit_usd + self._cost_per_1k_input = cost_per_1k_input_tokens + self._cost_per_1k_output = cost_per_1k_output_tokens + self._org_spent_usd: float = 0.0 + self._agent_spent: dict[str, float] = {} + self._lock = threading.Lock() + + def estimate_cost( + self, + input_tokens: int, + output_tokens: int, + ) -> float: + """Estimate cost from token counts (clamped to non-negative).""" + raw = ( + max(input_tokens, 0) / 1000.0 * self._cost_per_1k_input + + max(output_tokens, 0) / 1000.0 * self._cost_per_1k_output + ) + return max(raw, 0.0) + + def check(self, agent_name: str) -> tuple[bool, str]: + """Check if agent is within budget. Returns (allowed, reason).""" + with self._lock: + if self._org_spent_usd >= self._org_limit_usd: + return False, ( + f"Org budget exhausted: ${self._org_spent_usd:.4f}" + f" / ${self._org_limit_usd:.4f}" + ) + agent_spent = self._agent_spent.get(agent_name, 0.0) + if agent_spent >= self._agent_limit_usd: + return False, ( + f"Agent '{agent_name}' budget exhausted:" + f" ${agent_spent:.4f} / ${self._agent_limit_usd:.4f}" + ) + return True, "" + + def record(self, agent_name: str, cost_usd: float) -> None: + """Record cost for an agent.""" + with self._lock: + self._org_spent_usd += cost_usd + self._agent_spent[agent_name] = ( + self._agent_spent.get(agent_name, 0.0) + cost_usd + ) + + def utilization(self) -> float: + """Current org-level budget utilization (0.0 to 1.0+).""" + with self._lock: + if self._org_limit_usd <= 0: + return 1.0 + return self._org_spent_usd / self._org_limit_usd + + def snapshot(self) -> BudgetSnapshot: + """Return a read-only snapshot of current budget state.""" + with self._lock: + return BudgetSnapshot( + org_spent_usd=self._org_spent_usd, + org_limit_usd=self._org_limit_usd, + agent_spent=dict(self._agent_spent), + agent_limit_usd=self._agent_limit_usd, + ) diff --git a/src/google/adk_community/governance/_circuit_breaker.py b/src/google/adk_community/governance/_circuit_breaker.py new file mode 100644 index 0000000..c1ca6a8 --- /dev/null +++ b/src/google/adk_community/governance/_circuit_breaker.py @@ -0,0 +1,136 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Circuit breaker for governance plugin.""" + +from __future__ import annotations + +import enum +import threading +import time + + +class CircuitState(enum.Enum): + CLOSED = "closed" + OPEN = "open" + HALF_OPEN = "half_open" + + +class CircuitBreaker: + """Per-agent circuit breaker tracking consecutive failures.""" + + def __init__( + self, + *, + failure_threshold: int, + recovery_timeout_s: float, + ) -> None: + self._failure_threshold = failure_threshold + self._recovery_timeout_s = recovery_timeout_s + self._agents: dict[str, _AgentCircuit] = {} + self._lock = threading.Lock() + + def is_open(self, agent_name: str) -> bool: + """Check if agent circuit is open (should be isolated).""" + with self._lock: + circuit = self._agents.get(agent_name) + if circuit is None: + return False + return circuit.state() == CircuitState.OPEN + + def is_half_open(self, agent_name: str) -> bool: + """Check if agent circuit is half-open (allow one probe).""" + with self._lock: + circuit = self._agents.get(agent_name) + if circuit is None: + return False + return circuit.state() == CircuitState.HALF_OPEN + + def record_failure(self, agent_name: str) -> CircuitState: + """Record a failure for the agent. Returns new state.""" + with self._lock: + circuit = self._ensure(agent_name) + circuit.consecutive_failures += 1 + if circuit.consecutive_failures >= self._failure_threshold: + if circuit.opened_at is None: + circuit.opened_at = time.monotonic() + circuit.probe_in_flight = False + return circuit.state() + + def record_success(self, agent_name: str) -> None: + """Record a success, resetting failure count.""" + with self._lock: + circuit = self._ensure(agent_name) + circuit.consecutive_failures = 0 + circuit.opened_at = None + circuit.probe_in_flight = False + + def claim_probe(self, agent_name: str) -> bool: + """Atomically claim the HALF_OPEN probe slot. Returns True if claimed.""" + with self._lock: + circuit = self._agents.get(agent_name) + if circuit is None: + return False + if circuit.state() != CircuitState.HALF_OPEN: + return False + if circuit.probe_in_flight: + return False + circuit.probe_in_flight = True + return True + + def get_state(self, agent_name: str) -> CircuitState: + """Get current circuit state for an agent.""" + with self._lock: + circuit = self._agents.get(agent_name) + if circuit is None: + return CircuitState.CLOSED + return circuit.state() + + def summary(self) -> dict[str, str]: + """Return {agent_name: state_name} for all tracked agents.""" + with self._lock: + return { + name: circuit.state().value for name, circuit in self._agents.items() + } + + def _ensure(self, agent_name: str) -> _AgentCircuit: + if agent_name not in self._agents: + self._agents[agent_name] = _AgentCircuit( + failure_threshold=self._failure_threshold, + recovery_timeout_s=self._recovery_timeout_s, + ) + return self._agents[agent_name] + + +class _AgentCircuit: + """State for a single agent's circuit.""" + + def __init__( + self, *, failure_threshold: int, recovery_timeout_s: float + ) -> None: + self._failure_threshold = failure_threshold + self._recovery_timeout_s = recovery_timeout_s + self.consecutive_failures: int = 0 + self.opened_at: float | None = None + self.probe_in_flight: bool = False + + def state(self) -> CircuitState: + if self.consecutive_failures < self._failure_threshold: + return CircuitState.CLOSED + if self.opened_at is None: + return CircuitState.CLOSED + elapsed = time.monotonic() - self.opened_at + if elapsed >= self._recovery_timeout_s: + return CircuitState.HALF_OPEN + return CircuitState.OPEN diff --git a/src/google/adk_community/governance/_degradation.py b/src/google/adk_community/governance/_degradation.py new file mode 100644 index 0000000..e02bb36 --- /dev/null +++ b/src/google/adk_community/governance/_degradation.py @@ -0,0 +1,91 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Degradation logic for governance plugin.""" + +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +import threading + + +@dataclass +class DegradationEvent: + """Record of a single degradation action.""" + + agent_name: str + utilization_pct: float + original_model: str + fallback_model: str + + +class DegradationManager: + """Manages model degradation and tool disabling near budget limits.""" + + def __init__( + self, + *, + threshold: float, + fallback_model: str | None, + disable_tools_on_degrade: list[str] | None = None, + ) -> None: + self._threshold = threshold + self._fallback_model = fallback_model + self._disable_tools: frozenset[str] = frozenset( + disable_tools_on_degrade or [] + ) + self._events: list[DegradationEvent] = [] + self._lock = threading.Lock() + + @property + def is_configured(self) -> bool: + return self._fallback_model is not None + + def should_degrade(self, utilization: float) -> bool: + """Check if degradation should be triggered.""" + return self.is_configured and utilization >= self._threshold + + def should_disable_tool(self, tool_name: str, utilization: float) -> bool: + """Check if a tool should be disabled due to degradation.""" + if utilization < self._threshold: + return False + return tool_name in self._disable_tools + + def record_event( + self, + *, + agent_name: str, + utilization: float, + original_model: str, + ) -> DegradationEvent: + """Record a degradation event. Returns the event for logging.""" + event = DegradationEvent( + agent_name=agent_name, + utilization_pct=round(utilization * 100, 1), + original_model=original_model, + fallback_model=self._fallback_model or "", + ) + with self._lock: + self._events.append(event) + return event + + @property + def fallback_model(self) -> str | None: + return self._fallback_model + + @property + def events(self) -> list[DegradationEvent]: + with self._lock: + return list(self._events) diff --git a/src/google/adk_community/governance/_policy.py b/src/google/adk_community/governance/_policy.py new file mode 100644 index 0000000..5a1d6ff --- /dev/null +++ b/src/google/adk_community/governance/_policy.py @@ -0,0 +1,40 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tool policy enforcement for governance plugin.""" + +from __future__ import annotations + + +class ToolPolicy: + """Decides whether a tool call is allowed.""" + + def __init__( + self, + *, + blocked_tools: list[str] | None = None, + allowed_tools: list[str] | None = None, + ) -> None: + self._blocked: frozenset[str] = frozenset(blocked_tools or []) + self._allowed: frozenset[str] | None = ( + frozenset(allowed_tools) if allowed_tools is not None else None + ) + + def check(self, tool_name: str) -> tuple[bool, str]: + """Check if tool is allowed. Returns (allowed, reason).""" + if tool_name in self._blocked: + return False, f"Tool '{tool_name}' is blocked by governance policy" + if self._allowed is not None and tool_name not in self._allowed: + return False, f"Tool '{tool_name}' is not in the allowed list" + return True, "" diff --git a/src/google/adk_community/governance/veronica_governance_plugin.py b/src/google/adk_community/governance/veronica_governance_plugin.py new file mode 100644 index 0000000..06b18b0 --- /dev/null +++ b/src/google/adk_community/governance/veronica_governance_plugin.py @@ -0,0 +1,456 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""VERONICA governance plugin for ADK. + +Provides budget enforcement, circuit breaking, tool policy, and +degradation for multi-agent ADK workflows. Inspired by +veronica-core (https://github.com/amabito/veronica-core). +""" + +from __future__ import annotations + +import logging +import re +import threading +import time +from typing import Any +from typing import Optional + +from google.adk.agents.base_agent import BaseAgent +from google.adk.agents.callback_context import CallbackContext +from google.adk.agents.invocation_context import InvocationContext +from google.adk.models.llm_request import LlmRequest +from google.adk.models.llm_response import LlmResponse +from google.adk.plugins.base_plugin import BasePlugin +from google.adk.tools.base_tool import BaseTool +from google.genai import types +from pydantic import BaseModel +from pydantic import Field + +from ._budget import BudgetTracker +from ._circuit_breaker import CircuitBreaker +from ._circuit_breaker import CircuitState +from ._degradation import DegradationManager +from ._policy import ToolPolicy + +logger = logging.getLogger(__name__) + +_LOG_PREFIX = "[GOVERNANCE]" + + +class GovernanceConfig(BaseModel): + """Configuration for VeronicaGovernancePlugin.""" + + # Budget + max_cost_usd: float = Field( + default=10.0, + gt=0, + description="Org-level spending ceiling (USD). Must be positive.", + ) + agent_max_cost_usd: float = Field( + default=5.0, + gt=0, + description="Per-agent spending ceiling (USD). Must be positive.", + ) + cost_per_1k_input_tokens: float = Field( + default=0.00025, + ge=0, + description="Cost per 1,000 input tokens (USD).", + ) + cost_per_1k_output_tokens: float = Field( + default=0.0005, + ge=0, + description="Cost per 1,000 output tokens (USD).", + ) + + # Circuit Breaker + failure_threshold: int = Field( + default=5, + ge=1, + description="Consecutive failures before isolating an agent.", + ) + recovery_timeout_s: float = Field( + default=60.0, + ge=0, + description="Seconds before a tripped agent gets a probe request.", + ) + + # Tool Policy + blocked_tools: list[str] = Field( + default_factory=list, + description="Tool names to block unconditionally.", + ) + allowed_tools: Optional[list[str]] = Field( + default=None, + description="If set, only these tools are permitted.", + ) + + # Degradation + degradation_threshold: float = Field( + default=0.8, + ge=0, + le=1.0, + description="Budget utilization ratio that triggers degradation.", + ) + fallback_model: Optional[str] = Field( + default=None, + description="Model to switch to when degradation triggers.", + ) + disable_tools_on_degrade: list[str] = Field( + default_factory=list, + description="Tools to disable when degradation triggers.", + ) + + +class VeronicaGovernancePlugin(BasePlugin): + """Budget, circuit-breaker, and policy enforcement for ADK agents. + + Register this plugin on an ADK Runner (or App) to add runtime + containment to any agent workflow. The plugin intercepts model and + tool callbacks to enforce spending limits, block disallowed tools, + isolate failing agents, and degrade to cheaper models + when budget runs low. + + See: https://github.com/amabito/veronica-core + """ + + def __init__( + self, + config: Optional[GovernanceConfig] = None, + ) -> None: + super().__init__(name="veronica_governance") + self._config = config or GovernanceConfig() + + self._budget = BudgetTracker( + org_limit_usd=self._config.max_cost_usd, + agent_limit_usd=self._config.agent_max_cost_usd, + cost_per_1k_input_tokens=self._config.cost_per_1k_input_tokens, + cost_per_1k_output_tokens=self._config.cost_per_1k_output_tokens, + ) + self._circuit_breaker = CircuitBreaker( + failure_threshold=self._config.failure_threshold, + recovery_timeout_s=self._config.recovery_timeout_s, + ) + self._policy = ToolPolicy( + blocked_tools=self._config.blocked_tools, + allowed_tools=self._config.allowed_tools, + ) + self._degradation = DegradationManager( + threshold=self._config.degradation_threshold, + fallback_model=self._config.fallback_model, + disable_tools_on_degrade=self._config.disable_tools_on_degrade, + ) + + self._tool_start_times: dict[str, float] = {} + self._total_model_calls: int = 0 + self._total_tool_calls: int = 0 + self._run_start_time: float = time.monotonic() + self._stats_lock = threading.Lock() + self._degraded_agents: set[str] = set() + + # ------------------------------------------------------------------ + # Runner lifecycle + # ------------------------------------------------------------------ + + async def before_run_callback( + self, *, invocation_context: InvocationContext + ) -> Optional[types.Content]: + self._run_start_time = time.monotonic() + logger.info( + "%s Run started. Budget: $%.4f org / $%.4f per-agent.", + _LOG_PREFIX, + self._config.max_cost_usd, + self._config.agent_max_cost_usd, + ) + return None + + async def after_run_callback( + self, *, invocation_context: InvocationContext + ) -> None: + elapsed = time.monotonic() - self._run_start_time + snap = self._budget.snapshot() + cb_summary = self._circuit_breaker.summary() + deg_events = self._degradation.events + + logger.info( + "%s Run complete in %.1fs. Model calls: %d, Tool calls: %d.", + _LOG_PREFIX, + elapsed, + self._total_model_calls, + self._total_tool_calls, + ) + logger.info( + "%s Budget: $%.4f / $%.4f (%.1f%% used).", + _LOG_PREFIX, + snap.org_spent_usd, + snap.org_limit_usd, + snap.org_utilization * 100, + ) + if snap.agent_spent: + for agent, spent in snap.agent_spent.items(): + logger.info( + "%s Agent '%s': $%.4f / $%.4f.", + _LOG_PREFIX, + agent, + spent, + snap.agent_limit_usd, + ) + if cb_summary: + for agent, state in cb_summary.items(): + if state != "closed": + logger.warning( + "%s Circuit breaker '%s': %s.", + _LOG_PREFIX, + agent, + state, + ) + if deg_events: + logger.info("%s Degradation events (%d):", _LOG_PREFIX, len(deg_events)) + for ev in deg_events: + logger.info( + "%s Agent '%s' at %.1f%% -- degraded %s -> %s.", + _LOG_PREFIX, + ev.agent_name, + ev.utilization_pct, + ev.original_model, + ev.fallback_model, + ) + + # ------------------------------------------------------------------ + # Model callbacks + # ------------------------------------------------------------------ + + async def before_model_callback( + self, + *, + callback_context: CallbackContext, + llm_request: LlmRequest, + ) -> Optional[LlmResponse]: + agent_name = self._agent_name(callback_context) + + # Circuit breaker check -- block OPEN, allow one probe in HALF_OPEN. + cb_state = self._circuit_breaker.get_state(agent_name) + if cb_state == CircuitState.OPEN: + logger.warning( + "%s Agent '%s' circuit OPEN -- blocking model call.", + _LOG_PREFIX, + agent_name, + ) + return LlmResponse( + error_code="GOVERNANCE_CIRCUIT_OPEN", + error_message=( + f"Agent '{agent_name}' is isolated (circuit breaker open)." + ), + ) + if cb_state == CircuitState.HALF_OPEN: + if not self._circuit_breaker.claim_probe(agent_name): + return LlmResponse( + error_code="GOVERNANCE_CIRCUIT_OPEN", + error_message=f"Agent '{agent_name}' probe already in flight.", + ) + logger.info( + "%s Agent '%s' circuit HALF_OPEN -- allowing probe request.", + _LOG_PREFIX, + agent_name, + ) + + # Budget check + allowed, reason = self._budget.check(agent_name) + if not allowed: + logger.warning( + "%s Budget exceeded -- blocking model call. %s", + _LOG_PREFIX, + reason, + ) + return LlmResponse( + error_code="GOVERNANCE_BUDGET_EXCEEDED", + error_message=reason, + ) + + # Degradation: switch model if near limit (once per agent). + utilization = self._budget.utilization() + if self._degradation.should_degrade(utilization): + original = llm_request.model or "(default)" + fallback = self._degradation.fallback_model + if fallback: + llm_request.model = fallback + with self._stats_lock: + is_first = agent_name not in self._degraded_agents + if is_first: + self._degraded_agents.add(agent_name) + if is_first: + event = self._degradation.record_event( + agent_name=agent_name, + utilization=utilization, + original_model=original, + ) + logger.info( + "%s Budget at %.0f%% -- degraded to %s (agent '%s', was %s).", + _LOG_PREFIX, + event.utilization_pct, + fallback, + agent_name, + original, + ) + + with self._stats_lock: + self._total_model_calls += 1 + return None # proceed with (possibly modified) request + + async def after_model_callback( + self, + *, + callback_context: CallbackContext, + llm_response: LlmResponse, + ) -> Optional[LlmResponse]: + agent_name = self._agent_name(callback_context) + + # Circuit breaker: soft errors (error_code set) count as failures. + if llm_response.error_code: + self._circuit_breaker.record_failure(agent_name) + else: + self._circuit_breaker.record_success(agent_name) + + # Record cost from usage metadata + usage = llm_response.usage_metadata + if usage is not None: + input_tokens = getattr(usage, "prompt_token_count", 0) or 0 + output_tokens = getattr(usage, "candidates_token_count", 0) or 0 + cost = self._budget.estimate_cost(input_tokens, output_tokens) + self._budget.record(agent_name, cost) + + return None + + async def on_model_error_callback( + self, + *, + callback_context: CallbackContext, + llm_request: LlmRequest, + error: Exception, + ) -> Optional[LlmResponse]: + agent_name = self._agent_name(callback_context) + new_state = self._circuit_breaker.record_failure(agent_name) + + if new_state == CircuitState.OPEN: + logger.warning( + "%s Agent '%s' circuit tripped to OPEN" + " after %d consecutive failures.", + _LOG_PREFIX, + agent_name, + self._config.failure_threshold, + ) + + return None # let the error propagate + + # ------------------------------------------------------------------ + # Tool callbacks + # ------------------------------------------------------------------ + + async def before_tool_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: CallbackContext, + ) -> Optional[dict]: + # Policy check + allowed, reason = self._policy.check(tool.name) + if not allowed: + logger.warning("%s Tool blocked: %s", _LOG_PREFIX, reason) + return {"error": reason} + + # Degradation: disable expensive tools near budget limit + utilization = self._budget.utilization() + if self._degradation.should_disable_tool(tool.name, utilization): + msg = f"Tool '{tool.name}' disabled -- budget at {utilization * 100:.0f}%" + logger.info("%s %s", _LOG_PREFIX, msg) + return {"error": msg} + + call_id = getattr(tool_context, "function_call_id", None) or "" + timing_key = f"{tool.name}:{call_id}" + with self._stats_lock: + self._total_tool_calls += 1 + self._tool_start_times[timing_key] = time.monotonic() + return None + + async def after_tool_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: CallbackContext, + result: dict, + ) -> Optional[dict]: + call_id = getattr(tool_context, "function_call_id", None) or "" + timing_key = f"{tool.name}:{call_id}" + with self._stats_lock: + start = self._tool_start_times.pop(timing_key, None) + if start is not None: + elapsed_ms = (time.monotonic() - start) * 1000 + logger.debug( + "%s Tool '%s' completed in %.1fms.", + _LOG_PREFIX, + tool.name, + elapsed_ms, + ) + return None + + async def on_tool_error_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: CallbackContext, + error: Exception, + ) -> Optional[dict]: + # Clean up timing entry that before_tool_callback recorded. + call_id = getattr(tool_context, "function_call_id", None) or "" + timing_key = f"{tool.name}:{call_id}" + with self._stats_lock: + self._tool_start_times.pop(timing_key, None) + + agent_name = self._agent_name(tool_context) + new_state = self._circuit_breaker.record_failure(agent_name) + + if new_state == CircuitState.OPEN: + logger.warning( + "%s Agent '%s' circuit tripped to OPEN after tool error in '%s'.", + _LOG_PREFIX, + agent_name, + tool.name, + ) + + return None # let the error propagate + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _agent_name(callback_context: CallbackContext) -> str: + """Extract agent name from callback context.""" + name = getattr(callback_context, "agent_name", None) or "unknown" + # Sanitize to prevent log injection via newlines or control chars. + return re.sub(r"[^\w\-.]", "_", name) + + @property + def budget(self) -> BudgetTracker: + """Access the budget tracker for inspection.""" + return self._budget + + @property + def circuit_breaker(self) -> CircuitBreaker: + """Access the circuit breaker for inspection.""" + return self._circuit_breaker diff --git a/tests/unittests/governance/__init__.py b/tests/unittests/governance/__init__.py new file mode 100644 index 0000000..0a2669d --- /dev/null +++ b/tests/unittests/governance/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/unittests/governance/test_budget.py b/tests/unittests/governance/test_budget.py new file mode 100644 index 0000000..23398c8 --- /dev/null +++ b/tests/unittests/governance/test_budget.py @@ -0,0 +1,77 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for governance budget tracker.""" + +import pytest + +from google.adk_community.governance._budget import BudgetTracker + + +@pytest.fixture() +def tracker(): + return BudgetTracker( + org_limit_usd=1.0, + agent_limit_usd=0.5, + cost_per_1k_input_tokens=0.001, + cost_per_1k_output_tokens=0.002, + ) + + +class TestBudgetTracker: + + def test_initial_state_allows(self, tracker): + allowed, _ = tracker.check("agent_a") + assert allowed is True + + def test_estimate_cost(self, tracker): + cost = tracker.estimate_cost(1000, 1000) + assert cost == pytest.approx(0.003) + + def test_record_and_check_org_limit(self, tracker): + tracker.record("agent_a", 0.6) + tracker.record("agent_b", 0.5) + allowed, reason = tracker.check("agent_a") + assert allowed is False + assert "Org budget" in reason + + def test_record_and_check_agent_limit(self, tracker): + tracker.record("agent_a", 0.5) + allowed, reason = tracker.check("agent_a") + assert allowed is False + assert "agent_a" in reason + + def test_utilization(self, tracker): + tracker.record("agent_a", 0.3) + assert tracker.utilization() == pytest.approx(0.3) + + def test_snapshot(self, tracker): + tracker.record("agent_a", 0.2) + tracker.record("agent_b", 0.1) + snap = tracker.snapshot() + assert snap.org_spent_usd == pytest.approx(0.3) + assert snap.org_limit_usd == 1.0 + assert snap.agent_spent["agent_a"] == pytest.approx(0.2) + assert snap.agent_spent["agent_b"] == pytest.approx(0.1) + assert snap.org_utilization == pytest.approx(0.3) + + def test_zero_limit_always_blocked(self): + t = BudgetTracker( + org_limit_usd=0.0, + agent_limit_usd=0.0, + cost_per_1k_input_tokens=0.001, + cost_per_1k_output_tokens=0.002, + ) + allowed, _ = t.check("agent_a") + assert allowed is False diff --git a/tests/unittests/governance/test_circuit_breaker.py b/tests/unittests/governance/test_circuit_breaker.py new file mode 100644 index 0000000..766c60e --- /dev/null +++ b/tests/unittests/governance/test_circuit_breaker.py @@ -0,0 +1,75 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for governance circuit breaker.""" + +import time +from unittest import mock + +import pytest + +from google.adk_community.governance._circuit_breaker import CircuitBreaker +from google.adk_community.governance._circuit_breaker import CircuitState + + +@pytest.fixture() +def cb(): + return CircuitBreaker(failure_threshold=3, recovery_timeout_s=10.0) + + +class TestCircuitBreaker: + + def test_initial_state_is_closed(self, cb): + assert cb.get_state("agent_a") == CircuitState.CLOSED + assert cb.is_open("agent_a") is False + + def test_failures_below_threshold_stay_closed(self, cb): + cb.record_failure("agent_a") + cb.record_failure("agent_a") + assert cb.get_state("agent_a") == CircuitState.CLOSED + + def test_failures_at_threshold_open(self, cb): + for _ in range(3): + cb.record_failure("agent_a") + assert cb.get_state("agent_a") == CircuitState.OPEN + assert cb.is_open("agent_a") is True + + def test_success_resets_failures(self, cb): + cb.record_failure("agent_a") + cb.record_failure("agent_a") + cb.record_success("agent_a") + assert cb.get_state("agent_a") == CircuitState.CLOSED + + def test_half_open_after_timeout(self, cb): + for _ in range(3): + cb.record_failure("agent_a") + assert cb.is_open("agent_a") is True + + with mock.patch("time.monotonic", return_value=time.monotonic() + 11): + assert cb.is_half_open("agent_a") is True + assert cb.is_open("agent_a") is False + + def test_agents_are_independent(self, cb): + for _ in range(3): + cb.record_failure("agent_a") + assert cb.is_open("agent_a") is True + assert cb.is_open("agent_b") is False + + def test_summary(self, cb): + for _ in range(3): + cb.record_failure("agent_a") + cb.record_failure("agent_b") + summary = cb.summary() + assert summary["agent_a"] == "open" + assert summary["agent_b"] == "closed" diff --git a/tests/unittests/governance/test_degradation.py b/tests/unittests/governance/test_degradation.py new file mode 100644 index 0000000..8f5f620 --- /dev/null +++ b/tests/unittests/governance/test_degradation.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for governance degradation manager.""" + +import pytest + +from google.adk_community.governance._degradation import DegradationManager + + +class TestDegradationManager: + + def test_not_configured_without_fallback(self): + dm = DegradationManager(threshold=0.8, fallback_model=None) + assert dm.is_configured is False + assert dm.should_degrade(0.9) is False + + def test_configured_with_fallback(self): + dm = DegradationManager( + threshold=0.8, fallback_model="gemini-2.0-flash-lite" + ) + assert dm.is_configured is True + + def test_should_degrade_above_threshold(self): + dm = DegradationManager( + threshold=0.8, fallback_model="gemini-2.0-flash-lite" + ) + assert dm.should_degrade(0.8) is True + assert dm.should_degrade(0.9) is True + + def test_should_not_degrade_below_threshold(self): + dm = DegradationManager( + threshold=0.8, fallback_model="gemini-2.0-flash-lite" + ) + assert dm.should_degrade(0.79) is False + + def test_should_disable_tool(self): + dm = DegradationManager( + threshold=0.5, + fallback_model="lite", + disable_tools_on_degrade=["expensive_search"], + ) + assert dm.should_disable_tool("expensive_search", 0.6) is True + assert dm.should_disable_tool("cheap_tool", 0.6) is False + assert dm.should_disable_tool("expensive_search", 0.4) is False + + def test_record_event(self): + dm = DegradationManager( + threshold=0.8, fallback_model="gemini-2.0-flash-lite" + ) + event = dm.record_event( + agent_name="agent_a", + utilization=0.85, + original_model="gemini-2.5-pro", + ) + assert event.agent_name == "agent_a" + assert event.utilization_pct == 85.0 + assert event.original_model == "gemini-2.5-pro" + assert event.fallback_model == "gemini-2.0-flash-lite" + assert len(dm.events) == 1 + + def test_events_are_independent_copies(self): + dm = DegradationManager(threshold=0.8, fallback_model="lite") + dm.record_event(agent_name="a", utilization=0.9, original_model="pro") + events1 = dm.events + events2 = dm.events + assert events1 is not events2 + assert events1 == events2 diff --git a/tests/unittests/governance/test_policy.py b/tests/unittests/governance/test_policy.py new file mode 100644 index 0000000..7992430 --- /dev/null +++ b/tests/unittests/governance/test_policy.py @@ -0,0 +1,58 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for governance tool policy.""" + +import pytest + +from google.adk_community.governance._policy import ToolPolicy + + +class TestToolPolicy: + + def test_no_restrictions_allows_all(self): + policy = ToolPolicy() + allowed, _ = policy.check("any_tool") + assert allowed is True + + def test_blocked_tool(self): + policy = ToolPolicy(blocked_tools=["dangerous_tool"]) + allowed, reason = policy.check("dangerous_tool") + assert allowed is False + assert "blocked" in reason + + def test_blocked_does_not_affect_others(self): + policy = ToolPolicy(blocked_tools=["dangerous_tool"]) + allowed, _ = policy.check("safe_tool") + assert allowed is True + + def test_allowlist_permits_listed(self): + policy = ToolPolicy(allowed_tools=["search", "calculator"]) + allowed, _ = policy.check("search") + assert allowed is True + + def test_allowlist_blocks_unlisted(self): + policy = ToolPolicy(allowed_tools=["search", "calculator"]) + allowed, reason = policy.check("delete_database") + assert allowed is False + assert "not in the allowed list" in reason + + def test_blocklist_takes_precedence_over_allowlist(self): + policy = ToolPolicy( + blocked_tools=["search"], + allowed_tools=["search", "calculator"], + ) + allowed, reason = policy.check("search") + assert allowed is False + assert "blocked" in reason diff --git a/tests/unittests/governance/test_veronica_governance_plugin.py b/tests/unittests/governance/test_veronica_governance_plugin.py new file mode 100644 index 0000000..4aaeae0 --- /dev/null +++ b/tests/unittests/governance/test_veronica_governance_plugin.py @@ -0,0 +1,548 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for VeronicaGovernancePlugin.""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest import mock + +import pytest + +from google.adk_community.governance import GovernanceConfig +from google.adk_community.governance import VeronicaGovernancePlugin + + +def _make_callback_context(agent_name: str = "test_agent"): + """Create a minimal mock CallbackContext.""" + ctx = mock.MagicMock() + ctx.agent_name = agent_name + ctx.function_call_id = "call_001" + return ctx + + +def _make_llm_request(model: str = "gemini-2.5-pro"): + """Create a minimal mock LlmRequest.""" + req = mock.MagicMock() + req.model = model + return req + + +def _make_llm_response( + input_tokens: int = 100, + output_tokens: int = 50, + error_code: str | None = None, +): + """Create a minimal mock LlmResponse.""" + resp = mock.MagicMock() + resp.error_code = error_code + usage = SimpleNamespace( + prompt_token_count=input_tokens, + candidates_token_count=output_tokens, + ) + resp.usage_metadata = usage + return resp + + +def _make_tool(name: str = "search"): + """Create a minimal mock BaseTool.""" + tool = mock.MagicMock() + tool.name = name + return tool + + +def _make_invocation_context(): + """Create a minimal mock InvocationContext.""" + return mock.MagicMock() + + +class TestBeforeModelCallback: + + @pytest.mark.asyncio + async def test_allows_within_budget(self): + plugin = VeronicaGovernancePlugin() + ctx = _make_callback_context() + req = _make_llm_request() + result = await plugin.before_model_callback( + callback_context=ctx, llm_request=req + ) + assert result is None + + @pytest.mark.asyncio + async def test_blocks_when_budget_exceeded(self): + config = GovernanceConfig(max_cost_usd=0.001, agent_max_cost_usd=0.001) + plugin = VeronicaGovernancePlugin(config=config) + plugin._budget.record("test_agent", 0.002) + + ctx = _make_callback_context() + req = _make_llm_request() + result = await plugin.before_model_callback( + callback_context=ctx, llm_request=req + ) + assert result is not None + assert result.error_code == "GOVERNANCE_BUDGET_EXCEEDED" + + @pytest.mark.asyncio + async def test_blocks_when_circuit_open(self): + config = GovernanceConfig(failure_threshold=2) + plugin = VeronicaGovernancePlugin(config=config) + plugin._circuit_breaker.record_failure("test_agent") + plugin._circuit_breaker.record_failure("test_agent") + + ctx = _make_callback_context() + req = _make_llm_request() + result = await plugin.before_model_callback( + callback_context=ctx, llm_request=req + ) + assert result is not None + assert result.error_code == "GOVERNANCE_CIRCUIT_OPEN" + + @pytest.mark.asyncio + async def test_degrades_model_near_budget_limit(self): + config = GovernanceConfig( + max_cost_usd=1.0, + degradation_threshold=0.8, + fallback_model="gemini-2.0-flash-lite", + ) + plugin = VeronicaGovernancePlugin(config=config) + plugin._budget.record("test_agent", 0.85) + + ctx = _make_callback_context() + req = _make_llm_request(model="gemini-2.5-pro") + result = await plugin.before_model_callback( + callback_context=ctx, llm_request=req + ) + # Returns None (proceeds), but request model was mutated + assert result is None + assert req.model == "gemini-2.0-flash-lite" + + @pytest.mark.asyncio + async def test_degradation_recorded_in_events(self): + config = GovernanceConfig( + max_cost_usd=1.0, + degradation_threshold=0.5, + fallback_model="gemini-2.0-flash-lite", + ) + plugin = VeronicaGovernancePlugin(config=config) + plugin._budget.record("test_agent", 0.6) + + ctx = _make_callback_context() + req = _make_llm_request(model="gemini-2.5-pro") + await plugin.before_model_callback(callback_context=ctx, llm_request=req) + events = plugin._degradation.events + assert len(events) == 1 + assert events[0].agent_name == "test_agent" + assert events[0].original_model == "gemini-2.5-pro" + assert events[0].fallback_model == "gemini-2.0-flash-lite" + + @pytest.mark.asyncio + async def test_no_degradation_when_not_configured(self): + config = GovernanceConfig( + max_cost_usd=1.0, + degradation_threshold=0.8, + fallback_model=None, + ) + plugin = VeronicaGovernancePlugin(config=config) + plugin._budget.record("test_agent", 0.9) + + ctx = _make_callback_context() + req = _make_llm_request(model="gemini-2.5-pro") + await plugin.before_model_callback(callback_context=ctx, llm_request=req) + assert req.model == "gemini-2.5-pro" + + +class TestAfterModelCallback: + + @pytest.mark.asyncio + async def test_records_cost(self): + plugin = VeronicaGovernancePlugin() + ctx = _make_callback_context() + resp = _make_llm_response(input_tokens=1000, output_tokens=500) + + await plugin.after_model_callback(callback_context=ctx, llm_response=resp) + snap = plugin._budget.snapshot() + assert snap.org_spent_usd > 0 + + @pytest.mark.asyncio + async def test_resets_circuit_breaker_on_success(self): + config = GovernanceConfig(failure_threshold=3) + plugin = VeronicaGovernancePlugin(config=config) + plugin._circuit_breaker.record_failure("test_agent") + plugin._circuit_breaker.record_failure("test_agent") + + ctx = _make_callback_context() + resp = _make_llm_response() + await plugin.after_model_callback(callback_context=ctx, llm_response=resp) + from google.adk_community.governance._circuit_breaker import CircuitState + + assert ( + plugin._circuit_breaker.get_state("test_agent") == CircuitState.CLOSED + ) + + @pytest.mark.asyncio + async def test_handles_missing_usage_metadata(self): + plugin = VeronicaGovernancePlugin() + ctx = _make_callback_context() + resp = mock.MagicMock() + resp.usage_metadata = None + result = await plugin.after_model_callback( + callback_context=ctx, llm_response=resp + ) + assert result is None + assert plugin._budget.snapshot().org_spent_usd == 0.0 + + +class TestModelErrorCallback: + + @pytest.mark.asyncio + async def test_records_failure(self): + config = GovernanceConfig(failure_threshold=2) + plugin = VeronicaGovernancePlugin(config=config) + ctx = _make_callback_context() + req = _make_llm_request() + + await plugin.on_model_error_callback( + callback_context=ctx, + llm_request=req, + error=RuntimeError("test"), + ) + from google.adk_community.governance._circuit_breaker import CircuitState + + assert ( + plugin._circuit_breaker.get_state("test_agent") == CircuitState.CLOSED + ) + + await plugin.on_model_error_callback( + callback_context=ctx, + llm_request=req, + error=RuntimeError("test"), + ) + assert plugin._circuit_breaker.get_state("test_agent") == CircuitState.OPEN + + +class TestToolCallbacks: + + @pytest.mark.asyncio + async def test_allows_normal_tool(self): + plugin = VeronicaGovernancePlugin() + tool = _make_tool("search") + ctx = _make_callback_context() + result = await plugin.before_tool_callback( + tool=tool, tool_args={}, tool_context=ctx + ) + assert result is None + + @pytest.mark.asyncio + async def test_blocks_disallowed_tool(self): + config = GovernanceConfig(blocked_tools=["delete_all"]) + plugin = VeronicaGovernancePlugin(config=config) + tool = _make_tool("delete_all") + ctx = _make_callback_context() + result = await plugin.before_tool_callback( + tool=tool, tool_args={}, tool_context=ctx + ) + assert result is not None + assert "blocked" in result["error"] + + @pytest.mark.asyncio + async def test_disables_tool_on_degrade(self): + config = GovernanceConfig( + max_cost_usd=1.0, + degradation_threshold=0.5, + disable_tools_on_degrade=["expensive_search"], + ) + plugin = VeronicaGovernancePlugin(config=config) + plugin._budget.record("test_agent", 0.6) + + tool = _make_tool("expensive_search") + ctx = _make_callback_context() + result = await plugin.before_tool_callback( + tool=tool, tool_args={}, tool_context=ctx + ) + assert result is not None + assert "disabled" in result["error"] + + @pytest.mark.asyncio + async def test_after_tool_callback_returns_none(self): + plugin = VeronicaGovernancePlugin() + tool = _make_tool("search") + ctx = _make_callback_context() + # Simulate before_tool to set start time + await plugin.before_tool_callback(tool=tool, tool_args={}, tool_context=ctx) + result = await plugin.after_tool_callback( + tool=tool, tool_args={}, tool_context=ctx, result={"ok": True} + ) + assert result is None + + @pytest.mark.asyncio + async def test_tool_error_trips_circuit_breaker(self): + config = GovernanceConfig(failure_threshold=1) + plugin = VeronicaGovernancePlugin(config=config) + tool = _make_tool("search") + ctx = _make_callback_context() + await plugin.on_tool_error_callback( + tool=tool, + tool_args={}, + tool_context=ctx, + error=RuntimeError("fail"), + ) + assert plugin._circuit_breaker.is_open("test_agent") + + @pytest.mark.asyncio + async def test_tool_error_cleans_up_timing_entry(self): + """on_tool_error_callback must remove the timing key set by before_tool_callback.""" + plugin = VeronicaGovernancePlugin() + tool = _make_tool("search") + ctx = _make_callback_context() + # Record a start time as before_tool_callback would. + await plugin.before_tool_callback(tool=tool, tool_args={}, tool_context=ctx) + assert len(plugin._tool_start_times) == 1 + # Error path must clean it up. + await plugin.on_tool_error_callback( + tool=tool, + tool_args={}, + tool_context=ctx, + error=RuntimeError("fail"), + ) + assert len(plugin._tool_start_times) == 0 + + @pytest.mark.asyncio + async def test_repeated_tool_errors_no_timing_leak(self): + """_tool_start_times must not grow unbounded across many tool errors.""" + plugin = VeronicaGovernancePlugin() + tool = _make_tool("search") + for i in range(50): + ctx = mock.MagicMock() + ctx.agent_name = "test_agent" + ctx.function_call_id = f"call_{i}" + await plugin.before_tool_callback( + tool=tool, tool_args={}, tool_context=ctx + ) + await plugin.on_tool_error_callback( + tool=tool, + tool_args={}, + tool_context=ctx, + error=RuntimeError("fail"), + ) + assert len(plugin._tool_start_times) == 0 + + +class TestRunLifecycle: + + @pytest.mark.asyncio + async def test_before_run_returns_none(self): + plugin = VeronicaGovernancePlugin() + ic = _make_invocation_context() + result = await plugin.before_run_callback(invocation_context=ic) + assert result is None + + @pytest.mark.asyncio + async def test_after_run_logs_summary(self, caplog): + import logging + + plugin = VeronicaGovernancePlugin() + ic = _make_invocation_context() + await plugin.before_run_callback(invocation_context=ic) + plugin._budget.record("agent_a", 0.05) + + with caplog.at_level(logging.INFO): + await plugin.after_run_callback(invocation_context=ic) + + assert "[GOVERNANCE]" in caplog.text + assert "Run complete" in caplog.text + assert "Budget:" in caplog.text + + @pytest.mark.asyncio + async def test_after_run_logs_degradation_events(self, caplog): + import logging + + config = GovernanceConfig( + max_cost_usd=1.0, + degradation_threshold=0.5, + fallback_model="gemini-2.0-flash-lite", + ) + plugin = VeronicaGovernancePlugin(config=config) + ic = _make_invocation_context() + await plugin.before_run_callback(invocation_context=ic) + + # Trigger degradation + plugin._budget.record("agent_a", 0.6) + ctx = _make_callback_context("agent_a") + req = _make_llm_request("gemini-2.5-pro") + await plugin.before_model_callback(callback_context=ctx, llm_request=req) + + with caplog.at_level(logging.INFO): + await plugin.after_run_callback(invocation_context=ic) + + assert "Degradation events" in caplog.text + assert "gemini-2.0-flash-lite" in caplog.text + + +class TestErrorCodeHandling: + + @pytest.mark.asyncio + async def test_soft_error_counts_as_failure(self): + """error_code in LlmResponse should trigger circuit breaker failure.""" + config = GovernanceConfig(failure_threshold=2) + plugin = VeronicaGovernancePlugin(config=config) + ctx = _make_callback_context() + resp = mock.MagicMock() + resp.error_code = "500" + resp.usage_metadata = None + + await plugin.after_model_callback(callback_context=ctx, llm_response=resp) + await plugin.after_model_callback(callback_context=ctx, llm_response=resp) + from google.adk_community.governance._circuit_breaker import CircuitState + + assert plugin._circuit_breaker.get_state("test_agent") == CircuitState.OPEN + + +class TestDegradationDedup: + + @pytest.mark.asyncio + async def test_degradation_fires_once_per_agent(self): + """Second call for same agent should not create another event.""" + config = GovernanceConfig( + max_cost_usd=1.0, + degradation_threshold=0.5, + fallback_model="gemini-2.0-flash-lite", + ) + plugin = VeronicaGovernancePlugin(config=config) + plugin._budget.record("test_agent", 0.6) + + ctx = _make_callback_context() + req1 = _make_llm_request("gemini-2.5-pro") + await plugin.before_model_callback(callback_context=ctx, llm_request=req1) + req2 = _make_llm_request("gemini-2.5-pro") + await plugin.before_model_callback(callback_context=ctx, llm_request=req2) + + assert len(plugin._degradation.events) == 1 + assert req2.model == "gemini-2.0-flash-lite" + + +class TestAgentNameSanitization: + + @pytest.mark.asyncio + async def test_newline_in_agent_name_sanitized(self): + """Agent names with control chars should be sanitized.""" + plugin = VeronicaGovernancePlugin() + ctx = _make_callback_context("evil\nagent") + req = _make_llm_request() + resp = _make_llm_response(input_tokens=100, output_tokens=50) + await plugin.before_model_callback(callback_context=ctx, llm_request=req) + await plugin.after_model_callback(callback_context=ctx, llm_response=resp) + snap = plugin._budget.snapshot() + assert len(snap.agent_spent) > 0 + for name in snap.agent_spent: + assert "\n" not in name + + @pytest.mark.asyncio + async def test_missing_agent_name_returns_unknown(self): + """CallbackContext without agent_name should return 'unknown'.""" + plugin = VeronicaGovernancePlugin() + ctx = mock.MagicMock() + ctx.agent_name = None + ctx.function_call_id = None + req = _make_llm_request() + result = await plugin.before_model_callback( + callback_context=ctx, llm_request=req + ) + assert result is None + + +class TestHalfOpenProbe: + + @pytest.mark.asyncio + async def test_half_open_allows_probe(self): + """HALF_OPEN state should allow one probe request.""" + config = GovernanceConfig(failure_threshold=1, recovery_timeout_s=0.0) + plugin = VeronicaGovernancePlugin(config=config) + # Trip the circuit + plugin._circuit_breaker.record_failure("test_agent") + # recovery_timeout_s=0 -> immediately HALF_OPEN + ctx = _make_callback_context() + req = _make_llm_request() + result = await plugin.before_model_callback( + callback_context=ctx, llm_request=req + ) + # Probe should be allowed (returns None) + assert result is None + + @pytest.mark.asyncio + async def test_half_open_blocks_second_probe(self): + """Second concurrent probe should be rejected.""" + config = GovernanceConfig(failure_threshold=1, recovery_timeout_s=0.0) + plugin = VeronicaGovernancePlugin(config=config) + plugin._circuit_breaker.record_failure("test_agent") + ctx = _make_callback_context() + # First probe succeeds + req1 = _make_llm_request() + result1 = await plugin.before_model_callback( + callback_context=ctx, llm_request=req1 + ) + assert result1 is None + # Second probe blocked (probe_in_flight still True) + req2 = _make_llm_request() + result2 = await plugin.before_model_callback( + callback_context=ctx, llm_request=req2 + ) + assert result2 is not None + assert "probe already in flight" in result2.error_message + + +class TestOpenedAtIdempotency: + + def test_opened_at_not_reset_on_repeated_failure(self): + """opened_at must not change once circuit is OPEN.""" + from google.adk_community.governance._circuit_breaker import CircuitBreaker + + cb = CircuitBreaker(failure_threshold=1, recovery_timeout_s=60.0) + cb.record_failure("agent_a") # trips to OPEN + opened_at_1 = cb._agents["agent_a"].opened_at + assert opened_at_1 is not None + + import time as _time + + _time.sleep(0.01) # ensure monotonic advances + cb.record_failure("agent_a") # second failure while OPEN + opened_at_2 = cb._agents["agent_a"].opened_at + assert opened_at_2 == opened_at_1 # must NOT reset + + +class TestToolErrorTimingCleanup: + + @pytest.mark.asyncio + async def test_tool_error_cleans_timing_entry(self): + """on_tool_error_callback must remove the timing entry.""" + plugin = VeronicaGovernancePlugin() + tool = _make_tool("search") + ctx = _make_callback_context() + await plugin.before_tool_callback(tool=tool, tool_args={}, tool_context=ctx) + assert len(plugin._tool_start_times) == 1 + await plugin.on_tool_error_callback( + tool=tool, + tool_args={}, + tool_context=ctx, + error=RuntimeError("fail"), + ) + assert len(plugin._tool_start_times) == 0 + + +class TestZeroBudgetConfig: + + def test_zero_budget_rejected_by_config(self): + """GovernanceConfig should reject zero budget.""" + with pytest.raises(Exception): + GovernanceConfig(max_cost_usd=0.0) From 0a1573a5137fc11957186f7af116c670c978e28e Mon Sep 17 00:00:00 2001 From: amabito Date: Sun, 15 Mar 2026 10:50:59 +0900 Subject: [PATCH 2/2] fix: address review feedback from gemini-code-assist - Move google.genai types import to top of main.py (PEP 8) - Remove unused import: dataclasses.field in _budget.py, _degradation.py - Remove unused import: BaseAgent in veronica_governance_plugin.py --- contributing/samples/governance/main.py | 20 ++++++++----------- .../adk_community/governance/_budget.py | 1 - .../adk_community/governance/_degradation.py | 1 - .../governance/veronica_governance_plugin.py | 1 - 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/contributing/samples/governance/main.py b/contributing/samples/governance/main.py index 8f34c12..6505cf7 100644 --- a/contributing/samples/governance/main.py +++ b/contributing/samples/governance/main.py @@ -29,11 +29,10 @@ from google.adk import Runner from google.adk.agents import Agent from google.adk.sessions import InMemorySessionService +from google.genai import types -from google.adk_community.governance import ( - GovernanceConfig, - VeronicaGovernancePlugin, -) +from google.adk_community.governance import GovernanceConfig +from google.adk_community.governance import VeronicaGovernancePlugin logging.basicConfig(level=logging.INFO, format="%(message)s") @@ -41,11 +40,11 @@ def main(): # Configure governance limits config = GovernanceConfig( - max_cost_usd=0.50, # org-level: 50 cents - agent_max_cost_usd=0.25, # per-agent: 25 cents - failure_threshold=3, # circuit breaker after 3 failures + max_cost_usd=0.50, # org-level: 50 cents + agent_max_cost_usd=0.25, # per-agent: 25 cents + failure_threshold=3, # circuit breaker after 3 failures recovery_timeout_s=30.0, - degradation_threshold=0.7, # degrade at 70% budget + degradation_threshold=0.7, # degrade at 70% budget fallback_model="gemini-2.0-flash-lite", blocked_tools=["shell_exec"], disable_tools_on_degrade=["web_search"], @@ -67,8 +66,7 @@ def main(): model="gemini-2.5-flash", name="summarizer", instruction=( - "You summarize text provided to you. Keep summaries to 2-3" - " sentences." + "You summarize text provided to you. Keep summaries to 2-3 sentences." ), ) @@ -98,8 +96,6 @@ async def run(): user_id="demo_user", ) - from google.genai import types - user_message = types.Content( role="user", parts=[types.Part(text="What is agent governance?")], diff --git a/src/google/adk_community/governance/_budget.py b/src/google/adk_community/governance/_budget.py index bd281df..7692656 100644 --- a/src/google/adk_community/governance/_budget.py +++ b/src/google/adk_community/governance/_budget.py @@ -17,7 +17,6 @@ from __future__ import annotations from dataclasses import dataclass -from dataclasses import field import threading diff --git a/src/google/adk_community/governance/_degradation.py b/src/google/adk_community/governance/_degradation.py index e02bb36..3c491bb 100644 --- a/src/google/adk_community/governance/_degradation.py +++ b/src/google/adk_community/governance/_degradation.py @@ -17,7 +17,6 @@ from __future__ import annotations from dataclasses import dataclass -from dataclasses import field import threading diff --git a/src/google/adk_community/governance/veronica_governance_plugin.py b/src/google/adk_community/governance/veronica_governance_plugin.py index 06b18b0..00dc403 100644 --- a/src/google/adk_community/governance/veronica_governance_plugin.py +++ b/src/google/adk_community/governance/veronica_governance_plugin.py @@ -28,7 +28,6 @@ from typing import Any from typing import Optional -from google.adk.agents.base_agent import BaseAgent from google.adk.agents.callback_context import CallbackContext from google.adk.agents.invocation_context import InvocationContext from google.adk.models.llm_request import LlmRequest