Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions .github/workflows/stack-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,20 @@ jobs:
if: failure()
run: |
mkdir -p logs
docker compose logs > logs/docker-compose.log 2>&1
docker compose logs backend > logs/backend.log 2>&1
docker compose logs kafka > logs/kafka.log 2>&1
docker compose logs coordinator > logs/coordinator.log 2>&1 || true
docker compose logs k8s-worker > logs/k8s-worker.log 2>&1 || true
docker compose logs --timestamps > logs/docker-compose.log 2>&1
docker compose logs --timestamps backend > logs/backend.log 2>&1
docker compose logs --timestamps mongo > logs/mongo.log 2>&1 || true
docker compose logs --timestamps redis > logs/redis.log 2>&1 || true
docker compose logs --timestamps kafka > logs/kafka.log 2>&1 || true
docker compose logs --timestamps zookeeper > logs/zookeeper.log 2>&1 || true
docker compose logs --timestamps schema-registry > logs/schema-registry.log 2>&1 || true
docker compose logs --timestamps coordinator > logs/coordinator.log 2>&1 || true
docker compose logs --timestamps k8s-worker > logs/k8s-worker.log 2>&1 || true
docker compose logs --timestamps pod-monitor > logs/pod-monitor.log 2>&1 || true
docker compose logs --timestamps result-processor > logs/result-processor.log 2>&1 || true
docker compose logs --timestamps saga-orchestrator > logs/saga-orchestrator.log 2>&1 || true
docker compose logs --timestamps event-replay > logs/event-replay.log 2>&1 || true
docker compose logs --timestamps dlq-processor > logs/dlq-processor.log 2>&1 || true
kubectl get events --sort-by='.metadata.creationTimestamp' -A > logs/k8s-events.log 2>&1 || true

- name: Upload logs
Expand Down
35 changes: 17 additions & 18 deletions backend/app/api/routes/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,72 +12,71 @@
SessionSummary,
)
from app.schemas_pydantic.replay_models import ReplaySession
from app.services.replay_service import ReplayService
from app.services.event_replay import EventReplayService

router = APIRouter(prefix="/replay", tags=["Event Replay"], route_class=DishkaRoute, dependencies=[Depends(admin_user)])


@router.post("/sessions", response_model=ReplayResponse)
async def create_replay_session(
replay_request: ReplayRequest,
service: FromDishka[ReplayService],
service: FromDishka[EventReplayService],
) -> ReplayResponse:
result = await service.create_session_from_config(ReplayConfig(**replay_request.model_dump()))
return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message)
return ReplayResponse(**result.model_dump())


@router.post("/sessions/{session_id}/start", response_model=ReplayResponse)
async def start_replay_session(
session_id: str,
service: FromDishka[ReplayService],
service: FromDishka[EventReplayService],
) -> ReplayResponse:
result = await service.start_session(session_id)
return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message)
return ReplayResponse(**result.model_dump())


@router.post("/sessions/{session_id}/pause", response_model=ReplayResponse)
async def pause_replay_session(
session_id: str,
service: FromDishka[ReplayService],
service: FromDishka[EventReplayService],
) -> ReplayResponse:
result = await service.pause_session(session_id)
return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message)
return ReplayResponse(**result.model_dump())


@router.post("/sessions/{session_id}/resume", response_model=ReplayResponse)
async def resume_replay_session(session_id: str, service: FromDishka[ReplayService]) -> ReplayResponse:
async def resume_replay_session(session_id: str, service: FromDishka[EventReplayService]) -> ReplayResponse:
result = await service.resume_session(session_id)
return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message)
return ReplayResponse(**result.model_dump())


@router.post("/sessions/{session_id}/cancel", response_model=ReplayResponse)
async def cancel_replay_session(session_id: str, service: FromDishka[ReplayService]) -> ReplayResponse:
async def cancel_replay_session(session_id: str, service: FromDishka[EventReplayService]) -> ReplayResponse:
result = await service.cancel_session(session_id)
return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message)
return ReplayResponse(**result.model_dump())


@router.get("/sessions", response_model=list[SessionSummary])
async def list_replay_sessions(
service: FromDishka[ReplayService],
service: FromDishka[EventReplayService],
status: ReplayStatus | None = Query(None),
limit: int = Query(100, ge=1, le=1000),
) -> list[SessionSummary]:
return [
SessionSummary.model_validate({**s.model_dump(), **s.model_dump()["config"]})
SessionSummary(**{**s.model_dump(), **s.config.model_dump()})
for s in service.list_sessions(status=status, limit=limit)
]


@router.get("/sessions/{session_id}", response_model=ReplaySession)
async def get_replay_session(session_id: str, service: FromDishka[ReplayService]) -> ReplaySession:
state = service.get_session(session_id)
return ReplaySession.model_validate(state)
async def get_replay_session(session_id: str, service: FromDishka[EventReplayService]) -> ReplaySession:
return ReplaySession.model_validate(service.get_session(session_id))


@router.post("/cleanup", response_model=CleanupResponse)
async def cleanup_old_sessions(
service: FromDishka[ReplayService],
service: FromDishka[EventReplayService],
older_than_hours: int = Query(24, ge=1),
) -> CleanupResponse:
result = await service.cleanup_old_sessions(older_than_hours)
return CleanupResponse(removed_sessions=result.removed_sessions, message=result.message)
return CleanupResponse(**result.model_dump())
18 changes: 4 additions & 14 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
from app.services.pod_monitor.event_mapper import PodEventMapper
from app.services.pod_monitor.monitor import PodMonitor
from app.services.rate_limit_service import RateLimitService
from app.services.replay_service import ReplayService
from app.services.result_processor.resource_cleaner import ResourceCleaner
from app.services.saga import SagaOrchestrator, create_saga_orchestrator
from app.services.saga.saga_service import SagaService
Expand Down Expand Up @@ -497,10 +496,10 @@ class AdminServicesProvider(Provider):
def get_admin_events_service(
self,
admin_events_repository: AdminEventsRepository,
replay_service: ReplayService,
event_replay_service: EventReplayService,
logger: logging.Logger,
) -> AdminEventsService:
return AdminEventsService(admin_events_repository, replay_service, logger)
return AdminEventsService(admin_events_repository, event_replay_service, logger)

@provide
def get_admin_settings_service(
Expand Down Expand Up @@ -633,15 +632,6 @@ def get_saved_script_service(
) -> SavedScriptService:
return SavedScriptService(saved_script_repository, logger)

@provide
def get_replay_service(
self,
replay_repository: ReplayRepository,
event_replay_service: EventReplayService,
logger: logging.Logger,
) -> ReplayService:
return ReplayService(replay_repository, event_replay_service, logger)

@provide
def get_admin_user_service(
self,
Expand Down Expand Up @@ -864,13 +854,13 @@ def get_event_replay_service(
replay_repository: ReplayRepository,
kafka_producer: UnifiedProducer,
event_store: EventStore,
settings: Settings,
replay_metrics: ReplayMetrics,
logger: logging.Logger,
) -> EventReplayService:
return EventReplayService(
repository=replay_repository,
producer=kafka_producer,
event_store=event_store,
settings=settings,
replay_metrics=replay_metrics,
logger=logger,
)
10 changes: 1 addition & 9 deletions backend/app/domain/replay/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any
from uuid import uuid4

from pydantic import BaseModel, ConfigDict, Field, PrivateAttr
from pydantic import BaseModel, ConfigDict, Field

from app.domain.enums.events import EventType
from app.domain.enums.kafka import KafkaTopic
Expand Down Expand Up @@ -127,14 +127,6 @@ class ReplayConfig(BaseModel):
retry_attempts: int = 3

enable_progress_tracking: bool = True
# Use PrivateAttr to avoid including callables in schema and serialization
_progress_callback: Any = PrivateAttr(default=None)

def set_progress_callback(self, cb: Any) -> None:
self._progress_callback = cb

def get_progress_callback(self) -> Any:
return self._progress_callback


class ReplaySessionState(BaseModel):
Expand Down
4 changes: 2 additions & 2 deletions backend/app/services/admin/admin_events_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
EventSummary,
)
from app.domain.replay import ReplayConfig, ReplayFilter
from app.services.replay_service import ReplayService
from app.services.event_replay import EventReplayService


def _export_row_to_dict(row: EventExportRow) -> dict[str, str]:
Expand Down Expand Up @@ -69,7 +69,7 @@ class ExportResult:

class AdminEventsService:
def __init__(
self, repository: AdminEventsRepository, replay_service: ReplayService, logger: logging.Logger
self, repository: AdminEventsRepository, replay_service: EventReplayService, logger: logging.Logger
) -> None:
self._repo = repository
self._replay_service = replay_service
Expand Down
2 changes: 0 additions & 2 deletions backend/app/services/event_replay/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from app.domain.enums.replay import ReplayStatus, ReplayTarget, ReplayType
from app.domain.replay import ReplayConfig, ReplayFilter
from app.schemas_pydantic.replay_models import ReplaySession
from app.services.event_replay.replay_service import EventReplayService

__all__ = [
Expand All @@ -10,5 +9,4 @@
"ReplayTarget",
"ReplayFilter",
"ReplayConfig",
"ReplaySession",
]
Loading
Loading