-
Notifications
You must be signed in to change notification settings - Fork 104
Expand file tree
/
Copy pathworkflow_agents_concurrent.py
More file actions
120 lines (99 loc) · 4.2 KB
/
workflow_agents_concurrent.py
File metadata and controls
120 lines (99 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Concurrent multi-expert analysis using ConcurrentBuilder orchestration.
Demonstrates: ConcurrentBuilder(participants=[...]) to run 3 specialist agents
in parallel on the same user prompt, then collect the default aggregated output
(combined message list).
Each participant receives the original prompt independently and runs concurrently.
The default aggregator merges all agent conversations into a single message list.
Contrast with workflow_agents_sequential.py, where agents run one after another
and each sees the full conversation so far.
Reference:
https://learn.microsoft.com/en-us/agent-framework/workflows/orchestrations/concurrent?pivots=programming-language-python
Run:
uv run examples/workflow_agents_concurrent.py
uv run examples/workflow_agents_concurrent.py --devui (opens DevUI at http://localhost:8105)
"""
import asyncio
import logging
import os
import sys
from agent_framework import Agent, Message
from agent_framework.openai import OpenAIChatClient
from agent_framework.orchestrations import ConcurrentBuilder
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from dotenv import load_dotenv
from rich.logging import RichHandler
log_handler = RichHandler(show_path=False, rich_tracebacks=True, show_level=False)
logging.basicConfig(level=logging.WARNING, handlers=[log_handler], force=True, format="%(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
load_dotenv(override=True)
API_HOST = os.getenv("API_HOST", "github")
# Configure the chat client based on the API host
async_credential = None
if API_HOST == "azure":
async_credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(async_credential, "https://cognitiveservices.azure.com/.default")
client = OpenAIChatClient(
base_url=f"{os.environ['AZURE_OPENAI_ENDPOINT']}/openai/v1/",
api_key=token_provider,
model_id=os.environ["AZURE_OPENAI_CHAT_DEPLOYMENT"],
)
elif API_HOST == "github":
client = OpenAIChatClient(
base_url="https://models.github.ai/inference",
api_key=os.environ["GITHUB_TOKEN"],
model_id=os.getenv("GITHUB_MODEL", "openai/gpt-4.1-mini"),
)
else:
client = OpenAIChatClient(
api_key=os.environ["OPENAI_API_KEY"], model_id=os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
)
# Three specialist agents — each brings a different perspective to the same prompt
researcher = Agent(
client=client,
name="Researcher",
instructions=(
"You are an expert market and product researcher. "
"Given a prompt, provide concise, factual insights, opportunities, and risks. "
"Keep your analysis to one paragraph."
),
)
marketer = Agent(
client=client,
name="Marketer",
instructions=(
"You are a creative marketing strategist. "
"Craft a compelling value proposition and target messaging aligned to the prompt. "
"Keep your response to one paragraph."
),
)
legal = Agent(
client=client,
name="Legal",
instructions=(
"You are a cautious legal and compliance reviewer. "
"Highlight constraints, disclaimers, and policy concerns based on the prompt. "
"Keep your response to one paragraph."
),
)
# Build the concurrent workflow — all three agents run in parallel
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
async def main():
prompt = "We are launching a new budget-friendly electric bike for urban commuters."
logger.info("Prompt: %s", prompt)
result = await workflow.run(prompt)
outputs = result.get_outputs()
for conversation in outputs:
logger.info("===== Aggregated Conversation =====")
messages: list[Message] = conversation
for index, message in enumerate(messages, start=1):
author = message.author_name or ("assistant" if message.role == "assistant" else "user")
logger.info("%02d [%s]\n%s", index, author, message.text)
if async_credential:
await async_credential.close()
if __name__ == "__main__":
if "--devui" in sys.argv:
from agent_framework.devui import serve
serve(entities=[workflow], port=8105, auto_open=True)
else:
asyncio.run(main())