Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
from app.services.auth_service import AuthService
from app.services.coordinator.coordinator import ExecutionCoordinator
from app.services.event_bus import EventBusManager
from app.services.event_bus import EventBus, EventBusEvent
from app.services.event_replay.replay_service import EventReplayService
from app.services.event_service import EventService
from app.services.execution_service import ExecutionService
Expand Down Expand Up @@ -230,14 +230,11 @@ async def get_event_store_consumer(
yield consumer

@provide
async def get_event_bus_manager(
async def get_event_bus(
self, settings: Settings, logger: logging.Logger, connection_metrics: ConnectionMetrics
) -> AsyncIterator[EventBusManager]:
manager = EventBusManager(settings, logger, connection_metrics)
try:
yield manager
finally:
await manager.close()
) -> AsyncIterator[EventBus]:
async with EventBus(settings, logger, connection_metrics) as bus:
yield bus


class KubernetesProvider(Provider):
Expand Down Expand Up @@ -470,11 +467,21 @@ async def get_user_settings_service(
self,
repository: UserSettingsRepository,
kafka_event_service: KafkaEventService,
event_bus_manager: EventBusManager,
event_bus: EventBus,
logger: logging.Logger,
) -> UserSettingsService:
service = UserSettingsService(repository, kafka_event_service, logger)
await service.initialize(event_bus_manager)
service = UserSettingsService(repository, kafka_event_service, logger, event_bus)

# Subscribe to settings update events for cross-instance cache invalidation.
# EventBus filters out self-published messages, so this handler only
# runs for events from OTHER instances.
async def _handle_settings_update(evt: EventBusEvent) -> None:
uid = evt.payload.get("user_id")
if uid:
await service.invalidate_cache(str(uid))

await event_bus.subscribe("user.settings.updated*", _handle_settings_update)

return service


Expand Down Expand Up @@ -503,27 +510,25 @@ def get_notification_service(
self,
notification_repository: NotificationRepository,
kafka_event_service: KafkaEventService,
event_bus_manager: EventBusManager,
event_bus: EventBus,
schema_registry: SchemaRegistryManager,
sse_redis_bus: SSERedisBus,
settings: Settings,
logger: logging.Logger,
notification_metrics: NotificationMetrics,
event_metrics: EventMetrics,
) -> NotificationService:
service = NotificationService(
return NotificationService(
notification_repository=notification_repository,
event_service=kafka_event_service,
event_bus_manager=event_bus_manager,
event_bus=event_bus,
schema_registry_manager=schema_registry,
sse_bus=sse_redis_bus,
settings=settings,
logger=logger,
notification_metrics=notification_metrics,
event_metrics=event_metrics,
)
service.initialize()
return service

@provide
def get_grafana_alert_processor(
Expand Down
30 changes: 0 additions & 30 deletions backend/app/services/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError
from fastapi import Request
from pydantic import BaseModel, ConfigDict

from app.core.lifecycle import LifecycleEnabled
Expand Down Expand Up @@ -316,32 +315,3 @@ async def get_statistics(self) -> dict[str, Any]:
}


class EventBusManager:
"""Manages EventBus lifecycle as a singleton."""

def __init__(self, settings: Settings, logger: logging.Logger, connection_metrics: ConnectionMetrics) -> None:
self.settings = settings
self.logger = logger
self._connection_metrics = connection_metrics
self._event_bus: Optional[EventBus] = None
self._lock = asyncio.Lock()

async def get_event_bus(self) -> EventBus:
"""Get or create the event bus instance."""
async with self._lock:
if self._event_bus is None:
self._event_bus = EventBus(self.settings, self.logger, self._connection_metrics)
await self._event_bus.__aenter__()
return self._event_bus

async def close(self) -> None:
"""Stop and clean up the event bus."""
async with self._lock:
if self._event_bus:
await self._event_bus.aclose()
self._event_bus = None


async def get_event_bus(request: Request) -> EventBus:
manager: EventBusManager = request.app.state.event_bus_manager
return await manager.get_event_bus()
2 changes: 0 additions & 2 deletions backend/app/services/grafana_alert_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Grafana alert processing service."""

import logging
from typing import Any

Expand Down
49 changes: 17 additions & 32 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from app.events.schema.schema_registry import SchemaRegistryManager
from app.infrastructure.kafka.mappings import get_topic_for_event
from app.schemas_pydantic.sse import RedisNotificationMessage
from app.services.event_bus import EventBusManager
from app.services.event_bus import EventBus
from app.services.kafka_event_service import KafkaEventService
from app.services.sse.redis_bus import SSERedisBus
from app.settings import Settings
Expand All @@ -59,8 +59,6 @@
class ServiceState(StringEnum):
"""Service lifecycle states."""

IDLE = auto()
INITIALIZING = auto()
RUNNING = auto()
STOPPING = auto()
STOPPED = auto()
Expand Down Expand Up @@ -117,7 +115,7 @@ def __init__(
self,
notification_repository: NotificationRepository,
event_service: KafkaEventService,
event_bus_manager: EventBusManager,
event_bus: EventBus,
schema_registry_manager: SchemaRegistryManager,
sse_bus: SSERedisBus,
settings: Settings,
Expand All @@ -127,7 +125,7 @@ def __init__(
) -> None:
self.repository = notification_repository
self.event_service = event_service
self.event_bus_manager = event_bus_manager
self.event_bus = event_bus
self.metrics = notification_metrics
self._event_metrics = event_metrics
self.settings = settings
Expand All @@ -136,7 +134,7 @@ def __init__(
self.logger = logger

# State
self._state = ServiceState.IDLE
self._state = ServiceState.RUNNING
self._throttle_cache = ThrottleCache()

# Tasks
Expand All @@ -146,6 +144,16 @@ def __init__(
self._dispatcher: EventDispatcher | None = None
self._consumer_task: asyncio.Task[None] | None = None

# Channel handlers mapping
self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.IN_APP: self._send_in_app,
NotificationChannel.WEBHOOK: self._send_webhook,
NotificationChannel.SLACK: self._send_slack,
}

# Start background processors
self._start_background_tasks()

self.logger.info(
"NotificationService initialized",
extra={
Expand All @@ -155,30 +163,10 @@ def __init__(
},
)

# Channel handlers mapping
self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.IN_APP: self._send_in_app,
NotificationChannel.WEBHOOK: self._send_webhook,
NotificationChannel.SLACK: self._send_slack,
}

@property
def state(self) -> ServiceState:
return self._state

def initialize(self) -> None:
if self._state != ServiceState.IDLE:
self.logger.warning(f"Cannot initialize in state: {self._state}")
return

self._state = ServiceState.INITIALIZING

# Start processors
self._state = ServiceState.RUNNING
self._start_background_tasks()

self.logger.info("Notification service initialized (without Kafka consumer)")

async def shutdown(self) -> None:
"""Shutdown notification service."""
if self._state == ServiceState.STOPPED:
Expand Down Expand Up @@ -324,8 +312,7 @@ async def create_notification(
notification = await self.repository.create_notification(create_data)

# Publish event
event_bus = await self.event_bus_manager.get_event_bus()
await event_bus.publish(
await self.event_bus.publish(
"notifications.created",
{
"notification_id": str(notification.notification_id),
Expand Down Expand Up @@ -694,9 +681,8 @@ async def mark_as_read(self, user_id: str, notification_id: str) -> bool:
"""Mark notification as read."""
success = await self.repository.mark_as_read(notification_id, user_id)

event_bus = await self.event_bus_manager.get_event_bus()
if success:
await event_bus.publish(
await self.event_bus.publish(
"notifications.read",
{"notification_id": str(notification_id), "user_id": user_id, "read_at": datetime.now(UTC).isoformat()},
)
Expand Down Expand Up @@ -778,9 +764,8 @@ async def mark_all_as_read(self, user_id: str) -> int:
"""Mark all notifications as read for a user."""
count = await self.repository.mark_all_as_read(user_id)

event_bus = await self.event_bus_manager.get_event_bus()
if count > 0:
await event_bus.publish(
await self.event_bus.publish(
"notifications.all_read", {"user_id": user_id, "count": count, "read_at": datetime.now(UTC).isoformat()}
)

Expand Down
31 changes: 8 additions & 23 deletions backend/app/services/user_settings_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
DomainUserSettingsChangedEvent,
DomainUserSettingsUpdate,
)
from app.services.event_bus import EventBusEvent, EventBusManager
from app.services.event_bus import EventBus
from app.services.kafka_event_service import KafkaEventService

_settings_adapter = TypeAdapter(DomainUserSettings)
Expand All @@ -25,19 +25,22 @@

class UserSettingsService:
def __init__(
self, repository: UserSettingsRepository, event_service: KafkaEventService, logger: logging.Logger
self,
repository: UserSettingsRepository,
event_service: KafkaEventService,
logger: logging.Logger,
event_bus: EventBus,
) -> None:
self.repository = repository
self.event_service = event_service
self.logger = logger
self._event_bus = event_bus
self._cache_ttl = timedelta(minutes=5)
self._max_cache_size = 1000
self._cache: TTLCache[str, DomainUserSettings] = TTLCache(
maxsize=self._max_cache_size,
ttl=self._cache_ttl.total_seconds(),
)
self._event_bus_manager: EventBusManager | None = None
self._subscription_id: str | None = None

self.logger.info(
"UserSettingsService initialized",
Expand All @@ -53,22 +56,6 @@ async def get_user_settings(self, user_id: str) -> DomainUserSettings:

return await self.get_user_settings_fresh(user_id)
Comment on lines 55 to 63
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Redis cache operations lack error handling.

If Redis becomes unavailable or times out, the await self._redis.get(cache_key) call will raise an exception, causing the entire request to fail. For a cache layer, Redis failures should be non-fatal—falling back to fresh data retrieval from the database.

🛡️ Proposed fix with graceful degradation
     async def get_user_settings(self, user_id: str) -> DomainUserSettings:
         """Get settings with Redis cache; rebuild and cache on miss."""
         cache_key = self._cache_key(user_id)
-        cached = await self._redis.get(cache_key)
-        if cached:
-            self.logger.debug(f"Settings cache hit for user {user_id}")
-            return DomainUserSettings.model_validate_json(cached)
+        try:
+            cached = await self._redis.get(cache_key)
+            if cached:
+                self.logger.debug(f"Settings cache hit for user {user_id}")
+                return DomainUserSettings.model_validate_json(cached)
+        except Exception as e:
+            self.logger.warning(f"Redis cache read failed for user {user_id}: {e}")
 
         return await self.get_user_settings_fresh(user_id)
🤖 Prompt for AI Agents
In `@backend/app/services/user_settings_service.py` around lines 55 - 63, Wrap the
Redis read in get_user_settings with a try/except to catch connection/timeouts
from self._redis.get(cache_key) (and optionally
DomainUserSettings.model_validate_json parsing errors); on exception log a
non-fatal warning via self.logger.warning including the exception and cache_key,
then fall back to calling and returning get_user_settings_fresh(user_id). Ensure
you still log the cache hit when cached succeeds and only use the cached data
when model_validate_json succeeds; do not let Redis exceptions propagate and
crash the request.


async def initialize(self, event_bus_manager: EventBusManager) -> None:
"""Subscribe to settings update events for cross-instance cache invalidation.

Note: EventBus filters out self-published messages, so this handler only
runs for events from OTHER instances.
"""
self._event_bus_manager = event_bus_manager
bus = await event_bus_manager.get_event_bus()

async def _handle(evt: EventBusEvent) -> None:
uid = evt.payload.get("user_id")
if uid:
await self.invalidate_cache(str(uid))

self._subscription_id = await bus.subscribe("user.settings.updated*", _handle)

async def get_user_settings_fresh(self, user_id: str) -> DomainUserSettings:
"""Bypass cache and rebuild settings from snapshot + events."""
snapshot = await self.repository.get_snapshot(user_id)
Expand Down Expand Up @@ -108,9 +95,7 @@ async def update_user_settings(
changes_json = _update_adapter.dump_python(updates, exclude_none=True, mode="json")
await self._publish_settings_event(user_id, changes_json, reason)

if self._event_bus_manager is not None:
bus = await self._event_bus_manager.get_event_bus()
await bus.publish("user.settings.updated", {"user_id": user_id})
await self._event_bus.publish("user.settings.updated", {"user_id": user_id})

self._add_to_cache(user_id, new_settings)
if (await self.repository.count_events_since_snapshot(user_id)) >= 10:
Expand Down
5 changes: 2 additions & 3 deletions backend/tests/integration/services/events/test_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
from aiokafka import AIOKafkaProducer
from app.domain.enums.kafka import KafkaTopic
from app.services.event_bus import EventBusEvent, EventBusManager
from app.services.event_bus import EventBus, EventBusEvent
from app.settings import Settings
from dishka import AsyncContainer

Expand All @@ -15,8 +15,7 @@
@pytest.mark.asyncio
async def test_event_bus_publish_subscribe(scope: AsyncContainer, test_settings: Settings) -> None:
"""Test EventBus receives events from other instances (cross-instance communication)."""
manager: EventBusManager = await scope.get(EventBusManager)
bus = await manager.get_event_bus()
bus: EventBus = await scope.get(EventBus)

# Future resolves when handler receives the event - no polling needed
received_future: asyncio.Future[EventBusEvent] = asyncio.get_running_loop().create_future()
Expand Down
Loading