Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def create_app_container(settings: Settings) -> AsyncContainer:
AdminServicesProvider(),
EventReplayProvider(),
BusinessServicesProvider(),
CoordinatorProvider(),
KubernetesProvider(),
ResourceCleanerProvider(),
FastapiProvider(),
Expand Down
125 changes: 85 additions & 40 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
from app.services.auth_service import AuthService
from app.services.coordinator.coordinator import ExecutionCoordinator
from app.services.coordinator.queue_manager import QueueManager
from app.services.event_bus import EventBusManager
from app.services.event_replay.replay_service import EventReplayService
from app.services.event_service import EventService
Expand Down Expand Up @@ -590,46 +591,9 @@ async def _provide_saga_orchestrator(
yield orchestrator


async def _provide_execution_coordinator(
kafka_producer: UnifiedProducer,
schema_registry: SchemaRegistryManager,
settings: Settings,
event_store: EventStore,
execution_repository: ExecutionRepository,
idempotency_manager: IdempotencyManager,
logger: logging.Logger,
coordinator_metrics: CoordinatorMetrics,
event_metrics: EventMetrics,
) -> AsyncIterator[ExecutionCoordinator]:
"""Shared factory for ExecutionCoordinator with lifecycle management."""
dispatcher = IdempotentEventDispatcher(
logger=logger,
idempotency_manager=idempotency_manager,
key_strategy=KeyStrategy.EVENT_BASED,
ttl_seconds=7200,
)
async with ExecutionCoordinator(
producer=kafka_producer,
schema_registry_manager=schema_registry,
settings=settings,
event_store=event_store,
execution_repository=execution_repository,
dispatcher=dispatcher,
logger=logger,
coordinator_metrics=coordinator_metrics,
event_metrics=event_metrics,
) as coordinator:
yield coordinator


class BusinessServicesProvider(Provider):
scope = Scope.REQUEST

def __init__(self) -> None:
super().__init__()
# Register shared factory functions on instance (avoids warning about missing self)
self.provide(_provide_execution_coordinator)

@provide
def get_saga_service(
self,
Expand Down Expand Up @@ -702,9 +666,90 @@ def get_admin_user_service(
class CoordinatorProvider(Provider):
scope = Scope.APP

def __init__(self) -> None:
super().__init__()
self.provide(_provide_execution_coordinator)
@provide
def get_coordinator_dispatcher(
self, logger: logging.Logger, idempotency_manager: IdempotencyManager
) -> EventDispatcher:
"""Create idempotent EventDispatcher for coordinator."""
return IdempotentEventDispatcher(
logger=logger,
idempotency_manager=idempotency_manager,
key_strategy=KeyStrategy.EVENT_BASED,
ttl_seconds=7200,
)

@provide
def get_queue_manager(
self, logger: logging.Logger, coordinator_metrics: CoordinatorMetrics
) -> QueueManager:
return QueueManager(
logger=logger,
coordinator_metrics=coordinator_metrics,
max_queue_size=10000,
max_executions_per_user=100,
stale_timeout_seconds=3600,
)

@provide
def get_execution_coordinator(
self,
producer: UnifiedProducer,
dispatcher: EventDispatcher,
queue_manager: QueueManager,
execution_repository: ExecutionRepository,
logger: logging.Logger,
coordinator_metrics: CoordinatorMetrics,
) -> ExecutionCoordinator:
"""Create ExecutionCoordinator - registers handlers on dispatcher in constructor."""
return ExecutionCoordinator(
producer=producer,
dispatcher=dispatcher,
queue_manager=queue_manager,
execution_repository=execution_repository,
logger=logger,
coordinator_metrics=coordinator_metrics,
)

@provide
async def get_coordinator_consumer(
self,
coordinator: ExecutionCoordinator, # Ensures coordinator created first (handlers registered)
dispatcher: EventDispatcher,
schema_registry: SchemaRegistryManager,
settings: Settings,
logger: logging.Logger,
event_metrics: EventMetrics,
) -> AsyncIterator[UnifiedConsumer]:
"""Create and start consumer for coordinator."""
consumer_config = ConsumerConfig(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=GroupId.EXECUTION_COORDINATOR,
enable_auto_commit=False,
session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS,
heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS,
max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS,
request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS,
)

consumer = UnifiedConsumer(
consumer_config,
event_dispatcher=dispatcher,
schema_registry=schema_registry,
settings=settings,
logger=logger,
event_metrics=event_metrics,
)

await coordinator.queue_manager.start()
await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.EXECUTION_COORDINATOR]))
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
logger.info("Coordinator consumer started")

try:
yield consumer
finally:
await consumer.stop()
await coordinator.queue_manager.stop()
logger.info("Coordinator consumer stopped")
Comment thread
HardMax71 marked this conversation as resolved.
Outdated


class K8sWorkerProvider(Provider):
Expand Down
3 changes: 0 additions & 3 deletions backend/app/services/coordinator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from app.services.coordinator.coordinator import ExecutionCoordinator
from app.services.coordinator.queue_manager import QueueManager, QueuePriority
from app.services.coordinator.resource_manager import ResourceAllocation, ResourceManager

__all__ = [
"ExecutionCoordinator",
"QueueManager",
"QueuePriority",
"ResourceManager",
"ResourceAllocation",
]
Loading
Loading