Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 18 additions & 29 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
from app.core.providers import (
AdminServicesProvider,
AuthProvider,
BoundaryClientProvider,
BusinessServicesProvider,
CoordinatorProvider,
CoreServicesProvider,
DatabaseProvider,
EventProvider,
EventReplayProvider,
K8sWorkerProvider,
KafkaServicesProvider,
KubernetesProvider,
LoggingProvider,
MessagingProvider,
MetricsProvider,
PodMonitorProvider,
RedisProvider,
RedisServicesProvider,
RepositoryProvider,
ResultProcessorProvider,
SagaOrchestratorProvider,
SettingsProvider,
SSEProvider,
Expand All @@ -38,7 +38,8 @@ def create_app_container(settings: Settings) -> AsyncContainer:
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand Down Expand Up @@ -67,29 +68,14 @@ def create_result_processor_container(settings: Settings) -> AsyncContainer:
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
EventProvider(),
MessagingProvider(),
context={Settings: settings},
)


def create_coordinator_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the ExecutionCoordinator worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
CoordinatorProvider(),
ResultProcessorProvider(),
context={Settings: settings},
)

Expand All @@ -100,13 +86,13 @@ def create_k8s_worker_container(settings: Settings) -> AsyncContainer:
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KubernetesProvider(),
K8sWorkerProvider(),
context={Settings: settings},
)
Expand All @@ -118,14 +104,14 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer:
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KafkaServicesProvider(),
KubernetesProvider(),
PodMonitorProvider(),
context={Settings: settings},
)
Expand All @@ -137,7 +123,8 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand All @@ -154,7 +141,8 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand All @@ -171,7 +159,8 @@ def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand Down
109 changes: 5 additions & 104 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,11 @@
import asyncio
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

import redis.asyncio as redis
from beanie import init_beanie
from dishka import AsyncContainer
from fastapi import FastAPI

from app.core.database_context import Database
from app.core.metrics import RateLimitMetrics
from app.core.startup import initialize_rate_limits
from app.core.tracing import init_tracing
from app.db.docs import ALL_DOCUMENTS
from app.events.event_store_consumer import EventStoreConsumer
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
from app.settings import Settings


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan with dishka dependency injection.

This is much cleaner than the old lifespan.py:
- No dependency_overrides
- No manual service management
- Dishka handles all lifecycle automatically
"""
# Get settings and logger from DI container (uses test settings in tests)
container: AsyncContainer = app.state.dishka_container
settings = await container.get(Settings)
logger = await container.get(logging.Logger)

logger.info(
"Starting application with dishka DI",
extra={
"project_name": settings.PROJECT_NAME,
"environment": "test" if settings.TESTING else "production",
},
)

# Metrics setup moved to app creation to allow middleware registration
logger.info("Lifespan start: tracing and services initialization")

# Initialize tracing only when enabled (avoid exporter retries in tests)
if settings.ENABLE_TRACING and not settings.TESTING:
instrumentation_report = init_tracing(
service_name=settings.TRACING_SERVICE_NAME,
settings=settings,
logger=logger,
service_version=settings.TRACING_SERVICE_VERSION,
sampling_rate=settings.TRACING_SAMPLING_RATE,
enable_console_exporter=settings.TESTING,
adaptive_sampling=settings.TRACING_ADAPTIVE_SAMPLING,
)

if instrumentation_report.has_failures():
logger.warning(
"Some instrumentation libraries failed to initialize",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing initialized successfully",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing disabled",
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
)

# Phase 1: Resolve all DI dependencies in parallel
(
schema_registry,
database,
redis_client,
rate_limit_metrics,
sse_bridge,
event_store_consumer,
) = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(SSEKafkaRedisBridge),
container.get(EventStoreConsumer),
)

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
await asyncio.gather(
initialize_event_schemas(schema_registry),
init_beanie(database=database, document_models=ALL_DOCUMENTS),
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
)
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")

# Phase 3: Start Kafka consumers in parallel
async with AsyncExitStack() as stack:
stack.push_async_callback(sse_bridge.aclose)
stack.push_async_callback(event_store_consumer.aclose)
await asyncio.gather(
sse_bridge.__aenter__(),
event_store_consumer.__aenter__(),
)
logger.info("SSE bridge and EventStoreConsumer started")
yield
"""Minimal lifespan - container.close() triggers all provider cleanup."""
yield
await app.state.dishka_container.close()
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
47 changes: 0 additions & 47 deletions backend/app/core/k8s_clients.py

This file was deleted.

62 changes: 0 additions & 62 deletions backend/app/core/lifecycle.py

This file was deleted.

Loading
Loading