Skip to content

Commit b64a3dd

Browse files
committed
feat: Add GraphAgent for directed-graph workflow orchestration
Add GraphAgent for building directed-graph workflows with conditional routing, cyclic execution, state management with reducers, typed events, streaming, callbacks, rewind, resumability, telemetry with OpenTelemetry tracing, evaluation metrics, and CLI graph visualization for GraphAgent topologies. Includes samples and design documentation.
1 parent 223d9a7 commit b64a3dd

58 files changed

Lines changed: 16332 additions & 0 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
# GraphAgent Design Document
2+
3+
**Author**: ADK Team
4+
**Date**: 2026-01-25
5+
**Status**: Experimental
6+
7+
---
8+
9+
## Motivation
10+
11+
ADK provides three workflow agents (SequentialAgent, ParallelAgent, LoopAgent) that execute in **fixed patterns**. While these can be composed, they cannot make **runtime decisions** based on state.
12+
13+
GraphAgent fills this gap by enabling **conditional routing** - the ability to choose different execution paths based on runtime state.
14+
15+
---
16+
17+
## Use Case: Why Existing Agents Are Insufficient
18+
19+
### Problem: Data Validation Pipeline with Retry
20+
21+
**Requirement**:
22+
1. Validate input data
23+
2. If valid → process data
24+
3. If invalid → retry validation (max 3 times)
25+
4. After max retries → route to error handler
26+
27+
### Attempt 1: SequentialAgent ❌
28+
29+
```python
30+
sequential = SequentialAgent(sub_agents=[
31+
validator,
32+
processor, # ❌ Always runs, even if validation failed
33+
])
34+
```
35+
36+
**Problem**: Cannot skip processor if validation fails.
37+
38+
### Attempt 2: Composition ❌
39+
40+
```python
41+
# Try to handle errors with conditional logic
42+
sequential = SequentialAgent(sub_agents=[
43+
validator,
44+
# ❌ No way to conditionally route here
45+
processor,
46+
])
47+
```
48+
49+
**Problem**: Sequential/Parallel/Loop agents don't support conditional edges.
50+
51+
### Solution: GraphAgent ✅
52+
53+
```python
54+
graph = GraphAgent(name="pipeline")
55+
56+
# Add nodes (convenience API)
57+
graph.add_node("validate", agent=validator)
58+
graph.add_node("process", agent=processor)
59+
graph.add_node("retry", agent=retry_handler)
60+
graph.add_node("error", agent=error_handler)
61+
62+
# Conditional edges based on runtime state
63+
graph.add_edge(
64+
"validate",
65+
"process",
66+
condition=lambda state: state.data.get("valid", False) is True
67+
)
68+
graph.add_edge(
69+
"validate",
70+
"retry",
71+
condition=lambda state: (
72+
not state.data.get("valid", False)
73+
and state.data.get("retry_count", 0) < 3
74+
)
75+
)
76+
graph.add_edge(
77+
"validate",
78+
"error",
79+
condition=lambda state: (
80+
not state.data.get("valid", False)
81+
and state.data.get("retry_count", 0) >= 3
82+
)
83+
)
84+
graph.add_edge("retry", "validate") # Loop back
85+
```
86+
87+
**This cannot be achieved with SequentialAgent, ParallelAgent, or LoopAgent.**
88+
89+
---
90+
91+
## Key Capabilities
92+
93+
### 1. Conditional Routing
94+
95+
Execute different paths based on runtime state:
96+
97+
```python
98+
graph.add_edge(
99+
"node_a",
100+
"node_b",
101+
condition=lambda state: state.data["score"] > 0.8
102+
)
103+
graph.add_edge(
104+
"node_a",
105+
"node_c",
106+
condition=lambda state: state.data["score"] <= 0.8
107+
)
108+
```
109+
110+
### 2. Cyclic Execution (Loops)
111+
112+
Loop back to previous nodes:
113+
114+
```python
115+
graph.add_edge("validate", "process")
116+
graph.add_edge("process", "validate") # Loop back for next iteration
117+
```
118+
119+
Protected by `max_iterations` to prevent infinite loops.
120+
121+
### 3. State Management
122+
123+
Track state across node executions:
124+
125+
```python
126+
class GraphState:
127+
data: Dict[str, Any] # Domain data (node outputs, user values)
128+
129+
class GraphAgentState(BaseAgentState):
130+
current_node: str # Execution tracking
131+
iteration: int
132+
path: List[str]
133+
node_invocations: Dict[str, List[str]]
134+
# ... more execution tracking fields
135+
```
136+
137+
Domain data flows through `state_delta` events; execution tracking through `BaseAgentState`.
138+
139+
### 4. Checkpointing (Optional)
140+
141+
Save state after each node for resume capability:
142+
143+
```python
144+
graph = GraphAgent(
145+
name="workflow",
146+
checkpoint_service=checkpoint_service # Optional parameter
147+
)
148+
# State automatically saved to session after each node
149+
```
150+
151+
Resume from checkpoint:
152+
153+
```python
154+
graph, checkpoint = await GraphAgent.resume_from_checkpoint(
155+
session_service, app_name, user_id, session_id
156+
)
157+
await graph.continue_from_checkpoint(checkpoint, session_service)
158+
```
159+
160+
---
161+
162+
## Architecture
163+
164+
### Core Components
165+
166+
**GraphNode**: Wrapper around BaseAgent
167+
- `name`: Node identifier
168+
- `agent`: BaseAgent to execute
169+
- `edges`: Conditional edges to other nodes
170+
- `output_mapper`: Transform agent output to state
171+
172+
**GraphEdge**: Conditional transition
173+
- `to_node`: Target node name
174+
- `condition`: Predicate on GraphState (optional)
175+
176+
**GraphState**: Domain data container
177+
- `data`: Accumulated results from nodes
178+
179+
**GraphAgentState(BaseAgentState)**: Execution tracking
180+
- `current_node`, `iteration`, `path`, `node_invocations`, etc.
181+
182+
### Execution Flow
183+
184+
1. Start at `start_node`
185+
2. Execute current node's agent
186+
3. Update state with node output (via output_mapper)
187+
4. Evaluate edge conditions to find next node
188+
5. If multiple conditions match → take first matching edge
189+
6. If no conditions match → stop (must be at end_node)
190+
7. Repeat until end_node reached or max_iterations exceeded
191+
192+
### Integration with ADK
193+
194+
GraphAgent is a proper **BaseAgent**:
195+
- ✅ Extends `BaseAgent`
196+
- ✅ Implements `_run_async_impl(ctx: InvocationContext)`
197+
- ✅ Yields `Event` objects
198+
- ✅ Uses `EventActions.state_delta` for state persistence
199+
- ✅ Works with any `SessionService` (InMemory, SQLite, VertexAI)
200+
201+
GraphAgent uses **event-driven state**:
202+
- State updates via `EventActions.state_delta`
203+
- No manual state tracking
204+
- Automatically persisted by SessionService
205+
- Works with all ADK services
206+
207+
---
208+
209+
## Comparison to Alternatives
210+
211+
### vs SequentialAgent
212+
213+
| Feature | SequentialAgent | GraphAgent |
214+
|---------|----------------|------------|
215+
| Execution | Fixed sequence | Conditional routing |
216+
| Loops | No | Yes (with max_iterations) |
217+
| Branching | No | Yes (conditional edges) |
218+
| State management | Basic | Rich (GraphState) |
219+
| Use case | Simple pipelines | Complex workflows |
220+
221+
### vs ParallelAgent
222+
223+
| Feature | ParallelAgent | GraphAgent |
224+
|---------|--------------|------------|
225+
| Execution | All sub-agents in parallel | Conditional sequential/parallel |
226+
| Dependencies | None | Explicit (via edges) |
227+
| Conditional | No | Yes |
228+
| Use case | Independent tasks | Dependent workflow |
229+
230+
### vs LangGraph
231+
232+
GraphAgent is **simpler** and **ADK-native**:
233+
- No new concepts (uses BaseAgent, Events, SessionService)
234+
- No custom state management (uses ADK primitives)
235+
- No custom checkpointing (uses EventActions.state_delta)
236+
- Direct integration with ADK ecosystem
237+
238+
---
239+
240+
## Implementation Details
241+
242+
### Size and Complexity
243+
244+
- **Core implementation**: ~1,231 lines
245+
- **Tests**: 82 tests, 2,665 lines
246+
- **Marked**: `@experimental` (API may change)
247+
248+
### Why This Size?
249+
250+
GraphAgent includes:
251+
- Graph structure management (nodes, edges)
252+
- Conditional routing logic
253+
- State management (GraphState, reducers)
254+
- Checkpointing integration
255+
- Resume/continue from checkpoint
256+
- Checkpoint management (list, delete, export, import)
257+
- Error handling and validation
258+
259+
**Note**: Future refactoring may extract checkpointing utilities for reuse across all agents.
260+
261+
---
262+
263+
## Implemented Features
264+
265+
### Interrupt & Observability Framework ✅
266+
267+
**Callback-based Observability**:
268+
- `before_node_callback` / `after_node_callback` for custom observability
269+
- `NodeCallbackContext` with full access to state, iteration, and invocation context
270+
- Developers control event format and content (no hardcoded strings)
271+
- Extensible metadata via `state_delta`
272+
273+
**LLM-based Interrupt Reasoning**:
274+
- `InterruptReasoner` agent analyzes interrupt messages and decides actions
275+
- Context-aware decisions based on current node, state, and execution path
276+
- Available actions: continue, rerun, pause, defer, skip
277+
- Extensible via `custom_actions` dictionary
278+
279+
**Flexible Interrupt Timings**:
280+
- `InterruptMode.BEFORE` - Validate before node execution
281+
- `InterruptMode.AFTER` - Correct after node execution (default)
282+
- `InterruptMode.BOTH` - Both before and after
283+
- Per-node configuration via `InterruptConfig.nodes`
284+
285+
**Immediate Cancellation**:
286+
- ESC-like immediate interrupt (cancels during node execution, not just between nodes)
287+
- State preservation on cancellation (partial results, execution path, resume flag)
288+
- Clean session cleanup with restart capability
289+
- Three cancellation paths: between nodes, during execution, task cancellation
290+
291+
### Current Limitations
292+
293+
1. **No parallel node execution**: Nodes execute sequentially
294+
- **Future**: Add parallel node groups
295+
296+
2. **Simple condition evaluation**: First matching edge wins
297+
- **Future**: Add priority/weight to edges
298+
299+
3. **Checkpoint management coupled to GraphAgent**
300+
- **Future**: Extract CheckpointUtils for all agents
301+
302+
### Future Enhancements
303+
304+
- Extract checkpointing to reusable utilities
305+
- Add parallel node execution with dependency management
306+
- Enhance conditional routing (edge priorities, weights)
307+
- D3 visualization improvements (show interrupt points, callback hooks)
308+
309+
---
310+
311+
## Usage Guidance
312+
313+
### When to Use GraphAgent
314+
315+
**Good fit**:
316+
- ✅ Workflows with conditional routing
317+
- ✅ Multi-step pipelines with error recovery
318+
- ✅ Iterative refinement (loops)
319+
- ✅ State-dependent execution paths
320+
321+
**Not recommended**:
322+
- ❌ Simple sequential workflows (use SequentialAgent)
323+
- ❌ Independent parallel tasks (use ParallelAgent)
324+
- ❌ Simple loops (use LoopAgent)
325+
326+
### Getting Started
327+
328+
See `contributing/samples/graph_agent_basic/agent.py` for a complete example.
329+
330+
---
331+
332+
## API Stability
333+
334+
**Status**: `@experimental`
335+
336+
**What this means**:
337+
- API may change in future releases
338+
- Use in production at your own risk
339+
- Provide feedback to help stabilize API
340+
- Migration guide will be provided when API changes
341+
342+
**When will it graduate?**:
343+
- After 2-3 release cycles
344+
- After real-world usage validation
345+
- After addressing feedback
346+
- When API is proven stable
347+
348+
---
349+
350+
## References
351+
352+
- Source: `src/google/adk/agents/graph_agent.py`
353+
- Tests: `tests/unittests/agents/test_graph_agent.py`
354+
- Sample: `contributing/samples/graph_agent_basic/agent.py`
355+
- Similar: LangGraph, Apache Airflow DAGs
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# GraphAgent Basic Example — Conditional Routing
2+
3+
This example demonstrates a data validation pipeline using **conditional routing** based on runtime
4+
state. The validator checks input quality and branches to either a processor (success path) or an
5+
error handler (failure path), showing how GraphAgent enables state-dependent decision logic that
6+
sequential or parallel agent composition alone cannot achieve.
7+
8+
## When to Use This Pattern
9+
10+
- Any workflow requiring "if X then A, else B" branching on agent output
11+
- Input validation before expensive downstream processing
12+
- Quality-gate patterns where the next step depends on a score or classification
13+
14+
## How to Run
15+
16+
```bash
17+
adk run contributing/samples/graph_agent_basic
18+
```
19+
20+
## Graph Structure
21+
22+
```
23+
validate ──(valid=True)──▶ process
24+
──(valid=False)─▶ error
25+
```
26+
27+
## Key Code Walkthrough
28+
29+
- **`GraphNode(name="validate", agent=validator_agent)`** — wraps an `LlmAgent` as a graph node
30+
- **`add_edge("validate", "process", condition=lambda s: s.data["valid"] == True)`** — conditional
31+
edge that only fires when the validation flag is set
32+
- **Two end nodes** (`process` and `error`) — GraphAgent can have multiple terminal nodes
33+
- **State propagation** — each node's output is written to `state.data[node_name]` and read by
34+
downstream condition functions
35+
- **No cycles** — this is a simple directed acyclic graph; for loops see `graph_agent_dynamic_queue`
36+
37+
## Further Reading
38+
39+
- [Advanced Graph Patterns](../../docs/advanced_graph_patterns.md)
40+
- [GraphAgent Architecture](../../docs/graph_agent_design.md)

contributing/samples/graph_agent_basic/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)