-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutions.py
More file actions
82 lines (73 loc) · 2.43 KB
/
executions.py
File metadata and controls
82 lines (73 loc) · 2.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import Annotated
from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, Query
from app.api.dependencies import admin_user
from app.domain.enums import ExecutionStatus, QueuePriority
from app.domain.user import User
from app.schemas_pydantic.admin_executions import (
AdminExecutionListResponse,
AdminExecutionResponse,
PriorityUpdateRequest,
QueueStatusResponse,
)
from app.schemas_pydantic.common import ErrorResponse
from app.services.admin import AdminExecutionService
router = APIRouter(
prefix="/admin/executions",
tags=["admin-executions"],
route_class=DishkaRoute,
dependencies=[Depends(admin_user)],
)
@router.get(
"/",
response_model=AdminExecutionListResponse,
responses={403: {"model": ErrorResponse}},
)
async def list_executions(
_: Annotated[User, Depends(admin_user)],
service: FromDishka[AdminExecutionService],
status: ExecutionStatus | None = Query(None),
priority: QueuePriority | None = Query(None),
user_id: str | None = Query(None),
limit: int = Query(50, ge=1, le=200),
skip: int = Query(0, ge=0),
) -> AdminExecutionListResponse:
executions, total = await service.list_executions(
status=status,
priority=priority,
user_id=user_id,
limit=limit,
skip=skip,
)
return AdminExecutionListResponse(
executions=[AdminExecutionResponse.model_validate(e) for e in executions],
total=total,
limit=limit,
skip=skip,
has_more=(skip + len(executions)) < total,
)
@router.put(
"/{execution_id}/priority",
response_model=AdminExecutionResponse,
responses={403: {"model": ErrorResponse}, 404: {"model": ErrorResponse}},
)
async def update_priority(
execution_id: str,
body: PriorityUpdateRequest,
_: Annotated[User, Depends(admin_user)],
service: FromDishka[AdminExecutionService],
) -> AdminExecutionResponse:
updated = await service.update_priority(execution_id, body.priority)
return AdminExecutionResponse.model_validate(updated)
@router.get(
"/queue",
response_model=QueueStatusResponse,
responses={403: {"model": ErrorResponse}},
)
async def get_queue_status(
_: Annotated[User, Depends(admin_user)],
service: FromDishka[AdminExecutionService],
) -> QueueStatusResponse:
status = await service.get_queue_status()
return QueueStatusResponse(**status)