Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
from app.services.auth_service import AuthService
from app.services.coordinator.coordinator import ExecutionCoordinator
from app.services.event_bus import EventBusManager
from app.services.event_bus import EventBusEvent, EventBusManager
from app.services.event_replay.replay_service import EventReplayService
from app.services.event_service import EventService
from app.services.execution_service import ExecutionService
Expand Down Expand Up @@ -473,8 +473,20 @@ async def get_user_settings_service(
event_bus_manager: EventBusManager,
logger: logging.Logger,
) -> UserSettingsService:
service = UserSettingsService(repository, kafka_event_service, logger)
await service.initialize(event_bus_manager)
service = UserSettingsService(repository, kafka_event_service, logger, event_bus_manager)

# Subscribe to settings update events for cross-instance cache invalidation.
# EventBus filters out self-published messages, so this handler only
# runs for events from OTHER instances.
bus = await event_bus_manager.get_event_bus()

async def _handle_settings_update(evt: EventBusEvent) -> None:
uid = evt.payload.get("user_id")
if uid:
await service.invalidate_cache(str(uid))

await bus.subscribe("user.settings.updated*", _handle_settings_update)

return service


Expand Down Expand Up @@ -511,7 +523,7 @@ def get_notification_service(
notification_metrics: NotificationMetrics,
event_metrics: EventMetrics,
) -> NotificationService:
service = NotificationService(
return NotificationService(
notification_repository=notification_repository,
event_service=kafka_event_service,
event_bus_manager=event_bus_manager,
Expand All @@ -522,8 +534,6 @@ def get_notification_service(
notification_metrics=notification_metrics,
event_metrics=event_metrics,
)
service.initialize()
return service

@provide
def get_grafana_alert_processor(
Expand Down
34 changes: 11 additions & 23 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
class ServiceState(StringEnum):
"""Service lifecycle states."""

IDLE = auto()
INITIALIZING = auto()
RUNNING = auto()
STOPPING = auto()
STOPPED = auto()
Expand Down Expand Up @@ -136,7 +134,7 @@ def __init__(
self.logger = logger

# State
self._state = ServiceState.IDLE
self._state = ServiceState.RUNNING
self._throttle_cache = ThrottleCache()

# Tasks
Expand All @@ -146,6 +144,16 @@ def __init__(
self._dispatcher: EventDispatcher | None = None
self._consumer_task: asyncio.Task[None] | None = None

# Channel handlers mapping
self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.IN_APP: self._send_in_app,
NotificationChannel.WEBHOOK: self._send_webhook,
NotificationChannel.SLACK: self._send_slack,
}

# Start background processors
self._start_background_tasks()

self.logger.info(
"NotificationService initialized",
extra={
Expand All @@ -155,30 +163,10 @@ def __init__(
},
)

# Channel handlers mapping
self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.IN_APP: self._send_in_app,
NotificationChannel.WEBHOOK: self._send_webhook,
NotificationChannel.SLACK: self._send_slack,
}

@property
def state(self) -> ServiceState:
return self._state

def initialize(self) -> None:
if self._state != ServiceState.IDLE:
self.logger.warning(f"Cannot initialize in state: {self._state}")
return

self._state = ServiceState.INITIALIZING

# Start processors
self._state = ServiceState.RUNNING
self._start_background_tasks()

self.logger.info("Notification service initialized (without Kafka consumer)")

async def shutdown(self) -> None:
"""Shutdown notification service."""
if self._state == ServiceState.STOPPED:
Expand Down
32 changes: 9 additions & 23 deletions backend/app/services/user_settings_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
DomainUserSettingsChangedEvent,
DomainUserSettingsUpdate,
)
from app.services.event_bus import EventBusEvent, EventBusManager
from app.services.event_bus import EventBusManager
from app.services.kafka_event_service import KafkaEventService

_settings_adapter = TypeAdapter(DomainUserSettings)
Expand All @@ -25,19 +25,22 @@

class UserSettingsService:
def __init__(
self, repository: UserSettingsRepository, event_service: KafkaEventService, logger: logging.Logger
self,
repository: UserSettingsRepository,
event_service: KafkaEventService,
logger: logging.Logger,
event_bus_manager: EventBusManager,
) -> None:
self.repository = repository
self.event_service = event_service
self.logger = logger
self._event_bus_manager = event_bus_manager
self._cache_ttl = timedelta(minutes=5)
self._max_cache_size = 1000
self._cache: TTLCache[str, DomainUserSettings] = TTLCache(
maxsize=self._max_cache_size,
ttl=self._cache_ttl.total_seconds(),
)
self._event_bus_manager: EventBusManager | None = None
self._subscription_id: str | None = None

self.logger.info(
"UserSettingsService initialized",
Expand All @@ -53,22 +56,6 @@ async def get_user_settings(self, user_id: str) -> DomainUserSettings:

return await self.get_user_settings_fresh(user_id)
Comment on lines 55 to 63
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Redis cache operations lack error handling.

If Redis becomes unavailable or times out, the await self._redis.get(cache_key) call will raise an exception, causing the entire request to fail. For a cache layer, Redis failures should be non-fatal—falling back to fresh data retrieval from the database.

🛡️ Proposed fix with graceful degradation
     async def get_user_settings(self, user_id: str) -> DomainUserSettings:
         """Get settings with Redis cache; rebuild and cache on miss."""
         cache_key = self._cache_key(user_id)
-        cached = await self._redis.get(cache_key)
-        if cached:
-            self.logger.debug(f"Settings cache hit for user {user_id}")
-            return DomainUserSettings.model_validate_json(cached)
+        try:
+            cached = await self._redis.get(cache_key)
+            if cached:
+                self.logger.debug(f"Settings cache hit for user {user_id}")
+                return DomainUserSettings.model_validate_json(cached)
+        except Exception as e:
+            self.logger.warning(f"Redis cache read failed for user {user_id}: {e}")
 
         return await self.get_user_settings_fresh(user_id)
🤖 Prompt for AI Agents
In `@backend/app/services/user_settings_service.py` around lines 55 - 63, Wrap the
Redis read in get_user_settings with a try/except to catch connection/timeouts
from self._redis.get(cache_key) (and optionally
DomainUserSettings.model_validate_json parsing errors); on exception log a
non-fatal warning via self.logger.warning including the exception and cache_key,
then fall back to calling and returning get_user_settings_fresh(user_id). Ensure
you still log the cache hit when cached succeeds and only use the cached data
when model_validate_json succeeds; do not let Redis exceptions propagate and
crash the request.


async def initialize(self, event_bus_manager: EventBusManager) -> None:
"""Subscribe to settings update events for cross-instance cache invalidation.

Note: EventBus filters out self-published messages, so this handler only
runs for events from OTHER instances.
"""
self._event_bus_manager = event_bus_manager
bus = await event_bus_manager.get_event_bus()

async def _handle(evt: EventBusEvent) -> None:
uid = evt.payload.get("user_id")
if uid:
await self.invalidate_cache(str(uid))

self._subscription_id = await bus.subscribe("user.settings.updated*", _handle)

async def get_user_settings_fresh(self, user_id: str) -> DomainUserSettings:
"""Bypass cache and rebuild settings from snapshot + events."""
snapshot = await self.repository.get_snapshot(user_id)
Expand Down Expand Up @@ -108,9 +95,8 @@ async def update_user_settings(
changes_json = _update_adapter.dump_python(updates, exclude_none=True, mode="json")
await self._publish_settings_event(user_id, changes_json, reason)

if self._event_bus_manager is not None:
bus = await self._event_bus_manager.get_event_bus()
await bus.publish("user.settings.updated", {"user_id": user_id})
bus = await self._event_bus_manager.get_event_bus()
await bus.publish("user.settings.updated", {"user_id": user_id})

self._add_to_cache(user_id, new_settings)
if (await self.repository.count_events_since_snapshot(user_id)) >= 10:
Expand Down
Loading