Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 113 additions & 153 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,204 +31,164 @@

@router.post("/browse")
async def browse_events(request: EventBrowseRequest, service: FromDishka[AdminEventsService]) -> EventBrowseResponse:
try:
event_filter = EventFilter(**request.filters.model_dump())

result = await service.browse_events(
event_filter=event_filter,
skip=request.skip,
limit=request.limit,
sort_by=request.sort_by,
sort_order=request.sort_order,
)
event_filter = EventFilter(**request.filters.model_dump())

return EventBrowseResponse(
events=result.events,
total=result.total,
skip=result.skip,
limit=result.limit,
)
result = await service.browse_events(
event_filter=event_filter,
skip=request.skip,
limit=request.limit,
sort_by=request.sort_by,
sort_order=request.sort_order,
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return EventBrowseResponse(
events=result.events,
total=result.total,
skip=result.skip,
limit=result.limit,
)


@router.get("/stats")
async def get_event_stats(
service: FromDishka[AdminEventsService],
hours: int = Query(default=24, le=168),
hours: Annotated[int, Query(le=168)] = 24,
) -> EventStatsResponse:
try:
stats = await service.get_event_stats(hours=hours)
return EventStatsResponse.model_validate(stats)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
stats = await service.get_event_stats(hours=hours)
return EventStatsResponse.model_validate(stats)
Comment thread
HardMax71 marked this conversation as resolved.


@router.get("/export/csv")
async def export_events_csv(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
start_time: datetime | None = Query(None, description="Start time"),
end_time: datetime | None = Query(None, description="End time"),
limit: int = Query(default=10000, le=50000),
event_types: Annotated[list[EventType] | None, Query(description="Event types (repeat param for multiple)")] = None,
start_time: Annotated[datetime | None, Query(description="Start time")] = None,
end_time: Annotated[datetime | None, Query(description="End time")] = None,
limit: Annotated[int, Query(le=50000)] = 10000,
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
) -> StreamingResponse:
try:
export_filter = EventFilter(
event_types=event_types,
start_time=start_time,
end_time=end_time,
)
result = await service.export_events_csv_content(event_filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.file_name}"},
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
export_filter = EventFilter(
event_types=event_types,
start_time=start_time,
end_time=end_time,
)
result = await service.export_events_csv_content(event_filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.file_name}"},
)


@router.get("/export/json")
async def export_events_json(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
aggregate_id: str | None = Query(None, description="Aggregate ID filter"),
correlation_id: str | None = Query(None, description="Correlation ID filter"),
user_id: str | None = Query(None, description="User ID filter"),
service_name: str | None = Query(None, description="Service name filter"),
start_time: datetime | None = Query(None, description="Start time"),
end_time: datetime | None = Query(None, description="End time"),
limit: int = Query(default=10000, le=50000),
event_types: Annotated[list[EventType] | None, Query(description="Event types (repeat param for multiple)")] = None,
aggregate_id: Annotated[str | None, Query(description="Aggregate ID filter")] = None,
correlation_id: Annotated[str | None, Query(description="Correlation ID filter")] = None,
user_id: Annotated[str | None, Query(description="User ID filter")] = None,
service_name: Annotated[str | None, Query(description="Service name filter")] = None,
start_time: Annotated[datetime | None, Query(description="Start time")] = None,
end_time: Annotated[datetime | None, Query(description="End time")] = None,
limit: Annotated[int, Query(le=50000)] = 10000,
) -> StreamingResponse:
"""Export events as JSON with comprehensive filtering."""
try:
export_filter = EventFilter(
event_types=event_types,
aggregate_id=aggregate_id,
correlation_id=correlation_id,
user_id=user_id,
service_name=service_name,
start_time=start_time,
end_time=end_time,
)
result = await service.export_events_json_content(event_filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.file_name}"},
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
export_filter = EventFilter(
event_types=event_types,
aggregate_id=aggregate_id,
correlation_id=correlation_id,
user_id=user_id,
service_name=service_name,
start_time=start_time,
end_time=end_time,
)
result = await service.export_events_json_content(event_filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.file_name}"},
)


@router.get("/{event_id}")
async def get_event_detail(event_id: str, service: FromDishka[AdminEventsService]) -> EventDetailResponse:
try:
result = await service.get_event_detail(event_id)
result = await service.get_event_detail(event_id)

if not result:
raise HTTPException(status_code=404, detail="Event not found")
if not result:
raise HTTPException(status_code=404, detail="Event not found")

return EventDetailResponse(
event=result.event,
related_events=result.related_events,
timeline=result.timeline,
)

except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return EventDetailResponse(
event=result.event,
related_events=result.related_events,
timeline=result.timeline,
)


@router.post("/replay")
async def replay_events(
request: EventReplayRequest, background_tasks: BackgroundTasks, service: FromDishka[AdminEventsService]
) -> EventReplayResponse:
replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}"
replay_filter = ReplayFilter(
event_ids=request.event_ids,
correlation_id=request.correlation_id,
aggregate_id=request.aggregate_id,
start_time=request.start_time,
end_time=request.end_time,
)
try:
replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}"
replay_filter = ReplayFilter(
event_ids=request.event_ids,
correlation_id=request.correlation_id,
aggregate_id=request.aggregate_id,
start_time=request.start_time,
end_time=request.end_time,
)
try:
result = await service.prepare_or_schedule_replay(
replay_filter=replay_filter,
dry_run=request.dry_run,
replay_correlation_id=replay_correlation_id,
target_service=request.target_service,
)
except ValueError as e:
msg = str(e)
if "No events found" in msg:
raise HTTPException(status_code=404, detail=msg)
if "Too many events" in msg:
raise HTTPException(status_code=400, detail=msg)
raise

if not result.dry_run and result.session_id:
background_tasks.add_task(service.start_replay_session, result.session_id)

return EventReplayResponse(
dry_run=result.dry_run,
total_events=result.total_events,
replay_correlation_id=result.replay_correlation_id,
session_id=result.session_id,
status=result.status,
events_preview=result.events_preview,
result = await service.prepare_or_schedule_replay(
replay_filter=replay_filter,
dry_run=request.dry_run,
replay_correlation_id=replay_correlation_id,
target_service=request.target_service,
)

except HTTPException:
except ValueError as e:
msg = str(e)
if "No events found" in msg:
raise HTTPException(status_code=404, detail=msg)
if "Too many events" in msg:
raise HTTPException(status_code=400, detail=msg)
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

if not result.dry_run and result.session_id:
background_tasks.add_task(service.start_replay_session, result.session_id)

return EventReplayResponse(
dry_run=result.dry_run,
total_events=result.total_events,
replay_correlation_id=result.replay_correlation_id,
session_id=result.session_id,
status=result.status,
events_preview=result.events_preview,
)


@router.get("/replay/{session_id}/status")
async def get_replay_status(session_id: str, service: FromDishka[AdminEventsService]) -> EventReplayStatusResponse:
try:
status = await service.get_replay_status(session_id)

if not status:
raise HTTPException(status_code=404, detail="Replay session not found")

session = status.session
estimated_completion = status.estimated_completion
execution_results = status.execution_results
return EventReplayStatusResponse(
**{
**session.model_dump(),
"status": session.status,
"estimated_completion": estimated_completion,
"execution_results": execution_results,
}
)
status = await service.get_replay_status(session_id)

except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if not status:
raise HTTPException(status_code=404, detail="Replay session not found")

session = status.session
estimated_completion = status.estimated_completion
execution_results = status.execution_results
return EventReplayStatusResponse(
**{
**session.model_dump(),
"status": session.status,
"estimated_completion": estimated_completion,
"execution_results": execution_results,
}
)


@router.delete("/{event_id}")
async def delete_event(
event_id: str, admin: Annotated[UserResponse, Depends(admin_user)], service: FromDishka[AdminEventsService]
) -> EventDeleteResponse:
try:
deleted = await service.delete_event(event_id=event_id, deleted_by=admin.email)
if not deleted:
raise HTTPException(status_code=500, detail="Failed to delete event")

return EventDeleteResponse(message="Event deleted and archived", event_id=event_id)
deleted = await service.delete_event(event_id=event_id, deleted_by=admin.email)
if not deleted:
raise HTTPException(status_code=500, detail="Failed to delete event")

except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return EventDeleteResponse(message="Event deleted and archived", event_id=event_id)
6 changes: 3 additions & 3 deletions backend/app/api/routes/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
async def list_users(
admin: Annotated[UserResponse, Depends(admin_user)],
admin_user_service: FromDishka[AdminUserService],
limit: int = Query(default=100, le=1000),
offset: int = Query(default=0, ge=0),
limit: Annotated[int, Query(le=1000)] = 100,
offset: Annotated[int, Query(ge=0)] = 0,
search: str | None = None,
role: UserRole | None = None,
) -> UserListResponse:
Expand Down Expand Up @@ -142,7 +142,7 @@ async def delete_user(
admin: Annotated[UserResponse, Depends(admin_user)],
user_id: str,
admin_user_service: FromDishka[AdminUserService],
cascade: bool = Query(default=True, description="Cascade delete user's data"),
cascade: Annotated[bool, Query(description="Cascade delete user's data")] = True,
) -> DeleteUserResponse:
# Prevent self-deletion; delegate to service
if admin.user_id == user_id:
Expand Down
12 changes: 7 additions & 5 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Annotated

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, HTTPException, Query
Expand Down Expand Up @@ -34,11 +36,11 @@ async def get_dlq_statistics(repository: FromDishka[DLQRepository]) -> DLQStats:
@router.get("/messages", response_model=DLQMessagesResponse)
async def get_dlq_messages(
repository: FromDishka[DLQRepository],
status: DLQMessageStatus | None = Query(None),
status: Annotated[DLQMessageStatus | None, Query()] = None,
topic: str | None = None,
event_type: EventType | None = Query(None),
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0),
event_type: Annotated[EventType | None, Query()] = None,
limit: Annotated[int, Query(ge=1, le=1000)] = 50,
offset: Annotated[int, Query(ge=0)] = 0,
) -> DLQMessagesResponse:
result = await repository.get_messages(
status=status, topic=topic, event_type=event_type, limit=limit, offset=offset
Expand Down Expand Up @@ -86,7 +88,7 @@ async def set_retry_policy(policy_request: RetryPolicyRequest, dlq_manager: From
async def discard_dlq_message(
event_id: str,
dlq_manager: FromDishka[DLQManager],
reason: str = Query(..., description="Reason for discarding"),
reason: Annotated[str, Query(description="Reason for discarding")],
) -> MessageResponse:
success = await dlq_manager.discard_message_manually(event_id, f"manual: {reason}")
if not success:
Expand Down
Loading
Loading