-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathagent_server.py
More file actions
174 lines (137 loc) · 5.31 KB
/
agent_server.py
File metadata and controls
174 lines (137 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""FastAPI server for py-code-mode agent.
Exposes the AutoGen agent as an HTTP API for Azure Container Apps.
Connects to an external session server for code execution.
"""
import os
from autogen_agentchat.agents import AssistantAgent
from autogen_core.tools import FunctionTool
from fastapi import FastAPI
from pydantic import BaseModel
from py_code_mode import RedisStorage, Session
from py_code_mode.execution import ContainerConfig, ContainerExecutor
class TaskRequest(BaseModel):
task: str
class TaskResponse(BaseModel):
result: str
error: str | None = None
def get_model_client():
"""Get Azure OpenAI model client."""
from autogen_core.models import ModelFamily, ModelInfo
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
# Use managed identity for Azure OpenAI auth
token_provider = get_bearer_token_provider(
DefaultAzureCredential(),
"https://cognitiveservices.azure.com/.default",
)
deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o")
# Model info for models not yet in autogen's registry
model_info_map = {
"gpt-41": ModelInfo(
vision=True,
function_calling=True,
json_output=True,
family=ModelFamily.GPT_4O, # GPT-4.1 is similar to GPT-4o
context_window=128000,
),
}
return AzureOpenAIChatCompletionClient(
azure_deployment=deployment,
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_ad_token_provider=token_provider,
model=deployment,
api_version="2024-08-01-preview",
model_info=model_info_map.get(deployment),
)
app = FastAPI(title="py-code-mode Agent")
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy"}
@app.post("/task", response_model=TaskResponse)
async def run_task(request: TaskRequest):
"""Run a task with the agent."""
# Create storage pointing to same Redis as session server
storage = RedisStorage(
url=os.environ["REDIS_URL"],
prefix=os.environ.get("REDIS_PREFIX", "pycodemode"),
)
# Create executor in remote mode
config = ContainerConfig(
remote_url=os.environ.get("SESSION_URL", "http://session-server:8080"),
auth_token=os.environ.get("SESSION_AUTH_TOKEN"),
)
executor = ContainerExecutor(config)
async with Session(storage=storage, executor=executor) as session:
async def _run_code(code: str) -> str:
"""Execute Python code via the session server.
Args:
code: Python code to execute. Has access to tools, skills,
artifacts, and deps namespaces.
Returns:
String result of execution, or error message if failed.
"""
result = await session.run(code)
if result.error:
return f"Error: {result.error}"
parts = []
if result.stdout:
parts.append(result.stdout)
if result.value is not None:
parts.append(str(result.value))
return "\n".join(parts) if parts else "OK"
run_code_tool = FunctionTool(
func=_run_code,
name="run_code",
description="Execute Python code with tools, skills, artifacts, deps.",
)
system_prompt = """You are a helpful assistant that writes Python code to accomplish tasks.
You have access to these namespaces in run_code:
- tools.* - CLI tools (curl, jq, etc.)
- skills.* - Reusable Python functions
- artifacts.* - Persistent data storage
- deps.* - Python package management
WORKFLOW:
1. Discover tools: tools.list() or tools.search("keyword")
2. Use tools: tools.curl.get(url="...") or tools.jq.query(filter=".")
3. Save results: artifacts.save("name", data)
Example code:
```python
import json
response = tools.curl.get(url="https://api.github.com/repos/python/cpython")
data = json.loads(response)
print(f"Stars: {data['stargazers_count']}")
```
Always use the run_code tool to execute Python code."""
agent = AssistantAgent(
name="assistant",
model_client=get_model_client(),
tools=[run_code_tool],
system_message=system_prompt,
)
try:
result = await agent.run(task=request.task)
return TaskResponse(result=result.messages[-1].content)
except Exception as e:
return TaskResponse(result="", error=str(e))
@app.get("/info")
async def info():
"""Get available tools and skills from session server."""
# Create storage pointing to same Redis as session server
storage = RedisStorage(
url=os.environ["REDIS_URL"],
prefix=os.environ.get("REDIS_PREFIX", "pycodemode"),
)
# Create executor in remote mode
config = ContainerConfig(
remote_url=os.environ.get("SESSION_URL", "http://session-server:8080"),
auth_token=os.environ.get("SESSION_AUTH_TOKEN"),
)
executor = ContainerExecutor(config)
async with Session(storage=storage, executor=executor) as session:
tools = await session.list_tools()
skills = await session.list_skills()
return {
"tools": tools,
"skills": skills,
}