Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
from app.services.auth_service import AuthService
from app.services.coordinator.coordinator import ExecutionCoordinator
from app.services.event_bus import EventBusManager
from app.services.event_bus import EventBus, EventBusEvent
from app.services.event_replay.replay_service import EventReplayService
from app.services.event_service import EventService
from app.services.execution_service import ExecutionService
Expand All @@ -73,7 +73,7 @@
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge, create_sse_kafka_redis_bridge
from app.services.sse.redis_bus import SSERedisBus
from app.services.sse.sse_service import SSEService
from app.services.sse.sse_shutdown_manager import SSEShutdownManager, create_sse_shutdown_manager
from app.services.sse.sse_shutdown_manager import SSEShutdownManager
from app.services.user_settings_service import UserSettingsService
from app.settings import Settings

Expand Down Expand Up @@ -230,14 +230,11 @@ async def get_event_store_consumer(
yield consumer

@provide
async def get_event_bus_manager(
async def get_event_bus(
self, settings: Settings, logger: logging.Logger, connection_metrics: ConnectionMetrics
) -> AsyncIterator[EventBusManager]:
manager = EventBusManager(settings, logger, connection_metrics)
try:
yield manager
finally:
await manager.close()
) -> AsyncIterator[EventBus]:
async with EventBus(settings, logger, connection_metrics) as bus:
yield bus


class KubernetesProvider(Provider):
Expand Down Expand Up @@ -398,9 +395,16 @@ async def get_sse_kafka_redis_bridge(

@provide(scope=Scope.REQUEST)
def get_sse_shutdown_manager(
self, logger: logging.Logger, connection_metrics: ConnectionMetrics
self,
router: SSEKafkaRedisBridge,
logger: logging.Logger,
connection_metrics: ConnectionMetrics,
) -> SSEShutdownManager:
return create_sse_shutdown_manager(logger=logger, connection_metrics=connection_metrics)
return SSEShutdownManager(
router=router,
logger=logger,
connection_metrics=connection_metrics,
)
Comment on lines 421 to +430
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find SSEConnectionRegistry class definition
fd -t f "\.py$" | xargs grep -l "class SSEConnectionRegistry"

Repository: HardMax71/Integr8sCode

Length of output: 188


🏁 Script executed:

# Search for get_connection_count usage
rg "get_connection_count" --type py -A 3 -B 3

Repository: HardMax71/Integr8sCode

Length of output: 6270


🏁 Script executed:

# Search for SSEConnectionRegistry usage
rg "SSEConnectionRegistry" --type py -A 2 -B 2

Repository: HardMax71/Integr8sCode

Length of output: 6034


🏁 Script executed:

# Check how SSEService is provided
rg "@provide.*SSEService" --type py -A 5 -B 2

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Check for Scope definitions and their meanings
rg "class Scope" --type py -A 10

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Look at sse_connection_registry.py to understand persistence expectations
cat -n backend/app/services/sse/sse_connection_registry.py | head -50

Repository: HardMax71/Integr8sCode

Length of output: 2296


🏁 Script executed:

# Search for SSEService provider definition more broadly
rg "get_sse_service|SSEService" backend/app/core/providers.py -A 5 -B 5

Repository: HardMax71/Integr8sCode

Length of output: 1278


🏁 Script executed:

# Look at the full providers.py around the SSEService definition
rg "def get_" backend/app/core/providers.py | grep -i sse

Repository: HardMax71/Integr8sCode

Length of output: 355


🏁 Script executed:

# Check SSEService class to understand if it needs app-wide state
cat -n backend/app/services/sse/sse_service.py | head -80

Repository: HardMax71/Integr8sCode

Length of output: 3831


🏁 Script executed:

# Check if there's any caching or special handling for REQUEST scope singletons
rg "Scope\." backend/app/core/providers.py -B 2 -A 1 | head -40

Repository: HardMax71/Integr8sCode

Length of output: 706


🏁 Script executed:

# Look for health status endpoint that uses get_connection_count
rg "get_health_status" --type py -A 10 -B 5

Repository: HardMax71/Integr8sCode

Length of output: 3730


🏁 Script executed:

# Search for any test that shows REQUEST vs APP scope behavior
rg "Scope\.(REQUEST|APP)" backend/tests --type py | head -20

Repository: HardMax71/Integr8sCode

Length of output: 48


SSEConnectionRegistry scope must be Scope.APP, not Scope.REQUEST.

With Scope.REQUEST, a new registry instance is created per HTTP request, resulting in an empty _active_connections dictionary on every health status check. This causes get_connection_count() to always return 0. The registry is designed to track active connections globally across the application—registrations from one request's SSE stream must be visible when the health endpoint is called from a different request. The internal asyncio.Lock() and the persistent state expectations confirm the registry is intended as application-wide infrastructure.

🤖 Prompt for AI Agents
In `@backend/app/core/providers.py` around lines 463 - 472, The provider
get_sse_connection_registry currently creates an SSEConnectionRegistry per
request (provide(scope=Scope.REQUEST)), which resets _active_connections and
causes get_connection_count() to always return 0; change the provider to use
application scope (provide(scope=Scope.APP)) so SSEConnectionRegistry is a
singleton for the app, preserving the asyncio.Lock() and shared
_active_connections across requests and allowing registrations from SSE streams
to be visible to the health endpoint.


@provide(scope=Scope.REQUEST)
def get_sse_service(
Expand All @@ -413,7 +417,6 @@ def get_sse_service(
logger: logging.Logger,
connection_metrics: ConnectionMetrics,
) -> SSEService:
shutdown_manager.set_router(router)
return SSEService(
repository=sse_repository,
router=router,
Expand Down Expand Up @@ -470,11 +473,21 @@ async def get_user_settings_service(
self,
repository: UserSettingsRepository,
kafka_event_service: KafkaEventService,
event_bus_manager: EventBusManager,
event_bus: EventBus,
logger: logging.Logger,
) -> UserSettingsService:
service = UserSettingsService(repository, kafka_event_service, logger)
await service.initialize(event_bus_manager)
service = UserSettingsService(repository, kafka_event_service, logger, event_bus)

# Subscribe to settings update events for cross-instance cache invalidation.
# EventBus filters out self-published messages, so this handler only
# runs for events from OTHER instances.
async def _handle_settings_update(evt: EventBusEvent) -> None:
uid = evt.payload.get("user_id")
if uid:
await service.invalidate_cache(str(uid))

await event_bus.subscribe("user.settings.updated*", _handle_settings_update)

return service


Expand Down Expand Up @@ -503,27 +516,25 @@ def get_notification_service(
self,
notification_repository: NotificationRepository,
kafka_event_service: KafkaEventService,
event_bus_manager: EventBusManager,
event_bus: EventBus,
schema_registry: SchemaRegistryManager,
sse_redis_bus: SSERedisBus,
settings: Settings,
logger: logging.Logger,
notification_metrics: NotificationMetrics,
event_metrics: EventMetrics,
) -> NotificationService:
service = NotificationService(
return NotificationService(
notification_repository=notification_repository,
event_service=kafka_event_service,
event_bus_manager=event_bus_manager,
event_bus=event_bus,
schema_registry_manager=schema_registry,
sse_bus=sse_redis_bus,
settings=settings,
logger=logger,
notification_metrics=notification_metrics,
event_metrics=event_metrics,
)
service.initialize()
return service

@provide
def get_grafana_alert_processor(
Expand Down
30 changes: 0 additions & 30 deletions backend/app/services/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError
from fastapi import Request
from pydantic import BaseModel, ConfigDict

from app.core.lifecycle import LifecycleEnabled
Expand Down Expand Up @@ -316,32 +315,3 @@ async def get_statistics(self) -> dict[str, Any]:
}


class EventBusManager:
"""Manages EventBus lifecycle as a singleton."""

def __init__(self, settings: Settings, logger: logging.Logger, connection_metrics: ConnectionMetrics) -> None:
self.settings = settings
self.logger = logger
self._connection_metrics = connection_metrics
self._event_bus: Optional[EventBus] = None
self._lock = asyncio.Lock()

async def get_event_bus(self) -> EventBus:
"""Get or create the event bus instance."""
async with self._lock:
if self._event_bus is None:
self._event_bus = EventBus(self.settings, self.logger, self._connection_metrics)
await self._event_bus.__aenter__()
return self._event_bus

async def close(self) -> None:
"""Stop and clean up the event bus."""
async with self._lock:
if self._event_bus:
await self._event_bus.aclose()
self._event_bus = None


async def get_event_bus(request: Request) -> EventBus:
manager: EventBusManager = request.app.state.event_bus_manager
return await manager.get_event_bus()
2 changes: 0 additions & 2 deletions backend/app/services/grafana_alert_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Grafana alert processing service."""

import logging
from typing import Any

Expand Down
49 changes: 17 additions & 32 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from app.events.schema.schema_registry import SchemaRegistryManager
from app.infrastructure.kafka.mappings import get_topic_for_event
from app.schemas_pydantic.sse import RedisNotificationMessage
from app.services.event_bus import EventBusManager
from app.services.event_bus import EventBus
from app.services.kafka_event_service import KafkaEventService
from app.services.sse.redis_bus import SSERedisBus
from app.settings import Settings
Expand All @@ -59,8 +59,6 @@
class ServiceState(StringEnum):
"""Service lifecycle states."""

IDLE = auto()
INITIALIZING = auto()
RUNNING = auto()
STOPPING = auto()
STOPPED = auto()
Expand Down Expand Up @@ -117,7 +115,7 @@ def __init__(
self,
notification_repository: NotificationRepository,
event_service: KafkaEventService,
event_bus_manager: EventBusManager,
event_bus: EventBus,
schema_registry_manager: SchemaRegistryManager,
sse_bus: SSERedisBus,
settings: Settings,
Expand All @@ -127,7 +125,7 @@ def __init__(
) -> None:
self.repository = notification_repository
self.event_service = event_service
self.event_bus_manager = event_bus_manager
self.event_bus = event_bus
self.metrics = notification_metrics
self._event_metrics = event_metrics
self.settings = settings
Expand All @@ -136,7 +134,7 @@ def __init__(
self.logger = logger

# State
self._state = ServiceState.IDLE
self._state = ServiceState.RUNNING
self._throttle_cache = ThrottleCache()

# Tasks
Expand All @@ -146,6 +144,16 @@ def __init__(
self._dispatcher: EventDispatcher | None = None
self._consumer_task: asyncio.Task[None] | None = None

# Channel handlers mapping
self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.IN_APP: self._send_in_app,
NotificationChannel.WEBHOOK: self._send_webhook,
NotificationChannel.SLACK: self._send_slack,
}

# Start background processors
self._start_background_tasks()

self.logger.info(
"NotificationService initialized",
extra={
Expand All @@ -155,30 +163,10 @@ def __init__(
},
)

# Channel handlers mapping
self._channel_handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.IN_APP: self._send_in_app,
NotificationChannel.WEBHOOK: self._send_webhook,
NotificationChannel.SLACK: self._send_slack,
}

@property
def state(self) -> ServiceState:
return self._state

def initialize(self) -> None:
if self._state != ServiceState.IDLE:
self.logger.warning(f"Cannot initialize in state: {self._state}")
return

self._state = ServiceState.INITIALIZING

# Start processors
self._state = ServiceState.RUNNING
self._start_background_tasks()

self.logger.info("Notification service initialized (without Kafka consumer)")

async def shutdown(self) -> None:
"""Shutdown notification service."""
if self._state == ServiceState.STOPPED:
Expand Down Expand Up @@ -324,8 +312,7 @@ async def create_notification(
notification = await self.repository.create_notification(create_data)

# Publish event
event_bus = await self.event_bus_manager.get_event_bus()
await event_bus.publish(
await self.event_bus.publish(
"notifications.created",
{
"notification_id": str(notification.notification_id),
Expand Down Expand Up @@ -694,9 +681,8 @@ async def mark_as_read(self, user_id: str, notification_id: str) -> bool:
"""Mark notification as read."""
success = await self.repository.mark_as_read(notification_id, user_id)

event_bus = await self.event_bus_manager.get_event_bus()
if success:
await event_bus.publish(
await self.event_bus.publish(
"notifications.read",
{"notification_id": str(notification_id), "user_id": user_id, "read_at": datetime.now(UTC).isoformat()},
)
Expand Down Expand Up @@ -778,9 +764,8 @@ async def mark_all_as_read(self, user_id: str) -> int:
"""Mark all notifications as read for a user."""
count = await self.repository.mark_all_as_read(user_id)

event_bus = await self.event_bus_manager.get_event_bus()
if count > 0:
await event_bus.publish(
await self.event_bus.publish(
"notifications.all_read", {"user_id": user_id, "count": count, "read_at": datetime.now(UTC).isoformat()}
)

Expand Down
36 changes: 18 additions & 18 deletions backend/app/services/sse/sse_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class SSEService:
}

def __init__(
self,
repository: SSERepository,
router: SSEKafkaRedisBridge,
sse_bus: SSERedisBus,
shutdown_manager: SSEShutdownManager,
settings: Settings,
logger: logging.Logger,
connection_metrics: ConnectionMetrics,
self,
repository: SSERepository,
router: SSEKafkaRedisBridge,
sse_bus: SSERedisBus,
shutdown_manager: SSEShutdownManager,
settings: Settings,
logger: logging.Logger,
connection_metrics: ConnectionMetrics,
) -> None:
self.repository = repository
self.router = router
Expand All @@ -48,7 +48,7 @@ def __init__(
self.settings = settings
self.logger = logger
self.metrics = connection_metrics
self.heartbeat_interval = getattr(settings, "SSE_HEARTBEAT_INTERVAL", 30)
self.heartbeat_interval = settings.SSE_HEARTBEAT_INTERVAL

async def create_execution_stream(self, execution_id: str, user_id: str) -> AsyncGenerator[Dict[str, Any], None]:
connection_id = f"sse_{execution_id}_{datetime.now(timezone.utc).timestamp()}"
Expand Down Expand Up @@ -106,10 +106,10 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn
self.metrics.record_sse_message_sent("executions", "status")

async for event_data in self._stream_events_redis(
execution_id,
subscription,
shutdown_event,
include_heartbeat=False,
execution_id,
subscription,
shutdown_event,
include_heartbeat=False,
):
yield event_data

Expand All @@ -120,11 +120,11 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn
self.logger.info("SSE connection closed", extra={"execution_id": execution_id})

async def _stream_events_redis(
self,
execution_id: str,
subscription: Any,
shutdown_event: asyncio.Event,
include_heartbeat: bool = True,
self,
execution_id: str,
subscription: Any,
shutdown_event: asyncio.Event,
include_heartbeat: bool = True,
) -> AsyncGenerator[Dict[str, Any], None]:
last_heartbeat = datetime.now(timezone.utc)
while True:
Expand Down
Loading
Loading