Skip to content

Commit b42592f

Browse files
committed
feat: Add parallel node groups with join strategies
Add ParallelNodeGroup for concurrent node execution within GraphAgent with WAIT_ALL, WAIT_ANY, and WAIT_N join strategies and configurable error policies (FAIL_FAST, CONTINUE, COLLECT). Includes parallel samples and test coverage.
1 parent 7302880 commit b42592f

20 files changed

Lines changed: 4123 additions & 14 deletions

File tree

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# GraphAgent Multi-Agent Research Workflow
2+
3+
Demonstrates a **multi-agent coordination pattern** using GraphAgent:
4+
parallel research branches, sequential coordination, and a quality-review loop.
5+
6+
## Graph Structure
7+
8+
```
9+
coordinator
10+
11+
[researcher_a ║ researcher_b] ← ParallelNodeGroup (WAIT_ALL)
12+
13+
merger
14+
15+
critic ──REVISE──→ merger
16+
17+
APPROVED
18+
19+
END
20+
```
21+
22+
## What Each Agent Does
23+
24+
| Agent | Role | Output key |
25+
|-------|------|-----------|
26+
| coordinator | Splits topic into two subtopics | `subtopics` |
27+
| researcher_a | Investigates subtopic A concurrently | `research_a` |
28+
| researcher_b | Investigates subtopic B concurrently | `research_b` |
29+
| merger | Synthesises findings into one report | `merged_report` |
30+
| critic | Peer-reviews; routes to merger (REVISE) or ends (APPROVED) | `review` |
31+
32+
## When to Use
33+
34+
- Tasks that decompose into independent parallel workstreams
35+
- Workflows needing a quality-review loop after synthesis
36+
- Any pattern mixing sequential coordination, parallelism, and conditional loops
37+
38+
## Comparison with Other Workflow Agents
39+
40+
| Capability | SequentialAgent | ParallelAgent | **GraphAgent** |
41+
|------------|----------------|---------------|----------------|
42+
| Run researcher_a and researcher_b concurrently ||||
43+
| Pre-coordination step before parallel work ||||
44+
| Post-merge step after parallel work ||||
45+
| Conditional quality loop (critic → merger) ||||
46+
| Inspect state to decide routing ||||
47+
48+
**ParallelAgent** can fan out but cannot add a coordinator before or a
49+
critic-loop after — it has no concept of entry/exit coordination nodes.
50+
**SequentialAgent** executes in a fixed order and cannot parallelise the
51+
two researchers.
52+
53+
## Key Code
54+
55+
```python
56+
# Register parallel group: researcher_a and researcher_b run concurrently
57+
graph.add_parallel_group(
58+
"researchers",
59+
ParallelNodeGroup(
60+
nodes=["researcher_a", "researcher_b"],
61+
join_strategy=JoinStrategy.WAIT_ALL,
62+
),
63+
)
64+
65+
# Edges fan-out from coordinator to both researchers
66+
graph.add_edge("coordinator", "researcher_a")
67+
graph.add_edge("coordinator", "researcher_b")
68+
69+
# Both researchers converge at merger
70+
graph.add_edge("researcher_a", "merger")
71+
graph.add_edge("researcher_b", "merger")
72+
73+
# Conditional quality loop
74+
graph.add_edge("critic", "merger", condition=lambda s: s.data.get("review","").startswith("REVISE"))
75+
graph.set_end("critic")
76+
```
77+
78+
## State Isolation
79+
80+
During parallel execution each branch (`researcher_a`, `researcher_b`) receives
81+
an **isolated copy** of the shared state. Both write to independent output keys
82+
(`research_a`, `research_b`), so there are no race conditions. After both
83+
complete, states are merged automatically before `merger` runs.
84+
85+
## How to Run
86+
87+
```bash
88+
cd /path/to/adk-python
89+
source venv/bin/activate
90+
export GOOGLE_API_KEY=<your-key>
91+
python -m contributing.samples.graph_agent_multi_agent.agent
92+
```
93+
94+
## Related Examples
95+
96+
- `contributing/samples/graph_examples/09_parallel_wait_all` — parallel basics
97+
- `contributing/samples/graph_examples/14_parallel_rewind` — parallel + rewind
98+
- `contributing/samples/graph_agent_advanced` — full research workflow with interrupts
99+
- `contributing/docs/advanced_graph_patterns.md` — pattern guide

contributing/samples/graph_agent_multi_agent/__init__.py

Whitespace-only changes.
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
"""GraphAgent multi-agent research workflow example.
2+
3+
Demonstrates a coordinator → parallel researchers → merger → critic loop:
4+
5+
coordinator
6+
7+
[researcher_a || researcher_b] (ParallelNodeGroup, WAIT_ALL)
8+
9+
merger
10+
11+
critic ──REVISE──→ merger
12+
13+
APPROVED
14+
15+
END
16+
17+
Why GraphAgent (not ParallelAgent/SequentialAgent)?
18+
- SequentialAgent: cannot run researcher_a and researcher_b concurrently.
19+
- ParallelAgent: parallelises but cannot add coordinator before or critic+loop after.
20+
- GraphAgent: combines sequential coordination, true parallel research, AND a
21+
conditional quality-review loop in one declarative graph.
22+
23+
Run (requires GOOGLE_API_KEY env var):
24+
python -m contributing.samples.graph_agent_multi_agent.agent
25+
"""
26+
27+
import asyncio
28+
import os
29+
30+
from google.adk.agents.graph import GraphAgent
31+
from google.adk.agents.graph import GraphState
32+
from google.adk.agents.graph import JoinStrategy
33+
from google.adk.agents.graph import ParallelNodeGroup
34+
from google.adk.agents.graph import StateReducer
35+
from google.adk.agents.llm_agent import LlmAgent
36+
from google.adk.runners import Runner
37+
from google.adk.sessions.in_memory_session_service import InMemorySessionService
38+
from google.genai import types
39+
from pydantic import BaseModel
40+
41+
_MODEL = os.getenv("LLM_MODEL_NAME", "gemini-2.5-flash")
42+
43+
# ---------------------------------------------------------------------------
44+
# Output Schemas
45+
# ---------------------------------------------------------------------------
46+
47+
48+
class ReviewResult(BaseModel):
49+
"""Structured review output from critic agent."""
50+
51+
decision: str # "approve" or "revise"
52+
feedback: str # Review comments
53+
54+
55+
# ---------------------------------------------------------------------------
56+
# Agents
57+
# ---------------------------------------------------------------------------
58+
59+
coordinator = LlmAgent(
60+
name="coordinator",
61+
model=_MODEL,
62+
instruction=(
63+
"You are a research coordinator. Given a research topic, split it into"
64+
" exactly two independent subtopics for parallel investigation. Output"
65+
" each subtopic on its own line prefixed with 'SUBTOPIC A:' and"
66+
" 'SUBTOPIC B:'."
67+
),
68+
output_key="subtopics",
69+
)
70+
71+
researcher_a = LlmAgent(
72+
name="researcher_a",
73+
model=_MODEL,
74+
instruction=(
75+
"You are a researcher specialising in the first subtopic. "
76+
"Write a concise research summary (3-5 sentences) with key findings."
77+
),
78+
output_key="research_a",
79+
)
80+
81+
researcher_b = LlmAgent(
82+
name="researcher_b",
83+
model=_MODEL,
84+
instruction=(
85+
"You are a researcher specialising in the second subtopic. "
86+
"Write a concise research summary (3-5 sentences) with key findings."
87+
),
88+
output_key="research_b",
89+
)
90+
91+
merger = LlmAgent(
92+
name="merger",
93+
model=_MODEL,
94+
instruction=(
95+
"You are a synthesis expert. Merge the two research summaries into a "
96+
"single coherent report. Highlight complementary insights."
97+
),
98+
output_key="merged_report",
99+
)
100+
101+
critic = LlmAgent(
102+
name="critic",
103+
model=_MODEL,
104+
instruction=(
105+
"You are a peer reviewer. Evaluate the merged report for clarity, "
106+
"completeness, and accuracy. "
107+
'Return {"decision": "approve", "feedback": "..."} if good, '
108+
'or {"decision": "revise", "feedback": "explanation..."} if needs work.'
109+
),
110+
output_schema=ReviewResult, # Structured output
111+
# output_key auto-defaults to "critic" (agent name)
112+
)
113+
114+
115+
# ---------------------------------------------------------------------------
116+
# Routing predicates
117+
# ---------------------------------------------------------------------------
118+
119+
120+
def _needs_revision(state: GraphState) -> bool:
121+
"""Check if critic requested revision using structured output."""
122+
review = state.get_parsed("critic", ReviewResult)
123+
return review.decision.lower() == "revise" if review else False
124+
125+
126+
def _is_approved(state: GraphState) -> bool:
127+
"""Check if critic approved using structured output."""
128+
review = state.get_parsed("critic", ReviewResult)
129+
return review.decision.lower() == "approve" if review else False
130+
131+
132+
# ---------------------------------------------------------------------------
133+
# Graph
134+
# ---------------------------------------------------------------------------
135+
136+
137+
def build_multi_agent_graph() -> GraphAgent:
138+
graph = GraphAgent(
139+
name="research_graph",
140+
description=(
141+
"Multi-agent research with parallel execution and quality loop"
142+
),
143+
max_iterations=20,
144+
)
145+
146+
graph.add_node("coordinator", agent=coordinator)
147+
148+
graph.add_node(
149+
"researcher_a",
150+
agent=researcher_a,
151+
# Both researchers see the same coordinator output
152+
input_mapper=lambda s: s.data.get("subtopics", ""),
153+
reducer=StateReducer.OVERWRITE,
154+
)
155+
graph.add_node(
156+
"researcher_b",
157+
agent=researcher_b,
158+
input_mapper=lambda s: s.data.get("subtopics", ""),
159+
reducer=StateReducer.OVERWRITE,
160+
)
161+
162+
graph.add_node(
163+
"merger",
164+
agent=merger,
165+
input_mapper=lambda s: (
166+
f"Research A:\n{s.data.get('research_a', '')}\n\n"
167+
f"Research B:\n{s.data.get('research_b', '')}"
168+
),
169+
reducer=StateReducer.OVERWRITE,
170+
)
171+
graph.add_node("critic", agent=critic)
172+
173+
# Register parallel group so branches execute concurrently
174+
graph.add_parallel_group(
175+
"researchers",
176+
ParallelNodeGroup(
177+
nodes=["researcher_a", "researcher_b"],
178+
join_strategy=JoinStrategy.WAIT_ALL,
179+
),
180+
)
181+
182+
graph.set_start("coordinator")
183+
graph.add_edge("coordinator", "researcher_a")
184+
graph.add_edge("coordinator", "researcher_b")
185+
graph.add_edge("researcher_a", "merger")
186+
graph.add_edge("researcher_b", "merger")
187+
graph.add_edge("merger", "critic")
188+
189+
# Quality loop: revise if not approved
190+
graph.add_edge("critic", "merger", condition=_needs_revision)
191+
192+
graph.set_end("critic")
193+
194+
return graph
195+
196+
197+
# ---------------------------------------------------------------------------
198+
# Main
199+
# ---------------------------------------------------------------------------
200+
201+
202+
async def main() -> None:
203+
session_service = InMemorySessionService()
204+
graph = build_multi_agent_graph()
205+
206+
session = await session_service.create_session(
207+
app_name="research_graph", user_id="user1"
208+
)
209+
210+
topic = "The impact of large language models on software engineering"
211+
print(f"Research topic: {topic}\n")
212+
213+
# Use Runner instead of manual invocation context
214+
runner = Runner(
215+
app_name="research_graph",
216+
agent=graph,
217+
session_service=session_service,
218+
auto_create_session=False, # Session already created above
219+
)
220+
221+
revision_count = 0
222+
async for event in runner.run_async(
223+
user_id="user1",
224+
session_id=session.id,
225+
new_message=types.Content(parts=[types.Part(text=topic)]),
226+
):
227+
if not event.content or not event.content.parts:
228+
continue
229+
author = event.author
230+
text = event.content.parts[0].text or ""
231+
if author == "coordinator":
232+
print("Coordinator assigned subtopics.")
233+
elif author in ("researcher_a", "researcher_b"):
234+
print(f" [{author}] research complete ({len(text)} chars)")
235+
elif author == "merger":
236+
revision_count += 1
237+
print(f"Merger produced report (revision {revision_count}).")
238+
elif author == "critic":
239+
# Parse critic output from the event text (JSON string)
240+
try:
241+
review = ReviewResult.model_validate_json(text.strip())
242+
decision = review.decision.upper()
243+
except Exception:
244+
decision = "UNKNOWN (parse error)"
245+
print(f"Critic verdict: {decision}")
246+
247+
# Re-fetch fresh session state (create_session returns a deepcopy)
248+
fresh_session = await session_service.get_session(
249+
app_name="research_graph", user_id="user1", session_id=session.id
250+
)
251+
final_data = (fresh_session or session).state.get("graph_data", {})
252+
final_state = GraphState(data=final_data)
253+
254+
print("\nFinal merged report:")
255+
print(final_state.get_str("merged_report", "(none)")[:500])
256+
print("\nFinal review:")
257+
review = final_state.get_parsed("critic", ReviewResult)
258+
print(f"Decision: {review.decision if review else 'none'}")
259+
print(f"Feedback: {review.feedback[:200] if review else 'none'}")
260+
261+
262+
if __name__ == "__main__":
263+
asyncio.run(main())
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""GraphAgent example."""

0 commit comments

Comments
 (0)