forked from google/adk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresilient_agent.py
More file actions
105 lines (88 loc) · 3.13 KB
/
resilient_agent.py
File metadata and controls
105 lines (88 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# Sample: Using LlmResiliencePlugin for robust model calls
#
# Run with:
# PYTHONPATH=$(pwd)/src python samples/resilient_agent.py
#
# This demonstrates:
# - Configuring LlmResiliencePlugin for retries and fallbacks
# - Running a minimal in-memory agent with a mocked model
from __future__ import annotations
import asyncio
from typing import ClassVar
from google.adk.agents.llm_agent import LlmAgent
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.models.base_llm import BaseLlm
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.adk.models.registry import LLMRegistry
from google.adk.plugins.llm_resilience_plugin import LlmResiliencePlugin
from google.adk.runners import Runner
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.genai import types
class DemoFailThenSucceedModel(BaseLlm):
model: str = "demo-fail-succeed"
attempts: ClassVar[int] = (
0 # Class variable for shared state across instances
)
@classmethod
def supported_models(cls) -> list[str]:
return ["demo-fail-succeed"]
async def generate_content_async(
self, llm_request: LlmRequest, stream: bool = False
):
# Fail for the first attempt, then succeed
DemoFailThenSucceedModel.attempts += 1
if DemoFailThenSucceedModel.attempts < 2:
raise TimeoutError("Simulated transient failure")
yield LlmResponse(
content=types.Content(
role="model",
parts=[types.Part.from_text(text="Recovered on retry!")],
),
partial=False,
)
# Register test models
LLMRegistry.register(DemoFailThenSucceedModel)
async def main():
# Agent with the failing-then-succeed model
agent = LlmAgent(name="resilient_agent", model="demo-fail-succeed")
# Build services and runner in-memory
artifact_service = InMemoryArtifactService()
session_service = InMemorySessionService()
memory_service = InMemoryMemoryService()
runner = Runner(
app_name="resilience_demo",
agent=agent,
artifact_service=artifact_service,
session_service=session_service,
memory_service=memory_service,
plugins=[
LlmResiliencePlugin(
max_retries=2,
backoff_initial=0.1,
backoff_multiplier=2.0,
jitter=0.1,
fallback_models=["mock"], # Demonstration; not used here
)
],
)
# Create a session and run once
session = await session_service.create_session(
app_name="resilience_demo", user_id="demo"
)
events = []
async for ev in runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text="hello")]
),
):
events.append(ev)
print("Collected", len(events), "events")
for e in events:
if e.content and e.content.parts and e.content.parts[0].text:
print("MODEL:", e.content.parts[0].text.strip())
if __name__ == "__main__":
asyncio.run(main())