-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathworkflow_magenticone.py
More file actions
219 lines (188 loc) · 7.64 KB
/
workflow_magenticone.py
File metadata and controls
219 lines (188 loc) · 7.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
"""MagenticOne orchestration example with OpenAIChatClient setup used in this repo.
This sample demonstrates a Magentic manager coordinating three specialists to
produce a travel plan, with streaming output and orchestration ledger events.
Run:
uv run examples/workflow_magenticone.py
uv run examples/workflow_magenticone.py --devui
"""
import asyncio
import json
import os
import sys
from typing import cast
from agent_framework import Agent, AgentResponseUpdate, Message, WorkflowEvent
from agent_framework.openai import OpenAIChatClient
from agent_framework.orchestrations import MagenticBuilder, MagenticProgressLedger
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from dotenv import load_dotenv
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
# Configure OpenAI client based on environment
load_dotenv(override=True)
API_HOST = os.getenv("API_HOST", "github")
async_credential = None
if API_HOST == "azure":
async_credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(async_credential, "https://cognitiveservices.azure.com/.default")
client = OpenAIChatClient(
base_url=f"{os.environ['AZURE_OPENAI_ENDPOINT']}/openai/v1/",
api_key=token_provider,
model_id=os.environ["AZURE_OPENAI_CHAT_DEPLOYMENT"],
)
elif API_HOST == "github":
client = OpenAIChatClient(
base_url="https://models.github.ai/inference",
api_key=os.environ["GITHUB_TOKEN"],
model_id=os.getenv("GITHUB_MODEL", "openai/gpt-4.1-mini"),
)
else:
client = OpenAIChatClient(
api_key=os.environ["OPENAI_API_KEY"], model_id=os.environ.get("OPENAI_MODEL", "gpt-4.1-mini")
)
console = Console()
local_agent = Agent(
client=client,
instructions=(
"You suggest authentic and interesting local activities or places to visit, "
"using any context provided by the user or other agents."
),
name="local_agent",
description="Specialist in local activities and places.",
)
language_agent = Agent(
client=client,
instructions=(
"You review travel plans and provide practical tips for language and communication "
"challenges at the destination. If coverage is already good, acknowledge that with rationale."
),
name="language_agent",
description="Specialist in language and communication advice.",
)
travel_summary_agent = Agent(
client=client,
instructions=(
"You synthesize suggestions and advice from other agents into a complete travel plan. "
"Make reasonable assumptions when details are missing. "
"Do not ask the user follow-up questions. "
"Do not ask for confirmations or permissions. "
"YOUR FINAL RESPONSE MUST BE THE COMPLETE PLAN."
),
name="travel_summary_agent",
description="Specialist in travel-plan synthesis.",
)
manager_agent = Agent(
client=client,
name="manager_agent",
description="Magentic manager that coordinates specialists.",
instructions=(
"You coordinate specialists to solve complex tasks efficiently. "
"The user is not available for follow-up questions. "
"If information is missing, choose sensible assumptions and continue. "
"Ensure the workflow ends with a complete final plan."
),
)
magentic_workflow = MagenticBuilder(
participants=[local_agent, language_agent, travel_summary_agent],
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=1,
max_reset_count=1,
).build()
def handle_stream_event(event: WorkflowEvent, last_message_id: str | None) -> str | None:
"""Render a workflow stream event and return the updated message id."""
# Streaming token from an agent (may not fire for all orchestrators)
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
message_id = event.data.message_id
if message_id != last_message_id:
if last_message_id is not None:
console.print()
console.print(f"🤖 {event.executor_id}:", end=" ")
last_message_id = message_id
console.print(event.data, end="")
return last_message_id
# A participant finished — show its output
if event.type == "executor_completed" and isinstance(event.data, list) and event.data:
# The data is a list of AgentResponseUpdate tokens — concatenate them
parts = [msg.text for msg in event.data if isinstance(msg, AgentResponseUpdate) and msg.text]
if parts:
full_text = "".join(parts)
console.print(
Panel(
Markdown(full_text),
title=f"🤖 {event.executor_id}",
border_style="cyan",
padding=(1, 2),
)
)
return last_message_id
# Orchestrator events (plan, progress ledger)
if event.type == "magentic_orchestrator":
console.print()
emoji = "✅" if event.data.event_type.name == "PROGRESS_LEDGER_UPDATED" else "🧭"
if isinstance(event.data.content, MagenticProgressLedger):
rendered_content = json.dumps(event.data.content.to_dict(), indent=2)
console.print(
Panel(
rendered_content,
title=f"{emoji} Orchestrator: {event.data.event_type.name}",
border_style="bold yellow",
padding=(1, 2),
)
)
elif hasattr(event.data.content, "text"):
console.print(
Panel(
Markdown(event.data.content.text),
title=f"{emoji} Orchestrator: {event.data.event_type.name}",
border_style="bold green",
padding=(1, 2),
)
)
else:
console.print(
Panel(
Markdown(str(event.data.content)),
title=f"{emoji} Orchestrator: {event.data.event_type.name}",
border_style="bold green",
padding=(1, 2),
)
)
return last_message_id
def print_final_result(output_event: WorkflowEvent | None) -> None:
"""Print the final plan from the workflow output event."""
if output_event is None:
return
output_messages = cast(list[Message], output_event.data)
console.print(
Panel(
Markdown(output_messages[-1].text),
title="🌍 Final Travel Plan",
border_style="bold green",
padding=(1, 2),
)
)
async def main() -> None:
"""Run the Magentic workflow with streaming output."""
task = (
"Plan a half-day trip in Costa Rica for a family with two children ages 6 and 9, "
"staying in San José, with a mid-range budget. "
"Provide a complete itinerary with timing, transport assumptions, estimated costs, "
"food recommendation, and practical language tips."
)
console.print(f"[bold]Task:[/bold] {task}\n")
last_message_id: str | None = None
output_event: WorkflowEvent | None = None
async for event in magentic_workflow.run(task, stream=True):
last_message_id = handle_stream_event(event, last_message_id)
if event.type == "output" and not isinstance(event.data, AgentResponseUpdate):
output_event = event
print_final_result(output_event)
if async_credential:
await async_credential.close()
if __name__ == "__main__":
if "--devui" in sys.argv:
from agent_framework.devui import serve
serve(entities=[magentic_workflow], auto_open=True)
else:
asyncio.run(main())