-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathckit_cloudtool.py
More file actions
536 lines (470 loc) · 21.3 KB
/
ckit_cloudtool.py
File metadata and controls
536 lines (470 loc) · 21.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
import asyncio
import json
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable, Awaitable, List, Set, Optional, Tuple
import gql
import gql.transport.exceptions
import websockets
import websockets.exceptions
from flexus_client_kit import ckit_client
from flexus_client_kit import ckit_shutdown
from flexus_client_kit import ckit_utils
from flexus_client_kit import ckit_passwords
from flexus_client_kit import gql_utils
logger = logging.getLogger("ctool")
KANBAN_ADVANCED = {"flexus_kanban_advanced"}
KANBAN_TRIAGE = {"flexus_kanban_triage"}
KANBAN_PUBLIC = {"flexus_kanban_public"}
KANBAN_SAFE = {"flexus_kanban_safe"}
KANBAN_BOSS = {"flexus_kanban_boss"}
KANBAN_ALL = KANBAN_ADVANCED | KANBAN_TRIAGE | KANBAN_PUBLIC | KANBAN_SAFE | KANBAN_BOSS # unusable in bot
CLOUDTOOLS_SCARY_ENOUGH = {"flexus_my_setup", "flexus_colleagues", "flexus_hand_over_task", "flexus_mcp_setup", "flexus_eds_setup"}
CLOUDTOOLS_VECDB = {"flexus_vector_search", "flexus_read_original"}
CLOUDTOOLS_PYTHON = {"python_execute"}
CLOUDTOOLS_WEB = {"web"}
CLOUDTOOLS_AGENTS = {"flexus_hand_over_task"}
CLOUDTOOLS_NOT_KANBAN = CLOUDTOOLS_VECDB | CLOUDTOOLS_PYTHON | CLOUDTOOLS_WEB | CLOUDTOOLS_AGENTS
CLOUDTOOLS_MCP = {"mcp_*"}
CLOUDTOOLS_QUITE_A_LOT = KANBAN_ADVANCED | CLOUDTOOLS_NOT_KANBAN | CLOUDTOOLS_MCP
CLOUDTOOLS_ALL_KNOWN = KANBAN_ALL | CLOUDTOOLS_NOT_KANBAN # unusable in a bot
def gql_error_4xx_to_model_reraise_5xx(e: gql.transport.exceptions.TransportQueryError, label: str) -> str:
msg = (e.errors[0].get("message", "") if e.errors else "") or str(e)
logger.info("%s: %s", label, msg)
if msg[:1] != "4":
raise
return msg
class NeedsConfirmation(Exception):
def __init__(self,
confirm_explanation: str, # first visible line
confirm_command: str, # next line, written in monospace
confirm_setup_key: str, # ui will nagivate to setup this key
):
self.confirm_setup_key = confirm_setup_key
self.confirm_command = confirm_command
self.confirm_explanation = confirm_explanation
super().__init__(f"Confirmation needed: {confirm_explanation}")
class WaitForSubchats(Exception):
def __init__(self, subchats: List[str]):
self.subchats = subchats
super().__init__(f"Waiting for subchats: {subchats}")
class AlreadyFakedResult(Exception):
def __init__(self):
super().__init__("Result already faked in scenario")
@dataclass
class ToolResult:
"""
Local tools (in bots or cloudtool services) should either return str or ToolResult
"""
content: str
multimodal: Optional[List[dict]] = None
dollars: float = 0.0
def __post_init__(self):
if self.multimodal is not None and self.content:
raise ValueError("ToolResult: use either content (str) or multimodal (list), not both")
if self.multimodal:
for item in self.multimodal:
if not isinstance(item, dict):
raise ValueError("ToolResult multimodal list items must be dicts: %r" % (item,))
if "m_type" not in item or "m_content" not in item:
raise ValueError("ToolResult multimodal items must have m_type and m_content: %r" % (item,))
if not isinstance(item["m_type"], str) or not isinstance(item["m_content"], str):
raise ValueError("ToolResult m_type and m_content must be strings: %r" % (item,))
def to_serialized(self) -> str:
if self.multimodal is not None:
return json.dumps(self.multimodal)
return json.dumps(self.content)
@dataclass
class FCloudtoolCall:
caller_fuser_id: str # copy of thread owner fuser_id
located_fgroup_id: str
fcall_id: str
fcall_ft_id: str
fcall_ft_btest_name: str
fcall_fexp_name: str
fcall_ftm_alt: int
fcall_called_ftm_num: int
fcall_call_n: int
fcall_name: str
fcall_arguments: str
fcall_result_ftm_num: int
fcall_created_ts: float
fcall_untrusted_key: str
connected_persona_id: Optional[str]
ws_id: str
confirmed_by_human: Optional[bool] = None
@dataclass
class CloudTool:
strict: bool
name: str
description: str
parameters: dict
def openai_style_tool(self):
def add_order(obj):
if isinstance(obj, dict):
props = obj.get("properties")
if props:
for idx, key in enumerate(props.keys()):
props[key].setdefault("order", idx)
add_order(props[key])
for v in obj.values():
if isinstance(v, (dict, list)):
add_order(v)
elif isinstance(obj, list):
for item in obj:
add_order(item)
if self.strict:
def _type_contains(t, name):
return t == name or (isinstance(t, list) and name in t)
def check_strict(path: str, schema: dict) -> None:
t = schema.get("type")
if _type_contains(t, "object") and "properties" in schema:
if schema.get("additionalProperties") is not False:
raise ValueError(f"CloudTool {self.name!r} strict mode requires additionalProperties: false at {path}")
required = set(schema.get("required", []))
missing = [k for k in schema["properties"] if k not in required]
if missing:
raise ValueError(f"CloudTool {self.name!r} strict mode requires all properties in required at {path}, missing: {missing}")
for k, v in schema["properties"].items():
if isinstance(v, dict):
check_strict(f"{path}.{k}", v)
if _type_contains(t, "array") and isinstance(schema.get("items"), dict):
check_strict(f"{path}[]", schema["items"])
for i, variant in enumerate(schema.get("anyOf", [])):
if isinstance(variant, dict):
check_strict(f"{path}.anyOf[{i}]", variant)
check_strict("parameters", self.parameters)
params = self.parameters.copy()
add_order(params)
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": params,
},
"strict": self.strict,
}
def sanitize_args(args_dict_from_model: Any) -> tuple[dict, Optional[str]]:
"""
For use within tool call. Make sure model has generated "args" and it's a dict.
"""
if isinstance(args_dict_from_model, dict):
args = args_dict_from_model.get("args", {})
else:
return {}, "args_dict_from_model should be a dict"
if args is None: # that's allowed, strict mode for tools leads to type=object|none, therefure model might produce None
return {}, None
# Model was stupid enough to escape json object and send a string
if isinstance(args, str):
try:
args = json.loads(args)
except (json.JSONDecodeError, TypeError):
return {}, "Error: 'args' needs to have 'object', don't pass a string, I couldn't even parse it back to json :/"
if not isinstance(args, dict):
return {}, "args must be a dict after normalization"
return args, None
def try_best_to_find_argument(args: dict, args_dict_from_model: Any, param_name: str, default_value: Any) -> Any:
"""
This handles cases where the model puts parameters in the wrong place.
"""
value = args.get(param_name) # Normal, as asked
if value is not None:
return value
# We might find it on the top level, a common mistake
if isinstance(args_dict_from_model, dict):
value = args_dict_from_model.get(param_name)
if value is not None:
return value
return default_value
async def call_python_function_and_save_result(
call: FCloudtoolCall,
the_python_function: Callable[[ckit_client.FlexusClient, FCloudtoolCall, Any], Awaitable[Tuple[str | ToolResult, str] | Tuple[None, None]]],
service_name: str,
fclient: ckit_client.FlexusClient,
) -> None:
result = None
try:
args = json.loads(call.fcall_arguments)
result, prov = await the_python_function(fclient, call, args)
# NOTE: here we have 2 allowed variants for output
# 1. (str | ToolResult, str) - immediate answer from handler
# 2. (None, None) - delayed cloudtool_post_result
assert (isinstance(result, (str, ToolResult)) and isinstance(prov, str)) or (result is None and prov is None)
content = result.content if isinstance(result, ToolResult) else result
dollars = result.dollars if isinstance(result, ToolResult) else 0.0
logger.info("/%s %s:%03d:%03d %+d result=%s", call.fcall_id, call.fcall_ft_id, call.fcall_ftm_alt, call.fcall_called_ftm_num, call.fcall_call_n, content[:30] if content is not None else "delayed")
except AlreadyFakedResult:
logger.info("/%s fake %s:%03d:%03d %+d", call.fcall_id, call.fcall_ft_id, call.fcall_ftm_alt, call.fcall_called_ftm_num, call.fcall_call_n)
return
except NeedsConfirmation as e:
logger.info("%s needs human confirmation: %s", call.fcall_id, e.confirm_explanation)
try:
await cloudtool_confirmation_request(
fclient,
call,
e.confirm_setup_key,
e.confirm_command,
e.confirm_explanation,
)
except gql.transport.exceptions.TransportQueryError as gql_err:
if "confirmation already requested" in str(gql_err).lower():
logger.info("Confirmation already requested for %s, ignoring", call.fcall_id)
else:
raise
return
except Exception as e:
logger.error("error processing call %s %s:%03d:%03d %+d: %s %s" % (call.fcall_id, call.fcall_ft_id, call.fcall_ftm_alt, call.fcall_called_ftm_num, call.fcall_call_n, type(e).__name__, e), exc_info=e)
result = json.dumps(f"Internal error: {type(e).__name__} {e}")
prov = json.dumps({"system": service_name})
dollars = 0.0
if result is not None:
serialized_result = result if isinstance(result, str) else result.to_serialized()
http_client = await fclient.use_http_on_behalf(call.connected_persona_id, call.fcall_untrusted_key)
async with http_client as http:
await cloudtool_post_result(http, call, serialized_result, prov, dollars)
async def cloudtool_post_result(http, call: FCloudtoolCall, content: str, prov: str, dollars: float = 0.0, as_placeholder: bool = False):
await http.execute(
gql.gql("""mutation CloudtoolPost($input: CloudtoolResultInput!, $as_placeholder: Boolean!) {
cloudtool_post_result(input: $input, as_placeholder: $as_placeholder)
}"""),
variable_values={
"input": {
"fcall_id": call.fcall_id,
"fcall_untrusted_key": call.fcall_untrusted_key,
"ftm_content": content,
"ftm_provenance": prov,
"dollars": dollars,
},
"as_placeholder": as_placeholder,
},
)
class DeltaStreamer:
"""
Streams tool output deltas via /v1/delta/ws WebSocket.
Frontend receives DELTA events and appends content in real-time.
"""
def __init__(self, ws_url: str, call: FCloudtoolCall, role: str = "tool"):
self._ws_url = ws_url
self._call = call
self._role = role
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._closed = False
@classmethod
async def connect(
cls,
fclient: ckit_client.FlexusClient,
call: FCloudtoolCall,
role: str = "tool",
) -> "DeltaStreamer":
ws_url = fclient.base_url_ws.rstrip("/") + "/v1/delta/ws"
streamer = cls(ws_url, call, role)
streamer._ws = await websockets.connect(ws_url, ping_interval=20, ping_timeout=20)
logger.debug("DeltaStreamer connected to %s for call %s", ws_url, call.fcall_id)
return streamer
async def send(self, text: str) -> None:
if not text or self._closed or not self._ws:
return
payload = {
"ftm_belongs_to_ft_id": self._call.fcall_ft_id,
"ftm_alt": self._call.fcall_ftm_alt,
"ftm_num": self._call.fcall_result_ftm_num,
"ftm_prev_alt": self._call.fcall_ftm_alt,
"delta": {"ftm_role": self._role, "ftm_content": text, "ftm_call_id": self._call.fcall_id},
}
await self._ws.send(json.dumps(payload))
async def close(self) -> None:
if self._closed:
return
self._closed = True
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
logger.debug("DeltaStreamer closed for call %s", self._call.fcall_id)
async def cloudtool_confirmation_request(
fclient: ckit_client.FlexusClient,
call: FCloudtoolCall,
confirm_setup_key: str,
confirm_command: str,
confirm_explanation: str,
):
http_client = await fclient.use_http_on_behalf(call.connected_persona_id, call.fcall_untrusted_key)
async with http_client as http:
await http.execute(
gql.gql("""mutation CloudtoolConfirmationRequest(
$fcall_id: String!,
$confirm_setup_key: String!,
$confirm_command: String!,
$confirm_explanation: String!
) {
cloudtool_confirmation_request(
fcall_id: $fcall_id,
confirm_setup_key: $confirm_setup_key,
confirm_command: $confirm_command,
confirm_explanation: $confirm_explanation
)
}"""),
variable_values={
"fcall_id": call.fcall_id,
"confirm_setup_key": confirm_setup_key,
"confirm_command": confirm_command,
"confirm_explanation": confirm_explanation,
},
)
async def cloudtool_i_am_still_alive(
fclient: ckit_client.FlexusClient,
tool_list: List[CloudTool],
fgroup_id: Optional[str],
fuser_id: Optional[str],
shared: bool,
) -> None:
for t in tool_list:
# for verification of strict tools, does not have side effects
_ = t.openai_style_tool()
while not ckit_shutdown.shutdown_event.is_set():
try:
http_client = await fclient.use_http_on_behalf("", "")
async with http_client as http:
for t in tool_list:
await http.execute(
gql.gql("""mutation CloudtoolConfirm($name: String!, $desc: String!, $params: String!, $fgroup_id: String, $fuser_id: String, $shared: Boolean!, $strict: Boolean!) {
cloudtool_confirm_exists(tool_name: $name, ctool_description: $desc, ctool_parameters: $params, fgroup_id: $fgroup_id, fuser_id: $fuser_id, shared: $shared, ctool_strict: $strict)
}"""),
variable_values={
"name": t.name,
"desc": t.description,
"params": json.dumps(t.parameters),
"fgroup_id": fgroup_id,
"fuser_id": fuser_id,
"shared": shared,
"strict": t.strict,
},
)
# logger.info("cloudtool_i_am_still_alive %s", t.name)
if await ckit_shutdown.wait(120):
break
except (
gql.transport.exceptions.TransportError,
asyncio.exceptions.TimeoutError,
) as e:
if "403:" in str(e):
# It's gql.transport.exceptions.TransportQueryError with {'message': "403: Whoops your key didn't work (1).", ...}
# Unfortunately, no separate exception class for 403
logger.error("That looks bad, my key doesn't work: %s", e)
else:
logger.info("cloudtool_i_am_still_alive connection problem")
await ckit_shutdown.wait(60)
async def run_cloudtool_service_real(
service_name: str,
endpoint: str,
superuser: bool,
tools: List[CloudTool],
the_python_function: Callable[[ckit_client.FlexusClient, FCloudtoolCall, Any], Awaitable[Tuple[str, str] | Tuple[None, None]]], # should return tuple of tool result ftm_content as serialized json, and ftm_provenance as serialized json
max_tasks: int,
fgroup_id: Optional[str],
fuser_id: Optional[str],
shared: bool,
) -> None:
fclient = ckit_client.FlexusClient(
service_name,
endpoint=endpoint,
superuser=superuser,
)
workset: Set[asyncio.Task] = set()
async def monitor_performance() -> None:
idle_sec = 0
full_sec = 0
minute = int(time.time() // 60)
badstat = True
while not ckit_shutdown.shutdown_event.is_set():
if await ckit_shutdown.wait(1):
break
if len(workset) == 0:
idle_sec += 1
if len(workset) == max_tasks:
full_sec += 1
now_minute = int(time.time() // 60)
if now_minute != minute:
if badstat:
badstat = False
else:
logger.info("idle %0.1f%% full %0.1f%% now %d %s", (idle_sec * 100 / 60), (full_sec * 100 / 60), len(workset), service_name)
idle_sec = 0
full_sec = 0
minute = now_minute
def workset_done(task: asyncio.Task, call: FCloudtoolCall) -> None:
workset.discard(task)
ckit_utils.report_crash(task, logger)
still_alive = asyncio.create_task(cloudtool_i_am_still_alive(fclient, tools, fgroup_id, fuser_id, shared))
still_alive.add_done_callback(lambda t: ckit_utils.report_crash(t, logger))
perfmon = asyncio.create_task(monitor_performance())
perfmon.add_done_callback(lambda t: ckit_utils.report_crash(t, logger))
ws_client = await fclient.use_ws()
ckit_shutdown.give_ws_client(service_name, ws_client)
try:
async with ws_client as ws:
async for r in ws.subscribe(gql.gql(
f"""subscription CloudtoolWait($names: [String!]!, $fgroup_id: String, $fuser_id: String) {{
cloudtool_wait_for_call(tool_names: $names, fgroup_id: $fgroup_id, fuser_id: $fuser_id) {{
{gql_utils.gql_fields(FCloudtoolCall)}
}}
}}"""),
variable_values={
"names": [t.name for t in tools],
"fgroup_id": fgroup_id,
"fuser_id": None if shared else fuser_id,
}
):
while len(workset) >= max_tasks:
logger.warning("too many tasks %d, sleeping instead of reading subs", len(workset))
if await ckit_shutdown.wait(1):
break
call = gql_utils.dataclass_from_dict(r["cloudtool_wait_for_call"], FCloudtoolCall)
logger.info(" %s %s:%03d:%03d %+d %s(%s)", call.fcall_id, call.fcall_ft_id, call.fcall_ftm_alt, call.fcall_called_ftm_num, call.fcall_call_n, call.fcall_name, str(call.fcall_arguments)[:20])
t = asyncio.create_task(call_python_function_and_save_result(call, the_python_function, service_name, fclient))
t.add_done_callback(lambda t, c = call: workset_done(t, c))
workset.add(t)
finally:
logger.info("run_cloudtool_service_real going down!")
ckit_shutdown.take_away_ws_client(service_name)
perfmon.cancel()
still_alive.cancel()
await asyncio.gather(still_alive, perfmon, *workset, return_exceptions=True)
async def run_cloudtool_service(
service_name: str,
endpoint: str,
superuser: bool,
tools: List[CloudTool],
# should return tuple of tool result ftm_content as serialized json, and ftm_provenance as serialized json or tuple of None for delayed returns
the_python_function: Callable[[ckit_client.FlexusClient, FCloudtoolCall, Any], Awaitable[Tuple[str, str] | Tuple[None, None]]],
max_tasks: int = 64,
fgroup_id: Optional[str] = None,
fuser_id: Optional[str] = None,
shared: bool = True,
) -> None:
while not ckit_shutdown.shutdown_event.is_set():
try:
await run_cloudtool_service_real(
service_name,
endpoint,
superuser,
tools,
the_python_function,
max_tasks,
fgroup_id,
fuser_id,
shared,
)
except gql.transport.exceptions.TransportError:
if ckit_shutdown.shutdown_event.is_set():
break
retry_sec = 5 if ckit_passwords.it_might_be_a_devbox else 60
logger.info("got disconnected, will connect again in %ds", retry_sec)
await ckit_shutdown.wait(retry_sec)
except Exception as e:
logger.error("caught exception %s: %s" % (type(e).__name__, e), exc_info=e)
await ckit_shutdown.wait(5 if ckit_passwords.it_might_be_a_devbox else 60)