Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/stack-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ jobs:
- name: Build all images
run: |
docker build -t integr8scode-backend:latest --build-context base=docker-image://integr8scode-base:latest -f ./backend/Dockerfile ./backend
docker build -t integr8scode-coordinator:latest -f backend/workers/Dockerfile.coordinator --build-context base=docker-image://integr8scode-base:latest ./backend
docker build -t integr8scode-k8s-worker:latest -f backend/workers/Dockerfile.k8s_worker --build-context base=docker-image://integr8scode-base:latest ./backend
docker build -t integr8scode-pod-monitor:latest -f backend/workers/Dockerfile.pod_monitor --build-context base=docker-image://integr8scode-base:latest ./backend
docker build -t integr8scode-result-processor:latest -f backend/workers/Dockerfile.result_processor --build-context base=docker-image://integr8scode-base:latest ./backend
Expand Down Expand Up @@ -169,7 +168,6 @@ jobs:
run: |
docker save \
integr8scode-backend:latest \
integr8scode-coordinator:latest \
integr8scode-k8s-worker:latest \
integr8scode-pod-monitor:latest \
integr8scode-result-processor:latest \
Expand Down Expand Up @@ -319,7 +317,6 @@ jobs:
docker compose logs > logs/docker-compose.log 2>&1
docker compose logs backend > logs/backend.log 2>&1
docker compose logs kafka > logs/kafka.log 2>&1
docker compose logs coordinator > logs/coordinator.log 2>&1 || true
docker compose logs k8s-worker > logs/k8s-worker.log 2>&1 || true
kubectl get events --sort-by='.metadata.creationTimestamp' -A > logs/k8s-events.log 2>&1 || true

Expand Down
66 changes: 16 additions & 50 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@
from app.core.providers import (
AdminServicesProvider,
AuthProvider,
BoundaryClientProvider,
BusinessServicesProvider,
CoordinatorProvider,
CoreServicesProvider,
DatabaseProvider,
EventProvider,
EventReplayProvider,
K8sWorkerProvider,
KafkaServicesProvider,
KubernetesProvider,
LoggingProvider,
MessagingProvider,
MetricsProvider,
PodMonitorProvider,
RedisProvider,
RedisServicesProvider,
RepositoryProvider,
ResultProcessorProvider,
SagaOrchestratorProvider,
SettingsProvider,
SSEProvider,
Expand All @@ -37,8 +36,8 @@ def create_app_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand Down Expand Up @@ -66,30 +65,14 @@ def create_result_processor_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
EventProvider(),
MessagingProvider(),
context={Settings: settings},
)


def create_coordinator_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the ExecutionCoordinator worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
CoordinatorProvider(),
ResultProcessorProvider(),
context={Settings: settings},
)

Expand All @@ -99,14 +82,12 @@ def create_k8s_worker_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KubernetesProvider(),
K8sWorkerProvider(),
context={Settings: settings},
)
Expand All @@ -117,15 +98,14 @@ def create_pod_monitor_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KafkaServicesProvider(),
KubernetesProvider(),
PodMonitorProvider(),
context={Settings: settings},
)
Expand All @@ -136,8 +116,8 @@ def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand All @@ -153,8 +133,8 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
BoundaryClientProvider(),
RedisServicesProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
Expand All @@ -165,17 +145,3 @@ def create_event_replay_container(settings: Settings) -> AsyncContainer:
)


def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the DLQ processor worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
context={Settings: settings},
)
14 changes: 0 additions & 14 deletions backend/app/core/database_context.py

This file was deleted.

133 changes: 45 additions & 88 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,67 @@
import asyncio
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

import redis.asyncio as redis
from beanie import init_beanie
from dishka import AsyncContainer
from fastapi import FastAPI
from pymongo.asynchronous.mongo_client import AsyncMongoClient

from app.core.database_context import Database
from app.core.metrics import RateLimitMetrics
from app.core.startup import initialize_rate_limits
from app.core.tracing import init_tracing
from app.db.docs import ALL_DOCUMENTS
from app.events.event_store_consumer import EventStoreConsumer
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
from app.services.notification_service import NotificationService
from app.settings import Settings


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan with dishka dependency injection.

This is much cleaner than the old lifespan.py:
- No dependency_overrides
- No manual service management
- Dishka handles all lifecycle automatically
"""
# Get settings and logger from DI container (uses test settings in tests)
container: AsyncContainer = app.state.dishka_container
container = app.state.dishka_container
settings = await container.get(Settings)
logger = await container.get(logging.Logger)

logger.info(
"Starting application with dishka DI",
extra={
"project_name": settings.PROJECT_NAME,
"environment": "test" if settings.TESTING else "production",
},
client: AsyncMongoClient[dict[str, object]] = AsyncMongoClient(
settings.MONGODB_URL, tz_aware=True, serverSelectionTimeoutMS=5000
)
await init_beanie(database=client[settings.DATABASE_NAME], document_models=ALL_DOCUMENTS)

# Metrics setup moved to app creation to allow middleware registration
logger.info("Lifespan start: tracing and services initialization")
# Start notification service background tasks
notification_service = await container.get(NotificationService)

# Initialize tracing only when enabled (avoid exporter retries in tests)
if settings.ENABLE_TRACING and not settings.TESTING:
instrumentation_report = init_tracing(
service_name=settings.TRACING_SERVICE_NAME,
settings=settings,
logger=logger,
service_version=settings.TRACING_SERVICE_VERSION,
sampling_rate=settings.TRACING_SAMPLING_RATE,
enable_console_exporter=settings.TESTING,
adaptive_sampling=settings.TRACING_ADAPTIVE_SAMPLING,
)
async def pending_notification_task() -> None:
"""Process pending notifications every 5 seconds."""
while True:
try:
await asyncio.sleep(5)
await notification_service.process_pending_batch()
except asyncio.CancelledError:
break
except Exception:
logger.exception("Error processing pending notifications")

if instrumentation_report.has_failures():
logger.warning(
"Some instrumentation libraries failed to initialize",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing initialized successfully",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing disabled",
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
)
async def cleanup_notification_task() -> None:
"""Cleanup old notifications every 24 hours."""
while True:
try:
await asyncio.sleep(86400) # 24 hours
await notification_service.cleanup_old()
except asyncio.CancelledError:
break
except Exception:
logger.exception("Error cleaning up notifications")

# Phase 1: Resolve all DI dependencies in parallel
(
schema_registry,
database,
redis_client,
rate_limit_metrics,
sse_bridge,
event_store_consumer,
) = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(SSEKafkaRedisBridge),
container.get(EventStoreConsumer),
)
pending_task = asyncio.create_task(pending_notification_task())
cleanup_task = asyncio.create_task(cleanup_notification_task())
logger.info("NotificationService background tasks started")

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
await asyncio.gather(
initialize_event_schemas(schema_registry),
init_beanie(database=database, document_models=ALL_DOCUMENTS),
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
)
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")
yield

# Shutdown background tasks
pending_task.cancel()
cleanup_task.cancel()
try:
await asyncio.gather(pending_task, cleanup_task)
except asyncio.CancelledError:
pass
logger.info("NotificationService background tasks stopped")

# Phase 3: Start Kafka consumers in parallel
async with AsyncExitStack() as stack:
stack.push_async_callback(sse_bridge.aclose)
stack.push_async_callback(event_store_consumer.aclose)
await asyncio.gather(
sse_bridge.__aenter__(),
event_store_consumer.__aenter__(),
)
logger.info("SSE bridge and EventStoreConsumer started")
yield
await client.close()
await container.close()
47 changes: 0 additions & 47 deletions backend/app/core/k8s_clients.py

This file was deleted.

Loading
Loading