-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathworkflow_conditional_structured.py
More file actions
158 lines (124 loc) · 5.12 KB
/
workflow_conditional_structured.py
File metadata and controls
158 lines (124 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Writer → Reviewer workflow using structured outputs with conditional edges.
Demonstrates: response_format for typed reviewer decisions, conditional edges,
and a terminal @executor publisher node.
This is a direct contrast with workflow_conditional.py:
- Same branching shape via add_edge(..., condition=...)
- Different decision mechanism (typed JSON instead of sentinel string matching)
Routing:
- decision == APPROVED → publisher (terminal @executor)
- decision == REVISION_NEEDED → editor (terminal Agent)
Run:
uv run examples/workflow_conditional_structured.py (opens DevUI at http://localhost:8096)
"""
import asyncio
import os
import sys
from typing import Any, Literal
from agent_framework import Agent, AgentExecutorResponse, WorkflowBuilder, WorkflowContext, executor
from agent_framework.openai import OpenAIChatClient
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from dotenv import load_dotenv
from pydantic import BaseModel
from typing_extensions import Never
load_dotenv(override=True)
API_HOST = os.getenv("API_HOST", "github")
# Configure the chat client based on the API host
async_credential = None
if API_HOST == "azure":
async_credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(async_credential, "https://cognitiveservices.azure.com/.default")
client = OpenAIChatClient(
base_url=f"{os.environ['AZURE_OPENAI_ENDPOINT']}/openai/v1/",
api_key=token_provider,
model_id=os.environ["AZURE_OPENAI_CHAT_DEPLOYMENT"],
)
elif API_HOST == "github":
client = OpenAIChatClient(
base_url="https://models.github.ai/inference",
api_key=os.environ["GITHUB_TOKEN"],
model_id=os.getenv("GITHUB_MODEL", "openai/gpt-4.1-mini"),
)
else:
client = OpenAIChatClient(
api_key=os.environ["OPENAI_API_KEY"], model_id=os.environ.get("OPENAI_MODEL", "gpt-5-mini")
)
class ReviewDecision(BaseModel):
"""Structured reviewer decision used for conditional routing."""
decision: Literal["APPROVED", "REVISION_NEEDED"]
feedback: str
post_text: str | None = None
# Parse helper so condition functions remain small and explicit.
def parse_review_decision(message: Any) -> ReviewDecision | None:
"""Parse structured reviewer output from AgentExecutorResponse."""
if not isinstance(message, AgentExecutorResponse):
return None
return ReviewDecision.model_validate_json(message.agent_response.text)
def is_approved(message: Any) -> bool:
"""Route to publisher when structured decision is APPROVED."""
result = parse_review_decision(message)
return result is not None and result.decision == "APPROVED"
def needs_revision(message: Any) -> bool:
"""Route to editor when structured decision is REVISION_NEEDED."""
result = parse_review_decision(message)
return result is not None and result.decision == "REVISION_NEEDED"
writer = Agent(
client=client,
name="Writer",
instructions=(
"You are a concise content writer. "
"Write a clear, engaging short article (2-3 paragraphs) based on the user's topic."
),
)
reviewer = Agent(
client=client,
name="Reviewer",
instructions=(
"You are a strict content reviewer. Evaluate the writer's draft. "
"If the draft is ready, set decision=APPROVED and include the publishable post in post_text. "
"If it needs changes, set decision=REVISION_NEEDED and provide actionable feedback."
),
default_options={"response_format": ReviewDecision},
)
editor = Agent(
client=client,
name="Editor",
instructions=(
"You are a skilled editor. "
"You receive a writer's draft followed by the reviewer's feedback. "
"Rewrite the draft to address all issues raised in the feedback. "
"Output only the improved post."
),
)
@executor(id="publisher")
async def publisher(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Publish content from structured reviewer output."""
result = parse_review_decision(response)
if result is None:
await ctx.yield_output("✅ Published:\n\n(Unable to parse structured reviewer output.)")
return
content = (result.post_text or "").strip()
if not content:
content = "(Reviewer approved but did not provide post_text.)"
await ctx.yield_output(f"✅ Published:\n\n{content}")
workflow = (
WorkflowBuilder(start_executor=writer)
.add_edge(writer, reviewer)
.add_edge(reviewer, publisher, condition=is_approved)
.add_edge(reviewer, editor, condition=needs_revision)
.build()
)
async def main():
prompt = "Write a LinkedIn post predicting the 5 jobs AI agents will replace by December 2026."
print(f"Prompt: {prompt}\n")
events = await workflow.run(prompt)
for output in events.get_outputs():
print('Output:')
print(output)
if async_credential:
await async_credential.close()
if __name__ == "__main__":
if "--devui" in sys.argv:
from agent_framework.devui import serve
serve(entities=[workflow], port=8096, auto_open=True)
else:
asyncio.run(main())