Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 16 additions & 50 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@
from app.core.providers import (
AdminServicesProvider,
AuthProvider,
BoundaryClientProvider,
BusinessServicesProvider,
CoordinatorProvider,
CoreServicesProvider,
DatabaseProvider,
EventProvider,
EventReplayProvider,
K8sWorkerProvider,
KafkaServicesProvider,
KubernetesProvider,
LoggingProvider,
MessagingProvider,
MetricsProvider,
PodMonitorProvider,
RedisProvider,
RedisServicesProvider,
RepositoryProvider,
ResultProcessorProvider,
SagaOrchestratorProvider,
SettingsProvider,
SSEProvider,
Expand All @@ -37,8 +36,8 @@ def create_app_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand Down Expand Up @@ -66,30 +65,14 @@ def create_result_processor_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
EventProvider(),
MessagingProvider(),
context={Settings: settings},
)


def create_coordinator_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the ExecutionCoordinator worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
CoordinatorProvider(),
ResultProcessorProvider(),
context={Settings: settings},
)

Expand All @@ -99,14 +82,12 @@ def create_k8s_worker_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KubernetesProvider(),
K8sWorkerProvider(),
context={Settings: settings},
)
Expand All @@ -117,15 +98,14 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KafkaServicesProvider(),
KubernetesProvider(),
PodMonitorProvider(),
context={Settings: settings},
)
Expand All @@ -136,8 +116,8 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand All @@ -153,8 +133,8 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand All @@ -165,17 +145,3 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
)


def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the DLQ processor worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
context={Settings: settings},
)
14 changes: 0 additions & 14 deletions backend/app/core/database_context.py

This file was deleted.

133 changes: 45 additions & 88 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,67 @@
import asyncio
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

import redis.asyncio as redis
from beanie import init_beanie
from dishka import AsyncContainer
from fastapi import FastAPI
from pymongo.asynchronous.mongo_client import AsyncMongoClient

from app.core.database_context import Database
from app.core.metrics import RateLimitMetrics
from app.core.startup import initialize_rate_limits
from app.core.tracing import init_tracing
from app.db.docs import ALL_DOCUMENTS
from app.events.event_store_consumer import EventStoreConsumer
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
from app.services.notification_service import NotificationService
from app.settings import Settings


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan with dishka dependency injection.

This is much cleaner than the old lifespan.py:
- No dependency_overrides
- No manual service management
- Dishka handles all lifecycle automatically
"""
# Get settings and logger from DI container (uses test settings in tests)
container: AsyncContainer = app.state.dishka_container
container = app.state.dishka_container
settings = await container.get(Settings)
logger = await container.get(logging.Logger)

logger.info(
"Starting application with dishka DI",
extra={
"project_name": settings.PROJECT_NAME,
"environment": "test" if settings.TESTING else "production",
},
client: AsyncMongoClient[dict[str, object]] = AsyncMongoClient(
settings.MONGODB_URL, tz_aware=True, serverSelectionTimeoutMS=5000
)
await init_beanie(database=client[settings.DATABASE_NAME], document_models=ALL_DOCUMENTS)

# Metrics setup moved to app creation to allow middleware registration
logger.info("Lifespan start: tracing and services initialization")
# Start notification service background tasks
notification_service = await container.get(NotificationService)

# Initialize tracing only when enabled (avoid exporter retries in tests)
if settings.ENABLE_TRACING and not settings.TESTING:
instrumentation_report = init_tracing(
service_name=settings.TRACING_SERVICE_NAME,
settings=settings,
logger=logger,
service_version=settings.TRACING_SERVICE_VERSION,
sampling_rate=settings.TRACING_SAMPLING_RATE,
enable_console_exporter=settings.TESTING,
adaptive_sampling=settings.TRACING_ADAPTIVE_SAMPLING,
)
async def pending_notification_task() -> None:
"""Process pending notifications every 5 seconds."""
while True:
try:
await asyncio.sleep(5)
await notification_service.process_pending_batch()
except asyncio.CancelledError:
break
except Exception:
logger.exception("Error processing pending notifications")

if instrumentation_report.has_failures():
logger.warning(
"Some instrumentation libraries failed to initialize",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing initialized successfully",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing disabled",
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
)
async def cleanup_notification_task() -> None:
"""Cleanup old notifications every 24 hours."""
while True:
try:
await asyncio.sleep(86400) # 24 hours
await notification_service.cleanup_old()
except asyncio.CancelledError:
break
except Exception:
logger.exception("Error cleaning up notifications")

# Phase 1: Resolve all DI dependencies in parallel
(
schema_registry,
database,
redis_client,
rate_limit_metrics,
sse_bridge,
event_store_consumer,
) = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(SSEKafkaRedisBridge),
container.get(EventStoreConsumer),
)
pending_task = asyncio.create_task(pending_notification_task())
cleanup_task = asyncio.create_task(cleanup_notification_task())
logger.info("NotificationService background tasks started")

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
await asyncio.gather(
initialize_event_schemas(schema_registry),
init_beanie(database=database, document_models=ALL_DOCUMENTS),
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
)
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")
yield

# Shutdown background tasks
pending_task.cancel()
cleanup_task.cancel()
try:
await asyncio.gather(pending_task, cleanup_task)
except asyncio.CancelledError:
pass
logger.info("NotificationService background tasks stopped")

# Phase 3: Start Kafka consumers in parallel
async with AsyncExitStack() as stack:
stack.push_async_callback(sse_bridge.aclose)
stack.push_async_callback(event_store_consumer.aclose)
await asyncio.gather(
sse_bridge.__aenter__(),
event_store_consumer.__aenter__(),
)
logger.info("SSE bridge and EventStoreConsumer started")
yield
await client.close()
await container.close()
47 changes: 0 additions & 47 deletions backend/app/core/k8s_clients.py

This file was deleted.

Loading
Loading