Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PodMonitorProvider,
RedisProvider,
RepositoryProvider,
ResultProcessorProvider,
SagaOrchestratorProvider,
SettingsProvider,
SSEProvider,
Expand Down Expand Up @@ -73,6 +74,7 @@ def create_result_processor_container(settings: Settings) -> AsyncContainer:
RepositoryProvider(),
EventProvider(),
MessagingProvider(),
ResultProcessorProvider(),
context={Settings: settings},
)

Expand Down
52 changes: 35 additions & 17 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from contextlib import asynccontextmanager
from typing import AsyncGenerator

import redis.asyncio as redis
Expand All @@ -13,9 +13,10 @@
from app.core.startup import initialize_rate_limits
from app.core.tracing import init_tracing
from app.db.docs import ALL_DOCUMENTS
from app.events.core import UnifiedConsumer
from app.events.event_store_consumer import EventStoreConsumer
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
from app.services.notification_service import NotificationService
from app.settings import Settings


Expand All @@ -24,10 +25,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan with dishka dependency injection.

This is much cleaner than the old lifespan.py:
- No dependency_overrides
- No manual service management
- Dishka handles all lifecycle automatically
Services are already initialized by their DI providers (which handle __aenter__/__aexit__).
Lifespan just starts the run() methods as background tasks.
"""
# Get settings and logger from DI container (uses test settings in tests)
container: AsyncContainer = app.state.dishka_container
Expand Down Expand Up @@ -79,15 +78,17 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
database,
redis_client,
rate_limit_metrics,
sse_bridge,
sse_consumers,
event_store_consumer,
notification_service,
) = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(SSEKafkaRedisBridge),
container.get(list[UnifiedConsumer]),
container.get(EventStoreConsumer),
container.get(NotificationService),
)

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
Expand All @@ -98,13 +99,30 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
)
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")

# Phase 3: Start Kafka consumers in parallel
async with AsyncExitStack() as stack:
stack.push_async_callback(sse_bridge.aclose)
stack.push_async_callback(event_store_consumer.aclose)
await asyncio.gather(
sse_bridge.__aenter__(),
event_store_consumer.__aenter__(),
)
logger.info("SSE bridge and EventStoreConsumer started")
# Phase 3: Start run() methods as background tasks
# Note: Services are already initialized by their DI providers (which handle __aenter__/__aexit__)

async def run_sse_consumers() -> None:
"""Run SSE consumers using TaskGroup."""
async with asyncio.TaskGroup() as tg:
for consumer in sse_consumers:
tg.create_task(consumer.run())

tasks = [
asyncio.create_task(run_sse_consumers(), name="sse_consumers"),
asyncio.create_task(event_store_consumer.run(), name="event_store_consumer"),
asyncio.create_task(notification_service.run(), name="notification_service"),
]
logger.info(f"Background services started ({len(sse_consumers)} SSE consumers)")
Comment thread
HardMax71 marked this conversation as resolved.
Outdated

try:
yield
finally:
# Cancel all background tasks on shutdown
logger.info("Shutting down background services...")
for task in tasks:
task.cancel()

# Wait for tasks to finish cancellation
await asyncio.gather(*tasks, return_exceptions=True)
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
logger.info("Background services stopped")
62 changes: 0 additions & 62 deletions backend/app/core/lifecycle.py

This file was deleted.

Loading
Loading