Skip to content

Commit 989aeed

Browse files
author
yinsu.zs
committed
review
Change-Id: Ia767a1d14c78bd1aa162e7a59363fb585a4b3f0c
1 parent 8869fee commit 989aeed

4 files changed

Lines changed: 152 additions & 91 deletions

File tree

src/code/agent/exceptions/exceptions.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,45 @@ class StateTransitionError(CustomError):
1010
def __init__(self, from_state, to_state):
1111
msg = f"Illegal state transition: {from_state} -> {to_state}"
1212
super().__init__(message=msg, code=409)
13+
14+
15+
# ========== 任务管理相关异常 ==========
16+
17+
class TaskException(CustomError):
18+
"""任务管理异常基类"""
19+
def __init__(self, message: str, error_code: str = "task_error", code: int = 500):
20+
self.error_code = error_code
21+
super().__init__(code=code, message=message)
22+
23+
def to_dict(self):
24+
return {
25+
"type": "error",
26+
"error_code": self.error_code,
27+
"error_message": self.message
28+
}
29+
30+
31+
class ConfigurationException(TaskException):
32+
"""配置错误"""
33+
def __init__(self, message: str):
34+
super().__init__(message=message, error_code="configuration_error", code=500)
35+
36+
37+
class InvalidRequestException(TaskException):
38+
"""无效请求"""
39+
def __init__(self, message: str, error_code: str = "invalid_request_error"):
40+
super().__init__(message=message, error_code=error_code, code=400)
41+
42+
43+
class GPUException(TaskException):
44+
"""GPU相关异常"""
45+
def __init__(self, message: str, status_code: int = 500,
46+
error_code: str = "gpu_error", original_response: dict = None):
47+
super().__init__(message=message, error_code=error_code, code=status_code)
48+
self.original_response = original_response
49+
50+
def to_dict(self):
51+
if self.original_response and isinstance(self.original_response, dict):
52+
if "type" in self.original_response or "error" in self.original_response:
53+
return self.original_response
54+
return super().to_dict()

src/code/agent/services/gateway/handlers/prompt_handler.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
Prompt Handler
33
处理 /prompt 请求逻辑
44
"""
5+
import traceback
56
from flask import request, jsonify
67

78
from utils.logger import log
9+
from exceptions.exceptions import TaskException
810

911

1012
class PromptHandler:
@@ -42,26 +44,35 @@ def handle_post_request(self):
4244
}
4345
}), 400
4446

45-
# 转发给GPU
46-
task_id, result = self.task_manager.forward_to_gpu_async(
47-
request_body=request_data,
48-
client_id=client_id
49-
)
50-
51-
# 处理结果
52-
if task_id:
47+
try:
48+
# 转发给GPU
49+
task_id, result = self.task_manager.forward_to_gpu_async(
50+
request_body=request_data,
51+
client_id=client_id
52+
)
53+
5354
# 成功:返回ComfyUI格式
5455
return jsonify({
5556
"prompt_id": task_id,
5657
"number": 1,
5758
"node_errors": {}
5859
})
59-
else:
60-
# 失败:result 是 (status_code, error_type, error_message)
61-
status_code, error_type, error_message = result
60+
61+
except TaskException as e:
62+
log("ERROR", f"[PromptHandler] Task error: {e.message}")
63+
# 返回ComfyUI格式的错误
64+
return jsonify({
65+
"error": {
66+
"type": e.error_code,
67+
"message": e.message
68+
}
69+
}), e.code
70+
71+
except Exception as e:
72+
log("ERROR", f"[PromptHandler] Unexpected error: {str(e)}\n{traceback.format_exc()}")
6273
return jsonify({
6374
"error": {
64-
"type": error_type,
65-
"message": error_message
75+
"type": "internal_error",
76+
"message": str(e)
6677
}
67-
}), status_code
78+
}), 500

src/code/agent/services/gateway/handlers/serverless_handler.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
Serverless Handler
33
处理 /serverless/run 请求逻辑
44
"""
5+
import traceback
56
from flask import request, jsonify
67

78
from utils.logger import log
9+
from exceptions.exceptions import TaskException
810

911
class ServerlessHandler:
1012
"""处理 /serverless/run 请求"""
@@ -35,36 +37,36 @@ def handle_post_request(self):
3537
invocation_type = request.headers.get('X-Fc-Invocation-Type', '').lower()
3638
is_async = (invocation_type == 'async')
3739

38-
if is_async:
39-
# 异步模式:立即返回任务ID
40-
log("INFO", f"[ServerlessHandler] Processing async request")
41-
task_id, result = self.task_manager.forward_to_gpu_async(
42-
request_body=body,
43-
client_id=client_id
44-
)
45-
46-
# 处理结果
47-
if task_id:
40+
try:
41+
if is_async:
42+
# 异步模式:立即返回任务ID
43+
log("INFO", f"[ServerlessHandler] Processing async request")
44+
task_id, result = self.task_manager.forward_to_gpu_async(
45+
request_body=body,
46+
client_id=client_id
47+
)
48+
4849
# 成功:返回Serverless格式
4950
return jsonify({
5051
"task_id": task_id,
5152
"status": "pending"
5253
}), 202
5354
else:
54-
# 失败:result 是 (status_code, error_type, error_message)
55-
status_code, error_type, error_message = result
56-
return jsonify({
57-
"type": "error",
58-
"error_code": error_type,
59-
"error_message": error_message
60-
}), status_code
61-
else:
62-
# 同步模式:等待GPU处理完成
63-
log("INFO", f"[ServerlessHandler] Processing sync request")
64-
65-
# 调用同步转发,直接传递完整的请求体
66-
response_data, status_code = self.task_manager.forward_to_gpu_sync(
67-
request_body=body
68-
)
55+
# 同步模式:等待GPU处理完成
56+
log("INFO", f"[ServerlessHandler] Processing sync request")
57+
response_data, status_code = self.task_manager.forward_to_gpu_sync(
58+
request_body=body
59+
)
60+
return jsonify(response_data), status_code
61+
62+
except TaskException as e:
63+
log("ERROR", f"[ServerlessHandler] Task error: {e.message}")
64+
return jsonify(e.to_dict()), e.code
6965

70-
return jsonify(response_data), status_code
66+
except Exception as e:
67+
log("ERROR", f"[ServerlessHandler] Unexpected error: {str(e)}\n{traceback.format_exc()}")
68+
return jsonify({
69+
"type": "error",
70+
"error_code": "internal_error",
71+
"error_message": str(e)
72+
}), 500

src/code/agent/services/gateway/task/task_manager.py

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
import requests
1414
from flask import request
1515
from utils.logger import log
16+
from exceptions.exceptions import (
17+
ConfigurationException,
18+
InvalidRequestException,
19+
GPUException
20+
)
1621

1722
from .task import TaskStatus, Task
1823
from .utils.task_manager_util import TaskStatusBroadcaster
@@ -388,7 +393,7 @@ def _cleanup_old_completed_tasks_if_needed(self) -> None:
388393

389394
def forward_to_gpu_async(self,
390395
request_body: dict,
391-
client_id: str) -> Tuple[Optional[str], Union[object, Tuple[int, str, str]]]:
396+
client_id: str) -> Tuple[str, object]:
392397
"""
393398
GPU异步转发逻辑
394399
@@ -397,44 +402,48 @@ def forward_to_gpu_async(self,
397402
client_id: 客户端ID(用于WebSocket广播)
398403
399404
Returns:
400-
tuple: (task_id, response_or_error)
401-
- task_id: 成功时返回任务ID,失败时返回None
402-
- response_or_error: 成功时返回response对象,失败时返回(status_code, error_type, error_message)
405+
tuple: (task_id, response) - 成功时返回任务ID和响应对象
406+
407+
Raises:
408+
ConfigurationException: GPU URL未配置
409+
InvalidRequestException: 缺少任务ID或队列已满
410+
GPUException: GPU转发失败或异步调用失败
403411
"""
404412
request_id = request.headers.get('x-fc-request-id', 'unknown')
405413

406414
# 检查GPU URL配置
407415
if not self._gpu_function_url:
408416
log("ERROR", f"[TaskManager][RequestId={request_id}] GPU_FUNCTION_URL not configured")
409-
return None, (500, "configuration_error", "GPU_FUNCTION_URL not configured for CPU mode")
417+
raise ConfigurationException("GPU_FUNCTION_URL not configured for CPU mode")
410418

411-
# 提取任务ID(如果找不到会抛出异常)
419+
# 提取任务ID
412420
try:
413421
task_id = self._extract_task_id_from_request()
414422
except ValueError as e:
415423
log("ERROR", f"[TaskManager][RequestId={request_id}] {str(e)}")
416-
return None, (500, "missing_task_id", str(e))
424+
raise InvalidRequestException(str(e), error_code="missing_task_id")
417425

418-
# 将任务添加到管理器(用于跟踪和状态管理)
419-
# 保存完整的 request_body 作为 prompt_body
426+
# 将任务添加到管理器
420427
try:
421428
self.submit_task(
422429
prompt_body=request_body,
423430
client_id=client_id,
424431
task_id=task_id
425432
)
433+
except InvalidRequestException:
434+
raise
426435
except Exception as e:
427436
error_msg = f"Failed to add task to queue: {e}"
428437
log("ERROR", f"[TaskManager][TaskId={task_id}][RequestId={request_id}] {error_msg}\n{traceback.format_exc()}")
429-
return None, (500, "queue_error", error_msg)
438+
raise GPUException(error_msg, error_code="task_submit_error")
430439

431440
# 构造GPU URL和headers
432441
gpu_url = f"{self._gpu_function_url.rstrip('/')}/api/serverless/run"
433442

434443
forward_headers = {
435-
'x-fc-async-task-id': task_id, # 优先使用这个作为 task_id
436-
'x-fc-trace-id': task_id, # GPU的request-id与task-id一致
437-
'x-fc-invocation-type': 'Async' # 异步调用
444+
'x-fc-async-task-id': task_id,
445+
'x-fc-trace-id': task_id,
446+
'x-fc-invocation-type': 'Async'
438447
}
439448

440449
# 复制其他 headers
@@ -444,7 +453,6 @@ def forward_to_gpu_async(self,
444453
if k_lower not in excluded_headers_lower:
445454
forward_headers[k] = v
446455

447-
# 直接转发完整的请求体(保持原有结构,包括可能的 extra_data)
448456
# 转发请求到GPU
449457
try:
450458
resp = requests.post(
@@ -469,7 +477,8 @@ def forward_to_gpu_async(self,
469477
except Exception as status_error:
470478
log("WARNING", f"[TaskManager][TaskId={task_id}] Failed to update task status to FAILED: {status_error}")
471479

472-
return None, (500, "gpu_forward_error", error_msg)
480+
raise GPUException(error_msg, error_code="gpu_forward_error")
481+
473482
# 检查 GPU 响应状态
474483
if resp.status_code != 202:
475484
# GPU 拒绝了请求,更新为 FAILED
@@ -481,47 +490,44 @@ def forward_to_gpu_async(self,
481490
}, TaskStatus.FAILED)
482491
except Exception as e:
483492
log("WARNING", f"[TaskId={task_id}] Failed to update task status to FAILED: {e}\n{traceback.format_exc()}")
493+
494+
raise GPUException(
495+
f"Failed to invoke GPU function asynchronously: HTTP {resp.status_code}",
496+
status_code=resp.status_code,
497+
error_code="async_invocation_error"
498+
)
484499

485-
# 返回response和task_id
486-
if resp.status_code == 202:
487-
return task_id, resp
488-
else:
489-
return None, (500, "async_invocation_error", f"Failed to invoke GPU function asynchronously: HTTP {resp.status_code}")
500+
# 返回task_id和response
501+
return task_id, resp
490502

491-
def forward_to_gpu_sync(self,
492-
request_body: dict) -> Tuple[dict, int]:
503+
def forward_to_gpu_sync(self, request_body: dict) -> Tuple[dict, int]:
493504
"""
494505
GPU同步转发逻辑(等待GPU处理完成并返回结果)
495506
496507
Args:
497508
request_body: 完整的请求体
498509
499510
Returns:
500-
tuple: (response_dict, status_code)
501-
- response_dict: 响应数据
502-
- status_code: HTTP状态码
511+
tuple: (response_dict, status_code) - GPU返回的原始响应
512+
513+
Raises:
514+
ConfigurationException: GPU URL未配置
515+
InvalidRequestException: 缺少任务ID
516+
GPUException: GPU转发失败或返回非2xx状态码
503517
"""
504518
request_id = request.headers.get('x-fc-request-id', 'unknown')
505519

506520
# 检查GPU URL配置
507521
if not self._gpu_function_url:
508522
log("ERROR", f"[TaskManager][RequestId={request_id}] GPU_FUNCTION_URL not configured")
509-
return {
510-
"type": "error",
511-
"error_code": "configuration_error",
512-
"error_message": "GPU_FUNCTION_URL not configured for CPU mode"
513-
}, 500
523+
raise ConfigurationException("GPU_FUNCTION_URL not configured for CPU mode")
514524

515-
# 提取任务ID(如果找不到会抛出异常)
525+
# 提取任务ID
516526
try:
517527
task_id = self._extract_task_id_from_request()
518528
except ValueError as e:
519529
log("ERROR", f"[TaskManager][RequestId={request_id}] {str(e)}")
520-
return {
521-
"type": "error",
522-
"error_code": "missing_task_id",
523-
"error_message": str(e)
524-
}, 500
530+
raise InvalidRequestException(str(e), error_code="missing_task_id")
525531

526532
# 构造GPU URL和headers
527533
gpu_url = f"{self._gpu_function_url.rstrip('/')}/api/serverless/run"
@@ -546,34 +552,34 @@ def forward_to_gpu_sync(self,
546552
json=request_body,
547553
headers=forward_headers,
548554
params=request.args,
549-
timeout=600 # 同步调用使用更长的超时时间(10分钟)
555+
timeout=600
550556
)
551557

552558
if resp.status_code == 200:
553559
log("INFO", f"[TaskManager][TaskId={task_id}][RequestId={request_id}] Sync request completed successfully")
554560
return resp.json(), 200
555561
else:
556562
error_msg = f"GPU returned HTTP {resp.status_code}"
563+
log("ERROR", f"[TaskManager][TaskId={task_id}][RequestId={request_id}] Sync request failed: {error_msg}")
564+
565+
# 尝试解析GPU的错误响应
557566
try:
558567
error_response = resp.json()
559-
log("ERROR", f"[TaskManager][TaskId={task_id}][RequestId={request_id}] Sync request failed: {error_msg}, response: {error_response}")
560-
return error_response, resp.status_code
561-
except Exception:
562-
log("ERROR", f"[TaskManager][TaskId={task_id}][RequestId={request_id}] Sync request failed: {error_msg}")
563-
return {
564-
"type": "error",
565-
"error_code": "gpu_error",
566-
"error_message": error_msg
567-
}, resp.status_code
568+
raise GPUException(
569+
error_msg,
570+
status_code=resp.status_code,
571+
original_response=error_response
572+
)
573+
except (ValueError, requests.JSONDecodeError):
574+
raise GPUException(error_msg, status_code=resp.status_code)
575+
576+
except GPUException:
577+
raise
568578
except Exception as e:
569579
# 处理转发GPU失败的情况
570580
error_msg = f"Failed to send sync request to GPU: {str(e)}"
571581
log("ERROR", f"[TaskManager][TaskId={task_id}][RequestId={request_id}] {error_msg}\nStacktrace:\n{traceback.format_exc()}")
572-
return {
573-
"type": "error",
574-
"error_code": "gpu_forward_error",
575-
"error_message": error_msg
576-
}, 500
582+
raise GPUException(error_msg, error_code="gpu_forward_error")
577583

578584
@staticmethod
579585
def _extract_task_id_from_request() -> str:

0 commit comments

Comments
 (0)