-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathrun_test.py
More file actions
220 lines (162 loc) · 6.82 KB
/
run_test.py
File metadata and controls
220 lines (162 loc) · 6.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import json
from collections.abc import AsyncIterator
import pytest
from pydantic import BaseModel
import workflowai
from tests.integration.conftest import CityToCapitalTaskInput, CityToCapitalTaskOutput, IntTestClient
from workflowai.core.domain.run import Run
async def test_run_task(test_client: IntTestClient) -> None:
@workflowai.agent(schema_id=1)
async def city_to_capital(task_input: CityToCapitalTaskInput) -> CityToCapitalTaskOutput: ...
test_client.mock_response()
task_input = CityToCapitalTaskInput(city="Hello")
output = await city_to_capital(task_input)
assert output.capital == "Tokyo"
test_client.check_request()
async def test_run_task_run(test_client: IntTestClient) -> None:
@workflowai.agent(schema_id=1)
async def city_to_capital(task_input: CityToCapitalTaskInput) -> Run[CityToCapitalTaskOutput]: ...
test_client.mock_response()
task_input = CityToCapitalTaskInput(city="Hello")
with_run = await city_to_capital(task_input)
assert with_run.id == "123"
assert with_run.output.capital == "Tokyo"
test_client.check_request()
async def test_run_task_run_version(test_client: IntTestClient) -> None:
@workflowai.agent(schema_id=1, version="staging")
async def city_to_capital(task_input: CityToCapitalTaskInput) -> Run[CityToCapitalTaskOutput]: ...
test_client.mock_response()
task_input = CityToCapitalTaskInput(city="Hello")
with_run = await city_to_capital(task_input)
assert with_run.id == "123"
assert with_run.output.capital == "Tokyo"
test_client.check_request(version="staging")
async def test_run_task_run_with_model(test_client: IntTestClient) -> None:
@workflowai.agent(schema_id=1, version="staging")
async def city_to_capital(task_input: CityToCapitalTaskInput) -> Run[CityToCapitalTaskOutput]: ...
test_client.mock_response()
task_input = CityToCapitalTaskInput(city="Hello")
with_run = await city_to_capital(task_input, model="gpt-4o-latest")
assert with_run.id == "123"
assert with_run.output.capital == "Tokyo"
test_client.check_request(version={"model": "gpt-4o-latest"})
async def test_stream_task_run(test_client: IntTestClient) -> None:
@workflowai.agent(schema_id=1)
def city_to_capital(task_input: CityToCapitalTaskInput) -> AsyncIterator[CityToCapitalTaskOutput]: ...
test_client.mock_stream()
task_input = CityToCapitalTaskInput(city="Hello")
chunks = [chunk async for chunk in city_to_capital(task_input)]
assert [chunk.capital for chunk in chunks] == [
"",
"Tok",
"Tokyo",
"Tokyo",
]
async def test_stream_task_run_custom_id(test_client: IntTestClient) -> None:
@workflowai.agent(schema_id=1, id="custom-id")
def city_to_capital(task_input: CityToCapitalTaskInput) -> AsyncIterator[CityToCapitalTaskOutput]: ...
test_client.mock_stream(task_id="custom-id")
task_input = CityToCapitalTaskInput(city="Hello")
chunks = [chunk async for chunk in city_to_capital(task_input)]
assert [chunk.capital for chunk in chunks] == [
"",
"Tok",
"Tokyo",
"Tokyo",
]
async def test_auto_register(test_client: IntTestClient):
@workflowai.agent()
async def city_to_capital(task_input: CityToCapitalTaskInput) -> CityToCapitalTaskOutput: ...
test_client.mock_register()
test_client.mock_response()
res = await city_to_capital(CityToCapitalTaskInput(city="Hello"))
assert res.capital == "Tokyo"
test_client.mock_response(capital="Paris")
# Run it a second time
res = await city_to_capital(CityToCapitalTaskInput(city="Hello"), use_cache="never")
assert res.capital == "Paris"
req = test_client.httpx_mock.get_requests()
assert len(req) == 3
assert req[0].url == test_client.REGISTER_URL
req_body = json.loads(req[0].read())
assert req_body == {
"id": "city-to-capital",
"input_schema": {
"properties": {
"city": {
"type": "string",
},
},
"required": [
"city",
],
"type": "object",
},
"output_schema": {
"properties": {
"capital": {
"type": "string",
},
},
"required": [
"capital",
],
"type": "object",
},
}
async def test_run_with_tool(test_client: IntTestClient):
"""Test a round trip with a tool call request and a reply."""
class _SayHelloToolInput(BaseModel):
name: str
class _SayHelloToolOutput(BaseModel):
message: str
def say_hello(tool_input: _SayHelloToolInput) -> _SayHelloToolOutput:
return _SayHelloToolOutput(message=f"Hello {tool_input.name}")
# Sanity check to make sure that CityToCapitalTaskOutput.capital is a field required by pydantic
with pytest.raises(AttributeError):
_ = CityToCapitalTaskOutput.model_construct(None).capital
@workflowai.agent(id="city-to-capital", tools=[say_hello])
async def city_to_capital(task_input: CityToCapitalTaskInput) -> CityToCapitalTaskOutput:
"""Say hello to the user"""
...
test_client.mock_register()
# First response will respond a tool call request
test_client.mock_response(
json={
"id": "1234",
"task_output": {},
"tool_call_requests": [
{
"id": "say_hello_1",
"name": "say_hello",
"input": {"tool_input": {"name": "john"}},
},
],
},
)
# Second response will respond the final output
test_client.mock_response(url="https://run.workflowai.dev/v1/_/agents/city-to-capital/runs/1234/reply")
task_input = CityToCapitalTaskInput(city="Hello")
output = await city_to_capital(task_input)
assert output.capital == "Tokyo"
assert len(test_client.httpx_mock.get_requests()) == 3
run_req = test_client.httpx_mock.get_request(
url="https://run.workflowai.dev/v1/_/agents/city-to-capital/schemas/1/run",
)
assert run_req is not None
run_req_body = json.loads(run_req.content)
assert run_req_body["task_input"] == {"city": "Hello"}
assert set(run_req_body["version"].keys()) == {"enabled_tools", "instructions", "model"}
assert len(run_req_body["version"]["enabled_tools"]) == 1
assert run_req_body["version"]["enabled_tools"][0]["name"] == "say_hello"
reply_req = test_client.httpx_mock.get_request(
url="https://run.workflowai.dev/v1/_/agents/city-to-capital/runs/1234/reply",
)
assert reply_req is not None
reply_req_body = json.loads(reply_req.content)
assert reply_req_body["tool_results"] == [
{
"id": "say_hello_1",
"output": {"message": "Hello john"},
},
]