From bfc807d562fe916ec1a4062dea1e2defc7ec1d77 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 22:12:13 +0100 Subject: [PATCH 1/7] event replay - replay service -> passing metric obj through DI --- backend/app/core/providers.py | 4 ++-- backend/app/services/event_replay/replay_service.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index 88799e06..78c4cdde 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -864,13 +864,13 @@ def get_event_replay_service( replay_repository: ReplayRepository, kafka_producer: UnifiedProducer, event_store: EventStore, - settings: Settings, + replay_metrics: ReplayMetrics, logger: logging.Logger, ) -> EventReplayService: return EventReplayService( repository=replay_repository, producer=kafka_producer, event_store=event_store, - settings=settings, + replay_metrics=replay_metrics, logger=logger, ) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index e5266c4e..e8f718c8 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -18,7 +18,6 @@ from app.domain.replay import ReplayConfig, ReplayError, ReplaySessionState from app.events.core import UnifiedProducer from app.events.event_store import EventStore -from app.settings import Settings class EventReplayService: @@ -27,7 +26,7 @@ def __init__( repository: ReplayRepository, producer: UnifiedProducer, event_store: EventStore, - settings: Settings, + replay_metrics: ReplayMetrics, logger: logging.Logger, ) -> None: self._sessions: dict[str, ReplaySessionState] = {} @@ -38,7 +37,7 @@ def __init__( self.logger = logger self._callbacks: dict[ReplayTarget, Callable[..., Any]] = {} self._file_locks: dict[str, asyncio.Lock] = {} - self._metrics = ReplayMetrics(settings) + self._metrics = replay_metrics self.logger.info("Event replay service initialized") async def create_replay_session(self, config: ReplayConfig) -> str: From c7a7559c69780d37b20d7350c303cda0144a7add Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 22:38:50 +0100 Subject: [PATCH 2/7] removed dead code, consolidated 2 services into 1 --- backend/app/api/routes/replay.py | 35 ++--- backend/app/core/providers.py | 14 +- backend/app/domain/replay/models.py | 10 +- .../services/admin/admin_events_service.py | 4 +- backend/app/services/event_replay/__init__.py | 2 - .../services/event_replay/replay_service.py | 112 +++++++++----- backend/app/services/replay_service.py | 145 ------------------ backend/tests/e2e/core/test_container.py | 8 +- .../services/replay/test_replay_service.py | 20 +-- 9 files changed, 113 insertions(+), 237 deletions(-) delete mode 100644 backend/app/services/replay_service.py diff --git a/backend/app/api/routes/replay.py b/backend/app/api/routes/replay.py index fc83c4a1..15fb8608 100644 --- a/backend/app/api/routes/replay.py +++ b/backend/app/api/routes/replay.py @@ -12,7 +12,7 @@ SessionSummary, ) from app.schemas_pydantic.replay_models import ReplaySession -from app.services.replay_service import ReplayService +from app.services.event_replay import EventReplayService router = APIRouter(prefix="/replay", tags=["Event Replay"], route_class=DishkaRoute, dependencies=[Depends(admin_user)]) @@ -20,64 +20,63 @@ @router.post("/sessions", response_model=ReplayResponse) async def create_replay_session( replay_request: ReplayRequest, - service: FromDishka[ReplayService], + service: FromDishka[EventReplayService], ) -> ReplayResponse: result = await service.create_session_from_config(ReplayConfig(**replay_request.model_dump())) - return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message) + return ReplayResponse(**result.model_dump()) @router.post("/sessions/{session_id}/start", response_model=ReplayResponse) async def start_replay_session( session_id: str, - service: FromDishka[ReplayService], + service: FromDishka[EventReplayService], ) -> ReplayResponse: result = await service.start_session(session_id) - return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message) + return ReplayResponse(**result.model_dump()) @router.post("/sessions/{session_id}/pause", response_model=ReplayResponse) async def pause_replay_session( session_id: str, - service: FromDishka[ReplayService], + service: FromDishka[EventReplayService], ) -> ReplayResponse: result = await service.pause_session(session_id) - return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message) + return ReplayResponse(**result.model_dump()) @router.post("/sessions/{session_id}/resume", response_model=ReplayResponse) -async def resume_replay_session(session_id: str, service: FromDishka[ReplayService]) -> ReplayResponse: +async def resume_replay_session(session_id: str, service: FromDishka[EventReplayService]) -> ReplayResponse: result = await service.resume_session(session_id) - return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message) + return ReplayResponse(**result.model_dump()) @router.post("/sessions/{session_id}/cancel", response_model=ReplayResponse) -async def cancel_replay_session(session_id: str, service: FromDishka[ReplayService]) -> ReplayResponse: +async def cancel_replay_session(session_id: str, service: FromDishka[EventReplayService]) -> ReplayResponse: result = await service.cancel_session(session_id) - return ReplayResponse(session_id=result.session_id, status=result.status, message=result.message) + return ReplayResponse(**result.model_dump()) @router.get("/sessions", response_model=list[SessionSummary]) async def list_replay_sessions( - service: FromDishka[ReplayService], + service: FromDishka[EventReplayService], status: ReplayStatus | None = Query(None), limit: int = Query(100, ge=1, le=1000), ) -> list[SessionSummary]: return [ - SessionSummary.model_validate({**s.model_dump(), **s.model_dump()["config"]}) + SessionSummary(**{**s.model_dump(), **s.config.model_dump()}) for s in service.list_sessions(status=status, limit=limit) ] @router.get("/sessions/{session_id}", response_model=ReplaySession) -async def get_replay_session(session_id: str, service: FromDishka[ReplayService]) -> ReplaySession: - state = service.get_session(session_id) - return ReplaySession.model_validate(state) +async def get_replay_session(session_id: str, service: FromDishka[EventReplayService]) -> ReplaySession: + return ReplaySession.model_validate(service.get_session(session_id)) @router.post("/cleanup", response_model=CleanupResponse) async def cleanup_old_sessions( - service: FromDishka[ReplayService], + service: FromDishka[EventReplayService], older_than_hours: int = Query(24, ge=1), ) -> CleanupResponse: result = await service.cleanup_old_sessions(older_than_hours) - return CleanupResponse(removed_sessions=result.removed_sessions, message=result.message) + return CleanupResponse(**result.model_dump()) diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index 78c4cdde..ae9e9811 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -70,7 +70,6 @@ from app.services.pod_monitor.event_mapper import PodEventMapper from app.services.pod_monitor.monitor import PodMonitor from app.services.rate_limit_service import RateLimitService -from app.services.replay_service import ReplayService from app.services.result_processor.resource_cleaner import ResourceCleaner from app.services.saga import SagaOrchestrator, create_saga_orchestrator from app.services.saga.saga_service import SagaService @@ -497,10 +496,10 @@ class AdminServicesProvider(Provider): def get_admin_events_service( self, admin_events_repository: AdminEventsRepository, - replay_service: ReplayService, + event_replay_service: EventReplayService, logger: logging.Logger, ) -> AdminEventsService: - return AdminEventsService(admin_events_repository, replay_service, logger) + return AdminEventsService(admin_events_repository, event_replay_service, logger) @provide def get_admin_settings_service( @@ -633,15 +632,6 @@ def get_saved_script_service( ) -> SavedScriptService: return SavedScriptService(saved_script_repository, logger) - @provide - def get_replay_service( - self, - replay_repository: ReplayRepository, - event_replay_service: EventReplayService, - logger: logging.Logger, - ) -> ReplayService: - return ReplayService(replay_repository, event_replay_service, logger) - @provide def get_admin_user_service( self, diff --git a/backend/app/domain/replay/models.py b/backend/app/domain/replay/models.py index c9da5753..2d1e3c9f 100644 --- a/backend/app/domain/replay/models.py +++ b/backend/app/domain/replay/models.py @@ -2,7 +2,7 @@ from typing import Any from uuid import uuid4 -from pydantic import BaseModel, ConfigDict, Field, PrivateAttr +from pydantic import BaseModel, ConfigDict, Field from app.domain.enums.events import EventType from app.domain.enums.kafka import KafkaTopic @@ -127,14 +127,6 @@ class ReplayConfig(BaseModel): retry_attempts: int = 3 enable_progress_tracking: bool = True - # Use PrivateAttr to avoid including callables in schema and serialization - _progress_callback: Any = PrivateAttr(default=None) - - def set_progress_callback(self, cb: Any) -> None: - self._progress_callback = cb - - def get_progress_callback(self) -> Any: - return self._progress_callback class ReplaySessionState(BaseModel): diff --git a/backend/app/services/admin/admin_events_service.py b/backend/app/services/admin/admin_events_service.py index ad20916f..f3380578 100644 --- a/backend/app/services/admin/admin_events_service.py +++ b/backend/app/services/admin/admin_events_service.py @@ -21,7 +21,7 @@ EventSummary, ) from app.domain.replay import ReplayConfig, ReplayFilter -from app.services.replay_service import ReplayService +from app.services.event_replay import EventReplayService def _export_row_to_dict(row: EventExportRow) -> dict[str, str]: @@ -69,7 +69,7 @@ class ExportResult: class AdminEventsService: def __init__( - self, repository: AdminEventsRepository, replay_service: ReplayService, logger: logging.Logger + self, repository: AdminEventsRepository, replay_service: EventReplayService, logger: logging.Logger ) -> None: self._repo = repository self._replay_service = replay_service diff --git a/backend/app/services/event_replay/__init__.py b/backend/app/services/event_replay/__init__.py index 82e67bc5..aab4eb0d 100644 --- a/backend/app/services/event_replay/__init__.py +++ b/backend/app/services/event_replay/__init__.py @@ -1,6 +1,5 @@ from app.domain.enums.replay import ReplayStatus, ReplayTarget, ReplayType from app.domain.replay import ReplayConfig, ReplayFilter -from app.schemas_pydantic.replay_models import ReplaySession from app.services.event_replay.replay_service import EventReplayService __all__ = [ @@ -10,5 +9,4 @@ "ReplayTarget", "ReplayFilter", "ReplayConfig", - "ReplaySession", ] diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index e8f718c8..538d57b5 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -1,5 +1,4 @@ import asyncio -import inspect import json import logging from collections.abc import AsyncIterator @@ -15,7 +14,15 @@ from app.domain.admin.replay_updates import ReplaySessionUpdate from app.domain.enums.replay import ReplayStatus, ReplayTarget from app.domain.events.typed import DomainEvent -from app.domain.replay import ReplayConfig, ReplayError, ReplaySessionState +from app.domain.replay import ( + CleanupResult, + ReplayConfig, + ReplayError, + ReplayOperationError, + ReplayOperationResult, + ReplaySessionNotFoundError, + ReplaySessionState, +) from app.events.core import UnifiedProducer from app.events.event_store import EventStore @@ -54,10 +61,10 @@ async def create_replay_session(self, config: ReplayConfig) -> str: async def start_replay(self, session_id: str) -> None: session = self._sessions.get(session_id) if not session: - raise ValueError(f"Session {session_id} not found") + raise ReplaySessionNotFoundError(session_id) if session.status != ReplayStatus.CREATED: - raise ValueError(f"Session {session_id} already started") + raise ReplayOperationError(session_id, "start", "Session already started") task = asyncio.create_task(self._run_replay(session)) self._active_tasks[session_id] = task @@ -81,14 +88,14 @@ async def _run_replay(self, session: ReplaySessionState) -> None: "replay.target": session.config.target, }, ): - await self._prepare_session(session) + total_count = await self._repository.count_events(session.config.filter) + session.total_events = min(total_count, session.config.max_events or total_count) async for batch in self._fetch_event_batches(session): if session.status != ReplayStatus.RUNNING: break await self._process_batch(session, batch) - await self._handle_progress_callback(session) await self._complete_session(session, start_time) @@ -98,25 +105,6 @@ async def _run_replay(self, session: ReplaySessionState) -> None: self._metrics.decrement_active_replays() self._active_tasks.pop(session.session_id, None) - async def _prepare_session(self, session: ReplaySessionState) -> None: - total_count = await self._repository.count_events(session.config.filter) - session.total_events = min(total_count, session.config.max_events or total_count) - - self.logger.info( - "Replay session will process events", - extra={"session_id": session.session_id, "total_events": session.total_events}, - ) - - async def _handle_progress_callback(self, session: ReplaySessionState) -> None: - cb = session.config.get_progress_callback() - if cb is not None: - try: - result = cb(session) - if inspect.isawaitable(result): - await result - except Exception as e: - self.logger.error(f"Progress callback error: {e}") - async def _complete_session(self, session: ReplaySessionState, start_time: float) -> None: session.status = ReplayStatus.COMPLETED session.completed_at = datetime.now(timezone.utc) @@ -294,7 +282,7 @@ def _write_to_file_sync(self, event: DomainEvent, file_path: str) -> None: async def pause_replay(self, session_id: str) -> None: session = self._sessions.get(session_id) if not session: - raise ValueError(f"Session {session_id} not found") + raise ReplaySessionNotFoundError(session_id) if session.status == ReplayStatus.RUNNING: session.status = ReplayStatus.PAUSED @@ -303,7 +291,7 @@ async def pause_replay(self, session_id: str) -> None: async def resume_replay(self, session_id: str) -> None: session = self._sessions.get(session_id) if not session: - raise ValueError(f"Session {session_id} not found") + raise ReplaySessionNotFoundError(session_id) if session.status == ReplayStatus.PAUSED: session.status = ReplayStatus.RUNNING @@ -312,7 +300,7 @@ async def resume_replay(self, session_id: str) -> None: async def cancel_replay(self, session_id: str) -> None: session = self._sessions.get(session_id) if not session: - raise ValueError(f"Session {session_id} not found") + raise ReplaySessionNotFoundError(session_id) session.status = ReplayStatus.CANCELLED @@ -322,8 +310,11 @@ async def cancel_replay(self, session_id: str) -> None: self.logger.info("Cancelled replay session", extra={"session_id": session_id}) - def get_session(self, session_id: str) -> ReplaySessionState | None: - return self._sessions.get(session_id) + def get_session(self, session_id: str) -> ReplaySessionState: + session = self._sessions.get(session_id) + if not session: + raise ReplaySessionNotFoundError(session_id) + return session def list_sessions(self, status: ReplayStatus | None = None, limit: int = 100) -> list[ReplaySessionState]: sessions = list(self._sessions.values()) @@ -339,9 +330,9 @@ def register_callback( ) -> None: self._callbacks[target] = callback - async def cleanup_old_sessions(self, older_than_hours: int = 24) -> int: + async def cleanup_old_sessions(self, older_than_hours: int = 24) -> CleanupResult: cutoff_time = datetime.now(timezone.utc) - timedelta(hours=older_than_hours) - removed = 0 + removed_memory = 0 completed_statuses = {ReplayStatus.COMPLETED, ReplayStatus.FAILED, ReplayStatus.CANCELLED} @@ -349,10 +340,61 @@ async def cleanup_old_sessions(self, older_than_hours: int = 24) -> int: session = self._sessions[session_id] if session.status in completed_statuses and session.created_at < cutoff_time: del self._sessions[session_id] - removed += 1 + removed_memory += 1 + + removed_db = await self._repository.delete_old_sessions(cutoff_time) + total_removed = max(removed_memory, removed_db) + + self.logger.info("Cleaned up old replay sessions", extra={"removed_count": total_removed}) + return CleanupResult(removed_sessions=total_removed, message=f"Removed {total_removed} old sessions") - self.logger.info("Cleaned up old replay sessions", extra={"removed_count": removed}) - return removed + async def create_session_from_config(self, config: ReplayConfig) -> ReplayOperationResult: + """Create a new replay session, persist it, and return an operation result.""" + try: + session_id = await self.create_replay_session(config) + session = self._sessions.get(session_id) + if session: + await self._repository.save_session(session) + return ReplayOperationResult( + session_id=session_id, + status=ReplayStatus.CREATED, + message="Replay session created successfully", + ) + except Exception as e: + self.logger.error(f"Failed to create replay session: {e}") + raise ReplayOperationError("", "create", str(e)) from e + + async def start_session(self, session_id: str) -> ReplayOperationResult: + """Start a replay session and persist the status change.""" + await self.start_replay(session_id) + await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session started" + ) + + async def pause_session(self, session_id: str) -> ReplayOperationResult: + """Pause a replay session and persist the status change.""" + await self.pause_replay(session_id) + await self._repository.update_session_status(session_id, ReplayStatus.PAUSED) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" + ) + + async def resume_session(self, session_id: str) -> ReplayOperationResult: + """Resume a paused replay session and persist the status change.""" + await self.resume_replay(session_id) + await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session resumed" + ) + + async def cancel_session(self, session_id: str) -> ReplayOperationResult: + """Cancel a replay session and persist the status change.""" + await self.cancel_replay(session_id) + await self._repository.update_session_status(session_id, ReplayStatus.CANCELLED) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.CANCELLED, message="Replay session cancelled" + ) async def _update_session_in_db(self, session: ReplaySessionState) -> None: """Update session progress in the database.""" diff --git a/backend/app/services/replay_service.py b/backend/app/services/replay_service.py deleted file mode 100644 index 56ba6ac3..00000000 --- a/backend/app/services/replay_service.py +++ /dev/null @@ -1,145 +0,0 @@ -import logging -from datetime import datetime, timedelta, timezone - -from app.db.repositories.replay_repository import ReplayRepository -from app.domain.replay import ( - ReplayConfig, - ReplayOperationError, - ReplayOperationResult, - ReplaySessionNotFoundError, - ReplaySessionState, -) -from app.schemas_pydantic.replay import CleanupResponse -from app.services.event_replay import ( - EventReplayService, - ReplayStatus, -) - - -class ReplayService: - """Service for managing replay sessions and providing business logic""" - - def __init__( - self, repository: ReplayRepository, event_replay_service: EventReplayService, logger: logging.Logger - ) -> None: - self.repository = repository - self.event_replay_service = event_replay_service - self.logger = logger - - async def create_session_from_config(self, config: ReplayConfig) -> ReplayOperationResult: - """Create a new replay session from a domain config""" - try: - session_id = await self.event_replay_service.create_replay_session(config) - session = self.event_replay_service.get_session(session_id) - if session: - await self.repository.save_session(session) - return ReplayOperationResult( - session_id=session_id, - status=ReplayStatus.CREATED, - message="Replay session created successfully", - ) - except Exception as e: - self.logger.error(f"Failed to create replay session: {e}") - raise ReplayOperationError("", "create", str(e)) from e - - async def start_session(self, session_id: str) -> ReplayOperationResult: - """Start a replay session""" - self.logger.info(f"Starting replay session {session_id}") - try: - await self.event_replay_service.start_replay(session_id) - - await self.repository.update_session_status(session_id, ReplayStatus.RUNNING) - - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session started" - ) - - except ValueError: - raise ReplaySessionNotFoundError(session_id) - except Exception as e: - self.logger.error(f"Failed to start replay session: {e}") - raise ReplayOperationError(session_id, "start", str(e)) from e - - async def pause_session(self, session_id: str) -> ReplayOperationResult: - """Pause a replay session""" - try: - await self.event_replay_service.pause_replay(session_id) - - await self.repository.update_session_status(session_id, ReplayStatus.PAUSED) - - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" - ) - - except ValueError: - raise ReplaySessionNotFoundError(session_id) - except Exception as e: - self.logger.error(f"Failed to pause replay session: {e}") - raise ReplayOperationError(session_id, "pause", str(e)) from e - - async def resume_session(self, session_id: str) -> ReplayOperationResult: - """Resume a paused replay session""" - try: - await self.event_replay_service.resume_replay(session_id) - - await self.repository.update_session_status(session_id, ReplayStatus.RUNNING) - - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session resumed" - ) - - except ValueError: - raise ReplaySessionNotFoundError(session_id) - except Exception as e: - self.logger.error(f"Failed to resume replay session: {e}") - raise ReplayOperationError(session_id, "resume", str(e)) from e - - async def cancel_session(self, session_id: str) -> ReplayOperationResult: - """Cancel a replay session""" - try: - await self.event_replay_service.cancel_replay(session_id) - - await self.repository.update_session_status(session_id, ReplayStatus.CANCELLED) - - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.CANCELLED, message="Replay session cancelled" - ) - - except ValueError: - raise ReplaySessionNotFoundError(session_id) - except Exception as e: - self.logger.error(f"Failed to cancel replay session: {e}") - raise ReplayOperationError(session_id, "cancel", str(e)) from e - - def list_sessions(self, status: ReplayStatus | None = None, limit: int = 100) -> list[ReplaySessionState]: - """List replay sessions with optional filtering (domain objects).""" - return self.event_replay_service.list_sessions(status=status, limit=limit) - - def get_session(self, session_id: str) -> ReplaySessionState: - """Get a specific replay session (domain).""" - try: - # Get from memory-based service for performance - session = self.event_replay_service.get_session(session_id) - if not session: - raise ReplaySessionNotFoundError(session_id) - return session - except ReplaySessionNotFoundError: - raise - except Exception as e: - self.logger.error(f"Failed to get replay session {session_id}: {e}") - raise ReplayOperationError(session_id, "get", str(e)) from e - - async def cleanup_old_sessions(self, older_than_hours: int = 24) -> CleanupResponse: - """Clean up old replay sessions""" - try: - removed_memory = await self.event_replay_service.cleanup_old_sessions(older_than_hours) - - # Clean up from database - cutoff_time = datetime.now(timezone.utc) - timedelta(hours=older_than_hours) - removed_db = await self.repository.delete_old_sessions(cutoff_time) - - total_removed = max(removed_memory, removed_db) - return CleanupResponse(removed_sessions=total_removed, message=f"Removed {total_removed} old sessions") - except Exception as e: - self.logger.error(f"Failed to cleanup old sessions: {e}") - raise ReplayOperationError("", "cleanup", str(e)) from e diff --git a/backend/tests/e2e/core/test_container.py b/backend/tests/e2e/core/test_container.py index 0cd7f95b..da711f02 100644 --- a/backend/tests/e2e/core/test_container.py +++ b/backend/tests/e2e/core/test_container.py @@ -9,7 +9,7 @@ from app.services.execution_service import ExecutionService from app.services.notification_service import NotificationService from app.services.rate_limit_service import RateLimitService -from app.services.replay_service import ReplayService +from app.services.event_replay import EventReplayService from app.services.saved_script_service import SavedScriptService from app.services.admin import AdminUserService from app.services.user_settings_service import UserSettingsService @@ -151,10 +151,10 @@ async def test_resolves_rate_limit_service( async def test_resolves_replay_service( self, scope: AsyncContainer ) -> None: - """Container resolves ReplayService.""" - service = await scope.get(ReplayService) + """Container resolves EventReplayService.""" + service = await scope.get(EventReplayService) - assert isinstance(service, ReplayService) + assert isinstance(service, EventReplayService) class TestServiceDependencies: diff --git a/backend/tests/e2e/services/replay/test_replay_service.py b/backend/tests/e2e/services/replay/test_replay_service.py index f84511a5..e7f847d6 100644 --- a/backend/tests/e2e/services/replay/test_replay_service.py +++ b/backend/tests/e2e/services/replay/test_replay_service.py @@ -2,7 +2,7 @@ from app.domain.enums.replay import ReplayStatus, ReplayTarget, ReplayType from app.domain.replay.exceptions import ReplaySessionNotFoundError from app.services.event_replay import ReplayConfig, ReplayFilter -from app.services.replay_service import ReplayService +from app.services.event_replay import EventReplayService from dishka import AsyncContainer pytestmark = [pytest.mark.e2e, pytest.mark.kafka] @@ -16,7 +16,7 @@ async def test_create_session_execution_type( self, scope: AsyncContainer ) -> None: """Create replay session for execution events.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) cfg = ReplayConfig( replay_type=ReplayType.EXECUTION, @@ -36,7 +36,7 @@ async def test_create_session_with_max_events( self, scope: AsyncContainer ) -> None: """Create session with event limit.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) cfg = ReplayConfig( replay_type=ReplayType.EXECUTION, @@ -54,7 +54,7 @@ async def test_create_session_with_filter( self, scope: AsyncContainer ) -> None: """Create session with event filter.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) replay_filter = ReplayFilter( aggregate_id="exec-1", @@ -76,7 +76,7 @@ class TestListSessions: @pytest.mark.asyncio async def test_list_sessions(self, scope: AsyncContainer) -> None: """List replay sessions.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) # Create a session first cfg = ReplayConfig( @@ -96,7 +96,7 @@ async def test_list_sessions(self, scope: AsyncContainer) -> None: @pytest.mark.asyncio async def test_list_sessions_with_limit(self, scope: AsyncContainer) -> None: """List sessions respects limit.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) sessions = svc.list_sessions(limit=5) @@ -110,7 +110,7 @@ class TestGetSession: @pytest.mark.asyncio async def test_get_session_by_id(self, scope: AsyncContainer) -> None: """Get session by ID.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) # Create a session cfg = ReplayConfig( @@ -130,7 +130,7 @@ async def test_get_session_by_id(self, scope: AsyncContainer) -> None: @pytest.mark.asyncio async def test_get_session_not_found(self, scope: AsyncContainer) -> None: """Get nonexistent session raises error.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) with pytest.raises(ReplaySessionNotFoundError): svc.get_session("nonexistent-session-id") @@ -142,7 +142,7 @@ class TestCancelSession: @pytest.mark.asyncio async def test_cancel_session(self, scope: AsyncContainer) -> None: """Cancel a replay session.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) # Create a session cfg = ReplayConfig( @@ -164,7 +164,7 @@ async def test_cancel_nonexistent_session( self, scope: AsyncContainer ) -> None: """Cancel nonexistent session raises error.""" - svc: ReplayService = await scope.get(ReplayService) + svc: EventReplayService = await scope.get(EventReplayService) with pytest.raises(ReplaySessionNotFoundError): await svc.cancel_session("nonexistent-session-id") From 63ac5aa6e40a510fe435fbfb605aa6ce54d2e8cf Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 22:46:14 +0100 Subject: [PATCH 3/7] removed calls to get_running_loop and run_in_executor --- .../services/event_replay/replay_service.py | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index 538d57b5..218e03dd 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -76,8 +76,6 @@ async def start_replay(self, session_id: str) -> None: self.logger.info("Started replay session", extra={"session_id": session_id}) async def _run_replay(self, session: ReplaySessionState) -> None: - start_time = asyncio.get_running_loop().time() - try: with trace_span( name="event_replay.session", @@ -97,7 +95,7 @@ async def _run_replay(self, session: ReplaySessionState) -> None: await self._process_batch(session, batch) - await self._complete_session(session, start_time) + await self._complete_session(session) except Exception as e: await self._handle_session_error(session, e) @@ -105,11 +103,11 @@ async def _run_replay(self, session: ReplaySessionState) -> None: self._metrics.decrement_active_replays() self._active_tasks.pop(session.session_id, None) - async def _complete_session(self, session: ReplaySessionState, start_time: float) -> None: + async def _complete_session(self, session: ReplaySessionState) -> None: session.status = ReplayStatus.COMPLETED session.completed_at = datetime.now(timezone.utc) - duration = asyncio.get_running_loop().time() - start_time + duration = (session.completed_at - session.started_at).total_seconds() if session.started_at else 0.0 self._metrics.record_replay_duration(duration, session.config.replay_type) await self._update_session_in_db(session) @@ -146,16 +144,6 @@ async def _apply_replay_delay(self, session: ReplaySessionState, event: DomainEv if delay > 0: await asyncio.sleep(delay) - def _update_replay_metrics(self, session: ReplaySessionState, event: DomainEvent, success: bool) -> None: - if success: - session.replayed_events += 1 - status = "success" - else: - session.failed_events += 1 - status = "failed" - - self._metrics.record_event_replayed(session.config.replay_type, event.event_type, status) - async def _handle_replay_error(self, session: ReplaySessionState, event: DomainEvent, error: Exception) -> None: self.logger.error("Failed to replay event", extra={"event_id": event.event_id, "error": str(error)}) session.failed_events += 1 @@ -235,7 +223,13 @@ async def _process_batch(self, session: ReplaySessionState, batch: list[DomainEv raise continue - self._update_replay_metrics(session, event, success) + if success: + session.replayed_events += 1 + else: + session.failed_events += 1 + self._metrics.record_event_replayed( + session.config.replay_type, event.event_type, "success" if success else "failed" + ) session.last_event_at = event.timestamp await self._update_session_in_db(session) @@ -271,13 +265,10 @@ async def _write_event_to_file(self, event: DomainEvent, file_path: str) -> None if file_path not in self._file_locks: self._file_locks[file_path] = asyncio.Lock() + line = json.dumps(event.model_dump(), default=str) + "\n" async with self._file_locks[file_path]: - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, self._write_to_file_sync, event, file_path) - - def _write_to_file_sync(self, event: DomainEvent, file_path: str) -> None: - with open(file_path, "a") as f: - f.write(json.dumps(event.model_dump(), default=str) + "\n") + with open(file_path, "a") as f: + f.write(line) async def pause_replay(self, session_id: str) -> None: session = self._sessions.get(session_id) From 986a4da43fa8e851a5621a5af2cd06c888959974 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 22:59:59 +0100 Subject: [PATCH 4/7] further fixes of replay service --- .../services/event_replay/replay_service.py | 356 +++++++----------- backend/pyproject.toml | 1 + backend/uv.lock | 11 + 3 files changed, 143 insertions(+), 225 deletions(-) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index 218e03dd..6bf53834 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -3,9 +3,9 @@ import logging from collections.abc import AsyncIterator from datetime import datetime, timedelta, timezone -from typing import Any, Callable from uuid import uuid4 +import aiofiles from opentelemetry.trace import SpanKind from app.core.metrics import ReplayMetrics @@ -42,27 +42,25 @@ def __init__( self._producer = producer self._event_store = event_store self.logger = logger - self._callbacks: dict[ReplayTarget, Callable[..., Any]] = {} self._file_locks: dict[str, asyncio.Lock] = {} self._metrics = replay_metrics - self.logger.info("Event replay service initialized") - async def create_replay_session(self, config: ReplayConfig) -> str: - state = ReplaySessionState(session_id=str(uuid4()), config=config) - self._sessions[state.session_id] = state - - self.logger.info( - "Created replay session", - extra={"session_id": state.session_id, "type": config.replay_type, "target": config.target}, - ) - - return state.session_id - - async def start_replay(self, session_id: str) -> None: - session = self._sessions.get(session_id) - if not session: - raise ReplaySessionNotFoundError(session_id) + async def create_session_from_config(self, config: ReplayConfig) -> ReplayOperationResult: + try: + state = ReplaySessionState(session_id=str(uuid4()), config=config) + self._sessions[state.session_id] = state + await self._repository.save_session(state) + return ReplayOperationResult( + session_id=state.session_id, + status=ReplayStatus.CREATED, + message="Replay session created successfully", + ) + except Exception as e: + self.logger.error(f"Failed to create replay session: {e}") + raise ReplayOperationError("", "create", str(e)) from e + async def start_session(self, session_id: str) -> ReplayOperationResult: + session = self.get_session(session_id) if session.status != ReplayStatus.CREATED: raise ReplayOperationError(session_id, "start", "Session already started") @@ -73,7 +71,71 @@ async def start_replay(self, session_id: str) -> None: session.started_at = datetime.now(timezone.utc) self._metrics.increment_active_replays() - self.logger.info("Started replay session", extra={"session_id": session_id}) + await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session started" + ) + + async def pause_session(self, session_id: str) -> ReplayOperationResult: + session = self.get_session(session_id) + if session.status == ReplayStatus.RUNNING: + session.status = ReplayStatus.PAUSED + await self._repository.update_session_status(session_id, ReplayStatus.PAUSED) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" + ) + + async def resume_session(self, session_id: str) -> ReplayOperationResult: + session = self.get_session(session_id) + if session.status == ReplayStatus.PAUSED: + session.status = ReplayStatus.RUNNING + await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session resumed" + ) + + async def cancel_session(self, session_id: str) -> ReplayOperationResult: + session = self.get_session(session_id) + session.status = ReplayStatus.CANCELLED + + task = self._active_tasks.get(session_id) + if task and not task.done(): + task.cancel() + + await self._repository.update_session_status(session_id, ReplayStatus.CANCELLED) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.CANCELLED, message="Replay session cancelled" + ) + + def get_session(self, session_id: str) -> ReplaySessionState: + session = self._sessions.get(session_id) + if not session: + raise ReplaySessionNotFoundError(session_id) + return session + + def list_sessions(self, status: ReplayStatus | None = None, limit: int = 100) -> list[ReplaySessionState]: + sessions = list(self._sessions.values()) + if status: + sessions = [s for s in sessions if s.status == status] + sessions.sort(key=lambda s: s.created_at, reverse=True) + return sessions[:limit] + + async def cleanup_old_sessions(self, older_than_hours: int = 24) -> CleanupResult: + cutoff_time = datetime.now(timezone.utc) - timedelta(hours=older_than_hours) + removed_memory = 0 + + completed_statuses = {ReplayStatus.COMPLETED, ReplayStatus.FAILED, ReplayStatus.CANCELLED} + for session_id in list(self._sessions.keys()): + session = self._sessions[session_id] + if session.status in completed_statuses and session.created_at < cutoff_time: + del self._sessions[session_id] + removed_memory += 1 + + removed_db = await self._repository.delete_old_sessions(cutoff_time) + total_removed = max(removed_memory, removed_db) + + self.logger.info("Cleaned up old replay sessions", extra={"removed_count": total_removed}) + return CleanupResult(removed_sessions=total_removed, message=f"Removed {total_removed} old sessions") async def _run_replay(self, session: ReplaySessionState) -> None: try: @@ -92,91 +154,42 @@ async def _run_replay(self, session: ReplaySessionState) -> None: async for batch in self._fetch_event_batches(session): if session.status != ReplayStatus.RUNNING: break - await self._process_batch(session, batch) - await self._complete_session(session) + session.status = ReplayStatus.COMPLETED + session.completed_at = datetime.now(timezone.utc) + duration = (session.completed_at - session.started_at).total_seconds() if session.started_at else 0.0 + self._metrics.record_replay_duration(duration, session.config.replay_type) + await self._update_session_in_db(session) + self.logger.info( + "Replay session completed", + extra={ + "session_id": session.session_id, + "replayed_events": session.replayed_events, + "failed_events": session.failed_events, + "skipped_events": session.skipped_events, + "duration_seconds": round(duration, 2), + }, + ) except Exception as e: - await self._handle_session_error(session, e) + self.logger.error( + "Replay session failed", + extra={"session_id": session.session_id, "error": str(e)}, + exc_info=True, + ) + session.status = ReplayStatus.FAILED + session.completed_at = datetime.now(timezone.utc) + session.errors.append( + ReplayError(timestamp=datetime.now(timezone.utc), error=str(e), error_type=type(e).__name__) + ) + self._metrics.record_replay_error(type(e).__name__) + await self._update_session_in_db(session) finally: self._metrics.decrement_active_replays() self._active_tasks.pop(session.session_id, None) - async def _complete_session(self, session: ReplaySessionState) -> None: - session.status = ReplayStatus.COMPLETED - session.completed_at = datetime.now(timezone.utc) - - duration = (session.completed_at - session.started_at).total_seconds() if session.started_at else 0.0 - self._metrics.record_replay_duration(duration, session.config.replay_type) - - await self._update_session_in_db(session) - - self.logger.info( - "Replay session completed", - extra={ - "session_id": session.session_id, - "replayed_events": session.replayed_events, - "failed_events": session.failed_events, - "skipped_events": session.skipped_events, - "duration_seconds": round(duration, 2), - }, - ) - - async def _handle_session_error(self, session: ReplaySessionState, error: Exception) -> None: - self.logger.error( - "Replay session failed", - extra={"session_id": session.session_id, "error": str(error)}, - exc_info=True, - ) - session.status = ReplayStatus.FAILED - session.completed_at = datetime.now(timezone.utc) - session.errors.append( - ReplayError(timestamp=datetime.now(timezone.utc), error=str(error), error_type=type(error).__name__) - ) - self._metrics.record_replay_error(type(error).__name__) - await self._update_session_in_db(session) - - async def _apply_replay_delay(self, session: ReplaySessionState, event: DomainEvent) -> None: - if session.last_event_at and session.config.speed_multiplier < 100: - time_diff = (event.timestamp - session.last_event_at).total_seconds() - delay = time_diff / session.config.speed_multiplier - if delay > 0: - await asyncio.sleep(delay) - - async def _handle_replay_error(self, session: ReplaySessionState, event: DomainEvent, error: Exception) -> None: - self.logger.error("Failed to replay event", extra={"event_id": event.event_id, "error": str(error)}) - session.failed_events += 1 - err = ReplayError( - timestamp=datetime.now(timezone.utc), event_id=str(event.event_id), error=str(error) - ) - session.errors.append(err) - - async def _replay_to_kafka(self, session: ReplaySessionState, event: DomainEvent) -> bool: - config = session.config - if not config.preserve_timestamps: - event.timestamp = datetime.now(timezone.utc) - - # Send the event without modifying its metadata structure - await self._producer.produce(event_to_produce=event) - return True - - async def _replay_to_callback(self, event: DomainEvent, session: ReplaySessionState) -> bool: - callback = self._callbacks.get(ReplayTarget.CALLBACK) - if callback: - await callback(event, session) - return True - return False - - async def _replay_to_file(self, event: DomainEvent, file_path: str | None) -> bool: - if not file_path: - self.logger.error("No target file path specified") - return False - await self._write_event_to_file(event, file_path) - return True - async def _fetch_event_batches(self, session: ReplaySessionState) -> AsyncIterator[list[DomainEvent]]: - self.logger.info("Fetching events for session", extra={"session_id": session.session_id}) events_processed = 0 max_events = session.config.max_events @@ -213,12 +226,20 @@ async def _process_batch(self, session: ReplaySessionState, batch: list[DomainEv if session.status != ReplayStatus.RUNNING: break - # Apply delay before external I/O - await self._apply_replay_delay(session, event) + if session.last_event_at and session.config.speed_multiplier < 100: + time_diff = (event.timestamp - session.last_event_at).total_seconds() + delay = time_diff / session.config.speed_multiplier + if delay > 0: + await asyncio.sleep(delay) + try: success = await self._replay_event(session, event) except Exception as e: - await self._handle_replay_error(session, event, e) + self.logger.error("Failed to replay event", extra={"event_id": event.event_id, "error": str(e)}) + session.failed_events += 1 + session.errors.append( + ReplayError(timestamp=datetime.now(timezone.utc), event_id=str(event.event_id), error=str(e)) + ) if not session.config.skip_errors: raise continue @@ -235,16 +256,21 @@ async def _process_batch(self, session: ReplaySessionState, batch: list[DomainEv async def _replay_event(self, session: ReplaySessionState, event: DomainEvent) -> bool: config = session.config - attempts = config.retry_attempts if config.retry_failed else 1 + for attempt in range(attempts): try: if config.target == ReplayTarget.KAFKA: - return await self._replay_to_kafka(session, event) - elif config.target == ReplayTarget.CALLBACK: - return await self._replay_to_callback(event, session) + if not config.preserve_timestamps: + event.timestamp = datetime.now(timezone.utc) + await self._producer.produce(event_to_produce=event) + return True elif config.target == ReplayTarget.FILE: - return await self._replay_to_file(event, config.target_file_path) + if not config.target_file_path: + self.logger.error("No target file path specified") + return False + await self._write_event_to_file(event, config.target_file_path) + return True elif config.target == ReplayTarget.TEST: return True else: @@ -267,128 +293,10 @@ async def _write_event_to_file(self, event: DomainEvent, file_path: str) -> None line = json.dumps(event.model_dump(), default=str) + "\n" async with self._file_locks[file_path]: - with open(file_path, "a") as f: - f.write(line) - - async def pause_replay(self, session_id: str) -> None: - session = self._sessions.get(session_id) - if not session: - raise ReplaySessionNotFoundError(session_id) - - if session.status == ReplayStatus.RUNNING: - session.status = ReplayStatus.PAUSED - self.logger.info("Paused replay session", extra={"session_id": session_id}) - - async def resume_replay(self, session_id: str) -> None: - session = self._sessions.get(session_id) - if not session: - raise ReplaySessionNotFoundError(session_id) - - if session.status == ReplayStatus.PAUSED: - session.status = ReplayStatus.RUNNING - self.logger.info("Resumed replay session", extra={"session_id": session_id}) - - async def cancel_replay(self, session_id: str) -> None: - session = self._sessions.get(session_id) - if not session: - raise ReplaySessionNotFoundError(session_id) - - session.status = ReplayStatus.CANCELLED - - task = self._active_tasks.get(session_id) - if task and not task.done(): - task.cancel() - - self.logger.info("Cancelled replay session", extra={"session_id": session_id}) - - def get_session(self, session_id: str) -> ReplaySessionState: - session = self._sessions.get(session_id) - if not session: - raise ReplaySessionNotFoundError(session_id) - return session - - def list_sessions(self, status: ReplayStatus | None = None, limit: int = 100) -> list[ReplaySessionState]: - sessions = list(self._sessions.values()) - - if status: - sessions = [s for s in sessions if s.status == status] - - sessions.sort(key=lambda s: s.created_at, reverse=True) - return sessions[:limit] - - def register_callback( - self, target: ReplayTarget, callback: Callable[[DomainEvent, ReplaySessionState], Any] - ) -> None: - self._callbacks[target] = callback - - async def cleanup_old_sessions(self, older_than_hours: int = 24) -> CleanupResult: - cutoff_time = datetime.now(timezone.utc) - timedelta(hours=older_than_hours) - removed_memory = 0 - - completed_statuses = {ReplayStatus.COMPLETED, ReplayStatus.FAILED, ReplayStatus.CANCELLED} - - for session_id in list(self._sessions.keys()): - session = self._sessions[session_id] - if session.status in completed_statuses and session.created_at < cutoff_time: - del self._sessions[session_id] - removed_memory += 1 - - removed_db = await self._repository.delete_old_sessions(cutoff_time) - total_removed = max(removed_memory, removed_db) - - self.logger.info("Cleaned up old replay sessions", extra={"removed_count": total_removed}) - return CleanupResult(removed_sessions=total_removed, message=f"Removed {total_removed} old sessions") - - async def create_session_from_config(self, config: ReplayConfig) -> ReplayOperationResult: - """Create a new replay session, persist it, and return an operation result.""" - try: - session_id = await self.create_replay_session(config) - session = self._sessions.get(session_id) - if session: - await self._repository.save_session(session) - return ReplayOperationResult( - session_id=session_id, - status=ReplayStatus.CREATED, - message="Replay session created successfully", - ) - except Exception as e: - self.logger.error(f"Failed to create replay session: {e}") - raise ReplayOperationError("", "create", str(e)) from e - - async def start_session(self, session_id: str) -> ReplayOperationResult: - """Start a replay session and persist the status change.""" - await self.start_replay(session_id) - await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session started" - ) - - async def pause_session(self, session_id: str) -> ReplayOperationResult: - """Pause a replay session and persist the status change.""" - await self.pause_replay(session_id) - await self._repository.update_session_status(session_id, ReplayStatus.PAUSED) - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" - ) - - async def resume_session(self, session_id: str) -> ReplayOperationResult: - """Resume a paused replay session and persist the status change.""" - await self.resume_replay(session_id) - await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session resumed" - ) - - async def cancel_session(self, session_id: str) -> ReplayOperationResult: - """Cancel a replay session and persist the status change.""" - await self.cancel_replay(session_id) - await self._repository.update_session_status(session_id, ReplayStatus.CANCELLED) - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.CANCELLED, message="Replay session cancelled" - ) + async with aiofiles.open(file_path, "a") as f: + await f.write(line) async def _update_session_in_db(self, session: ReplaySessionState) -> None: - """Update session progress in the database.""" try: session_update = ReplaySessionUpdate( status=session.status, @@ -397,8 +305,6 @@ async def _update_session_in_db(self, session: ReplaySessionState) -> None: skipped_events=session.skipped_events, completed_at=session.completed_at, ) - # Note: last_event_at is not in ReplaySessionUpdate - # If needed, add it to the domain model await self._repository.update_replay_session(session_id=session.session_id, updates=session_update) except Exception as e: self.logger.error(f"Failed to update session in database: {e}") diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 0573e657..5cdc255c 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -124,6 +124,7 @@ dependencies = [ "yarl==1.20.1", "zipp==3.20.2", "monggregate==0.22.1", + "aiofiles>=25.1.0", ] [build-system] diff --git a/backend/uv.lock b/backend/uv.lock index bd3555c6..048de166 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -6,6 +6,15 @@ resolution-markers = [ "python_full_version < '3.13'", ] +[[package]] +name = "aiofiles" +version = "25.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/41/c3/534eac40372d8ee36ef40df62ec129bee4fdb5ad9706e58a29be53b2c970/aiofiles-25.1.0.tar.gz", hash = "sha256:a8d728f0a29de45dc521f18f07297428d56992a742f0cd2701ba86e44d23d5b2", size = 46354, upload-time = "2025-10-09T20:51:04.358Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bc/8a/340a1555ae33d7354dbca4faa54948d76d89a27ceef032c8c3bc661d003e/aiofiles-25.1.0-py3-none-any.whl", hash = "sha256:abe311e527c862958650f9438e859c1fa7568a141b22abcd015e120e86a85695", size = 14668, upload-time = "2025-10-09T20:51:03.174Z" }, +] + [[package]] name = "aiohappyeyeballs" version = "2.6.1" @@ -1025,6 +1034,7 @@ name = "integr8scode" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "aiofiles" }, { name = "aiohappyeyeballs" }, { name = "aiohttp" }, { name = "aiokafka" }, @@ -1169,6 +1179,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "aiofiles", specifier = ">=25.1.0" }, { name = "aiohappyeyeballs", specifier = "==2.6.1" }, { name = "aiohttp", specifier = "==3.13.3" }, { name = "aiokafka", specifier = "==0.13.0" }, From 58391edba5cc1fa2f3b0a8d5c50ac9c43d2afef4 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 30 Jan 2026 00:12:13 +0100 Subject: [PATCH 5/7] fixes --- backend/app/services/event_replay/replay_service.py | 10 ++++++---- backend/pyproject.toml | 2 +- backend/uv.lock | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index 6bf53834..3e0e2b8d 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -78,8 +78,9 @@ async def start_session(self, session_id: str) -> ReplayOperationResult: async def pause_session(self, session_id: str) -> ReplayOperationResult: session = self.get_session(session_id) - if session.status == ReplayStatus.RUNNING: - session.status = ReplayStatus.PAUSED + if session.status != ReplayStatus.RUNNING: + raise ReplayOperationError(session_id, "pause", "Session is not running") + session.status = ReplayStatus.PAUSED await self._repository.update_session_status(session_id, ReplayStatus.PAUSED) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" @@ -87,8 +88,9 @@ async def pause_session(self, session_id: str) -> ReplayOperationResult: async def resume_session(self, session_id: str) -> ReplayOperationResult: session = self.get_session(session_id) - if session.status == ReplayStatus.PAUSED: - session.status = ReplayStatus.RUNNING + if session.status != ReplayStatus.PAUSED: + raise ReplayOperationError(session_id, "resume", "Session is not paused") + session.status = ReplayStatus.RUNNING await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session resumed" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 5cdc255c..1c970094 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -124,7 +124,7 @@ dependencies = [ "yarl==1.20.1", "zipp==3.20.2", "monggregate==0.22.1", - "aiofiles>=25.1.0", + "aiofiles==25.1.0", ] [build-system] diff --git a/backend/uv.lock b/backend/uv.lock index 048de166..2c500df1 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -1179,7 +1179,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "aiofiles", specifier = ">=25.1.0" }, + { name = "aiofiles", specifier = "==25.1.0" }, { name = "aiohappyeyeballs", specifier = "==2.6.1" }, { name = "aiohttp", specifier = "==3.13.3" }, { name = "aiokafka", specifier = "==0.13.0" }, From 4db348b4c5eb324587297bffb12ec71a3f217663 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 30 Jan 2026 00:45:04 +0100 Subject: [PATCH 6/7] =?UTF-8?q?Before:=20Finalization=20logic=20(set=20com?= =?UTF-8?q?pleted=5Fat,=20record=20duration,=20persist=20DB,=20decrement?= =?UTF-8?q?=20metrics,=20pop=20dicts,=20log)=20was=20duplicated=20across?= =?UTF-8?q?=203=20code=20paths=20=E2=80=94=20try=20success,=20except=20Can?= =?UTF-8?q?celledError,=20and=20except=20Exception.=20Each=20path=20had=20?= =?UTF-8?q?its=20=20=20=20own=20DB=20persist,=20timestamp,=20and=20log=20c?= =?UTF-8?q?all.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After: Each block does only its job: - try: Run the replay loop. Set COMPLETED on success. return on non-RUNNING (no break + post-loop guard). - except CancelledError: One line — set CANCELLED. - except Exception: Set FAILED, append error, record error metric, log the error. - finally: ALL shared finalization — completed_at, duration metric (only for COMPLETED), DB persist, decrement active replays, pop from dicts, single log line with final status. _replay_event — match/case Replaced the if/elif/elif/else chain with match/case on config.target. Single return True after the match block instead of duplicated return True in each branch. _process_batch Changed break to return on non-RUNNING status for consistency with _run_replay. --- .../services/event_replay/replay_service.py | 103 +++++++++++------- 1 file changed, 64 insertions(+), 39 deletions(-) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index 3e0e2b8d..6741ff3e 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -38,6 +38,7 @@ def __init__( ) -> None: self._sessions: dict[str, ReplaySessionState] = {} self._active_tasks: dict[str, asyncio.Task[None]] = {} + self._resume_events: dict[str, asyncio.Event] = {} self._repository = repository self._producer = producer self._event_store = event_store @@ -64,6 +65,10 @@ async def start_session(self, session_id: str) -> ReplayOperationResult: if session.status != ReplayStatus.CREATED: raise ReplayOperationError(session_id, "start", "Session already started") + resume_event = asyncio.Event() + resume_event.set() + self._resume_events[session_id] = resume_event + task = asyncio.create_task(self._run_replay(session)) self._active_tasks[session_id] = task @@ -81,6 +86,9 @@ async def pause_session(self, session_id: str) -> ReplayOperationResult: if session.status != ReplayStatus.RUNNING: raise ReplayOperationError(session_id, "pause", "Session is not running") session.status = ReplayStatus.PAUSED + resume_event = self._resume_events.get(session_id) + if resume_event: + resume_event.clear() await self._repository.update_session_status(session_id, ReplayStatus.PAUSED) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" @@ -91,6 +99,9 @@ async def resume_session(self, session_id: str) -> ReplayOperationResult: if session.status != ReplayStatus.PAUSED: raise ReplayOperationError(session_id, "resume", "Session is not paused") session.status = ReplayStatus.RUNNING + resume_event = self._resume_events.get(session_id) + if resume_event: + resume_event.set() await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session resumed" @@ -100,6 +111,10 @@ async def cancel_session(self, session_id: str) -> ReplayOperationResult: session = self.get_session(session_id) session.status = ReplayStatus.CANCELLED + resume_event = self._resume_events.get(session_id) + if resume_event: + resume_event.set() + task = self._active_tasks.get(session_id) if task and not task.done(): task.cancel() @@ -154,42 +169,52 @@ async def _run_replay(self, session: ReplaySessionState) -> None: session.total_events = min(total_count, session.config.max_events or total_count) async for batch in self._fetch_event_batches(session): + await self._await_if_paused(session) if session.status != ReplayStatus.RUNNING: - break + return await self._process_batch(session, batch) session.status = ReplayStatus.COMPLETED - session.completed_at = datetime.now(timezone.utc) - duration = (session.completed_at - session.started_at).total_seconds() if session.started_at else 0.0 - self._metrics.record_replay_duration(duration, session.config.replay_type) - await self._update_session_in_db(session) - self.logger.info( - "Replay session completed", - extra={ - "session_id": session.session_id, - "replayed_events": session.replayed_events, - "failed_events": session.failed_events, - "skipped_events": session.skipped_events, - "duration_seconds": round(duration, 2), - }, - ) + except asyncio.CancelledError: + session.status = ReplayStatus.CANCELLED except Exception as e: - self.logger.error( - "Replay session failed", - extra={"session_id": session.session_id, "error": str(e)}, - exc_info=True, - ) + if session.status == ReplayStatus.CANCELLED: + return session.status = ReplayStatus.FAILED - session.completed_at = datetime.now(timezone.utc) session.errors.append( ReplayError(timestamp=datetime.now(timezone.utc), error=str(e), error_type=type(e).__name__) ) self._metrics.record_replay_error(type(e).__name__) - await self._update_session_in_db(session) + self.logger.error( + "Replay session failed", + extra={"session_id": session.session_id, "error": str(e)}, + exc_info=True, + ) finally: + session.completed_at = datetime.now(timezone.utc) + if session.status == ReplayStatus.COMPLETED and session.started_at: + duration = (session.completed_at - session.started_at).total_seconds() + self._metrics.record_replay_duration(duration, session.config.replay_type) self._metrics.decrement_active_replays() + await self._update_session_in_db(session) self._active_tasks.pop(session.session_id, None) + self._resume_events.pop(session.session_id, None) + self.logger.info( + "Replay session finished", + extra={ + "session_id": session.session_id, + "status": session.status.value if hasattr(session.status, "value") else session.status, + "replayed_events": session.replayed_events, + "failed_events": session.failed_events, + }, + ) + + async def _await_if_paused(self, session: ReplaySessionState) -> None: + if session.status == ReplayStatus.PAUSED: + resume_event = self._resume_events.get(session.session_id) + if resume_event: + await resume_event.wait() async def _fetch_event_batches(self, session: ReplaySessionState) -> AsyncIterator[list[DomainEvent]]: events_processed = 0 @@ -225,8 +250,9 @@ async def _process_batch(self, session: ReplaySessionState, batch: list[DomainEv }, ): for event in batch: + await self._await_if_paused(session) if session.status != ReplayStatus.RUNNING: - break + return if session.last_event_at and session.config.speed_multiplier < 100: time_diff = (event.timestamp - session.last_event_at).total_seconds() @@ -262,22 +288,22 @@ async def _replay_event(self, session: ReplaySessionState, event: DomainEvent) - for attempt in range(attempts): try: - if config.target == ReplayTarget.KAFKA: - if not config.preserve_timestamps: - event.timestamp = datetime.now(timezone.utc) - await self._producer.produce(event_to_produce=event) - return True - elif config.target == ReplayTarget.FILE: - if not config.target_file_path: - self.logger.error("No target file path specified") + match config.target: + case ReplayTarget.KAFKA: + if not config.preserve_timestamps: + event.timestamp = datetime.now(timezone.utc) + await self._producer.produce(event_to_produce=event) + case ReplayTarget.FILE: + if not config.target_file_path: + self.logger.error("No target file path specified") + return False + await self._write_event_to_file(event, config.target_file_path) + case ReplayTarget.TEST: + pass + case _: + self.logger.error("Unknown replay target", extra={"target": config.target}) return False - await self._write_event_to_file(event, config.target_file_path) - return True - elif config.target == ReplayTarget.TEST: - return True - else: - self.logger.error("Unknown replay target", extra={"target": config.target}) - return False + return True except Exception as e: self.logger.error( "Failed to replay event", @@ -285,7 +311,6 @@ async def _replay_event(self, session: ReplaySessionState, event: DomainEvent) - ) if attempt < attempts - 1: await asyncio.sleep(min(2**attempt, 10)) - continue return False From c78ae9abcdfd0ed543273291caf102cc84a2ab3e Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 30 Jan 2026 01:13:16 +0100 Subject: [PATCH 7/7] test fix + more logs in e2e tests --- .github/workflows/stack-tests.yml | 19 ++++++++++++++----- backend/tests/e2e/test_replay_routes.py | 16 +++++++++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/.github/workflows/stack-tests.yml b/.github/workflows/stack-tests.yml index 2cdea838..390fba13 100644 --- a/.github/workflows/stack-tests.yml +++ b/.github/workflows/stack-tests.yml @@ -247,11 +247,20 @@ jobs: if: failure() run: | mkdir -p logs - docker compose logs > logs/docker-compose.log 2>&1 - docker compose logs backend > logs/backend.log 2>&1 - docker compose logs kafka > logs/kafka.log 2>&1 - docker compose logs coordinator > logs/coordinator.log 2>&1 || true - docker compose logs k8s-worker > logs/k8s-worker.log 2>&1 || true + docker compose logs --timestamps > logs/docker-compose.log 2>&1 + docker compose logs --timestamps backend > logs/backend.log 2>&1 + docker compose logs --timestamps mongo > logs/mongo.log 2>&1 || true + docker compose logs --timestamps redis > logs/redis.log 2>&1 || true + docker compose logs --timestamps kafka > logs/kafka.log 2>&1 || true + docker compose logs --timestamps zookeeper > logs/zookeeper.log 2>&1 || true + docker compose logs --timestamps schema-registry > logs/schema-registry.log 2>&1 || true + docker compose logs --timestamps coordinator > logs/coordinator.log 2>&1 || true + docker compose logs --timestamps k8s-worker > logs/k8s-worker.log 2>&1 || true + docker compose logs --timestamps pod-monitor > logs/pod-monitor.log 2>&1 || true + docker compose logs --timestamps result-processor > logs/result-processor.log 2>&1 || true + docker compose logs --timestamps saga-orchestrator > logs/saga-orchestrator.log 2>&1 || true + docker compose logs --timestamps event-replay > logs/event-replay.log 2>&1 || true + docker compose logs --timestamps dlq-processor > logs/dlq-processor.log 2>&1 || true kubectl get events --sort-by='.metadata.creationTimestamp' -A > logs/k8s-events.log 2>&1 || true - name: Upload logs diff --git a/backend/tests/e2e/test_replay_routes.py b/backend/tests/e2e/test_replay_routes.py index 91d86940..a6bbb2ff 100644 --- a/backend/tests/e2e/test_replay_routes.py +++ b/backend/tests/e2e/test_replay_routes.py @@ -209,7 +209,7 @@ class TestResumeReplaySession: async def test_resume_replay_session( self, test_admin: AsyncClient ) -> None: - """Resume a replay session.""" + """Resume a paused replay session.""" request = ReplayRequest( replay_type=ReplayType.TIME_RANGE, target=ReplayTarget.KAFKA, @@ -221,12 +221,26 @@ async def test_resume_replay_session( assert create_response.status_code == 200 session = ReplayResponse.model_validate(create_response.json()) + # Start session first + start_response = await test_admin.post( + f"/api/v1/replay/sessions/{session.session_id}/start" + ) + assert start_response.status_code == 200 + + # Pause session + pause_response = await test_admin.post( + f"/api/v1/replay/sessions/{session.session_id}/pause" + ) + assert pause_response.status_code == 200 + + # Resume session response = await test_admin.post( f"/api/v1/replay/sessions/{session.session_id}/resume" ) assert response.status_code == 200 result = ReplayResponse.model_validate(response.json()) assert result.session_id == session.session_id + assert result.status == ReplayStatus.RUNNING @pytest.mark.asyncio async def test_resume_nonexistent_session(