Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Jan 29, 2026


Summary by cubic

Consolidated replay logic by removing ReplayService and moving session operations into EventReplayService. Injected ReplayMetrics via DI to use the shared metrics instance and decouple setup.

  • Refactors
    • Providers, routes, admin services, and tests now use EventReplayService directly; removed ReplayService.
    • EventReplayService adds create/start/pause/resume/cancel methods, returns ReplayOperationResult, get_session raises ReplaySessionNotFoundError, and cleanup returns CleanupResult (memory + DB).
    • Removed progress callback; dropped callback target; file writes now use async I/O.

Written for commit c78ae9a. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Replay sessions now support explicit lifecycle operations (create, start, pause, resume, cancel) with standardized list/get responses.
    • Cleanup returns a richer structured summary.
  • Bug Fixes

    • Improved error signaling for missing sessions and invalid state transitions.
  • Refactor

    • Event replay internals reorganized; legacy orchestration layer consolidated and metrics integrated.
  • Tests

    • End-to-end tests updated to exercise the new replay flow (including resume).
  • Chores

    • Added async file I/O dependency and enhanced CI failure log collection.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

📝 Walkthrough

Walkthrough

EventReplayService replaces ReplayService across providers, API routes, admin service, and tests; EventReplayService now accepts ReplayMetrics (not Settings) and exposes explicit session lifecycle APIs and new result/error types; legacy ReplayService and ReplayConfig progress-callback were removed.

Changes

Cohort / File(s) Summary
Provider wiring
backend/app/core/providers.py
Replaced ReplayService with EventReplayService in provider signatures; get_admin_events_service now accepts event_replay_service: EventReplayService; removed get_replay_service; get_event_replay_service uses replay_metrics instead of settings.
Event replay implementation
backend/app/services/event_replay/replay_service.py
Major refactor: constructor now takes replay_metrics: ReplayMetrics; added explicit session lifecycle methods (create/start/pause/resume/cancel), new result/error types, in-memory + repo persistence, cleanup API, pause/resume semantics, and aiofiles-based I/O; removed Settings dependency and progress-callback.
API routes
backend/app/api/routes/replay.py
Routes now depend on FromDishka[EventReplayService]; handlers build responses using result.model_dump()/CleanupResponse(**...); list/get serialization uses config.model_dump() and direct model validation.
Removed orchestration
backend/app/services/replay_service.py
Deleted legacy ReplayService module and all its orchestration methods (create/start/pause/resume/cancel/list/get/cleanup).
Domain model cleanup
backend/app/domain/replay/models.py
Removed PrivateAttr _progress_callback and its accessor/mutator from ReplayConfig.
Admin events service
backend/app/services/admin/admin_events_service.py
Constructor and imports updated to use EventReplayService instead of ReplayService.
Package exports
backend/app/services/event_replay/__init__.py
Removed ReplaySession import and export from __all__.
Tests
backend/tests/e2e/core/test_container.py, backend/tests/e2e/services/replay/test_replay_service.py, backend/tests/e2e/test_replay_routes.py
Test imports, container resolution, type hints and assertions switched from ReplayService to EventReplayService; resume route test updated to resume a paused session.
Dependencies
backend/pyproject.toml
Added aiofiles==25.1.0 for async file I/O used by the new event replay implementation.
CI logs
.github/workflows/stack-tests.yml
Expanded backend-e2e failure log collection to include more services with timestamped docker-compose logs.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as "Replay API"
    participant Service as "EventReplayService"
    participant Repo as "ReplayRepository"
    participant Kafka as "KafkaProducer"
    participant Metrics as "ReplayMetrics"

    Client->>API: POST /replay (config)
    API->>Service: create_session_from_config(config)
    Service->>Repo: persist session (CREATED)
    Service->>Metrics: record session_created
    Service-->>API: ReplayOperationResult (session_id, CREATED)
    API-->>Client: 201 ReplayResponse

    Client->>API: POST /replay/{id}/start
    API->>Service: start_session(session_id)
    Service->>Repo: update session -> RUNNING
    Service->>Kafka: stream replay events
    Service->>Metrics: record replay_started
    Service-->>API: ReplayOperationResult (session_id, RUNNING)
    API-->>Client: 200 ReplayResponse
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I hopped through code with tiny feet,

Metrics tucked in, settings met defeat.
Sessions spring, then pause, then run,
Old service gone, new lifecycle begun.
🥕 A nibble, a hop — replay work well done!

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.23% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title 'fix: event replay service' is vague and does not clearly describe the main changes. While the PR does involve fixes to event replay functionality, the title uses a generic term ('fix') and lacks specific details about consolidation, removal of ReplayService, or API changes. Consider a more descriptive title that captures the primary change, such as 'refactor: consolidate replay logic into EventReplayService' or 'fix: consolidate session operations into EventReplayService'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/event-replay

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 2 files

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 333-349: The cleanup_old_sessions implementation uses
total_removed = max(removed_memory, removed_db) which under-reports when there
are in-memory-only sessions; update cleanup_old_sessions to compute the true
removed count by combining both sources (e.g., sum removed_memory and removed_db
or dedupe by collecting deleted IDs from self._sessions and from await
self._repository.delete_old_sessions) and then set total_removed accordingly,
update the info log extra and the returned
CleanupResult(removed_sessions=total_removed, ...) so removed_memory, removed_db
and total_removed reflect the real numbers (use symbols: cleanup_old_sessions,
self._sessions, removed_memory, removed_db,
self._repository.delete_old_sessions, CleanupResult).
- Around line 351-366: The create_session_from_config flow can leave an
in-memory session in self._sessions if _repository.save_session raises; update
create_session_from_config to ensure cleanup: declare session_id = None before
the try, call create_replay_session as you do, and in the except/finally block
remove the orphaned session from self._sessions (e.g., pop(session_id, None))
when persistence fails, then log and raise ReplayOperationError as before;
reference create_session_from_config, create_replay_session, self._sessions,
_repository.save_session, and ReplayOperationError when making the change.
- Around line 367-397: The
start_session/pause_session/resume_session/cancel_session methods call
start_replay/pause_replay/resume_replay/cancel_replay and then call
self._repository.update_session_status but do not check its return value;
update_session_status can return False when the DB record is missing, causing
in-memory vs persisted divergence. Modify each wrapper to verify the boolean
result from update_session_status and if it is False, raise an exception or
return a ReplayOperationResult indicating failure (e.g., status unchanged and an
error message) instead of returning success; use the existing
ReplayOperationResult type and include the session_id, the intended status on
success, and a clear error message on failure so callers observe the persisted
update failure immediately.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 1 file (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/event_replay/replay_service.py">

<violation number="1" location="backend/app/services/event_replay/replay_service.py:270">
P2: Synchronous file writes inside this async method will block the event loop and can degrade replay throughput. Use a non-blocking approach (executor or async file I/O) for file writes.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
backend/app/services/event_replay/replay_service.py (2)

218-234: Metrics not recorded for exception-handled failures.

When an exception is caught at line 220-224 and skip_errors is True, execution continues without reaching record_event_replayed. This means exception-based failures won't appear in replay metrics.

📊 Proposed fix to record metrics for exception failures
             except Exception as e:
                 await self._handle_replay_error(session, event, e)
+                self._metrics.record_event_replayed(
+                    session.config.replay_type, event.event_type, "failed"
+                )
                 if not session.config.skip_errors:
                     raise
                 continue

264-271: Replace synchronous file I/O with async operations.

The open() and f.write() calls block the event loop, stalling event processing. Use setdefault() for cleaner lock initialization and add aiofiles to handle file I/O asynchronously:

🔧 Proposed fix using aiofiles
+import aiofiles
+
 async def _write_event_to_file(self, event: DomainEvent, file_path: str) -> None:
-    if file_path not in self._file_locks:
-        self._file_locks[file_path] = asyncio.Lock()
+    lock = self._file_locks.setdefault(file_path, asyncio.Lock())

     line = json.dumps(event.model_dump(), default=str) + "\n"
-    async with self._file_locks[file_path]:
-        with open(file_path, "a") as f:
-            f.write(line)
+    async with lock:
+        async with aiofiles.open(file_path, "a") as f:
+            await f.write(line)

Also add aiofiles to backend/pyproject.toml dependencies.

🧹 Nitpick comments (2)
backend/app/services/event_replay/replay_service.py (2)

355-355: Use structured logging for consistency.

Other log statements in this file use extra={} for structured logging. This line uses f-string formatting.

📝 Suggested fix
-            self.logger.error(f"Failed to create replay session: {e}")
+            self.logger.error("Failed to create replay session", extra={"error": str(e)})

390-403: Consider warning-level log or metrics for DB update failures.

Swallowing DB update exceptions prevents replay interruption, but silent failures can lead to DB/memory state divergence. A warning-level log or metric increment would improve observability without breaking the replay.

📊 Suggested improvement
         except Exception as e:
-            self.logger.error(f"Failed to update session in database: {e}")
+            self.logger.warning(
+                "Failed to update session in database, continuing replay",
+                extra={"session_id": session.session_id, "error": str(e)},
+            )

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 3 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/event_replay/replay_service.py">

<violation number="1" location="backend/app/services/event_replay/replay_service.py:63">
P2: resume_session persists RUNNING even when the session isn’t PAUSED, which can desync in-memory state from the database and misreport session status. Guard invalid states before updating.</violation>

<violation number="2" location="backend/app/services/event_replay/replay_service.py:63">
P2: pause_session persists PAUSED even when the session isn’t RUNNING, which can desync in-memory state from the database and incorrectly report a paused session. Guard invalid states and only update when RUNNING.</violation>
</file>

<file name="backend/pyproject.toml">

<violation number="1" location="backend/pyproject.toml:127">
P2: Runtime dependencies are consistently pinned to exact versions; using ">=" here makes builds non-deterministic and can introduce unexpected upgrades. Pin aiofiles to a specific version to match the rest of the dependency policy.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 99-110: The cancel_session flow can race with _run_replay which
may still mark the session COMPLETED in memory; update _run_replay to re-check
the session's persisted/in-memory status (via get_session or repository lookup)
right before you set COMPLETED and bail out if status == ReplayStatus.CANCELLED,
and also ensure you explicitly catch asyncio.CancelledError (or allow it to
propagate) instead of relying on except Exception so cancellation is handled
consistently; update references in _run_replay where it sets session.status =
ReplayStatus.COMPLETED and in the final exception handling to reflect this check
and avoid overwriting a CANCELLED status persisted by cancel_session or
repository.update_session_status.
- Around line 79-97: The pause logic currently sets session.status = PAUSED in
pause_session, but _run_replay and _process_batch treat any status != RUNNING as
a loop-break and later mark the session COMPLETED; change the replay loop in
_run_replay (and apply the same pattern in _process_batch) to explicitly handle
ReplayStatus.PAUSED by awaiting (e.g., loop/wait until status becomes RUNNING)
instead of breaking so the function does not fall through to the completion
path; ensure you check session.status (from get_session or the session object)
in the loop and only break/mark COMPLETED when status == CANCELLED or when
there's a true terminal condition, and keep repository updates
(update_session_status) consistent when entering/exiting PAUSED so
resume_session can succeed.
🧹 Nitpick comments (2)
backend/app/services/event_replay/replay_service.py (2)

257-257: Consider batching session updates for better performance.

_update_session_in_db is called after every event, which could create significant database load during high-throughput replays. Consider updating periodically (e.g., every N events or every few seconds) instead of per-event.


301-312: DB update failures are silently logged.

Exceptions in _update_session_in_db are caught and logged but not propagated. This prevents replay interruption but means in-memory state may diverge from persisted state without the caller knowing. Consider tracking update failures in session state or metrics.

🔧 Suggested enhancement: track update failures
     async def _update_session_in_db(self, session: ReplaySessionState) -> None:
         try:
             session_update = ReplaySessionUpdate(
                 status=session.status,
                 replayed_events=session.replayed_events,
                 failed_events=session.failed_events,
                 skipped_events=session.skipped_events,
                 completed_at=session.completed_at,
             )
             await self._repository.update_replay_session(session_id=session.session_id, updates=session_update)
         except Exception as e:
             self.logger.error(f"Failed to update session in database: {e}")
+            self._metrics.record_db_update_failure()

…t DB, decrement metrics, pop dicts, log) was duplicated across 3 code paths — try success, except CancelledError, and except Exception. Each path had its

   own DB persist, timestamp, and log call.

  After: Each block does only its job:
  - try: Run the replay loop. Set COMPLETED on success. return on non-RUNNING (no break + post-loop guard).
  - except CancelledError: One line — set CANCELLED.
  - except Exception: Set FAILED, append error, record error metric, log the error.
  - finally: ALL shared finalization — completed_at, duration metric (only for COMPLETED), DB persist, decrement active replays, pop from dicts, single log line with final status.

  _replay_event — match/case

  Replaced the if/elif/elif/else chain with match/case on config.target. Single return True after the match block instead of duplicated return True in each branch.

  _process_batch

  Changed break to return on non-RUNNING status for consistency with _run_replay.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/event_replay/replay_service.py (1)

326-336: Silent error handling masks DB persistence failures.

_update_session_in_db catches exceptions and logs them but never re-raises or returns a failure indicator. Callers (like line 283 in _process_batch) have no way to detect or react to persistence failures, which could lead to unnoticed data loss.

Additionally, line 336 uses an f-string while other error logs use extra={} for structured logging.

🛡️ Suggested improvement
-async def _update_session_in_db(self, session: ReplaySessionState) -> None:
+async def _update_session_in_db(self, session: ReplaySessionState) -> bool:
+    """Update session in DB. Returns True on success, False on failure."""
     try:
         session_update = ReplaySessionUpdate(
             status=session.status,
             replayed_events=session.replayed_events,
             failed_events=session.failed_events,
             skipped_events=session.skipped_events,
             completed_at=session.completed_at,
         )
         await self._repository.update_replay_session(session_id=session.session_id, updates=session_update)
+        return True
     except Exception as e:
-        self.logger.error(f"Failed to update session in database: {e}")
+        self.logger.error(
+            "Failed to update session in database",
+            extra={"session_id": session.session_id, "error": str(e)},
+        )
+        return False
🧹 Nitpick comments (1)
backend/app/services/event_replay/replay_service.py (1)

279-283: Consider batching DB updates for performance.

Line 283 calls _update_session_in_db after every single event. For large replays, this creates significant DB write pressure. Consider updating the DB periodically (e.g., every N events or every few seconds) instead of after each event.

♻️ Suggested approach
+# In _process_batch, track events since last persist
+events_since_persist = 0
+PERSIST_INTERVAL = 10  # or make configurable

 for event in batch:
     # ... existing event processing ...
     
     session.last_event_at = event.timestamp
-    await self._update_session_in_db(session)
+    events_since_persist += 1
+    if events_since_persist >= PERSIST_INTERVAL:
+        await self._update_session_in_db(session)
+        events_since_persist = 0
+
+# Persist any remaining state after batch
+if events_since_persist > 0:
+    await self._update_session_in_db(session)

@sonarqubecloud
Copy link

@HardMax71 HardMax71 merged commit 5105ada into main Jan 30, 2026
21 checks passed
@HardMax71 HardMax71 deleted the fix/event-replay branch January 30, 2026 00:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants