Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions backend/app/api/routes/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
from sse_starlette.sse import EventSourceResponse

from app.domain.sse import SSEHealthDomain
from app.schemas_pydantic.notification import NotificationResponse
from app.schemas_pydantic.sse import (
ShutdownStatusResponse,
SSEExecutionEventData,
SSEHealthResponse,
SSENotificationEventData,
)
from app.services.auth_service import AuthService
from app.services.sse.sse_service import SSEService

router = APIRouter(prefix="/events", tags=["sse"], route_class=DishkaRoute)


@router.get("/notifications/stream", responses={200: {"model": SSENotificationEventData}})
@router.get("/notifications/stream", responses={200: {"model": NotificationResponse}})
async def notification_stream(
request: Request,
sse_service: FromDishka[SSEService],
Expand All @@ -25,7 +25,10 @@ async def notification_stream(
"""Stream notifications for authenticated user."""
current_user = await auth_service.get_current_user(request)

return EventSourceResponse(sse_service.create_notification_stream(user_id=current_user.user_id))
return EventSourceResponse(
sse_service.create_notification_stream(user_id=current_user.user_id),
ping=30,
)


@router.get("/executions/{execution_id}", responses={200: {"model": SSEExecutionEventData}})
Expand Down
3 changes: 1 addition & 2 deletions backend/app/core/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from app.core.metrics.base import BaseMetrics, MetricsConfig
from app.core.metrics.base import BaseMetrics
from app.core.metrics.connections import ConnectionMetrics
from app.core.metrics.coordinator import CoordinatorMetrics
from app.core.metrics.database import DatabaseMetrics
Expand All @@ -14,7 +14,6 @@

__all__ = [
"BaseMetrics",
"MetricsConfig",
"ConnectionMetrics",
"CoordinatorMetrics",
"DatabaseMetrics",
Expand Down
63 changes: 7 additions & 56 deletions backend/app/core/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,27 @@
from dataclasses import dataclass

from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.metrics import Meter, NoOpMeterProvider
from opentelemetry.sdk.metrics import MeterProvider as SdkMeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry import metrics

from app.settings import Settings


@dataclass
class MetricsConfig:
service_name: str = "integr8scode-backend"
service_version: str = "1.0.0"
otlp_endpoint: str | None = None
export_interval_millis: int = 10000
console_export_interval_millis: int = 60000


class BaseMetrics:
def __init__(self, settings: Settings, meter_name: str | None = None):
"""Initialize base metrics with its own meter.
"""Initialize base metrics with a meter from the global MeterProvider.

The global MeterProvider is configured once by ``setup_metrics``.
If it hasn't been called (e.g. in tests), the default no-op provider is used.

Args:
settings: Application settings.
settings: Application settings (kept for DI compatibility).
meter_name: Optional name for the meter. Defaults to class name.
"""
config = MetricsConfig(
service_name=settings.TRACING_SERVICE_NAME or "integr8scode-backend",
service_version="1.0.0",
otlp_endpoint=settings.OTEL_EXPORTER_OTLP_ENDPOINT,
)

meter_name = meter_name or self.__class__.__name__
self._meter = self._create_meter(settings, config, meter_name)
self._meter = metrics.get_meter(meter_name)
self._create_instruments()

def _create_meter(self, settings: Settings, config: MetricsConfig, meter_name: str) -> Meter:
"""Create a new meter instance for this collector.

Args:
settings: Application settings
config: Metrics configuration
meter_name: Name for this meter

Returns:
A new meter instance
"""
# If tracing/metrics disabled or no OTLP endpoint configured, use NoOp meter
if not config.otlp_endpoint:
return NoOpMeterProvider().get_meter(meter_name)

resource = Resource.create(
{"service.name": config.service_name, "service.version": config.service_version, "meter.name": meter_name}
)

reader = PeriodicExportingMetricReader(
exporter=OTLPMetricExporter(endpoint=config.otlp_endpoint),
export_interval_millis=config.export_interval_millis,
)

# Each collector gets its own MeterProvider
meter_provider = SdkMeterProvider(resource=resource, metric_readers=[reader])

# Return a meter from this provider
return meter_provider.get_meter(meter_name)

def _create_instruments(self) -> None:
"""Create metric instruments. Override in subclasses."""
pass

def close(self) -> None:
"""Close the metrics collector and clean up resources."""
# Subclasses can override if they need cleanup
pass
30 changes: 14 additions & 16 deletions backend/app/core/middlewares/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time

import psutil
from fastapi import FastAPI
from opentelemetry import metrics
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.metrics import CallbackOptions, Observation
Expand Down Expand Up @@ -118,13 +117,22 @@ def _get_path_template(path: str) -> str:
return path


def setup_metrics(app: FastAPI, settings: Settings, logger: logging.Logger) -> None:
"""Set up OpenTelemetry metrics with OTLP exporter."""
if not settings.OTEL_EXPORTER_OTLP_ENDPOINT:
logger.warning("OTEL_EXPORTER_OTLP_ENDPOINT not configured, skipping metrics setup")
def setup_metrics(settings: Settings, logger: logging.Logger) -> None:
"""Set up the global OpenTelemetry MeterProvider with OTLP exporter.

This is the single initialization point for metrics export. ``BaseMetrics``
subclasses and ``MetricsMiddleware`` obtain meters via the global API
(``opentelemetry.metrics.get_meter``), so this must run before them.
When skipped (tests / missing endpoint), the default no-op provider is used.
"""
if settings.TESTING or not settings.OTEL_EXPORTER_OTLP_ENDPOINT:
logger.info(
"Metrics OTLP export disabled (testing=%s, endpoint=%s)",
settings.TESTING,
settings.OTEL_EXPORTER_OTLP_ENDPOINT,
)
return

# Configure OpenTelemetry resource
resource = Resource.create(
{
SERVICE_NAME: settings.SERVICE_NAME,
Expand All @@ -133,31 +141,21 @@ def setup_metrics(app: FastAPI, settings: Settings, logger: logging.Logger) -> N
}
)

# Configure OTLP exporter (sends to OpenTelemetry Collector or compatible backend)
# Default endpoint is localhost:4317 for gRPC
otlp_exporter = OTLPMetricExporter(endpoint=settings.OTEL_EXPORTER_OTLP_ENDPOINT, insecure=True)

# Create metric reader with 60 second export interval
metric_reader = PeriodicExportingMetricReader(
exporter=otlp_exporter,
export_interval_millis=60000,
)

# Set up the meter provider
meter_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader],
)

# Set the global meter provider
metrics.set_meter_provider(meter_provider)

# Create system metrics
create_system_metrics()

# Add the metrics middleware (disabled for now to avoid DNS issues)
# app.add_middleware(MetricsMiddleware)

logger.info("OpenTelemetry metrics configured with OTLP exporter")


Expand Down
3 changes: 1 addition & 2 deletions backend/app/domain/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
NotificationStatus,
)
from app.domain.enums.saga import SagaState
from app.domain.enums.sse import SSEControlEvent, SSENotificationEvent
from app.domain.enums.sse import SSEControlEvent
from app.domain.enums.user import UserRole

__all__ = [
Expand All @@ -30,7 +30,6 @@
"SagaState",
# SSE
"SSEControlEvent",
"SSENotificationEvent",
# User
"UserRole",
]
9 changes: 0 additions & 9 deletions backend/app/domain/enums/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,3 @@ class SSEControlEvent(StringEnum):
SHUTDOWN = "shutdown"
STATUS = "status"
ERROR = "error"


class SSENotificationEvent(StringEnum):
"""Event types for notification SSE streams."""

CONNECTED = "connected"
SUBSCRIBED = "subscribed"
HEARTBEAT = "heartbeat"
NOTIFICATION = "notification"
2 changes: 1 addition & 1 deletion backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
container = create_app_container(settings)
setup_dishka(container, app)

setup_metrics(app, settings, logger)
setup_metrics(settings, logger)
app.add_middleware(MetricsMiddleware)
app.add_middleware(RateLimitMiddleware, settings=settings)
app.add_middleware(CSRFMiddleware, container=container)
Expand Down
27 changes: 1 addition & 26 deletions backend/app/schemas_pydantic/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from app.domain.enums.events import EventType
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.notification import NotificationSeverity, NotificationStatus
from app.domain.enums.sse import SSEControlEvent, SSEHealthStatus, SSENotificationEvent
from app.domain.enums.sse import SSEControlEvent, SSEHealthStatus
from app.schemas_pydantic.execution import ExecutionResult, ResourceUsage

# Type variable for generic Redis message parsing
Expand Down Expand Up @@ -63,31 +63,6 @@ class RedisSSEMessage(BaseModel):
data: dict[str, Any] = Field(description="Full event data from BaseEvent.model_dump()")


class SSENotificationEventData(BaseModel):
"""Typed model for SSE notification stream event payload.

This represents the JSON data sent inside each SSE message for notification streams.
"""

# Always present - identifies the event type
event_type: SSENotificationEvent = Field(description="SSE notification event type")

# Present in control events (connected, heartbeat)
user_id: str | None = Field(default=None, description="User ID for the notification stream")
timestamp: datetime | None = Field(default=None, description="Event timestamp")
message: str | None = Field(default=None, description="Human-readable message")

# Present only in notification events
notification_id: str | None = Field(default=None, description="Unique notification ID")
severity: NotificationSeverity | None = Field(default=None, description="Notification severity level")
status: NotificationStatus | None = Field(default=None, description="Notification delivery status")
tags: list[str] | None = Field(default=None, description="Notification tags")
subject: str | None = Field(default=None, description="Notification subject/title")
body: str | None = Field(default=None, description="Notification body content")
action_url: str | None = Field(default=None, description="Optional action URL")
created_at: datetime | None = Field(default=None, description="Creation timestamp")


class RedisNotificationMessage(BaseModel):
"""Message structure published to Redis for notification SSE delivery."""

Expand Down
81 changes: 22 additions & 59 deletions backend/app/services/sse/sse_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
from app.core.metrics import ConnectionMetrics
from app.db.repositories.sse_repository import SSERepository
from app.domain.enums.events import EventType
from app.domain.enums.sse import SSEControlEvent, SSEHealthStatus, SSENotificationEvent
from app.domain.enums.notification import NotificationChannel
from app.domain.enums.sse import SSEControlEvent, SSEHealthStatus
from app.domain.sse import SSEHealthDomain
from app.schemas_pydantic.execution import ExecutionResult
from app.schemas_pydantic.notification import NotificationResponse
from app.schemas_pydantic.sse import (
RedisNotificationMessage,
RedisSSEMessage,
SSEExecutionEventData,
SSENotificationEventData,
)
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
from app.services.sse.redis_bus import SSERedisBus
Expand Down Expand Up @@ -197,66 +198,32 @@ async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMess

async def create_notification_stream(self, user_id: str) -> AsyncGenerator[dict[str, Any], None]:
subscription = None

try:
# Start opening subscription concurrently, then yield handshake
sub_task = asyncio.create_task(self.sse_bus.open_notification_subscription(user_id))
yield self._format_notification_event(
SSENotificationEventData(
event_type=SSENotificationEvent.CONNECTED,
user_id=user_id,
timestamp=datetime.now(timezone.utc),
message="Connected to notification stream",
)
)
subscription = await self.sse_bus.open_notification_subscription(user_id)
self.logger.info("Notification subscription opened", extra={"user_id": user_id})

# Complete Redis subscription after handshake
subscription = await sub_task

# Signal that subscription is ready - safe to publish notifications now
yield self._format_notification_event(
SSENotificationEventData(
event_type=SSENotificationEvent.SUBSCRIBED,
user_id=user_id,
timestamp=datetime.now(timezone.utc),
message="Redis subscription established",
)
)

last_heartbeat = datetime.now(timezone.utc)
while not self.shutdown_manager.is_shutting_down():
# Heartbeat
now = datetime.now(timezone.utc)
if (now - last_heartbeat).total_seconds() >= self.heartbeat_interval:
yield self._format_notification_event(
SSENotificationEventData(
event_type=SSENotificationEvent.HEARTBEAT,
user_id=user_id,
timestamp=now,
message="Notification stream active",
)
)
last_heartbeat = now

# Forward notification messages as SSE data
redis_msg = await subscription.get(RedisNotificationMessage)
if redis_msg:
yield self._format_notification_event(
SSENotificationEventData(
event_type=SSENotificationEvent.NOTIFICATION,
notification_id=redis_msg.notification_id,
severity=redis_msg.severity,
status=redis_msg.status,
tags=redis_msg.tags,
subject=redis_msg.subject,
body=redis_msg.body,
action_url=redis_msg.action_url,
created_at=redis_msg.created_at,
)
)
if not redis_msg:
continue

notification = NotificationResponse(
notification_id=redis_msg.notification_id,
channel=NotificationChannel.IN_APP,
status=redis_msg.status,
subject=redis_msg.subject,
body=redis_msg.body,
action_url=redis_msg.action_url,
created_at=redis_msg.created_at,
read_at=None,
severity=redis_msg.severity,
tags=redis_msg.tags,
)
yield {"event": "notification", "data": notification.model_dump_json()}
finally:
if subscription is not None:
await asyncio.shield(subscription.close())
self.logger.info("Notification stream closed", extra={"user_id": user_id})

async def get_health_status(self) -> SSEHealthDomain:
router_stats = self.router.get_stats()
Expand All @@ -274,7 +241,3 @@ async def get_health_status(self) -> SSEHealthDomain:
def _format_sse_event(self, event: SSEExecutionEventData) -> dict[str, Any]:
"""Format typed SSE event for sse-starlette."""
return {"data": event.model_dump_json(exclude_none=True)}

def _format_notification_event(self, event: SSENotificationEventData) -> dict[str, Any]:
"""Format typed notification SSE event for sse-starlette."""
return {"data": event.model_dump_json(exclude_none=True)}
1 change: 1 addition & 0 deletions backend/config.test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Differences from config.toml: lower timeouts, faster bcrypt, relaxed rate limits
# Secrets (SECRET_KEY, MONGODB_URL) live in secrets.toml — use secrets.test.toml in CI.

TESTING = true
PROJECT_NAME = "integr8scode"
DATABASE_NAME = "integr8scode_db"
ALGORITHM = "HS256"
Expand Down
Loading
Loading