-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathworkflow_conditional_state_isolated.py
More file actions
168 lines (135 loc) · 6.33 KB
/
workflow_conditional_state_isolated.py
File metadata and controls
168 lines (135 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Writer → Reviewer workflow with structured outputs, state, and state isolation.
Demonstrates: structured reviewer decisions, conditional edges, iterative
review loops, explicit state management with WorkflowContext.set_state/get_state,
and per-request state isolation via a workflow factory helper.
Routing:
- decision == APPROVED → publisher (terminal @executor)
- decision == REVISION_NEEDED → editor → reviewer (iterative loop)
State isolation best practice:
Build a fresh workflow (and fresh agents) per task/request by calling
create_workflow(...), so agent thread state and workflow state do not leak
across independent runs.
Run:
uv run examples/workflow_conditional_state_isolated.py (opens DevUI at http://localhost:8097)
"""
import asyncio
import os
import sys
from typing import Any, Literal
from agent_framework import Agent, AgentExecutorResponse, WorkflowBuilder, WorkflowContext, executor
from agent_framework.openai import OpenAIChatClient
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from dotenv import load_dotenv
from pydantic import BaseModel
from typing_extensions import Never
load_dotenv(override=True)
API_HOST = os.getenv("API_HOST", "github")
# Configure the chat client based on the API host
async_credential = None
if API_HOST == "azure":
async_credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(async_credential, "https://cognitiveservices.azure.com/.default")
client = OpenAIChatClient(
base_url=f"{os.environ['AZURE_OPENAI_ENDPOINT']}/openai/v1/",
api_key=token_provider,
model_id=os.environ["AZURE_OPENAI_CHAT_DEPLOYMENT"],
)
elif API_HOST == "github":
client = OpenAIChatClient(
base_url="https://models.github.ai/inference",
api_key=os.environ["GITHUB_TOKEN"],
model_id=os.getenv("GITHUB_MODEL", "openai/gpt-4.1-mini"),
)
else:
client = OpenAIChatClient(
api_key=os.environ["OPENAI_API_KEY"], model_id=os.environ.get("OPENAI_MODEL", "gpt-5-mini")
)
class ReviewDecision(BaseModel):
"""Structured reviewer decision used for conditional routing."""
decision: Literal["APPROVED", "REVISION_NEEDED"]
feedback: str
def parse_review_decision(message: Any) -> ReviewDecision | None:
"""Parse structured reviewer output from AgentExecutorResponse."""
if not isinstance(message, AgentExecutorResponse):
return None
return ReviewDecision.model_validate_json(message.agent_response.text)
# Condition functions — receive the message from the previous executor.
def is_approved(message: Any) -> bool:
"""Route to publisher when structured decision is APPROVED."""
result = parse_review_decision(message)
return result is not None and result.decision == "APPROVED"
def needs_revision(message: Any) -> bool:
"""Route to editor when structured decision is REVISION_NEEDED."""
result = parse_review_decision(message)
return result is not None and result.decision == "REVISION_NEEDED"
def create_workflow(model_client: OpenAIChatClient):
"""Create a fresh workflow instance with isolated agent/workflow state."""
writer = Agent(
client=model_client,
name="Writer",
instructions=(
"You are a concise content writer. "
"Write a clear, engaging short article (2-3 paragraphs) based on the user's topic."
),
)
reviewer = Agent(
client=model_client,
name="Reviewer",
instructions=(
"You are a strict content reviewer. Evaluate the writer's draft.\n"
"Check that the post is engaging and a good fit for the target platform.\n"
"Make sure that it does not sound overly LLM-generated.\n"
"Accessibility/style constraints: do not use em dashes (—) and do not use fancy Unicode text.\n"
"Return a structured decision using this schema: decision and feedback.\n"
"Set decision=APPROVED if the draft is clear, accurate, and well-structured.\n"
"Set decision=REVISION_NEEDED if it requires improvement.\n"
"In feedback, explain your reasoning briefly and provide actionable edits when needed."
),
default_options={"response_format": ReviewDecision},
)
editor = Agent(
client=model_client,
name="Editor",
instructions=(
"You are a skilled editor. "
"You receive a writer's draft followed by the reviewer's feedback. "
"Rewrite the draft to address all issues raised in the feedback. "
"Output only the improved post."
"Ensure the length of the final post is appropriate for the target platform."
),
)
@executor(id="store_post_text")
async def store_post_text(response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
"""Persist latest post text in workflow state and pass message downstream."""
ctx.set_state("post_text", response.agent_response.text.strip())
await ctx.send_message(response)
@executor(id="publisher")
async def publisher(_response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Publish the latest approved post text from workflow state."""
content = str(ctx.get_state("post_text", "")).strip()
await ctx.yield_output(f"✅ Published:\n\n{content}")
return (
WorkflowBuilder(start_executor=writer, max_iterations=8)
.add_edge(writer, store_post_text)
.add_edge(store_post_text, reviewer)
.add_edge(reviewer, publisher, condition=is_approved)
.add_edge(reviewer, editor, condition=needs_revision)
.add_edge(editor, store_post_text)
.build()
)
async def main():
prompt = "Write a LinkedIn post predicting the 5 jobs AI agents will replace by December 2026."
print(f"Prompt: {prompt}\n")
# Build a fresh workflow per request for state isolation.
workflow = create_workflow(client)
events = await workflow.run(prompt)
for output in events.get_outputs():
print(output)
if async_credential:
await async_credential.close()
if __name__ == "__main__":
if "--devui" in sys.argv:
from agent_framework.devui import serve
serve(entities=[create_workflow(client)], port=8097, auto_open=True)
else:
asyncio.run(main())