Skip to content
88 changes: 14 additions & 74 deletions backend/app/api/routes/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.security import OAuth2PasswordRequestForm
from pymongo.errors import DuplicateKeyError

from app.core.security import SecurityService
from app.core.utils import get_client_ip
from app.db.repositories import UserRepository
from app.domain.exceptions import ConflictError
from app.domain.enums import UserRole
from app.domain.user import DomainUserCreate
from app.schemas_pydantic.common import ErrorResponse
from app.schemas_pydantic.user import (
LoginResponse,
MessageResponse,
TokenValidationResponse,
UserCreate,
UserResponse,
)
Expand Down Expand Up @@ -124,7 +122,7 @@ async def login(
return LoginResponse(
message="Login successful",
username=user.username,
role="admin" if user.is_superuser else "user",
role=user.role,
csrf_token=csrf_token,
)

Expand All @@ -134,7 +132,7 @@ async def login(
response_model=UserResponse,
responses={
400: {"model": ErrorResponse, "description": "Username already registered"},
409: {"model": ErrorResponse, "description": "Email already registered"},
409: {"model": ErrorResponse, "description": "User already exists"},
},
)
async def register(
Expand Down Expand Up @@ -167,26 +165,16 @@ async def register(
)
raise HTTPException(status_code=400, detail="Username already registered")

try:
hashed_password = security_service.get_password_hash(user.password)
create_data = DomainUserCreate(
username=user.username,
email=str(user.email),
hashed_password=hashed_password,
role=user.role,
is_active=True,
is_superuser=False,
)
created_user = await user_repo.create_user(create_data)
except DuplicateKeyError as e:
logger.warning(
"Registration failed - duplicate email",
extra={
"username": user.username,
"client_ip": get_client_ip(request),
},
)
raise ConflictError("Email already registered") from e
hashed_password = security_service.get_password_hash(user.password)
create_data = DomainUserCreate(
username=user.username,
email=user.email,
hashed_password=hashed_password,
role=UserRole.USER,
is_active=True,
is_superuser=False,
)
created_user = await user_repo.create_user(create_data)

logger.info(
"Registration successful",
Expand All @@ -197,15 +185,7 @@ async def register(
},
)

return UserResponse(
user_id=created_user.user_id,
username=created_user.username,
email=created_user.email,
role=created_user.role,
is_superuser=created_user.is_superuser,
created_at=created_user.created_at,
updated_at=created_user.updated_at,
)
return UserResponse.model_validate(created_user, from_attributes=True)


@router.get("/me", response_model=UserResponse)
Expand Down Expand Up @@ -234,46 +214,6 @@ async def get_current_user_profile(
return UserResponse.model_validate(current_user, from_attributes=True)


@router.get(
"/verify-token",
response_model=TokenValidationResponse,
responses={401: {"model": ErrorResponse, "description": "Missing or invalid access token"}},
)
async def verify_token(
request: Request,
auth_service: FromDishka[AuthService],
logger: FromDishka[logging.Logger],
) -> TokenValidationResponse:
"""Verify the current access token."""
current_user = await auth_service.get_current_user(request)
logger.info(
"Token verification attempt",
extra={
"username": current_user.username,
"client_ip": get_client_ip(request),
"endpoint": "/verify-token",
"user_agent": request.headers.get("user-agent"),
},
)

logger.info(
"Token verification successful",
extra={
"username": current_user.username,
"client_ip": get_client_ip(request),
"user_agent": request.headers.get("user-agent"),
},
)
csrf_token = request.cookies.get("csrf_token", "")

return TokenValidationResponse(
valid=True,
username=current_user.username,
role="admin" if current_user.is_superuser else "user",
csrf_token=csrf_token,
)


@router.post("/logout", response_model=MessageResponse)
async def logout(
request: Request,
Expand Down
21 changes: 3 additions & 18 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, HTTPException, Query

from app.api.dependencies import current_user
from app.api.dependencies import admin_user
from app.db.repositories import DLQRepository
from app.dlq import RetryPolicy
from app.dlq.manager import DLQManager
Expand All @@ -16,25 +16,17 @@
DLQMessageDetail,
DLQMessageResponse,
DLQMessagesResponse,
DLQStats,
DLQTopicSummaryResponse,
ManualRetryRequest,
RetryPolicyRequest,
)
from app.schemas_pydantic.user import MessageResponse

router = APIRouter(
prefix="/dlq", tags=["Dead Letter Queue"], route_class=DishkaRoute, dependencies=[Depends(current_user)]
prefix="/dlq", tags=["Dead Letter Queue"], route_class=DishkaRoute, dependencies=[Depends(admin_user)]
)


@router.get("/stats", response_model=DLQStats)
async def get_dlq_statistics(repository: FromDishka[DLQRepository]) -> DLQStats:
"""Get summary statistics for the dead letter queue."""
stats = await repository.get_dlq_stats()
return DLQStats.model_validate(stats, from_attributes=True)


@router.get("/messages", response_model=DLQMessagesResponse)
async def get_dlq_messages(
repository: FromDishka[DLQRepository],
Expand Down Expand Up @@ -80,14 +72,7 @@ async def retry_dlq_messages(
@router.post("/retry-policy", response_model=MessageResponse)
async def set_retry_policy(policy_request: RetryPolicyRequest, dlq_manager: FromDishka[DLQManager]) -> MessageResponse:
"""Configure a retry policy for a specific Kafka topic."""
policy = RetryPolicy(
topic=policy_request.topic,
strategy=policy_request.strategy,
max_retries=policy_request.max_retries,
base_delay_seconds=policy_request.base_delay_seconds,
max_delay_seconds=policy_request.max_delay_seconds,
retry_multiplier=policy_request.retry_multiplier,
)
policy = RetryPolicy(**policy_request.model_dump())

dlq_manager.set_retry_policy(policy_request.topic, policy)

Expand Down
80 changes: 7 additions & 73 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any
from typing import Annotated

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
Expand All @@ -15,7 +15,6 @@
from app.schemas_pydantic.common import ErrorResponse
from app.schemas_pydantic.events import (
DeleteEventResponse,
EventAggregationRequest,
EventFilterRequest,
EventListResponse,
EventStatistics,
Expand Down Expand Up @@ -63,13 +62,7 @@ async def get_execution_events(
if result is None:
raise HTTPException(status_code=403, detail="Access denied")

return EventListResponse(
events=result.events,
total=result.total,
limit=limit,
skip=skip,
has_more=result.has_more,
)
return EventListResponse.model_validate(result, from_attributes=True)


@router.get("/user", response_model=EventListResponse)
Expand All @@ -94,13 +87,7 @@ async def get_user_events(
sort_order=sort_order,
)

return EventListResponse(
events=result.events,
total=result.total,
limit=limit,
skip=skip,
has_more=result.has_more,
)
return EventListResponse.model_validate(result, from_attributes=True)


@router.post(
Expand All @@ -114,16 +101,7 @@ async def query_events(
event_service: FromDishka[EventService],
) -> EventListResponse:
"""Query events with advanced filters."""
event_filter = EventFilter(
event_types=filter_request.event_types,
aggregate_id=filter_request.aggregate_id,
correlation_id=filter_request.correlation_id,
user_id=filter_request.user_id,
service_name=filter_request.service_name,
start_time=filter_request.start_time,
end_time=filter_request.end_time,
search_text=filter_request.search_text,
)
event_filter = EventFilter.model_validate(filter_request, from_attributes=True)

result = await event_service.query_events_advanced(
user_id=current_user.user_id,
Expand All @@ -136,13 +114,7 @@ async def query_events(
if result is None:
raise HTTPException(status_code=403, detail="Cannot query other users' events")

return EventListResponse(
events=result.events,
total=result.total,
limit=result.limit,
skip=result.skip,
has_more=result.has_more,
)
return EventListResponse.model_validate(result, from_attributes=True)


@router.get("/correlation/{correlation_id}", response_model=EventListResponse)
Expand All @@ -164,13 +136,7 @@ async def get_events_by_correlation(
skip=skip,
)

return EventListResponse(
events=result.events,
total=result.total,
limit=limit,
skip=skip,
has_more=result.has_more,
)
return EventListResponse.model_validate(result, from_attributes=True)


@router.get("/current-request", response_model=EventListResponse)
Expand All @@ -194,13 +160,7 @@ async def get_current_request_events(
skip=skip,
)

return EventListResponse(
events=result.events,
total=result.total,
limit=limit,
skip=skip,
has_more=result.has_more,
)
return EventListResponse.model_validate(result, from_attributes=True)


@router.get("/statistics", response_model=EventStatistics)
Expand Down Expand Up @@ -276,32 +236,6 @@ async def publish_custom_event(
return PublishEventResponse(event_id=event_id, status="published", timestamp=datetime.now(timezone.utc))


@router.post("/aggregate", response_model=list[dict[str, Any]])
async def aggregate_events(
current_user: Annotated[User, Depends(current_user)],
aggregation: EventAggregationRequest,
event_service: FromDishka[EventService],
) -> list[dict[str, Any]]:
"""Run a custom aggregation pipeline on the event store."""
result = await event_service.aggregate_events(
user_id=current_user.user_id,
user_role=current_user.role,
pipeline=aggregation.pipeline,
limit=aggregation.limit,
)

return result.results


@router.get("/types/list", response_model=list[str])
async def list_event_types(
current_user: Annotated[User, Depends(current_user)], event_service: FromDishka[EventService]
) -> list[str]:
"""List all distinct event types in the store."""
event_types = await event_service.list_event_types(user_id=current_user.user_id, user_role=current_user.role)
return event_types


@router.delete(
"/{event_id}",
response_model=DeleteEventResponse,
Expand Down
2 changes: 1 addition & 1 deletion backend/app/core/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def record_execution_assigned(self) -> None:
def record_execution_queued(self) -> None:
self.executions_queued.add(1)

def record_execution_scheduled(self, status: str) -> None:
def record_execution_scheduled(self) -> None:
self.executions_assigned.add(1)

def update_cpu_available(self, cores: float) -> None:
Expand Down
1 change: 0 additions & 1 deletion backend/app/core/middlewares/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ def __init__(self, app: ASGIApp):
self.cache_policies: dict[str, str] = {
"/api/v1/k8s-limits": "public, max-age=300", # 5 minutes
"/api/v1/example-scripts": "public, max-age=600", # 10 minutes
"/api/v1/auth/verify-token": "private, no-cache", # 30 seconds
"/api/v1/notifications": "private, no-cache", # Always revalidate
"/api/v1/notifications/unread-count": "private, no-cache", # Always revalidate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
ReplaySessionDocument,
)
from app.domain.admin import ExecutionResultSummary, ReplaySessionData, ReplaySessionStatusDetail, ReplaySessionUpdate
from app.domain.enums import EventType, ReplayStatus
from app.domain.enums import EventType, ExecutionStatus, ReplayStatus
from app.domain.events import (
DomainEvent,
DomainEventAdapter,
Expand Down Expand Up @@ -176,7 +176,7 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:
Pipeline()
.match({
ExecutionDocument.created_at: {"$gte": start_time},
ExecutionDocument.status: "completed",
ExecutionDocument.status: ExecutionStatus.COMPLETED,
ExecutionDocument.resource_usage.execution_time_wall_seconds: {"$exists": True}, # type: ignore[union-attr]
})
.group(by=None, query={"avg_duration": S.avg(exec_time_field)})
Expand Down
Loading
Loading