-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathworkflow_converge.py
More file actions
184 lines (148 loc) · 5.75 KB
/
workflow_converge.py
File metadata and controls
184 lines (148 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""Writer → Reviewer workflow with branch-and-converge routing.
Demonstrates: Agent executors, conditional edges, and a converging graph shape:
Reviewer routes either directly to Publisher (approved) or through Editor,
and both paths converge before the final Summarizer output.
Run:
uv run examples/workflow_converge.py
uv run examples/workflow_converge.py --devui (opens DevUI at http://localhost:8093)
"""
import asyncio
import os
import sys
from typing import Any
from agent_framework import Agent, AgentExecutorResponse, Message, WorkflowBuilder
from agent_framework.openai import OpenAIChatClient
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from dotenv import load_dotenv
from pydantic import BaseModel
# Configure OpenAI client based on environment
load_dotenv(override=True)
API_HOST = os.getenv("API_HOST", "github")
if API_HOST == "azure":
async_credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(async_credential, "https://cognitiveservices.azure.com/.default")
client = OpenAIChatClient(
base_url=f"{os.environ['AZURE_OPENAI_ENDPOINT']}/openai/v1/",
api_key=token_provider,
model_id=os.environ["AZURE_OPENAI_CHAT_DEPLOYMENT"],
)
elif API_HOST == "github":
client = OpenAIChatClient(
base_url="https://models.github.ai/inference",
api_key=os.environ["GITHUB_TOKEN"],
model_id=os.getenv("GITHUB_MODEL", "openai/gpt-4.1-mini"),
)
else:
client = OpenAIChatClient(
api_key=os.environ["OPENAI_API_KEY"], model_id=os.environ.get("OPENAI_MODEL", "gpt-5-mini")
)
class ReviewResult(BaseModel):
"""Review evaluation with scores and feedback."""
score: int # Overall quality score (0-100)
feedback: str # Concise, actionable feedback
clarity: int # Clarity score (0-100)
completeness: int # Completeness score (0-100)
accuracy: int # Accuracy score (0-100)
structure: int # Structure score (0-100)
def parse_review_result(message: Any) -> ReviewResult | None:
"""Parse structured reviewer output from AgentExecutorResponse."""
if not isinstance(message, AgentExecutorResponse):
return None
return ReviewResult.model_validate_json(message.agent_response.text)
def is_approved(message: Any) -> bool:
"""Check if content is approved (high quality)."""
result = parse_review_result(message)
return result is not None and result.score >= 80
def needs_editing(message: Any) -> bool:
"""Route to editor when content quality is below threshold."""
result = parse_review_result(message)
return result is not None and result.score < 80
writer = Agent(
client=client,
name="Writer",
instructions=(
"You are an excellent content writer. "
"Create clear, engaging content based on the user's request. "
"Focus on clarity, accuracy, and proper structure."
),
)
reviewer = Agent(
client=client,
name="Reviewer",
instructions=(
"You are an expert content reviewer. "
"Evaluate the writer's content based on:\n"
"1. Clarity - Is it easy to understand?\n"
"2. Completeness - Does it fully address the topic?\n"
"3. Accuracy - Is the information correct?\n"
"4. Structure - Is it well-organized?\n\n"
"Return a JSON object with:\n"
"- score: overall quality (0-100)\n"
"- feedback: concise, actionable feedback\n"
"- clarity, completeness, accuracy, structure: individual scores (0-100)"
),
default_options={"response_format": ReviewResult},
)
editor = Agent(
client=client,
name="Editor",
instructions=(
"You are a skilled editor. "
"You will receive content along with review feedback. "
"Improve the content by addressing all the issues mentioned in the feedback. "
"Maintain the original intent while enhancing clarity, completeness, accuracy, and structure."
),
)
publisher = Agent(
client=client,
name="Publisher",
instructions=(
"You are a publishing agent. "
"You receive either approved content or edited content. "
"Format it for publication with proper headings and structure."
),
)
summarizer = Agent(
client=client,
name="Summarizer",
instructions=(
"You are a summarizer agent. "
"Create a final publication report that includes:\n"
"1. A brief summary of the published content\n"
"2. The workflow path taken (direct approval or edited)\n"
"3. Key highlights and takeaways\n"
"Keep it concise and professional."
),
)
workflow = (
WorkflowBuilder(
name="ContentConverge",
start_executor=writer,
description="Branch from reviewer, then converge before final summary output.",
)
.add_edge(writer, reviewer)
.add_edge(reviewer, publisher, condition=is_approved)
.add_edge(reviewer, editor, condition=needs_editing)
.add_edge(editor, publisher)
.add_edge(publisher, summarizer)
.build()
)
async def main():
prompt = 'Write a one-paragraph LinkedIn post: "The AI workflow mistake almost every team makes."'
print(f"Prompt: {prompt}\n")
events = await workflow.run(prompt)
outputs = events.get_outputs()
for output in outputs:
if not isinstance(output, AgentExecutorResponse):
print(output)
continue
final_message = Message(role="assistant", text=output.agent_response.text)
print(f"[{output.executor_id}]\n{final_message.text}\n")
if async_credential:
await async_credential.close()
if __name__ == "__main__":
if "--devui" in sys.argv:
from agent_framework.devui import serve
serve(entities=[workflow], port=8093, auto_open=True)
else:
asyncio.run(main())