From a145021a1214da38deee0cf5c21ae1a94cd86b31 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 13:08:48 +0100 Subject: [PATCH 1/5] fix: k8s limits enforce stuff for pods, no need for separate overhead of resource manager -> removed --- backend/app/core/providers.py | 125 ++++-- backend/app/services/coordinator/__init__.py | 3 - .../app/services/coordinator/coordinator.py | 361 ++++-------------- .../services/coordinator/resource_manager.py | 324 ---------------- .../coordinator/test_execution_coordinator.py | 73 ---- .../coordinator/test_resource_manager.py | 61 --- backend/workers/run_coordinator.py | 15 +- docs/architecture/execution-queue.md | 76 ++-- docs/architecture/kafka-topic-architecture.md | 4 +- docs/architecture/lifecycle.md | 2 +- docs/architecture/services-overview.md | 2 +- docs/components/sse/execution-sse-flow.md | 2 +- docs/components/workers/coordinator.md | 29 +- docs/components/workers/index.md | 2 +- docs/operations/metrics-reference.md | 4 +- 15 files changed, 207 insertions(+), 876 deletions(-) delete mode 100644 backend/app/services/coordinator/resource_manager.py delete mode 100644 backend/tests/unit/services/coordinator/test_resource_manager.py diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index fa1c2bd9..b9c9506a 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -55,6 +55,7 @@ from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService from app.services.auth_service import AuthService from app.services.coordinator.coordinator import ExecutionCoordinator +from app.services.coordinator.queue_manager import QueueManager from app.services.event_bus import EventBusManager from app.services.event_replay.replay_service import EventReplayService from app.services.event_service import EventService @@ -590,46 +591,9 @@ async def _provide_saga_orchestrator( yield orchestrator -async def _provide_execution_coordinator( - kafka_producer: UnifiedProducer, - schema_registry: SchemaRegistryManager, - settings: Settings, - event_store: EventStore, - execution_repository: ExecutionRepository, - idempotency_manager: IdempotencyManager, - logger: logging.Logger, - coordinator_metrics: CoordinatorMetrics, - event_metrics: EventMetrics, -) -> AsyncIterator[ExecutionCoordinator]: - """Shared factory for ExecutionCoordinator with lifecycle management.""" - dispatcher = IdempotentEventDispatcher( - logger=logger, - idempotency_manager=idempotency_manager, - key_strategy=KeyStrategy.EVENT_BASED, - ttl_seconds=7200, - ) - async with ExecutionCoordinator( - producer=kafka_producer, - schema_registry_manager=schema_registry, - settings=settings, - event_store=event_store, - execution_repository=execution_repository, - dispatcher=dispatcher, - logger=logger, - coordinator_metrics=coordinator_metrics, - event_metrics=event_metrics, - ) as coordinator: - yield coordinator - - class BusinessServicesProvider(Provider): scope = Scope.REQUEST - def __init__(self) -> None: - super().__init__() - # Register shared factory functions on instance (avoids warning about missing self) - self.provide(_provide_execution_coordinator) - @provide def get_saga_service( self, @@ -702,9 +666,90 @@ def get_admin_user_service( class CoordinatorProvider(Provider): scope = Scope.APP - def __init__(self) -> None: - super().__init__() - self.provide(_provide_execution_coordinator) + @provide + def get_coordinator_dispatcher( + self, logger: logging.Logger, idempotency_manager: IdempotencyManager + ) -> EventDispatcher: + """Create idempotent EventDispatcher for coordinator.""" + return IdempotentEventDispatcher( + logger=logger, + idempotency_manager=idempotency_manager, + key_strategy=KeyStrategy.EVENT_BASED, + ttl_seconds=7200, + ) + + @provide + def get_queue_manager( + self, logger: logging.Logger, coordinator_metrics: CoordinatorMetrics + ) -> QueueManager: + return QueueManager( + logger=logger, + coordinator_metrics=coordinator_metrics, + max_queue_size=10000, + max_executions_per_user=100, + stale_timeout_seconds=3600, + ) + + @provide + def get_execution_coordinator( + self, + producer: UnifiedProducer, + dispatcher: EventDispatcher, + queue_manager: QueueManager, + execution_repository: ExecutionRepository, + logger: logging.Logger, + coordinator_metrics: CoordinatorMetrics, + ) -> ExecutionCoordinator: + """Create ExecutionCoordinator - registers handlers on dispatcher in constructor.""" + return ExecutionCoordinator( + producer=producer, + dispatcher=dispatcher, + queue_manager=queue_manager, + execution_repository=execution_repository, + logger=logger, + coordinator_metrics=coordinator_metrics, + ) + + @provide + async def get_coordinator_consumer( + self, + coordinator: ExecutionCoordinator, # Ensures coordinator created first (handlers registered) + dispatcher: EventDispatcher, + schema_registry: SchemaRegistryManager, + settings: Settings, + logger: logging.Logger, + event_metrics: EventMetrics, + ) -> AsyncIterator[UnifiedConsumer]: + """Create and start consumer for coordinator.""" + consumer_config = ConsumerConfig( + bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, + group_id=GroupId.EXECUTION_COORDINATOR, + enable_auto_commit=False, + session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS, + heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS, + max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS, + request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS, + ) + + consumer = UnifiedConsumer( + consumer_config, + event_dispatcher=dispatcher, + schema_registry=schema_registry, + settings=settings, + logger=logger, + event_metrics=event_metrics, + ) + + await coordinator.queue_manager.start() + await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.EXECUTION_COORDINATOR])) + logger.info("Coordinator consumer started") + + try: + yield consumer + finally: + await consumer.stop() + await coordinator.queue_manager.stop() + logger.info("Coordinator consumer stopped") class K8sWorkerProvider(Provider): diff --git a/backend/app/services/coordinator/__init__.py b/backend/app/services/coordinator/__init__.py index b3890c9d..7c9f6112 100644 --- a/backend/app/services/coordinator/__init__.py +++ b/backend/app/services/coordinator/__init__.py @@ -1,11 +1,8 @@ from app.services.coordinator.coordinator import ExecutionCoordinator from app.services.coordinator.queue_manager import QueueManager, QueuePriority -from app.services.coordinator.resource_manager import ResourceAllocation, ResourceManager __all__ = [ "ExecutionCoordinator", "QueueManager", "QueuePriority", - "ResourceManager", - "ResourceAllocation", ] diff --git a/backend/app/services/coordinator/coordinator.py b/backend/app/services/coordinator/coordinator.py index f938cac8..d948e549 100644 --- a/backend/app/services/coordinator/coordinator.py +++ b/backend/app/services/coordinator/coordinator.py @@ -1,18 +1,14 @@ -import asyncio import logging import time -from collections.abc import Coroutine -from typing import Any, TypeAlias from uuid import uuid4 -from app.core.lifecycle import LifecycleEnabled -from app.core.metrics import CoordinatorMetrics, EventMetrics +from app.core.metrics import CoordinatorMetrics from app.db.repositories.execution_repository import ExecutionRepository from app.domain.enums.events import EventType -from app.domain.enums.kafka import CONSUMER_GROUP_SUBSCRIPTIONS, GroupId from app.domain.enums.storage import ExecutionErrorType from app.domain.events.typed import ( CreatePodCommandEvent, + DomainEvent, EventMetadata, ExecutionAcceptedEvent, ExecutionCancelledEvent, @@ -20,20 +16,11 @@ ExecutionFailedEvent, ExecutionRequestedEvent, ) -from app.events.core import ConsumerConfig, EventDispatcher, UnifiedConsumer, UnifiedProducer -from app.events.event_store import EventStore -from app.events.schema.schema_registry import ( - SchemaRegistryManager, -) +from app.events.core import EventDispatcher, UnifiedProducer from app.services.coordinator.queue_manager import QueueManager, QueuePriority -from app.services.coordinator.resource_manager import ResourceAllocation, ResourceManager -from app.settings import Settings - -EventHandler: TypeAlias = Coroutine[Any, Any, None] -ExecutionMap: TypeAlias = dict[str, ResourceAllocation] -class ExecutionCoordinator(LifecycleEnabled): +class ExecutionCoordinator: """ Coordinates execution scheduling across the system. @@ -41,168 +28,50 @@ class ExecutionCoordinator(LifecycleEnabled): 1. Consumes ExecutionRequested events 2. Manages execution queue with priority 3. Enforces rate limits - 4. Allocates resources - 5. Publishes ExecutionStarted events for workers + 4. Publishes CreatePodCommand events for workers """ def __init__( self, producer: UnifiedProducer, - schema_registry_manager: SchemaRegistryManager, - settings: Settings, - event_store: EventStore, - execution_repository: ExecutionRepository, dispatcher: EventDispatcher, + queue_manager: QueueManager, + execution_repository: ExecutionRepository, logger: logging.Logger, coordinator_metrics: CoordinatorMetrics, - event_metrics: EventMetrics, - consumer_group: str = GroupId.EXECUTION_COORDINATOR, - max_concurrent_scheduling: int = 10, - scheduling_interval_seconds: float = 0.5, - ): - super().__init__() + ) -> None: self.logger = logger self.metrics = coordinator_metrics - self._event_metrics = event_metrics - self._settings = settings - - # Kafka configuration - self.kafka_servers = self._settings.KAFKA_BOOTSTRAP_SERVERS - self.consumer_group = consumer_group - - # Components - self.queue_manager = QueueManager( - logger=self.logger, - coordinator_metrics=coordinator_metrics, - max_queue_size=10000, - max_executions_per_user=100, - stale_timeout_seconds=3600, - ) - - self.resource_manager = ResourceManager( - logger=self.logger, - coordinator_metrics=coordinator_metrics, - total_cpu_cores=32.0, - total_memory_mb=65536, - total_gpu_count=0, - ) - - # Kafka components - self.consumer: UnifiedConsumer | None = None - self.producer: UnifiedProducer = producer - - # Persistence via repositories + self.producer = producer + self.queue_manager = queue_manager self.execution_repository = execution_repository - self._event_store = event_store - # Scheduling - self.max_concurrent_scheduling = max_concurrent_scheduling - self.scheduling_interval = scheduling_interval_seconds - self._scheduling_semaphore = asyncio.Semaphore(max_concurrent_scheduling) - - # State tracking - self._scheduling_task: asyncio.Task[None] | None = None self._active_executions: set[str] = set() - self._execution_resources: ExecutionMap = {} - self._schema_registry_manager = schema_registry_manager - self.dispatcher = dispatcher - - async def _on_start(self) -> None: - """Start the coordinator service.""" - self.logger.info("Starting ExecutionCoordinator service...") - - await self.queue_manager.start() - - consumer_config = ConsumerConfig( - bootstrap_servers=self.kafka_servers, - group_id=self.consumer_group, - enable_auto_commit=False, - session_timeout_ms=self._settings.KAFKA_SESSION_TIMEOUT_MS, - heartbeat_interval_ms=self._settings.KAFKA_HEARTBEAT_INTERVAL_MS, - max_poll_interval_ms=self._settings.KAFKA_MAX_POLL_INTERVAL_MS, - request_timeout_ms=self._settings.KAFKA_REQUEST_TIMEOUT_MS, - max_poll_records=100, # Process max 100 messages at a time for flow control - fetch_max_wait_ms=500, # Wait max 500ms for data (reduces latency) - fetch_min_bytes=1, # Return immediately if any data available - ) - - self.consumer = UnifiedConsumer( - consumer_config, - event_dispatcher=self.dispatcher, - schema_registry=self._schema_registry_manager, - settings=self._settings, - logger=self.logger, - event_metrics=self._event_metrics, - ) - - # Register handlers with EventDispatcher BEFORE wrapping with idempotency - @self.dispatcher.register(EventType.EXECUTION_REQUESTED) - async def handle_requested(event: ExecutionRequestedEvent) -> None: - await self._route_execution_event(event) - - @self.dispatcher.register(EventType.EXECUTION_COMPLETED) - async def handle_completed(event: ExecutionCompletedEvent) -> None: - await self._route_execution_result(event) - - @self.dispatcher.register(EventType.EXECUTION_FAILED) - async def handle_failed(event: ExecutionFailedEvent) -> None: - await self._route_execution_result(event) - - @self.dispatcher.register(EventType.EXECUTION_CANCELLED) - async def handle_cancelled(event: ExecutionCancelledEvent) -> None: - await self._route_execution_event(event) - - self.logger.info("COORDINATOR: Event handlers registered") - await self.consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.EXECUTION_COORDINATOR])) + self._register_handlers(dispatcher) - # Start scheduling task - self._scheduling_task = asyncio.create_task(self._scheduling_loop()) + def _register_handlers(self, dispatcher: EventDispatcher) -> None: + """Register event handlers on the dispatcher.""" + dispatcher.register_handler(EventType.EXECUTION_REQUESTED, self._handle_requested_wrapper) + dispatcher.register_handler(EventType.EXECUTION_COMPLETED, self._handle_completed_wrapper) + dispatcher.register_handler(EventType.EXECUTION_FAILED, self._handle_failed_wrapper) + dispatcher.register_handler(EventType.EXECUTION_CANCELLED, self._handle_cancelled_wrapper) - self.logger.info("ExecutionCoordinator service started successfully") + async def _handle_requested_wrapper(self, event: DomainEvent) -> None: + assert isinstance(event, ExecutionRequestedEvent) + await self._handle_execution_requested(event) - async def _on_stop(self) -> None: - """Stop the coordinator service.""" - self.logger.info("Stopping ExecutionCoordinator service...") + async def _handle_completed_wrapper(self, event: DomainEvent) -> None: + assert isinstance(event, ExecutionCompletedEvent) + await self._handle_execution_completed(event) - # Stop scheduling task - if self._scheduling_task: - self._scheduling_task.cancel() - try: - await self._scheduling_task - except asyncio.CancelledError: - pass + async def _handle_failed_wrapper(self, event: DomainEvent) -> None: + assert isinstance(event, ExecutionFailedEvent) + await self._handle_execution_failed(event) - if self.consumer: - await self.consumer.stop() - - await self.queue_manager.stop() - - self.logger.info(f"ExecutionCoordinator service stopped. Active executions: {len(self._active_executions)}") - - async def _route_execution_event(self, event: ExecutionRequestedEvent | ExecutionCancelledEvent) -> None: - """Route execution events to appropriate handlers based on event type""" - self.logger.info( - f"COORDINATOR: Routing execution event - type: {event.event_type}, " - f"id: {event.event_id}, " - f"actual class: {type(event).__name__}" - ) - - if event.event_type == EventType.EXECUTION_REQUESTED: - await self._handle_execution_requested(event) - elif event.event_type == EventType.EXECUTION_CANCELLED: - await self._handle_execution_cancelled(event) - else: - self.logger.debug(f"Ignoring execution event type: {event.event_type}") - - async def _route_execution_result(self, event: ExecutionCompletedEvent | ExecutionFailedEvent) -> None: - """Route execution result events to appropriate handlers based on event type""" - if event.event_type == EventType.EXECUTION_COMPLETED: - await self._handle_execution_completed(event) - elif event.event_type == EventType.EXECUTION_FAILED: - await self._handle_execution_failed(event) - else: - self.logger.debug(f"Ignoring execution result event type: {event.event_type}") + async def _handle_cancelled_wrapper(self, event: DomainEvent) -> None: + assert isinstance(event, ExecutionCancelledEvent) + await self._handle_execution_cancelled(event) async def _handle_execution_requested(self, event: ExecutionRequestedEvent) -> None: """Handle execution requested event - add to queue for processing""" @@ -234,9 +103,8 @@ async def _handle_execution_requested(self, event: ExecutionRequestedEvent) -> N self.logger.info(f"Execution {event.execution_id} added to queue at position {position}") - # Schedule immediately if at front of queue (position 0) if position == 0: - await self._schedule_execution(event) + await self._try_schedule_next() except Exception as e: self.logger.error(f"Failed to handle execution request {event.execution_id}: {e}", exc_info=True) @@ -247,138 +115,75 @@ async def _handle_execution_cancelled(self, event: ExecutionCancelledEvent) -> N execution_id = event.execution_id removed = await self.queue_manager.remove_execution(execution_id) - - if execution_id in self._execution_resources: - await self.resource_manager.release_allocation(execution_id) - del self._execution_resources[execution_id] - self._active_executions.discard(execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) if removed: self.logger.info(f"Execution {execution_id} cancelled and removed from queue") + await self._try_schedule_next() + async def _handle_execution_completed(self, event: ExecutionCompletedEvent) -> None: """Handle execution completed event""" execution_id = event.execution_id - if execution_id in self._execution_resources: - await self.resource_manager.release_allocation(execution_id) - del self._execution_resources[execution_id] - - # Remove from active set self._active_executions.discard(execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) - self.logger.info(f"Execution {execution_id} completed, resources released") + self.logger.info(f"Execution {execution_id} completed") + await self._try_schedule_next() async def _handle_execution_failed(self, event: ExecutionFailedEvent) -> None: """Handle execution failed event""" execution_id = event.execution_id - # Release resources - if execution_id in self._execution_resources: - await self.resource_manager.release_allocation(execution_id) - del self._execution_resources[execution_id] - - # Remove from active set self._active_executions.discard(execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) - async def _scheduling_loop(self) -> None: - """Main scheduling loop""" - while self.is_running: - try: - # Get next execution from queue - execution = await self.queue_manager.get_next_execution() - - if execution: - # Schedule execution - asyncio.create_task(self._schedule_execution(execution)) - else: - # No executions in queue, wait - await asyncio.sleep(self.scheduling_interval) + await self._try_schedule_next() - except Exception as e: - self.logger.error(f"Error in scheduling loop: {e}", exc_info=True) - await asyncio.sleep(5) # Wait before retrying + async def _try_schedule_next(self) -> None: + """Pop the next queued execution and schedule it.""" + execution = await self.queue_manager.get_next_execution() + if execution: + await self._schedule_execution(execution) async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None: - """Schedule a single execution""" - async with self._scheduling_semaphore: - start_time = time.time() - execution_id = event.execution_id - - # Atomic check-and-claim: no await between check and add prevents TOCTOU race - # when both eager scheduling (position=0) and _scheduling_loop try to schedule - if execution_id in self._active_executions: - self.logger.debug(f"Execution {execution_id} already claimed, skipping") - return - self._active_executions.add(execution_id) - - try: - # Request resource allocation - allocation = await self.resource_manager.request_allocation( - execution_id, - event.language, - requested_cpu=None, # Use defaults for now - requested_memory_mb=None, - requested_gpu=0, - ) - - if not allocation: - # No resources available, release claim and requeue - self._active_executions.discard(execution_id) - await self.queue_manager.requeue_execution(event, increment_retry=False) - self.logger.info(f"No resources available for {execution_id}, requeued") - return - - # Track allocation (already in _active_executions from claim above) - self._execution_resources[execution_id] = allocation - self.metrics.update_coordinator_active_executions(len(self._active_executions)) - - # Publish execution started event for workers - self.logger.info(f"About to publish ExecutionStartedEvent for {event.execution_id}") - try: - await self._publish_execution_started(event) - self.logger.info(f"Successfully published ExecutionStartedEvent for {event.execution_id}") - except Exception as publish_error: - self.logger.error( - f"Failed to publish ExecutionStartedEvent for {event.execution_id}: {publish_error}", - exc_info=True, - ) - raise - - # Track metrics - queue_time = start_time - event.timestamp.timestamp() - priority = getattr(event, "priority", QueuePriority.NORMAL.value) - self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(priority).name) - - scheduling_duration = time.time() - start_time - self.metrics.record_coordinator_scheduling_duration(scheduling_duration) - self.metrics.record_coordinator_execution_scheduled("scheduled") - - self.logger.info( - f"Scheduled execution {event.execution_id}. " - f"Queue time: {queue_time:.2f}s, " - f"Resources: {allocation.cpu_cores} CPU, " - f"{allocation.memory_mb}MB RAM" - ) - - except Exception as e: - self.logger.error(f"Failed to schedule execution {event.execution_id}: {e}", exc_info=True) - - # Release any allocated resources - if event.execution_id in self._execution_resources: - await self.resource_manager.release_allocation(event.execution_id) - del self._execution_resources[event.execution_id] - - self._active_executions.discard(event.execution_id) - self.metrics.update_coordinator_active_executions(len(self._active_executions)) - self.metrics.record_coordinator_execution_scheduled("error") - - # Publish failure event - await self._publish_scheduling_failed(event, str(e)) + """Schedule a single execution.""" + start_time = time.time() + execution_id = event.execution_id + + if execution_id in self._active_executions: + self.logger.debug(f"Execution {execution_id} already claimed, skipping") + return + self._active_executions.add(execution_id) + self.metrics.update_coordinator_active_executions(len(self._active_executions)) + + try: + await self._publish_execution_started(event) + + # Track metrics + queue_time = start_time - event.timestamp.timestamp() + priority = getattr(event, "priority", QueuePriority.NORMAL.value) + self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(priority).name) + + scheduling_duration = time.time() - start_time + self.metrics.record_coordinator_scheduling_duration(scheduling_duration) + self.metrics.record_coordinator_execution_scheduled("scheduled") + + self.logger.info( + f"Scheduled execution {event.execution_id}. " + f"Queue time: {queue_time:.2f}s" + ) + + except Exception as e: + self.logger.error(f"Failed to schedule execution {event.execution_id}: {e}", exc_info=True) + + self._active_executions.discard(event.execution_id) + self.metrics.update_coordinator_active_executions(len(self._active_executions)) + self.metrics.record_coordinator_execution_scheduled("error") + + await self._publish_scheduling_failed(event, str(e)) async def _build_command_metadata(self, request: ExecutionRequestedEvent) -> EventMetadata: """Build metadata for CreatePodCommandEvent with guaranteed user_id.""" @@ -430,7 +235,6 @@ async def _publish_execution_accepted(self, request: ExecutionRequestedEvent, po ) await self.producer.produce(event_to_produce=event) - self.logger.info(f"ExecutionAcceptedEvent published for {request.execution_id}") async def _publish_queue_full(self, request: ExecutionRequestedEvent, error: str) -> None: """Publish queue full event""" @@ -451,16 +255,11 @@ async def _publish_queue_full(self, request: ExecutionRequestedEvent, error: str async def _publish_scheduling_failed(self, request: ExecutionRequestedEvent, error: str) -> None: """Publish scheduling failed event""" - # Get resource stats for context - resource_stats = await self.resource_manager.get_resource_stats() - event = ExecutionFailedEvent( execution_id=request.execution_id, error_type=ExecutionErrorType.SYSTEM_ERROR, exit_code=-1, - stderr=f"Failed to schedule execution: {error}. " - f"Available resources: CPU={resource_stats.available.cpu_cores}, " - f"Memory={resource_stats.available.memory_mb}MB", + stderr=f"Failed to schedule execution: {error}", resource_usage=None, metadata=request.metadata, error_message=error, @@ -468,11 +267,3 @@ async def _publish_scheduling_failed(self, request: ExecutionRequestedEvent, err await self.producer.produce(event_to_produce=event, key=request.execution_id) - async def get_status(self) -> dict[str, Any]: - """Get coordinator status""" - return { - "running": self.is_running, - "active_executions": len(self._active_executions), - "queue_stats": await self.queue_manager.get_queue_stats(), - "resource_stats": await self.resource_manager.get_resource_stats(), - } diff --git a/backend/app/services/coordinator/resource_manager.py b/backend/app/services/coordinator/resource_manager.py deleted file mode 100644 index bd0c2fbf..00000000 --- a/backend/app/services/coordinator/resource_manager.py +++ /dev/null @@ -1,324 +0,0 @@ -import asyncio -import logging -from dataclasses import dataclass - -from app.core.metrics import CoordinatorMetrics - - -@dataclass -class ResourceAllocation: - """Resource allocation for an execution""" - - cpu_cores: float - memory_mb: int - gpu_count: int = 0 - - @property - def cpu_millicores(self) -> int: - """Get CPU in millicores for Kubernetes""" - return int(self.cpu_cores * 1000) - - @property - def memory_bytes(self) -> int: - """Get memory in bytes""" - return self.memory_mb * 1024 * 1024 - - -@dataclass -class ResourcePool: - """Available resource pool""" - - total_cpu_cores: float - total_memory_mb: int - total_gpu_count: int - - available_cpu_cores: float - available_memory_mb: int - available_gpu_count: int - - # Resource limits per execution - max_cpu_per_execution: float = 4.0 - max_memory_per_execution_mb: int = 8192 - max_gpu_per_execution: int = 1 - - # Minimum resources to keep available - min_available_cpu_cores: float = 2.0 - min_available_memory_mb: int = 4096 - - -@dataclass -class ResourceGroup: - """Resource group with usage information""" - - cpu_cores: float - memory_mb: int - gpu_count: int - - -@dataclass -class ResourceStats: - """Resource statistics""" - - total: ResourceGroup - available: ResourceGroup - allocated: ResourceGroup - utilization: dict[str, float] - allocation_count: int - limits: dict[str, int | float] - - -@dataclass -class ResourceAllocationInfo: - """Information about a resource allocation""" - - execution_id: str - cpu_cores: float - memory_mb: int - gpu_count: int - cpu_percentage: float - memory_percentage: float - - -class ResourceManager: - """Manages resource allocation for executions""" - - def __init__( - self, - logger: logging.Logger, - coordinator_metrics: CoordinatorMetrics, - total_cpu_cores: float = 32.0, - total_memory_mb: int = 65536, # 64GB - total_gpu_count: int = 0, - overcommit_factor: float = 1.2, # Allow 20% overcommit - ): - self.logger = logger - self.metrics = coordinator_metrics - self.pool = ResourcePool( - total_cpu_cores=total_cpu_cores * overcommit_factor, - total_memory_mb=int(total_memory_mb * overcommit_factor), - total_gpu_count=total_gpu_count, - available_cpu_cores=total_cpu_cores * overcommit_factor, - available_memory_mb=int(total_memory_mb * overcommit_factor), - available_gpu_count=total_gpu_count, - ) - - # Adjust minimum reserve thresholds proportionally for small pools. - # Keep at most 10% of total as reserve (but not higher than defaults). - # This avoids refusing small, reasonable allocations on modest clusters. - self.pool.min_available_cpu_cores = min( - self.pool.min_available_cpu_cores, - max(0.1 * self.pool.total_cpu_cores, 0.0), - ) - self.pool.min_available_memory_mb = min( - self.pool.min_available_memory_mb, - max(int(0.1 * self.pool.total_memory_mb), 0), - ) - - # Track allocations - self._allocations: dict[str, ResourceAllocation] = {} - self._allocation_lock = asyncio.Lock() - - # Default allocations by language - self.default_allocations = { - "python": ResourceAllocation(cpu_cores=0.5, memory_mb=512), - "javascript": ResourceAllocation(cpu_cores=0.5, memory_mb=512), - "go": ResourceAllocation(cpu_cores=0.25, memory_mb=256), - "rust": ResourceAllocation(cpu_cores=0.5, memory_mb=512), - "java": ResourceAllocation(cpu_cores=1.0, memory_mb=1024), - "cpp": ResourceAllocation(cpu_cores=0.5, memory_mb=512), - "r": ResourceAllocation(cpu_cores=1.0, memory_mb=2048), - } - - # Update initial metrics - self._update_metrics() - - async def request_allocation( - self, - execution_id: str, - language: str, - requested_cpu: float | None = None, - requested_memory_mb: int | None = None, - requested_gpu: int = 0, - ) -> ResourceAllocation | None: - """ - Request resource allocation for execution - - Returns: - ResourceAllocation if successful, None if resources unavailable - """ - async with self._allocation_lock: - # Check if already allocated - if execution_id in self._allocations: - self.logger.warning(f"Execution {execution_id} already has allocation") - return self._allocations[execution_id] - - # Determine requested resources - if requested_cpu is None or requested_memory_mb is None: - # Use defaults based on language - default = self.default_allocations.get(language, ResourceAllocation(cpu_cores=0.5, memory_mb=512)) - requested_cpu = requested_cpu or default.cpu_cores - requested_memory_mb = requested_memory_mb or default.memory_mb - - # Apply limits - requested_cpu = min(requested_cpu, self.pool.max_cpu_per_execution) - requested_memory_mb = min(requested_memory_mb, self.pool.max_memory_per_execution_mb) - requested_gpu = min(requested_gpu, self.pool.max_gpu_per_execution) - - # Check availability (considering minimum reserves) - cpu_after = self.pool.available_cpu_cores - requested_cpu - memory_after = self.pool.available_memory_mb - requested_memory_mb - gpu_after = self.pool.available_gpu_count - requested_gpu - - if ( - cpu_after < self.pool.min_available_cpu_cores - or memory_after < self.pool.min_available_memory_mb - or gpu_after < 0 - ): - self.logger.warning( - f"Insufficient resources for execution {execution_id}. " - f"Requested: {requested_cpu} CPU, {requested_memory_mb}MB RAM, " - f"{requested_gpu} GPU. Available: {self.pool.available_cpu_cores} CPU, " - f"{self.pool.available_memory_mb}MB RAM, {self.pool.available_gpu_count} GPU" - ) - return None - - # Create allocation - allocation = ResourceAllocation( - cpu_cores=requested_cpu, memory_mb=requested_memory_mb, gpu_count=requested_gpu - ) - - # Update pool - self.pool.available_cpu_cores = cpu_after - self.pool.available_memory_mb = memory_after - self.pool.available_gpu_count = gpu_after - - # Track allocation - self._allocations[execution_id] = allocation - - # Update metrics - self._update_metrics() - - self.logger.info( - f"Allocated resources for execution {execution_id}: " - f"{allocation.cpu_cores} CPU, {allocation.memory_mb}MB RAM, " - f"{allocation.gpu_count} GPU" - ) - - return allocation - - async def release_allocation(self, execution_id: str) -> bool: - """Release resource allocation""" - async with self._allocation_lock: - if execution_id not in self._allocations: - self.logger.warning(f"No allocation found for execution {execution_id}") - return False - - allocation = self._allocations[execution_id] - - # Return resources to pool - self.pool.available_cpu_cores += allocation.cpu_cores - self.pool.available_memory_mb += allocation.memory_mb - self.pool.available_gpu_count += allocation.gpu_count - - # Remove allocation - del self._allocations[execution_id] - - # Update metrics - self._update_metrics() - - self.logger.info( - f"Released resources for execution {execution_id}: " - f"{allocation.cpu_cores} CPU, {allocation.memory_mb}MB RAM, " - f"{allocation.gpu_count} GPU" - ) - - return True - - async def get_allocation(self, execution_id: str) -> ResourceAllocation | None: - """Get current allocation for execution""" - async with self._allocation_lock: - return self._allocations.get(execution_id) - - async def can_allocate(self, cpu_cores: float, memory_mb: int, gpu_count: int = 0) -> bool: - """Check if resources can be allocated""" - async with self._allocation_lock: - cpu_after = self.pool.available_cpu_cores - cpu_cores - memory_after = self.pool.available_memory_mb - memory_mb - gpu_after = self.pool.available_gpu_count - gpu_count - - return ( - cpu_after >= self.pool.min_available_cpu_cores - and memory_after >= self.pool.min_available_memory_mb - and gpu_after >= 0 - ) - - async def get_resource_stats(self) -> ResourceStats: - """Get resource statistics""" - async with self._allocation_lock: - allocated_cpu = self.pool.total_cpu_cores - self.pool.available_cpu_cores - allocated_memory = self.pool.total_memory_mb - self.pool.available_memory_mb - allocated_gpu = self.pool.total_gpu_count - self.pool.available_gpu_count - - gpu_percent = (allocated_gpu / self.pool.total_gpu_count * 100) if self.pool.total_gpu_count > 0 else 0 - - return ResourceStats( - total=ResourceGroup( - cpu_cores=self.pool.total_cpu_cores, - memory_mb=self.pool.total_memory_mb, - gpu_count=self.pool.total_gpu_count, - ), - available=ResourceGroup( - cpu_cores=self.pool.available_cpu_cores, - memory_mb=self.pool.available_memory_mb, - gpu_count=self.pool.available_gpu_count, - ), - allocated=ResourceGroup(cpu_cores=allocated_cpu, memory_mb=allocated_memory, gpu_count=allocated_gpu), - utilization={ - "cpu_percent": (allocated_cpu / self.pool.total_cpu_cores * 100), - "memory_percent": (allocated_memory / self.pool.total_memory_mb * 100), - "gpu_percent": gpu_percent, - }, - allocation_count=len(self._allocations), - limits={ - "max_cpu_per_execution": self.pool.max_cpu_per_execution, - "max_memory_per_execution_mb": self.pool.max_memory_per_execution_mb, - "max_gpu_per_execution": self.pool.max_gpu_per_execution, - }, - ) - - async def get_allocations_by_resource_usage(self) -> list[ResourceAllocationInfo]: - """Get allocations sorted by resource usage""" - async with self._allocation_lock: - allocations = [] - for exec_id, allocation in self._allocations.items(): - allocations.append( - ResourceAllocationInfo( - execution_id=str(exec_id), - cpu_cores=allocation.cpu_cores, - memory_mb=allocation.memory_mb, - gpu_count=allocation.gpu_count, - cpu_percentage=(allocation.cpu_cores / self.pool.total_cpu_cores * 100), - memory_percentage=(allocation.memory_mb / self.pool.total_memory_mb * 100), - ) - ) - - # Sort by total resource usage - allocations.sort(key=lambda x: x.cpu_percentage + x.memory_percentage, reverse=True) - - return allocations - - def _update_metrics(self) -> None: - """Update metrics""" - cpu_usage = self.pool.total_cpu_cores - self.pool.available_cpu_cores - cpu_percent = cpu_usage / self.pool.total_cpu_cores * 100 - self.metrics.update_resource_usage("cpu", cpu_percent) - - memory_usage = self.pool.total_memory_mb - self.pool.available_memory_mb - memory_percent = memory_usage / self.pool.total_memory_mb * 100 - self.metrics.update_resource_usage("memory", memory_percent) - - gpu_usage = self.pool.total_gpu_count - self.pool.available_gpu_count - gpu_percent = gpu_usage / max(1, self.pool.total_gpu_count) * 100 - self.metrics.update_resource_usage("gpu", gpu_percent) - - self.metrics.update_coordinator_active_executions(len(self._allocations)) diff --git a/backend/tests/e2e/services/coordinator/test_execution_coordinator.py b/backend/tests/e2e/services/coordinator/test_execution_coordinator.py index 5406c7b4..4ffa4736 100644 --- a/backend/tests/e2e/services/coordinator/test_execution_coordinator.py +++ b/backend/tests/e2e/services/coordinator/test_execution_coordinator.py @@ -53,44 +53,6 @@ async def test_handle_requested_unique_executions( assert "e-unique-2" in coord._active_executions # noqa: SLF001 -class TestGetStatus: - """Tests for get_status method.""" - - @pytest.mark.asyncio - async def test_get_status_returns_dict(self, scope: AsyncContainer) -> None: - """Get status returns dictionary with coordinator info.""" - coord: ExecutionCoordinator = await scope.get(ExecutionCoordinator) - - status = await coord.get_status() - - assert isinstance(status, dict) - assert "running" in status - assert "active_executions" in status - assert "queue_stats" in status - assert "resource_stats" in status - - @pytest.mark.asyncio - async def test_get_status_tracks_active_executions( - self, scope: AsyncContainer - ) -> None: - """Status tracks number of active executions.""" - coord: ExecutionCoordinator = await scope.get(ExecutionCoordinator) - - initial_status = await coord.get_status() - initial_active = initial_status.get("active_executions", 0) - - # Add execution - ev = make_execution_requested_event(execution_id="e-status-track-1") - await coord._handle_execution_requested(ev) # noqa: SLF001 - - new_status = await coord.get_status() - new_active = new_status.get("active_executions", 0) - - assert new_active == initial_active + 1, ( - f"Expected exactly one more active execution: {initial_active} -> {new_active}" - ) - - class TestQueueManager: """Tests for queue manager integration.""" @@ -104,44 +66,9 @@ async def test_queue_manager_initialized(self, scope: AsyncContainer) -> None: assert hasattr(coord.queue_manager, "get_next_execution") -class TestResourceManager: - """Tests for resource manager integration.""" - - @pytest.mark.asyncio - async def test_resource_manager_initialized( - self, scope: AsyncContainer - ) -> None: - """Resource manager is properly initialized.""" - coord: ExecutionCoordinator = await scope.get(ExecutionCoordinator) - - assert coord.resource_manager is not None - assert hasattr(coord.resource_manager, "request_allocation") - assert hasattr(coord.resource_manager, "release_allocation") - - @pytest.mark.asyncio - async def test_resource_manager_has_pool( - self, scope: AsyncContainer - ) -> None: - """Resource manager has resource pool configured.""" - coord: ExecutionCoordinator = await scope.get(ExecutionCoordinator) - - # Check resource manager has pool with capacity - assert coord.resource_manager.pool is not None - assert coord.resource_manager.pool.total_cpu_cores > 0 - assert coord.resource_manager.pool.total_memory_mb > 0 - - class TestCoordinatorLifecycle: """Tests for coordinator lifecycle.""" - @pytest.mark.asyncio - async def test_coordinator_has_consumer(self, scope: AsyncContainer) -> None: - """Coordinator has Kafka consumer configured.""" - coord: ExecutionCoordinator = await scope.get(ExecutionCoordinator) - - # Consumer is set up during start, may be None before - assert hasattr(coord, "consumer") - @pytest.mark.asyncio async def test_coordinator_has_producer(self, scope: AsyncContainer) -> None: """Coordinator has Kafka producer configured.""" diff --git a/backend/tests/unit/services/coordinator/test_resource_manager.py b/backend/tests/unit/services/coordinator/test_resource_manager.py deleted file mode 100644 index 3624dae6..00000000 --- a/backend/tests/unit/services/coordinator/test_resource_manager.py +++ /dev/null @@ -1,61 +0,0 @@ -import logging - -import pytest -from app.core.metrics import CoordinatorMetrics -from app.services.coordinator.resource_manager import ResourceManager - -_test_logger = logging.getLogger("test.services.coordinator.resource_manager") - - -@pytest.mark.asyncio -async def test_request_allocation_defaults_and_limits(coordinator_metrics: CoordinatorMetrics) -> None: - rm = ResourceManager(total_cpu_cores=8.0, total_memory_mb=16384, total_gpu_count=0, logger=_test_logger, coordinator_metrics=coordinator_metrics) - - # Default for python - alloc = await rm.request_allocation("e1", "python") - assert alloc is not None - - assert alloc.cpu_cores > 0 - assert alloc.memory_mb > 0 - - # Respect per-exec max cap - alloc2 = await rm.request_allocation("e2", "python", requested_cpu=100.0, requested_memory_mb=999999) - assert alloc2 is not None - assert alloc2.cpu_cores <= rm.pool.max_cpu_per_execution - assert alloc2.memory_mb <= rm.pool.max_memory_per_execution_mb - - -@pytest.mark.asyncio -async def test_release_and_can_allocate(coordinator_metrics: CoordinatorMetrics) -> None: - rm = ResourceManager(total_cpu_cores=4.0, total_memory_mb=8192, total_gpu_count=0, logger=_test_logger, coordinator_metrics=coordinator_metrics) - - a = await rm.request_allocation("e1", "python", requested_cpu=1.0, requested_memory_mb=512) - assert a is not None - - ok = await rm.release_allocation("e1") - assert ok is True - - # After release, can allocate near limits while preserving headroom. - # Use a tiny epsilon to avoid edge rounding issues in >= comparisons. - epsilon_cpu = 1e-6 - epsilon_mem = 1 - can = await rm.can_allocate(cpu_cores=rm.pool.total_cpu_cores - rm.pool.min_available_cpu_cores - epsilon_cpu, - memory_mb=rm.pool.total_memory_mb - rm.pool.min_available_memory_mb - epsilon_mem, - gpu_count=0) - assert can is True - - -@pytest.mark.asyncio -async def test_resource_stats(coordinator_metrics: CoordinatorMetrics) -> None: - rm = ResourceManager(total_cpu_cores=2.0, total_memory_mb=4096, total_gpu_count=0, logger=_test_logger, coordinator_metrics=coordinator_metrics) - # Make sure the allocation succeeds - alloc = await rm.request_allocation("e1", "python", requested_cpu=0.5, requested_memory_mb=256) - assert alloc is not None, "Allocation should have succeeded" - - stats = await rm.get_resource_stats() - - assert stats.total.cpu_cores > 0 - assert stats.available.cpu_cores >= 0 - assert stats.allocated.cpu_cores > 0 # Should be > 0 since we allocated - assert stats.utilization["cpu_percent"] >= 0 - assert stats.allocation_count >= 1 # Should be at least 1 (may have system allocations) diff --git a/backend/workers/run_coordinator.py b/backend/workers/run_coordinator.py index 12004bf1..4ddd8984 100644 --- a/backend/workers/run_coordinator.py +++ b/backend/workers/run_coordinator.py @@ -8,8 +8,8 @@ from app.core.tracing import init_tracing from app.db.docs import ALL_DOCUMENTS from app.domain.enums.kafka import GroupId +from app.events.core import UnifiedConsumer from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas -from app.services.coordinator.coordinator import ExecutionCoordinator from app.settings import Settings from beanie import init_beanie @@ -27,10 +27,10 @@ async def run_coordinator(settings: Settings) -> None: schema_registry = await container.get(SchemaRegistryManager) await initialize_event_schemas(schema_registry) - # Services are already started by the DI container providers - coordinator = await container.get(ExecutionCoordinator) + # Get consumer (triggers coordinator + dispatcher + queue_manager + scheduling via DI) + await container.get(UnifiedConsumer) - # Shutdown event - signal handlers just set this + # Shutdown event shutdown_event = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): @@ -39,13 +39,8 @@ async def run_coordinator(settings: Settings) -> None: logger.info("ExecutionCoordinator started and running") try: - # Wait for shutdown signal or service to stop - while coordinator.is_running and not shutdown_event.is_set(): - await asyncio.sleep(60) - status = await coordinator.get_status() - logger.info(f"Coordinator status: {status}") + await shutdown_event.wait() finally: - # Container cleanup stops everything logger.info("Initiating graceful shutdown...") await container.close() diff --git a/docs/architecture/execution-queue.md b/docs/architecture/execution-queue.md index 876ff6df..77576745 100644 --- a/docs/architecture/execution-queue.md +++ b/docs/architecture/execution-queue.md @@ -1,8 +1,8 @@ # Execution Queue -The ExecutionCoordinator manages a priority queue for script executions, allocating CPU and memory resources before -spawning pods. It consumes `ExecutionRequested` events, validates resource availability, and emits commands to the -Kubernetes worker via the saga system. Per-user limits and stale timeout handling prevent queue abuse. +The ExecutionCoordinator manages a priority queue for script executions. It consumes `ExecutionRequested` events, queues +them by priority, and emits `CreatePodCommand` events to the Kubernetes worker. Per-user limits and stale timeout +handling prevent queue abuse. Actual resource enforcement happens at the Kubernetes level via pod manifests. ## Architecture @@ -16,16 +16,12 @@ flowchart TB subgraph Coordinator COORD --> QUEUE[QueueManager] - COORD --> RESOURCES[ResourceManager] QUEUE --> HEAP[(Priority Heap)] - RESOURCES --> POOL[(Resource Pool)] end - subgraph Scheduling Loop - LOOP[Get Next Execution] --> CHECK{Resources Available?} - CHECK -->|Yes| ALLOCATE[Allocate Resources] - CHECK -->|No| REQUEUE[Requeue Execution] - ALLOCATE --> PUBLISH[Publish CreatePodCommand] + subgraph Scheduling + DEDUP{Already Active?} -->|No| PUBLISH[Publish CreatePodCommand] + DEDUP -->|Yes| SKIP[Skip] end ``` @@ -60,63 +56,39 @@ prevents abandoned requests from consuming queue space indefinitely: --8<-- "backend/app/services/coordinator/queue_manager.py:243:267" ``` -## Resource Allocation +## Reactive Scheduling -The ResourceManager tracks a pool of CPU, memory, and GPU resources. Each execution requests an allocation based on -language defaults or explicit requirements: +The coordinator does not use a background polling loop. Scheduling is event-driven: when an execution is added at +position 0 in the queue, or when an active execution completes, fails, or is cancelled, the coordinator immediately +tries to schedule the next queued execution. A dedup guard (`_active_executions` set) prevents double-publishing +`CreatePodCommand` for the same execution. -```python ---8<-- "backend/app/services/coordinator/resource_manager.py:121:130" -``` - -The pool maintains minimum reserve thresholds to ensure the system remains responsive even under heavy load. Allocations -that would exceed the safe threshold are rejected, and the execution is requeued for later processing. - -```python ---8<-- "backend/app/services/coordinator/resource_manager.py:135:148" -``` - -## Scheduling Loop - -The coordinator runs a background scheduling loop that continuously pulls executions from the queue and attempts to -schedule them: - -```python ---8<-- "backend/app/services/coordinator/coordinator.py:307:323" -``` - -A semaphore limits concurrent scheduling operations to prevent overwhelming the system during bursts of incoming -requests. +Resource limits (CPU, memory) are enforced by Kubernetes via pod manifest `resources.requests` and `resources.limits`, +not by the coordinator. ## Event Flow The coordinator handles several event types: -1. **ExecutionRequested** - Adds execution to queue, publishes `ExecutionAccepted` -2. **ExecutionCancelled** - Removes from queue, releases resources if allocated -3. **ExecutionCompleted** - Releases allocated resources -4. **ExecutionFailed** - Releases allocated resources +1. **ExecutionRequested** - Adds execution to queue, publishes `ExecutionAccepted`, triggers scheduling if at front +2. **ExecutionCancelled** - Removes from queue, triggers scheduling of next item +3. **ExecutionCompleted** - Removes from active set, triggers scheduling of next item +4. **ExecutionFailed** - Removes from active set, triggers scheduling of next item When scheduling succeeds, the coordinator publishes a `CreatePodCommand` to the saga topic, triggering pod creation by the Kubernetes worker. ## Configuration -| Parameter | Default | Description | -|-------------------------------|---------|--------------------------------------| -| `max_queue_size` | 10000 | Maximum executions in queue | -| `max_executions_per_user` | 100 | Per-user queue limit | -| `stale_timeout_seconds` | 3600 | When to discard old executions | -| `max_concurrent_scheduling` | 10 | Parallel scheduling operations | -| `scheduling_interval_seconds` | 0.5 | Polling interval when queue is empty | -| `total_cpu_cores` | 32.0 | Total CPU pool | -| `total_memory_mb` | 65536 | Total memory pool (64GB) | -| `overcommit_factor` | 1.2 | Allow 20% resource overcommit | +| Parameter | Default | Description | +|---------------------------|---------|--------------------------------| +| `max_queue_size` | 10000 | Maximum executions in queue | +| `max_executions_per_user` | 100 | Per-user queue limit | +| `stale_timeout_seconds` | 3600 | When to discard old executions | ## Key Files | File | Purpose | |--------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------| -| [`services/coordinator/coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Main coordinator service | -| [`services/coordinator/queue_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/queue_manager.py) | Priority queue implementation | -| [`services/coordinator/resource_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/resource_manager.py) | Resource pool and allocation | +| [`services/coordinator/coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Main coordinator service | +| [`services/coordinator/queue_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/queue_manager.py) | Priority queue implementation | diff --git a/docs/architecture/kafka-topic-architecture.md b/docs/architecture/kafka-topic-architecture.md index ac948c3c..8b17855b 100644 --- a/docs/architecture/kafka-topic-architecture.md +++ b/docs/architecture/kafka-topic-architecture.md @@ -24,7 +24,7 @@ When a user submits code, the API creates an `ExecutionRequestedEvent` and publi The coordinator subscribes to `execution_events` and begins validation: - Has the user exceeded their rate limit? -- Are sufficient resources available? +- Is the queue full? - Should this execution be prioritized or queued? Some requests get rejected immediately. Others sit in a priority queue waiting for resources. Still others get cancelled before starting. @@ -66,7 +66,7 @@ If the Kubernetes worker crashes, `execution_tasks` accumulates messages but the In a single-topic architecture, a slow worker would cause backpressure affecting *all* consumers. SSE might delay updates. Projections might fall behind. The entire system degrades because one component can't keep up. -The coordinator acts as a *shock absorber* between user requests and pod creation. It can implement queuing, prioritization, and resource management without affecting upstream producers or downstream workers. During cluster capacity issues, the coordinator holds executions in its internal queue while still acknowledging receipt. +The coordinator acts as a *shock absorber* between user requests and pod creation. It implements queuing and prioritization without affecting upstream producers or downstream workers. During high load, the coordinator holds executions in its internal queue while still acknowledging receipt. ## Extensibility diff --git a/docs/architecture/lifecycle.md b/docs/architecture/lifecycle.md index 4b8e0081..c24d52c3 100644 --- a/docs/architecture/lifecycle.md +++ b/docs/architecture/lifecycle.md @@ -8,7 +8,7 @@ The pattern that actually fits Python and asyncio is the language's own RAII: as ## What changed -Services with long-running background work now implement the async context manager protocol. Coordinator, KubernetesWorker, PodMonitor, SSE Kafka→Redis bridge, EventStoreConsumer, ResultProcessor, DLQManager, EventBus, NotificationService, and the Kafka producer all expose `__aenter__`/`__aexit__` that call `start`/`stop`. +Services with long-running background work now implement the async context manager protocol. KubernetesWorker, PodMonitor, SSE Kafka→Redis bridge, EventStoreConsumer, ResultProcessor, DLQManager, EventBus, NotificationService, and the Kafka producer all expose `__aenter__`/`__aexit__` that call `start`/`stop`. The Coordinator is stateless and has its consumer lifecycle managed entirely by the DI provider. DI providers return unstarted instances for these services. The FastAPI lifespan acquires them and uses an `AsyncExitStack` to start/stop them in a single place. That removed scattered start/stop logic from providers and made shutdown order explicit. diff --git a/docs/architecture/services-overview.md b/docs/architecture/services-overview.md index 4636504c..f42d08cc 100644 --- a/docs/architecture/services-overview.md +++ b/docs/architecture/services-overview.md @@ -60,7 +60,7 @@ The Result Processor persists terminal execution outcomes, updates metrics, and The Pod Monitor observes K8s pod state and translates to domain events. It watches CoreV1 Pod events and publishes EXECUTION_EVENTS for running, container started, logs tail, etc., adding useful metadata and best-effort failure analysis. -The Coordinator owns the admission/queuing policy, sets priorities, and gates starts based on capacity. It interacts with ExecutionService (API) and Saga Orchestrator (events), ensuring queue depth metrics reflect only user requests and avoiding negative values via single ownership of the counter. +The Coordinator owns the admission/queuing policy and sets priorities. It interacts with ExecutionService (API) and Saga Orchestrator (events), ensuring queue depth metrics reflect only user requests and avoiding negative values via single ownership of the counter. Resource limits are enforced by Kubernetes pod manifests, not by the coordinator. The Event Replay worker re-emits stored events to debug or rebuild projections, taking DB/event store and filters as inputs and outputting replayed events on regular topics with provenance markers. diff --git a/docs/components/sse/execution-sse-flow.md b/docs/components/sse/execution-sse-flow.md index a81f057d..0d3cb758 100644 --- a/docs/components/sse/execution-sse-flow.md +++ b/docs/components/sse/execution-sse-flow.md @@ -2,7 +2,7 @@ The system uses an event-driven pipeline to run code and stream progress to the browser with Server-Sent Events. The Editor subscribes to a per-execution SSE stream and renders updates as the execution advances. When the result is ready, the stream delivers a `result_stored` event that already carries the final payload, so the browser can render immediately. -The flow starts when the API receives a request to execute a script. It writes an execution record in MongoDB and publishes an `execution_requested` event to Kafka. The coordinator and saga orchestrator pick it up, allocate resources, and send a command to the Kubernetes worker. The worker creates the pod and emits `pod_created`/`pod_running`. The pod monitor watches Kubernetes and emits pod and terminal execution events. The result processor listens for terminal events, writes the final state into the executions collection, and publishes a `result_stored` event. +The flow starts when the API receives a request to execute a script. It writes an execution record in MongoDB and publishes an `execution_requested` event to Kafka. The coordinator and saga orchestrator pick it up, queue it by priority, and send a command to the Kubernetes worker. The worker creates the pod and emits `pod_created`/`pod_running`. The pod monitor watches Kubernetes and emits pod and terminal execution events. The result processor listens for terminal events, writes the final state into the executions collection, and publishes a `result_stored` event. The SSE router maintains a small pool of Kafka consumers and routes only the events that belong to a given execution into Redis, which backs the browser's SSE connection. Progress events like `pod_running` arrive quickly and render status changes without polling. The stream closes when `result_stored` is delivered — emitted only after the result processor has written the final output to MongoDB, with the event containing the final result payload. diff --git a/docs/components/workers/coordinator.md b/docs/components/workers/coordinator.md index 1020c3b6..ba4e5b42 100644 --- a/docs/components/workers/coordinator.md +++ b/docs/components/workers/coordinator.md @@ -1,13 +1,13 @@ # Coordinator -The coordinator owns admission and queuing policy for executions. It decides which executions can proceed based on -available resources and enforces per-user limits to prevent any single user from monopolizing the system. +The coordinator owns admission and queuing policy for executions. It enforces per-user limits to prevent any single user +from monopolizing the system and manages the priority queue for scheduling. Resource limits (CPU, memory) are enforced +by Kubernetes via pod manifests, not by the coordinator. ```mermaid graph LR Kafka[(Kafka)] --> Coord[Coordinator] Coord --> Queue[Priority Queue] - Coord --> Resources[Resource Pool] Coord --> Accepted[Accepted Events] Accepted --> Kafka ``` @@ -18,13 +18,10 @@ When an `ExecutionRequested` event arrives, the coordinator checks: 1. Is the queue full? (max 10,000 pending) 2. Has this user exceeded their limit? (max 100 concurrent) -3. Are there enough CPU and memory resources? -If all checks pass, the coordinator allocates resources and publishes `ExecutionAccepted`. Otherwise, the request -is either queued for later or rejected. - -The coordinator runs a background scheduling loop that continuously pulls from the priority queue and attempts to -schedule pending executions as resources become available. +If checks pass, the execution is queued and `ExecutionAccepted` is published. Scheduling is reactive: when an execution +lands at the front of the queue, or when an active execution completes/fails/is cancelled, the coordinator immediately +pops the next item and publishes a `CreatePodCommand`. A dedup guard prevents double-publishing for the same execution. ## Priority queue @@ -36,15 +33,10 @@ Executions are processed in priority order. Lower numeric values are processed f When resources are unavailable, executions are requeued with reduced priority to prevent starvation. -## Resource management - -The coordinator tracks a pool of CPU and memory resources: +## Configuration | Parameter | Default | Description | |---------------------------|---------|----------------------------| -| `total_cpu_cores` | 32 | Total CPU pool | -| `total_memory_mb` | 65,536 | Total memory pool (64GB) | -| `overcommit_factor` | 1.2 | Allow 20% overcommit | | `max_queue_size` | 10,000 | Maximum pending executions | | `max_executions_per_user` | 100 | Per-user limit | | `stale_timeout_seconds` | 3,600 | Stale execution timeout | @@ -58,10 +50,9 @@ The coordinator tracks a pool of CPU and memory resources: | File | Purpose | |--------------------------------------------------------------------------------------------------------------------------------|-------------------------------| -| [`run_coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_coordinator.py) | Entry point | -| [`coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Main coordinator service | -| [`queue_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/queue_manager.py) | Priority queue implementation | -| [`resource_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/resource_manager.py) | Resource pool and allocation | +| [`run_coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_coordinator.py) | Entry point | +| [`coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Main coordinator service | +| [`queue_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/queue_manager.py) | Priority queue implementation | ## Deployment diff --git a/docs/components/workers/index.md b/docs/components/workers/index.md index fe14d663..6c863c4d 100644 --- a/docs/components/workers/index.md +++ b/docs/components/workers/index.md @@ -27,7 +27,7 @@ graph LR | Worker | What it does | Entry point | |-------------------------------------------|-----------------------------------------------------------|----------------------------| -| [Coordinator](coordinator.md) | Admits executions, manages the queue, allocates resources | `run_coordinator.py` | +| [Coordinator](coordinator.md) | Admits executions, manages the priority queue | `run_coordinator.py` | | [Saga Orchestrator](saga_orchestrator.md) | Drives the execution state machine, issues pod commands | `run_saga_orchestrator.py` | | [K8s Worker](k8s_worker.md) | Creates ConfigMaps and Pods with security hardening | `run_k8s_worker.py` | | [Pod Monitor](pod_monitor.md) | Watches pods, translates K8s events to domain events | `run_pod_monitor.py` | diff --git a/docs/operations/metrics-reference.md b/docs/operations/metrics-reference.md index ab0f58fe..19f12236 100644 --- a/docs/operations/metrics-reference.md +++ b/docs/operations/metrics-reference.md @@ -33,7 +33,7 @@ Track script execution performance and resource usage. ### Coordinator Metrics -Track scheduling and resource allocation. +Track scheduling and queue management. | Metric | Type | Labels | Description | |------------------------------------------|---------------|---------------------|---------------------------| @@ -43,8 +43,6 @@ Track scheduling and resource allocation. | `coordinator.queue.wait_time` | Histogram | priority, queue | Queue wait by priority | | `coordinator.executions.scheduled.total` | Counter | status | Scheduled executions | | `coordinator.rate_limited.total` | Counter | limit_type, user_id | Rate limited requests | -| `coordinator.resource.allocations.total` | Counter | resource_type | Resource allocations | -| `coordinator.resource.utilization` | UpDownCounter | resource_type | Current utilization | | `coordinator.scheduling.decisions.total` | Counter | decision, reason | Scheduling decisions | ### Rate Limit Metrics From abb0461b70c7b64aeecffad94ce8ef4d74db11a3 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 13:28:38 +0100 Subject: [PATCH 2/5] fix: added missing provider --- backend/app/core/container.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/app/core/container.py b/backend/app/core/container.py index 1a82b14d..f8473e36 100644 --- a/backend/app/core/container.py +++ b/backend/app/core/container.py @@ -53,6 +53,7 @@ def create_app_container(settings: Settings) -> AsyncContainer: AdminServicesProvider(), EventReplayProvider(), BusinessServicesProvider(), + CoordinatorProvider(), KubernetesProvider(), ResourceCleanerProvider(), FastapiProvider(), From fa8b932b07c857433cd83594a67d381f57cf741e Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 20:27:47 +0100 Subject: [PATCH 3/5] updated coordinator --- backend/app/core/metrics/coordinator.py | 154 ++-------- backend/app/core/providers.py | 17 -- backend/app/domain/enums/__init__.py | 3 +- backend/app/domain/enums/execution.py | 10 + backend/app/domain/events/typed.py | 7 +- backend/app/services/coordinator/__init__.py | 6 +- .../app/services/coordinator/coordinator.py | 232 +++++++++++---- .../app/services/coordinator/queue_manager.py | 271 ------------------ backend/app/services/execution_service.py | 4 +- backend/tests/conftest.py | 3 +- .../coordinator/test_execution_coordinator.py | 16 +- .../tests/e2e/test_k8s_worker_create_pod.py | 3 +- ...est_connections_and_coordinator_metrics.py | 20 +- .../unit/core/metrics/test_metrics_classes.py | 2 +- .../coordinator/test_coordinator_queue.py | 95 ++++++ .../coordinator/test_queue_manager.py | 41 --- .../tests/unit/services/test_pod_builder.py | 11 +- docs/architecture/execution-queue.md | 21 +- docs/architecture/services-overview.md | 2 +- docs/components/workers/coordinator.md | 9 +- docs/operations/metrics-reference.md | 16 +- 21 files changed, 344 insertions(+), 599 deletions(-) delete mode 100644 backend/app/services/coordinator/queue_manager.py create mode 100644 backend/tests/unit/services/coordinator/test_coordinator_queue.py delete mode 100644 backend/tests/unit/services/coordinator/test_queue_manager.py diff --git a/backend/app/core/metrics/coordinator.py b/backend/app/core/metrics/coordinator.py index 9e06ca6a..ce108f39 100644 --- a/backend/app/core/metrics/coordinator.py +++ b/backend/app/core/metrics/coordinator.py @@ -5,15 +5,10 @@ class CoordinatorMetrics(BaseMetrics): """Metrics for coordinator and scheduling operations.""" def _create_instruments(self) -> None: - # Coordinator processing metrics - self.coordinator_processing_time = self._meter.create_histogram( - name="coordinator.processing.time", - description="Time spent processing execution events in seconds", - unit="s", - ) - self.coordinator_scheduling_duration = self._meter.create_histogram( - name="coordinator.scheduling.duration", description="Time spent scheduling executions in seconds", unit="s" + name="coordinator.scheduling.duration", + description="Time spent scheduling executions in seconds", + unit="s", ) self.coordinator_active_executions = self._meter.create_up_down_counter( @@ -22,159 +17,44 @@ def _create_instruments(self) -> None: unit="1", ) - # Queue management metrics self.coordinator_queue_time = self._meter.create_histogram( name="coordinator.queue.wait_time", description="Time spent waiting in coordinator queue by priority", unit="s", ) - self.coordinator_queue_operations = self._meter.create_counter( - name="coordinator.queue.operations.total", description="Total queue operations (add/remove)", unit="1" - ) - - # Execution-only request queue depth (authoritative, maintained by coordinator) self.execution_request_queue_depth = self._meter.create_up_down_counter( name="execution.queue.depth", - description="Depth of user execution requests queued (excludes replays and non-request events)", + description="Depth of user execution requests queued", unit="1", ) - # Scheduling metrics self.coordinator_executions_scheduled = self._meter.create_counter( - name="coordinator.executions.scheduled.total", description="Total number of executions scheduled", unit="1" - ) - - # Rate limiting metrics - self.coordinator_rate_limited = self._meter.create_counter( - name="coordinator.rate_limited.total", description="Total number of rate-limited requests", unit="1" - ) - - self.coordinator_rate_limit_wait_time = self._meter.create_histogram( - name="coordinator.rate_limit.wait_time", description="Time clients wait due to rate limiting", unit="s" - ) - - # Resource management metrics - self.coordinator_resource_allocations = self._meter.create_counter( - name="coordinator.resource.allocations.total", description="Total number of resource allocations", unit="1" - ) - - self.coordinator_resource_utilization = self._meter.create_up_down_counter( - name="coordinator.resource.utilization", description="Current resource utilization", unit="1" - ) - - # Scheduling decision metrics - self.coordinator_scheduling_decisions = self._meter.create_counter( - name="coordinator.scheduling.decisions.total", description="Total scheduling decisions made", unit="1" + name="coordinator.executions.scheduled.total", + description="Total number of executions scheduled", + unit="1", ) - def record_coordinator_processing_time(self, duration_seconds: float) -> None: - self.coordinator_processing_time.record(duration_seconds) - - def record_scheduling_duration(self, duration_seconds: float) -> None: + def record_coordinator_scheduling_duration(self, duration_seconds: float) -> None: self.coordinator_scheduling_duration.record(duration_seconds) - def update_active_executions_gauge(self, count: int) -> None: - """Update the count of active executions (absolute value).""" - # Reset to 0 then set to new value (for gauge-like behavior) - # This is a workaround since we're using up_down_counter - current_val = getattr(self, "_active_executions_current", 0) - delta = count - current_val + def update_coordinator_active_executions(self, count: int) -> None: + current = getattr(self, "_active_current", 0) + delta = count - current if delta != 0: self.coordinator_active_executions.add(delta) - self._active_executions_current = count + self._active_current = count def record_coordinator_queue_time(self, wait_seconds: float, priority: str) -> None: self.coordinator_queue_time.record(wait_seconds, attributes={"priority": priority}) - def record_coordinator_execution_scheduled(self, status: str) -> None: - self.coordinator_executions_scheduled.add(1, attributes={"status": status}) - - def record_coordinator_scheduling_duration(self, duration_seconds: float) -> None: - self.coordinator_scheduling_duration.record(duration_seconds) - - def update_coordinator_active_executions(self, count: int) -> None: - self.update_active_executions_gauge(count) - - def record_queue_wait_time_by_priority(self, wait_seconds: float, priority: str, queue_name: str) -> None: - self.coordinator_queue_time.record(wait_seconds, attributes={"priority": priority, "queue": queue_name}) - - # Removed legacy coordinator.queue.size; use execution.queue.depth instead - def update_execution_request_queue_size(self, size: int) -> None: """Update the execution-only request queue depth (absolute value).""" - key = "_exec_request_queue_size" - current_val = getattr(self, key, 0) - delta = size - current_val + current = getattr(self, "_queue_depth_current", 0) + delta = size - current if delta != 0: self.execution_request_queue_depth.add(delta) - setattr(self, key, size) - - def record_rate_limited(self, limit_type: str, user_id: str) -> None: - self.coordinator_rate_limited.add(1, attributes={"limit_type": limit_type, "user_id": user_id}) - - def update_rate_limit_wait_time(self, limit_type: str, user_id: str, wait_seconds: float) -> None: - self.coordinator_rate_limit_wait_time.record( - wait_seconds, attributes={"limit_type": limit_type, "user_id": user_id} - ) - - def record_resource_allocation(self, resource_type: str, amount: float, execution_id: str) -> None: - self.coordinator_resource_allocations.add( - 1, attributes={"resource_type": resource_type, "execution_id": execution_id} - ) - - # Update gauge for current allocation - key = f"_resource_{resource_type}" - current_val = getattr(self, key, 0.0) - new_val = current_val + amount - setattr(self, key, new_val) - - def record_resource_release(self, resource_type: str, amount: float, execution_id: str) -> None: - self.coordinator_resource_allocations.add( - -1, attributes={"resource_type": resource_type, "execution_id": execution_id} - ) - - # Update gauge for current allocation - key = f"_resource_{resource_type}" - current_val = getattr(self, key, 0.0) - new_val = max(0.0, current_val - amount) - setattr(self, key, new_val) + self._queue_depth_current = size - def update_resource_usage(self, resource_type: str, usage_percent: float) -> None: - # Record as a gauge-like metric - key = f"_resource_usage_{resource_type}" - current_val = getattr(self, key, 0.0) - delta = usage_percent - current_val - if delta != 0: - self.coordinator_resource_utilization.add(delta, attributes={"resource_type": resource_type}) - setattr(self, key, usage_percent) - - def record_scheduling_decision(self, decision: str, reason: str) -> None: - self.coordinator_scheduling_decisions.add(1, attributes={"decision": decision, "reason": reason}) - - def record_queue_reordering(self, queue_name: str, items_moved: int) -> None: - self.coordinator_queue_operations.add(1, attributes={"operation": "reorder", "queue": queue_name}) - - # Record the number of items moved as a histogram - self.coordinator_queue_time.record( - float(items_moved), attributes={"priority": "reordered", "queue": queue_name} - ) - - def record_priority_change(self, execution_id: str, old_priority: str, new_priority: str) -> None: - self.coordinator_scheduling_decisions.add( - 1, attributes={"decision": "priority_change", "reason": f"{old_priority}_to_{new_priority}"} - ) - - def update_rate_limiter_tokens(self, limit_type: str, tokens: int) -> None: - # Track tokens as gauge-like metric - key = f"_rate_limiter_{limit_type}" - current_val = getattr(self, key, 0) - delta = tokens - current_val - if delta != 0: - self.coordinator_resource_utilization.add(delta, attributes={"resource_type": f"rate_limit_{limit_type}"}) - setattr(self, key, tokens) - - def record_rate_limit_reset(self, limit_type: str, user_id: str) -> None: - self.coordinator_scheduling_decisions.add( - 1, attributes={"decision": "rate_limit_reset", "reason": f"{limit_type}_for_{user_id}"} - ) + def record_coordinator_execution_scheduled(self, status: str) -> None: + self.coordinator_executions_scheduled.add(1, attributes={"status": status}) diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index b9c9506a..88799e06 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -55,7 +55,6 @@ from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService from app.services.auth_service import AuthService from app.services.coordinator.coordinator import ExecutionCoordinator -from app.services.coordinator.queue_manager import QueueManager from app.services.event_bus import EventBusManager from app.services.event_replay.replay_service import EventReplayService from app.services.event_service import EventService @@ -678,24 +677,11 @@ def get_coordinator_dispatcher( ttl_seconds=7200, ) - @provide - def get_queue_manager( - self, logger: logging.Logger, coordinator_metrics: CoordinatorMetrics - ) -> QueueManager: - return QueueManager( - logger=logger, - coordinator_metrics=coordinator_metrics, - max_queue_size=10000, - max_executions_per_user=100, - stale_timeout_seconds=3600, - ) - @provide def get_execution_coordinator( self, producer: UnifiedProducer, dispatcher: EventDispatcher, - queue_manager: QueueManager, execution_repository: ExecutionRepository, logger: logging.Logger, coordinator_metrics: CoordinatorMetrics, @@ -704,7 +690,6 @@ def get_execution_coordinator( return ExecutionCoordinator( producer=producer, dispatcher=dispatcher, - queue_manager=queue_manager, execution_repository=execution_repository, logger=logger, coordinator_metrics=coordinator_metrics, @@ -740,7 +725,6 @@ async def get_coordinator_consumer( event_metrics=event_metrics, ) - await coordinator.queue_manager.start() await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.EXECUTION_COORDINATOR])) logger.info("Coordinator consumer started") @@ -748,7 +732,6 @@ async def get_coordinator_consumer( yield consumer finally: await consumer.stop() - await coordinator.queue_manager.stop() logger.info("Coordinator consumer stopped") diff --git a/backend/app/domain/enums/__init__.py b/backend/app/domain/enums/__init__.py index f37aac67..31458a8e 100644 --- a/backend/app/domain/enums/__init__.py +++ b/backend/app/domain/enums/__init__.py @@ -1,5 +1,5 @@ from app.domain.enums.common import ErrorType, SortOrder, Theme -from app.domain.enums.execution import ExecutionStatus +from app.domain.enums.execution import ExecutionStatus, QueuePriority from app.domain.enums.health import AlertSeverity, AlertStatus, ComponentStatus from app.domain.enums.notification import ( NotificationChannel, @@ -17,6 +17,7 @@ "Theme", # Execution "ExecutionStatus", + "QueuePriority", # Health "AlertSeverity", "AlertStatus", diff --git a/backend/app/domain/enums/execution.py b/backend/app/domain/enums/execution.py index abb4809d..86cca026 100644 --- a/backend/app/domain/enums/execution.py +++ b/backend/app/domain/enums/execution.py @@ -1,6 +1,16 @@ +from enum import IntEnum + from app.core.utils import StringEnum +class QueuePriority(IntEnum): + CRITICAL = 0 + HIGH = 1 + NORMAL = 5 + LOW = 8 + BACKGROUND = 10 + + class ExecutionStatus(StringEnum): """Status of an execution.""" diff --git a/backend/app/domain/events/typed.py b/backend/app/domain/events/typed.py index fca2a11d..c92d3b80 100644 --- a/backend/app/domain/events/typed.py +++ b/backend/app/domain/events/typed.py @@ -8,6 +8,7 @@ from app.domain.enums.auth import LoginMethod from app.domain.enums.common import Environment from app.domain.enums.events import EventType +from app.domain.enums.execution import QueuePriority from app.domain.enums.notification import NotificationChannel, NotificationSeverity from app.domain.enums.storage import ExecutionErrorType, StorageType @@ -75,7 +76,7 @@ class ExecutionRequestedEvent(BaseEvent): memory_limit: str cpu_request: str memory_request: str - priority: int = 5 + priority: QueuePriority = QueuePriority.NORMAL class ExecutionAcceptedEvent(BaseEvent): @@ -83,7 +84,7 @@ class ExecutionAcceptedEvent(BaseEvent): execution_id: str queue_position: int estimated_wait_seconds: float | None = None - priority: int = 5 + priority: QueuePriority = QueuePriority.NORMAL class ExecutionQueuedEvent(BaseEvent): @@ -428,7 +429,7 @@ class CreatePodCommandEvent(BaseEvent): memory_limit: str cpu_request: str memory_request: str - priority: int = 5 + priority: QueuePriority = QueuePriority.NORMAL class DeletePodCommandEvent(BaseEvent): diff --git a/backend/app/services/coordinator/__init__.py b/backend/app/services/coordinator/__init__.py index 7c9f6112..03ccbd8d 100644 --- a/backend/app/services/coordinator/__init__.py +++ b/backend/app/services/coordinator/__init__.py @@ -1,8 +1,8 @@ -from app.services.coordinator.coordinator import ExecutionCoordinator -from app.services.coordinator.queue_manager import QueueManager, QueuePriority +from app.domain.enums.execution import QueuePriority +from app.services.coordinator.coordinator import ExecutionCoordinator, QueueRejectError __all__ = [ "ExecutionCoordinator", - "QueueManager", "QueuePriority", + "QueueRejectError", ] diff --git a/backend/app/services/coordinator/coordinator.py b/backend/app/services/coordinator/coordinator.py index d948e549..fed8a977 100644 --- a/backend/app/services/coordinator/coordinator.py +++ b/backend/app/services/coordinator/coordinator.py @@ -1,10 +1,15 @@ +import asyncio +import heapq import logging import time +from collections import defaultdict +from dataclasses import dataclass, field from uuid import uuid4 from app.core.metrics import CoordinatorMetrics from app.db.repositories.execution_repository import ExecutionRepository from app.domain.enums.events import EventType +from app.domain.enums.execution import QueuePriority from app.domain.enums.storage import ExecutionErrorType from app.domain.events.typed import ( CreatePodCommandEvent, @@ -17,7 +22,29 @@ ExecutionRequestedEvent, ) from app.events.core import EventDispatcher, UnifiedProducer -from app.services.coordinator.queue_manager import QueueManager, QueuePriority + + +class QueueRejectError(Exception): + """Raised when an execution cannot be added to the queue.""" + + +@dataclass(order=True) +class _QueuedExecution: + priority: QueuePriority + timestamp: float = field(compare=False) + event: ExecutionRequestedEvent = field(compare=False) + + @property + def execution_id(self) -> str: + return self.event.execution_id + + @property + def user_id(self) -> str: + return self.event.metadata.user_id or "anonymous" + + @property + def age_seconds(self) -> float: + return time.time() - self.timestamp class ExecutionCoordinator: @@ -27,7 +54,7 @@ class ExecutionCoordinator: This service: 1. Consumes ExecutionRequested events 2. Manages execution queue with priority - 3. Enforces rate limits + 3. Enforces per-user rate limits 4. Publishes CreatePodCommand events for workers """ @@ -35,23 +62,35 @@ def __init__( self, producer: UnifiedProducer, dispatcher: EventDispatcher, - queue_manager: QueueManager, execution_repository: ExecutionRepository, logger: logging.Logger, coordinator_metrics: CoordinatorMetrics, + max_queue_size: int = 10000, + max_executions_per_user: int = 100, + stale_timeout_seconds: int = 3600, ) -> None: self.logger = logger self.metrics = coordinator_metrics self.producer = producer - self.queue_manager = queue_manager self.execution_repository = execution_repository + # Queue configuration + self._max_queue_size = max_queue_size + self._max_per_user = max_executions_per_user + self._stale_timeout = stale_timeout_seconds + + # Queue state + self._queue: list[_QueuedExecution] = [] + self._queue_lock = asyncio.Lock() + self._user_counts: dict[str, int] = defaultdict(int) + self._execution_users: dict[str, str] = {} + + # Scheduling state self._active_executions: set[str] = set() self._register_handlers(dispatcher) def _register_handlers(self, dispatcher: EventDispatcher) -> None: - """Register event handlers on the dispatcher.""" dispatcher.register_handler(EventType.EXECUTION_REQUESTED, self._handle_requested_wrapper) dispatcher.register_handler(EventType.EXECUTION_COMPLETED, self._handle_completed_wrapper) dispatcher.register_handler(EventType.EXECUTION_FAILED, self._handle_failed_wrapper) @@ -74,47 +113,37 @@ async def _handle_cancelled_wrapper(self, event: DomainEvent) -> None: await self._handle_execution_cancelled(event) async def _handle_execution_requested(self, event: ExecutionRequestedEvent) -> None: - """Handle execution requested event - add to queue for processing""" + """Handle execution requested event - add to queue for processing.""" self.logger.info(f"HANDLER CALLED: _handle_execution_requested for event {event.event_id}") start_time = time.time() try: - # Add to queue with priority - success, position, error = await self.queue_manager.add_execution( - event, - priority=QueuePriority(event.priority), - ) - - if not success: - # Publish queue full event - await self._publish_queue_full(event, error or "Queue is full") - self.metrics.record_coordinator_execution_scheduled("queue_full") - return - - # Publish ExecutionAcceptedEvent - if position is None: - position = 0 - await self._publish_execution_accepted(event, position, event.priority) + position = await self._add_to_queue(event) + except QueueRejectError as e: + await self._publish_queue_full(event, str(e)) + self.metrics.record_coordinator_execution_scheduled("queue_full") + return + except Exception as e: + self.logger.error(f"Failed to handle execution request {event.execution_id}: {e}", exc_info=True) + self.metrics.record_coordinator_execution_scheduled("error") + return - # Track metrics - duration = time.time() - start_time - self.metrics.record_coordinator_scheduling_duration(duration) - self.metrics.record_coordinator_execution_scheduled("queued") + await self._publish_execution_accepted(event, position) - self.logger.info(f"Execution {event.execution_id} added to queue at position {position}") + duration = time.time() - start_time + self.metrics.record_coordinator_scheduling_duration(duration) + self.metrics.record_coordinator_execution_scheduled("queued") - if position == 0: - await self._try_schedule_next() + self.logger.info(f"Execution {event.execution_id} added to queue at position {position}") - except Exception as e: - self.logger.error(f"Failed to handle execution request {event.execution_id}: {e}", exc_info=True) - self.metrics.record_coordinator_execution_scheduled("error") + if position == 0: + await self._try_schedule_next() async def _handle_execution_cancelled(self, event: ExecutionCancelledEvent) -> None: - """Handle execution cancelled event""" + """Handle execution cancelled event.""" execution_id = event.execution_id - removed = await self.queue_manager.remove_execution(execution_id) + removed = await self._remove_from_queue(execution_id) self._active_executions.discard(execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) @@ -124,7 +153,7 @@ async def _handle_execution_cancelled(self, event: ExecutionCancelledEvent) -> N await self._try_schedule_next() async def _handle_execution_completed(self, event: ExecutionCompletedEvent) -> None: - """Handle execution completed event""" + """Handle execution completed event.""" execution_id = event.execution_id self._active_executions.discard(execution_id) @@ -134,7 +163,7 @@ async def _handle_execution_completed(self, event: ExecutionCompletedEvent) -> N await self._try_schedule_next() async def _handle_execution_failed(self, event: ExecutionFailedEvent) -> None: - """Handle execution failed event""" + """Handle execution failed event.""" execution_id = event.execution_id self._active_executions.discard(execution_id) @@ -144,7 +173,7 @@ async def _handle_execution_failed(self, event: ExecutionFailedEvent) -> None: async def _try_schedule_next(self) -> None: """Pop the next queued execution and schedule it.""" - execution = await self.queue_manager.get_next_execution() + execution = await self._pop_next() if execution: await self._schedule_execution(execution) @@ -160,12 +189,10 @@ async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None: self.metrics.update_coordinator_active_executions(len(self._active_executions)) try: - await self._publish_execution_started(event) + await self._publish_create_pod_command(event) - # Track metrics queue_time = start_time - event.timestamp.timestamp() - priority = getattr(event, "priority", QueuePriority.NORMAL.value) - self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(priority).name) + self.metrics.record_coordinator_queue_time(queue_time, event.priority.name) scheduling_duration = time.time() - start_time self.metrics.record_coordinator_scheduling_duration(scheduling_duration) @@ -185,9 +212,108 @@ async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None: await self._publish_scheduling_failed(event, str(e)) + async def _add_to_queue(self, event: ExecutionRequestedEvent) -> int: + """Add execution to queue. Returns queue position. Raises QueueRejectError on failure.""" + async with self._queue_lock: + if len(self._queue) >= self._max_queue_size: + self._sweep_stale() + if len(self._queue) >= self._max_queue_size: + raise QueueRejectError("Queue is full") + + user_id = event.metadata.user_id or "anonymous" + + if self._user_counts[user_id] >= self._max_per_user: + raise QueueRejectError(f"User execution limit exceeded ({self._max_per_user})") + + queued = _QueuedExecution(priority=event.priority, timestamp=time.time(), event=event) + + heapq.heappush(self._queue, queued) + self._track_user(event.execution_id, user_id) + position = self._find_position(event.execution_id) or 0 + + self.metrics.update_execution_request_queue_size(len(self._queue)) + + self.logger.info( + f"Added execution {event.execution_id} to queue. " + f"Priority: {event.priority.name}, Position: {position}, " + f"Queue size: {len(self._queue)}" + ) + + return position + + async def _pop_next(self) -> ExecutionRequestedEvent | None: + """Pop the highest-priority non-stale execution from the queue.""" + async with self._queue_lock: + while self._queue: + queued = heapq.heappop(self._queue) + + if queued.age_seconds > self._stale_timeout: + self._untrack_user(queued.execution_id) + continue + + self._untrack_user(queued.execution_id) + self.metrics.record_coordinator_queue_time(queued.age_seconds, queued.priority.name) + self.metrics.update_execution_request_queue_size(len(self._queue)) + + self.logger.info( + f"Retrieved execution {queued.execution_id} from queue. " + f"Wait time: {queued.age_seconds:.2f}s, Queue size: {len(self._queue)}" + ) + + return queued.event + + return None + + async def _remove_from_queue(self, execution_id: str) -> bool: + """Remove a specific execution from the queue (e.g. on cancellation).""" + async with self._queue_lock: + initial_size = len(self._queue) + self._queue = [q for q in self._queue if q.execution_id != execution_id] + + if len(self._queue) < initial_size: + heapq.heapify(self._queue) + self._untrack_user(execution_id) + self.metrics.update_execution_request_queue_size(len(self._queue)) + self.logger.info(f"Removed execution {execution_id} from queue") + return True + + return False + + def _find_position(self, execution_id: str) -> int | None: + for position, queued in enumerate(self._queue): + if queued.execution_id == execution_id: + return position + return None + + def _track_user(self, execution_id: str, user_id: str) -> None: + self._user_counts[user_id] += 1 + self._execution_users[execution_id] = user_id + + def _untrack_user(self, execution_id: str) -> None: + if execution_id in self._execution_users: + user_id = self._execution_users.pop(execution_id) + self._user_counts[user_id] -= 1 + if self._user_counts[user_id] <= 0: + del self._user_counts[user_id] + + def _sweep_stale(self) -> None: + """Remove stale executions from queue. Must be called under _queue_lock.""" + active: list[_QueuedExecution] = [] + removed = 0 + for queued in self._queue: + if queued.age_seconds > self._stale_timeout: + self._untrack_user(queued.execution_id) + removed += 1 + else: + active.append(queued) + if removed: + self._queue = active + heapq.heapify(self._queue) + self.metrics.update_execution_request_queue_size(len(self._queue)) + self.logger.info(f"Swept {removed} stale executions from queue") + async def _build_command_metadata(self, request: ExecutionRequestedEvent) -> EventMetadata: """Build metadata for CreatePodCommandEvent with guaranteed user_id.""" - # Prefer execution record user_id to avoid missing attribution exec_rec = await self.execution_repository.get_execution(request.execution_id) user_id: str = exec_rec.user_id if exec_rec and exec_rec.user_id else "system" @@ -198,8 +324,8 @@ async def _build_command_metadata(self, request: ExecutionRequestedEvent) -> Eve correlation_id=request.metadata.correlation_id, ) - async def _publish_execution_started(self, request: ExecutionRequestedEvent) -> None: - """Send CreatePodCommandEvent to k8s-worker via SAGA_COMMANDS topic""" + async def _publish_create_pod_command(self, request: ExecutionRequestedEvent) -> None: + """Send CreatePodCommandEvent to k8s-worker via SAGA_COMMANDS topic.""" metadata = await self._build_command_metadata(request) create_pod_cmd = CreatePodCommandEvent( @@ -222,30 +348,27 @@ async def _publish_execution_started(self, request: ExecutionRequestedEvent) -> await self.producer.produce(event_to_produce=create_pod_cmd, key=request.execution_id) - async def _publish_execution_accepted(self, request: ExecutionRequestedEvent, position: int, priority: int) -> None: - """Publish execution accepted event to notify that request was valid and queued""" + async def _publish_execution_accepted(self, request: ExecutionRequestedEvent, position: int) -> None: + """Publish execution accepted event to notify that request was valid and queued.""" self.logger.info(f"Publishing ExecutionAcceptedEvent for execution {request.execution_id}") event = ExecutionAcceptedEvent( execution_id=request.execution_id, queue_position=position, - estimated_wait_seconds=None, # Could calculate based on queue analysis - priority=priority, + estimated_wait_seconds=None, + priority=request.priority, metadata=request.metadata, ) await self.producer.produce(event_to_produce=event) async def _publish_queue_full(self, request: ExecutionRequestedEvent, error: str) -> None: - """Publish queue full event""" - # Get queue stats for context - queue_stats = await self.queue_manager.get_queue_stats() - + """Publish queue full event.""" event = ExecutionFailedEvent( execution_id=request.execution_id, error_type=ExecutionErrorType.RESOURCE_LIMIT, exit_code=-1, - stderr=f"Queue full: {error}. Queue size: {queue_stats.get('total_size', 'unknown')}", + stderr=f"Queue full: {error}. Queue size: {len(self._queue)}", resource_usage=None, metadata=request.metadata, error_message=error, @@ -254,7 +377,7 @@ async def _publish_queue_full(self, request: ExecutionRequestedEvent, error: str await self.producer.produce(event_to_produce=event, key=request.execution_id) async def _publish_scheduling_failed(self, request: ExecutionRequestedEvent, error: str) -> None: - """Publish scheduling failed event""" + """Publish scheduling failed event.""" event = ExecutionFailedEvent( execution_id=request.execution_id, error_type=ExecutionErrorType.SYSTEM_ERROR, @@ -266,4 +389,3 @@ async def _publish_scheduling_failed(self, request: ExecutionRequestedEvent, err ) await self.producer.produce(event_to_produce=event, key=request.execution_id) - diff --git a/backend/app/services/coordinator/queue_manager.py b/backend/app/services/coordinator/queue_manager.py deleted file mode 100644 index 8dab2643..00000000 --- a/backend/app/services/coordinator/queue_manager.py +++ /dev/null @@ -1,271 +0,0 @@ -import asyncio -import heapq -import logging -import time -from collections import defaultdict -from dataclasses import dataclass, field -from enum import IntEnum -from typing import Any - -from app.core.metrics import CoordinatorMetrics -from app.domain.events.typed import ExecutionRequestedEvent - - -class QueuePriority(IntEnum): - CRITICAL = 0 - HIGH = 1 - NORMAL = 5 - LOW = 8 - BACKGROUND = 10 - - -@dataclass(order=True) -class QueuedExecution: - priority: int - timestamp: float = field(compare=False) - event: ExecutionRequestedEvent = field(compare=False) - retry_count: int = field(default=0, compare=False) - - @property - def execution_id(self) -> str: - return self.event.execution_id - - @property - def user_id(self) -> str: - return self.event.metadata.user_id or "anonymous" - - @property - def age_seconds(self) -> float: - return time.time() - self.timestamp - - -class QueueManager: - def __init__( - self, - logger: logging.Logger, - coordinator_metrics: CoordinatorMetrics, - max_queue_size: int = 10000, - max_executions_per_user: int = 100, - stale_timeout_seconds: int = 3600, - ) -> None: - self.logger = logger - self.metrics = coordinator_metrics - self.max_queue_size = max_queue_size - self.max_executions_per_user = max_executions_per_user - self.stale_timeout_seconds = stale_timeout_seconds - - self._queue: list[QueuedExecution] = [] - self._queue_lock = asyncio.Lock() - self._user_execution_count: dict[str, int] = defaultdict(int) - self._execution_users: dict[str, str] = {} - self._cleanup_task: asyncio.Task[None] | None = None - self._running = False - - async def start(self) -> None: - if self._running: - return - - self._running = True - self._cleanup_task = asyncio.create_task(self._cleanup_stale_executions()) - self.logger.info("Queue manager started") - - async def stop(self) -> None: - if not self._running: - return - - self._running = False - - if self._cleanup_task: - self._cleanup_task.cancel() - try: - await self._cleanup_task - except asyncio.CancelledError: - pass - - self.logger.info(f"Queue manager stopped. Final queue size: {len(self._queue)}") - - async def add_execution( - self, event: ExecutionRequestedEvent, priority: QueuePriority | None = None - ) -> tuple[bool, int | None, str | None]: - async with self._queue_lock: - if len(self._queue) >= self.max_queue_size: - return False, None, "Queue is full" - - user_id = event.metadata.user_id or "anonymous" - - if self._user_execution_count[user_id] >= self.max_executions_per_user: - return False, None, f"User execution limit exceeded ({self.max_executions_per_user})" - - if priority is None: - priority = QueuePriority(event.priority) - - queued = QueuedExecution(priority=priority.value, timestamp=time.time(), event=event) - - heapq.heappush(self._queue, queued) - self._track_execution(event.execution_id, user_id) - position = self._get_queue_position(event.execution_id) - - # Update single authoritative metric for execution request queue depth - self.metrics.update_execution_request_queue_size(len(self._queue)) - - self.logger.info( - f"Added execution {event.execution_id} to queue. " - f"Priority: {priority.name}, Position: {position}, " - f"Queue size: {len(self._queue)}" - ) - - return True, position, None - - async def get_next_execution(self) -> ExecutionRequestedEvent | None: - async with self._queue_lock: - while self._queue: - queued = heapq.heappop(self._queue) - - if self._is_stale(queued): - self._untrack_execution(queued.execution_id) - self._record_removal("stale") - continue - - self._untrack_execution(queued.execution_id) - self._record_wait_time(queued) - # Update metric after removal from the queue - self.metrics.update_execution_request_queue_size(len(self._queue)) - - self.logger.info( - f"Retrieved execution {queued.execution_id} from queue. " - f"Wait time: {queued.age_seconds:.2f}s, Queue size: {len(self._queue)}" - ) - - return queued.event - - return None - - async def remove_execution(self, execution_id: str) -> bool: - async with self._queue_lock: - initial_size = len(self._queue) - self._queue = [q for q in self._queue if q.execution_id != execution_id] - - if len(self._queue) < initial_size: - heapq.heapify(self._queue) - self._untrack_execution(execution_id) - # Update metric after explicit removal - self.metrics.update_execution_request_queue_size(len(self._queue)) - self.logger.info(f"Removed execution {execution_id} from queue") - return True - - return False - - async def get_queue_position(self, execution_id: str) -> int | None: - async with self._queue_lock: - return self._get_queue_position(execution_id) - - async def get_queue_stats(self) -> dict[str, Any]: - async with self._queue_lock: - priority_counts: dict[str, int] = defaultdict(int) - user_counts: dict[str, int] = defaultdict(int) - - for queued in self._queue: - priority_name = QueuePriority(queued.priority).name - priority_counts[priority_name] += 1 - user_counts[queued.user_id] += 1 - - top_users = dict(sorted(user_counts.items(), key=lambda x: x[1], reverse=True)[:10]) - - return { - "total_size": len(self._queue), - "priority_distribution": dict(priority_counts), - "top_users": top_users, - "max_queue_size": self.max_queue_size, - "utilization_percent": (len(self._queue) / self.max_queue_size) * 100, - } - - async def requeue_execution( - self, event: ExecutionRequestedEvent, increment_retry: bool = True - ) -> tuple[bool, int | None, str | None]: - def _next_lower(p: QueuePriority) -> QueuePriority: - order = [ - QueuePriority.CRITICAL, - QueuePriority.HIGH, - QueuePriority.NORMAL, - QueuePriority.LOW, - QueuePriority.BACKGROUND, - ] - try: - idx = order.index(p) - except ValueError: - # Fallback: treat unknown numeric as NORMAL - idx = order.index(QueuePriority.NORMAL) - return order[min(idx + 1, len(order) - 1)] - - if increment_retry: - original_priority = QueuePriority(event.priority) - new_priority = _next_lower(original_priority) - else: - new_priority = QueuePriority(event.priority) - - return await self.add_execution(event, priority=new_priority) - - def _get_queue_position(self, execution_id: str) -> int | None: - for position, queued in enumerate(self._queue): - if queued.execution_id == execution_id: - return position - return None - - def _is_stale(self, queued: QueuedExecution) -> bool: - return queued.age_seconds > self.stale_timeout_seconds - - def _track_execution(self, execution_id: str, user_id: str) -> None: - self._user_execution_count[user_id] += 1 - self._execution_users[execution_id] = user_id - - def _untrack_execution(self, execution_id: str) -> None: - if execution_id in self._execution_users: - user_id = self._execution_users.pop(execution_id) - self._user_execution_count[user_id] -= 1 - if self._user_execution_count[user_id] <= 0: - del self._user_execution_count[user_id] - - def _record_removal(self, reason: str) -> None: - # No-op: we keep a single queue depth metric and avoid operation counters - return - - def _record_wait_time(self, queued: QueuedExecution) -> None: - self.metrics.record_queue_wait_time_by_priority( - queued.age_seconds, QueuePriority(queued.priority).name, "default" - ) - - def _update_add_metrics(self, priority: QueuePriority) -> None: - # Deprecated in favor of single execution queue depth metric - self.metrics.update_execution_request_queue_size(len(self._queue)) - - def _update_queue_size(self) -> None: - self.metrics.update_execution_request_queue_size(len(self._queue)) - - async def _cleanup_stale_executions(self) -> None: - while self._running: - try: - await asyncio.sleep(300) - - async with self._queue_lock: - stale_executions = [] - active_executions = [] - - for queued in self._queue: - if self._is_stale(queued): - stale_executions.append(queued) - else: - active_executions.append(queued) - - if stale_executions: - self._queue = active_executions - heapq.heapify(self._queue) - - for queued in stale_executions: - self._untrack_execution(queued.execution_id) - - # Update metric after stale cleanup - self.metrics.update_execution_request_queue_size(len(self._queue)) - self.logger.info(f"Cleaned {len(stale_executions)} stale executions from queue") - - except Exception as e: - self.logger.error(f"Error in queue cleanup: {e}") diff --git a/backend/app/services/execution_service.py b/backend/app/services/execution_service.py index 8798e9c3..34b90f30 100644 --- a/backend/app/services/execution_service.py +++ b/backend/app/services/execution_service.py @@ -8,7 +8,7 @@ from app.core.metrics import ExecutionMetrics from app.db.repositories.execution_repository import ExecutionRepository from app.domain.enums.events import EventType -from app.domain.enums.execution import ExecutionStatus +from app.domain.enums.execution import ExecutionStatus, QueuePriority from app.domain.events.typed import ( DomainEvent, EventMetadata, @@ -131,7 +131,7 @@ async def execute_script( user_agent: str | None, lang: str = "python", lang_version: str = "3.11", - priority: int = 5, + priority: QueuePriority = QueuePriority.NORMAL, timeout_override: int | None = None, ) -> DomainExecution: """ diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index bb96bf9d..ce149796 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -8,6 +8,7 @@ import pytest_asyncio import redis.asyncio as redis from app.core.database_context import Database +from app.domain.enums.execution import QueuePriority from app.domain.events.typed import EventMetadata, ExecutionRequestedEvent from app.main import create_app from app.settings import Settings @@ -196,7 +197,7 @@ def make_execution_requested_event( memory_limit: str = "128Mi", cpu_request: str = "50m", memory_request: str = "64Mi", - priority: int = 5, + priority: QueuePriority = QueuePriority.NORMAL, service_name: str = "tests", service_version: str = "1.0.0", user_id: str | None = None, diff --git a/backend/tests/e2e/services/coordinator/test_execution_coordinator.py b/backend/tests/e2e/services/coordinator/test_execution_coordinator.py index 4ffa4736..59ed202a 100644 --- a/backend/tests/e2e/services/coordinator/test_execution_coordinator.py +++ b/backend/tests/e2e/services/coordinator/test_execution_coordinator.py @@ -1,4 +1,5 @@ import pytest +from app.domain.enums.execution import QueuePriority from app.services.coordinator.coordinator import ExecutionCoordinator from dishka import AsyncContainer from tests.conftest import make_execution_requested_event @@ -29,7 +30,7 @@ async def test_handle_requested_with_priority( coord: ExecutionCoordinator = await scope.get(ExecutionCoordinator) ev = make_execution_requested_event( execution_id="e-priority-1", - priority=10, # High priority + priority=QueuePriority.BACKGROUND, ) await coord._handle_execution_requested(ev) # noqa: SLF001 @@ -53,19 +54,6 @@ async def test_handle_requested_unique_executions( assert "e-unique-2" in coord._active_executions # noqa: SLF001 -class TestQueueManager: - """Tests for queue manager integration.""" - - @pytest.mark.asyncio - async def test_queue_manager_initialized(self, scope: AsyncContainer) -> None: - """Queue manager is properly initialized.""" - coord: ExecutionCoordinator = await scope.get(ExecutionCoordinator) - - assert coord.queue_manager is not None - assert hasattr(coord.queue_manager, "add_execution") - assert hasattr(coord.queue_manager, "get_next_execution") - - class TestCoordinatorLifecycle: """Tests for coordinator lifecycle.""" diff --git a/backend/tests/e2e/test_k8s_worker_create_pod.py b/backend/tests/e2e/test_k8s_worker_create_pod.py index 7ef46d4b..74b7b105 100644 --- a/backend/tests/e2e/test_k8s_worker_create_pod.py +++ b/backend/tests/e2e/test_k8s_worker_create_pod.py @@ -3,6 +3,7 @@ import pytest from app.core.metrics import EventMetrics +from app.domain.enums.execution import QueuePriority from app.domain.events.typed import CreatePodCommandEvent, EventMetadata from app.events.core import EventDispatcher, UnifiedProducer from app.services.k8s_worker import KubernetesWorker @@ -50,7 +51,7 @@ async def test_worker_creates_configmap_and_pod( memory_limit="128Mi", cpu_request="50m", memory_request="64Mi", - priority=5, + priority=QueuePriority.NORMAL, metadata=EventMetadata(service_name="tests", service_version="1", user_id="u1"), ) diff --git a/backend/tests/unit/core/metrics/test_connections_and_coordinator_metrics.py b/backend/tests/unit/core/metrics/test_connections_and_coordinator_metrics.py index 202b7233..d49c5ddd 100644 --- a/backend/tests/unit/core/metrics/test_connections_and_coordinator_metrics.py +++ b/backend/tests/unit/core/metrics/test_connections_and_coordinator_metrics.py @@ -23,24 +23,10 @@ def test_connection_metrics_methods(test_settings: Settings) -> None: def test_coordinator_metrics_methods(test_settings: Settings) -> None: """Test CoordinatorMetrics methods with no-op metrics.""" m = CoordinatorMetrics(test_settings) - m.record_coordinator_processing_time(0.1) - m.record_scheduling_duration(0.2) - m.update_active_executions_gauge(2) - m.update_active_executions_gauge(1) - m.record_coordinator_queue_time(0.3, "high") - m.record_coordinator_execution_scheduled("ok") m.record_coordinator_scheduling_duration(0.4) m.update_coordinator_active_executions(3) - m.record_queue_wait_time_by_priority(0.5, "low", "q") + m.update_coordinator_active_executions(1) + m.record_coordinator_queue_time(0.3, "high") + m.record_coordinator_execution_scheduled("ok") m.update_execution_request_queue_size(10) m.update_execution_request_queue_size(6) - m.record_rate_limited("per_user", "u1") - m.update_rate_limit_wait_time("per_user", "u1", 1.0) - m.record_resource_allocation("cpu", 1.0, "e1") - m.record_resource_release("cpu", 0.5, "e1") - m.update_resource_usage("cpu", 75.0) - m.record_scheduling_decision("assign", "enough_resources") - m.record_queue_reordering("q", 2) - m.record_priority_change("e1", "low", "high") - m.update_rate_limiter_tokens("per_user", 5) - m.record_rate_limit_reset("per_user", "u1") diff --git a/backend/tests/unit/core/metrics/test_metrics_classes.py b/backend/tests/unit/core/metrics/test_metrics_classes.py index 589deb2d..ce9e7f8f 100644 --- a/backend/tests/unit/core/metrics/test_metrics_classes.py +++ b/backend/tests/unit/core/metrics/test_metrics_classes.py @@ -61,7 +61,7 @@ def test_event_metrics_smoke(test_settings: Settings) -> None: def test_other_metrics_classes_smoke(test_settings: Settings) -> None: """Test other metrics classes smoke test with no-op metrics.""" - CoordinatorMetrics(test_settings).record_coordinator_processing_time(0.01) + CoordinatorMetrics(test_settings).record_coordinator_scheduling_duration(0.01) DatabaseMetrics(test_settings).record_mongodb_operation("read", "ok") DLQMetrics(test_settings).record_dlq_message_received("topic", "type") ExecutionMetrics(test_settings).record_script_execution(ExecutionStatus.QUEUED, "python") diff --git a/backend/tests/unit/services/coordinator/test_coordinator_queue.py b/backend/tests/unit/services/coordinator/test_coordinator_queue.py new file mode 100644 index 00000000..ecb9ac6f --- /dev/null +++ b/backend/tests/unit/services/coordinator/test_coordinator_queue.py @@ -0,0 +1,95 @@ +import logging +from unittest.mock import AsyncMock, MagicMock + +import pytest +from app.core.metrics import CoordinatorMetrics +from app.domain.enums.execution import QueuePriority +from app.domain.events.typed import ExecutionRequestedEvent +from app.services.coordinator.coordinator import ExecutionCoordinator, QueueRejectError + +from tests.conftest import make_execution_requested_event + +_test_logger = logging.getLogger("test.services.coordinator") + +pytestmark = pytest.mark.unit + + +def _make_coordinator( + coordinator_metrics: CoordinatorMetrics, + *, + max_queue_size: int = 100, + max_executions_per_user: int = 100, +) -> ExecutionCoordinator: + return ExecutionCoordinator( + producer=AsyncMock(), + dispatcher=MagicMock(), + execution_repository=AsyncMock(), + logger=_test_logger, + coordinator_metrics=coordinator_metrics, + max_queue_size=max_queue_size, + max_executions_per_user=max_executions_per_user, + ) + + +def _ev(execution_id: str, priority: QueuePriority = QueuePriority.NORMAL) -> ExecutionRequestedEvent: + return make_execution_requested_event(execution_id=execution_id, priority=priority) + + +@pytest.mark.asyncio +async def test_add_to_queue_returns_position(coordinator_metrics: CoordinatorMetrics) -> None: + coord = _make_coordinator(coordinator_metrics) + position = await coord._add_to_queue(_ev("a")) # noqa: SLF001 + assert position == 0 + + +@pytest.mark.asyncio +async def test_add_to_queue_rejects_when_full(coordinator_metrics: CoordinatorMetrics) -> None: + coord = _make_coordinator(coordinator_metrics, max_queue_size=1) + await coord._add_to_queue(_ev("a")) # noqa: SLF001 + with pytest.raises(QueueRejectError, match="Queue is full"): + await coord._add_to_queue(_ev("b")) # noqa: SLF001 + + +@pytest.mark.asyncio +async def test_add_to_queue_rejects_user_limit(coordinator_metrics: CoordinatorMetrics) -> None: + coord = _make_coordinator(coordinator_metrics, max_executions_per_user=1) + await coord._add_to_queue(_ev("a", priority=QueuePriority.NORMAL)) # noqa: SLF001 + with pytest.raises(QueueRejectError, match="User execution limit exceeded"): + await coord._add_to_queue(_ev("b", priority=QueuePriority.NORMAL)) # noqa: SLF001 + + +@pytest.mark.asyncio +async def test_pop_next_returns_highest_priority(coordinator_metrics: CoordinatorMetrics) -> None: + coord = _make_coordinator(coordinator_metrics) + await coord._add_to_queue(_ev("low", priority=QueuePriority.LOW)) # noqa: SLF001 + await coord._add_to_queue(_ev("critical", priority=QueuePriority.CRITICAL)) # noqa: SLF001 + await coord._add_to_queue(_ev("normal", priority=QueuePriority.NORMAL)) # noqa: SLF001 + + first = await coord._pop_next() # noqa: SLF001 + assert first is not None + assert first.execution_id == "critical" + + second = await coord._pop_next() # noqa: SLF001 + assert second is not None + assert second.execution_id == "normal" + + +@pytest.mark.asyncio +async def test_pop_next_returns_none_when_empty(coordinator_metrics: CoordinatorMetrics) -> None: + coord = _make_coordinator(coordinator_metrics) + result = await coord._pop_next() # noqa: SLF001 + assert result is None + + +@pytest.mark.asyncio +async def test_remove_from_queue(coordinator_metrics: CoordinatorMetrics) -> None: + coord = _make_coordinator(coordinator_metrics) + await coord._add_to_queue(_ev("a")) # noqa: SLF001 + await coord._add_to_queue(_ev("b")) # noqa: SLF001 + + removed = await coord._remove_from_queue("a") # noqa: SLF001 + assert removed is True + + result = await coord._pop_next() # noqa: SLF001 + assert result is not None + assert result.execution_id == "b" diff --git a/backend/tests/unit/services/coordinator/test_queue_manager.py b/backend/tests/unit/services/coordinator/test_queue_manager.py deleted file mode 100644 index 671b19a7..00000000 --- a/backend/tests/unit/services/coordinator/test_queue_manager.py +++ /dev/null @@ -1,41 +0,0 @@ -import logging - -import pytest -from app.core.metrics import CoordinatorMetrics -from app.domain.events.typed import ExecutionRequestedEvent -from app.services.coordinator.queue_manager import QueueManager, QueuePriority - -from tests.conftest import make_execution_requested_event - -_test_logger = logging.getLogger("test.services.coordinator.queue_manager") - -pytestmark = pytest.mark.unit - - -def ev(execution_id: str, priority: int = QueuePriority.NORMAL.value) -> ExecutionRequestedEvent: - return make_execution_requested_event(execution_id=execution_id, priority=priority) - - -@pytest.mark.asyncio -async def test_requeue_execution_increments_priority(coordinator_metrics: CoordinatorMetrics) -> None: - qm = QueueManager(max_queue_size=10, logger=_test_logger, coordinator_metrics=coordinator_metrics) - await qm.start() - # Use NORMAL priority which can be incremented to LOW - e = ev("x", priority=QueuePriority.NORMAL.value) - await qm.add_execution(e) - await qm.requeue_execution(e, increment_retry=True) - nxt = await qm.get_next_execution() - assert nxt is not None - await qm.stop() - - -@pytest.mark.asyncio -async def test_queue_stats_empty_and_after_add(coordinator_metrics: CoordinatorMetrics) -> None: - qm = QueueManager(max_queue_size=5, logger=_test_logger, coordinator_metrics=coordinator_metrics) - await qm.start() - stats0 = await qm.get_queue_stats() - assert stats0["total_size"] == 0 - await qm.add_execution(ev("a")) - st = await qm.get_queue_stats() - assert st["total_size"] == 1 - await qm.stop() diff --git a/backend/tests/unit/services/test_pod_builder.py b/backend/tests/unit/services/test_pod_builder.py index 5720ed62..b7b0ee9b 100644 --- a/backend/tests/unit/services/test_pod_builder.py +++ b/backend/tests/unit/services/test_pod_builder.py @@ -2,6 +2,7 @@ from uuid import uuid4 import pytest +from app.domain.enums.execution import QueuePriority from app.domain.events.typed import CreatePodCommandEvent, EventMetadata from app.services.k8s_worker import PodBuilder from kubernetes_asyncio import client as k8s_client @@ -43,7 +44,7 @@ def create_pod_command(self) -> CreatePodCommandEvent: memory_request="256Mi", cpu_limit="1000m", memory_limit="1Gi", - priority=5, + priority=QueuePriority.NORMAL, metadata=EventMetadata( user_id=str(uuid4()), correlation_id=str(uuid4()), @@ -158,7 +159,7 @@ def test_container_resources_defaults( memory_request="", cpu_limit="", memory_limit="", - priority=5, + priority=QueuePriority.NORMAL, metadata=EventMetadata( service_name="svc", service_version="1", @@ -291,7 +292,7 @@ def test_pod_timeout_default( memory_request="128Mi", cpu_limit="500m", memory_limit="512Mi", - priority=5, + priority=QueuePriority.NORMAL, metadata=EventMetadata(user_id=str(uuid4()), service_name="t", service_version="1") ) @@ -348,7 +349,7 @@ def test_pod_labels_truncation( memory_limit="128Mi", cpu_request="50m", memory_request="64Mi", - priority=5, + priority=QueuePriority.NORMAL, metadata=EventMetadata( service_name="svc", service_version="1", @@ -405,7 +406,7 @@ def test_different_languages( memory_request="128Mi", cpu_limit="200m", memory_limit="256Mi", - priority=5, + priority=QueuePriority.NORMAL, metadata=EventMetadata(user_id=str(uuid4()), service_name="t", service_version="1") ) diff --git a/docs/architecture/execution-queue.md b/docs/architecture/execution-queue.md index 77576745..dfa3f4af 100644 --- a/docs/architecture/execution-queue.md +++ b/docs/architecture/execution-queue.md @@ -15,8 +15,7 @@ flowchart TB end subgraph Coordinator - COORD --> QUEUE[QueueManager] - QUEUE --> HEAP[(Priority Heap)] + COORD --> HEAP[(Priority Heap)] end subgraph Scheduling @@ -30,18 +29,17 @@ flowchart TB Executions enter the queue with one of five priority levels. Lower numeric values are processed first: ```python ---8<-- "backend/app/services/coordinator/queue_manager.py:14:19" +--8<-- "backend/app/services/coordinator/coordinator.py:32:37" ``` -The queue uses Python's `heapq` module, which efficiently maintains the priority ordering. When resources are -unavailable, executions are requeued with reduced priority to prevent starvation of lower-priority work. +The queue uses Python's `heapq` module, which efficiently maintains the priority ordering. ## Per-User Limits The queue enforces per-user execution limits to prevent a single user from monopolizing resources: ```python ---8<-- "backend/app/services/coordinator/queue_manager.py:42:54" +--8<-- "backend/app/services/coordinator/coordinator.py:70:79" ``` When a user exceeds their limit, new execution requests are rejected with an error message indicating the limit has been @@ -49,12 +47,8 @@ reached. ## Stale Timeout -Executions that sit in the queue too long (default 1 hour) are automatically removed by a background cleanup task. This -prevents abandoned requests from consuming queue space indefinitely: - -```python ---8<-- "backend/app/services/coordinator/queue_manager.py:243:267" -``` +Executions that sit in the queue too long (default 1 hour) are lazily swept when the queue is full. This prevents +abandoned requests from consuming queue space indefinitely. ## Reactive Scheduling @@ -90,5 +84,4 @@ the Kubernetes worker. | File | Purpose | |--------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------| -| [`services/coordinator/coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Main coordinator service | -| [`services/coordinator/queue_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/queue_manager.py) | Priority queue implementation | +| [`services/coordinator/coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Coordinator service with integrated priority queue | diff --git a/docs/architecture/services-overview.md b/docs/architecture/services-overview.md index f42d08cc..2a26bedb 100644 --- a/docs/architecture/services-overview.md +++ b/docs/architecture/services-overview.md @@ -12,7 +12,7 @@ EXECUTION_EVENTS carries lifecycle updates like queued, started, running, and ca ## Execution pipeline services -The coordinator/ module contains QueueManager which maintains an in-memory view of pending executions with priorities, aging, and backpressure. It doesn't own metrics for queue depth (that's centralized in coordinator metrics) and doesn't publish commands directly, instead emitting events for the Saga Orchestrator to process. This provides fairness, limits, and stale-job cleanup in one place while preventing double publications. +The coordinator/ module maintains an in-memory priority queue of pending executions with per-user limits, stale-job cleanup, and backpressure. Queue state is private to the ExecutionCoordinator class. The coordinator emits events for the Saga Orchestrator to process, providing fairness, limits, and cleanup in one place while preventing double publications. The saga/ module has ExecutionSaga which encodes the multi-step execution flow from receiving a request through creating a pod command, observing pod outcomes, and committing the result. The Saga Orchestrator subscribes to EXECUTION events, reconstructs sagas, and issues SAGA_COMMANDS to the worker with goals of idempotency across restarts, clean compensation on failure, and avoiding duplicate side-effects. diff --git a/docs/components/workers/coordinator.md b/docs/components/workers/coordinator.md index ba4e5b42..f6015cfd 100644 --- a/docs/components/workers/coordinator.md +++ b/docs/components/workers/coordinator.md @@ -28,11 +28,9 @@ pops the next item and publishes a `CreatePodCommand`. A dedup guard prevents do Executions are processed in priority order. Lower numeric values are processed first: ```python ---8<-- "backend/app/services/coordinator/queue_manager.py:14:19" +--8<-- "backend/app/services/coordinator/coordinator.py:32:37" ``` -When resources are unavailable, executions are requeued with reduced priority to prevent starvation. - ## Configuration | Parameter | Default | Description | @@ -50,9 +48,8 @@ When resources are unavailable, executions are requeued with reduced priority to | File | Purpose | |--------------------------------------------------------------------------------------------------------------------------------|-------------------------------| -| [`run_coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_coordinator.py) | Entry point | -| [`coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Main coordinator service | -| [`queue_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/queue_manager.py) | Priority queue implementation | +| [`run_coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/workers/run_coordinator.py) | Entry point | +| [`coordinator.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/coordinator/coordinator.py) | Coordinator service with integrated priority queue | ## Deployment diff --git a/docs/operations/metrics-reference.md b/docs/operations/metrics-reference.md index 19f12236..0089eb02 100644 --- a/docs/operations/metrics-reference.md +++ b/docs/operations/metrics-reference.md @@ -35,15 +35,13 @@ Track script execution performance and resource usage. Track scheduling and queue management. -| Metric | Type | Labels | Description | -|------------------------------------------|---------------|---------------------|---------------------------| -| `coordinator.processing.time` | Histogram | - | Event processing time | -| `coordinator.scheduling.duration` | Histogram | - | Scheduling time | -| `coordinator.executions.active` | UpDownCounter | - | Active managed executions | -| `coordinator.queue.wait_time` | Histogram | priority, queue | Queue wait by priority | -| `coordinator.executions.scheduled.total` | Counter | status | Scheduled executions | -| `coordinator.rate_limited.total` | Counter | limit_type, user_id | Rate limited requests | -| `coordinator.scheduling.decisions.total` | Counter | decision, reason | Scheduling decisions | +| Metric | Type | Labels | Description | +|------------------------------------------|---------------|----------|---------------------------| +| `coordinator.scheduling.duration` | Histogram | - | Scheduling time | +| `coordinator.executions.active` | UpDownCounter | - | Active managed executions | +| `coordinator.queue.wait_time` | Histogram | priority | Queue wait by priority | +| `execution.queue.depth` | UpDownCounter | - | Queue depth | +| `coordinator.executions.scheduled.total` | Counter | status | Scheduled executions | ### Rate Limit Metrics From 17c2069dcfca433ea2df3c97af834a98378bf4a7 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 21:30:20 +0100 Subject: [PATCH 4/5] removed weird internal dataclass --- backend/app/domain/enums/execution.py | 16 ++--- .../app/services/coordinator/coordinator.py | 67 +++++++------------ .../events/test_schema_registry_manager.py | 3 +- 3 files changed, 36 insertions(+), 50 deletions(-) diff --git a/backend/app/domain/enums/execution.py b/backend/app/domain/enums/execution.py index 86cca026..22b8764c 100644 --- a/backend/app/domain/enums/execution.py +++ b/backend/app/domain/enums/execution.py @@ -1,14 +1,14 @@ -from enum import IntEnum - from app.core.utils import StringEnum -class QueuePriority(IntEnum): - CRITICAL = 0 - HIGH = 1 - NORMAL = 5 - LOW = 8 - BACKGROUND = 10 +class QueuePriority(StringEnum): + """Execution priority, ordered highest to lowest.""" + + CRITICAL = "critical" + HIGH = "high" + NORMAL = "normal" + LOW = "low" + BACKGROUND = "background" class ExecutionStatus(StringEnum): diff --git a/backend/app/services/coordinator/coordinator.py b/backend/app/services/coordinator/coordinator.py index fed8a977..81c56288 100644 --- a/backend/app/services/coordinator/coordinator.py +++ b/backend/app/services/coordinator/coordinator.py @@ -3,7 +3,6 @@ import logging import time from collections import defaultdict -from dataclasses import dataclass, field from uuid import uuid4 from app.core.metrics import CoordinatorMetrics @@ -28,23 +27,7 @@ class QueueRejectError(Exception): """Raised when an execution cannot be added to the queue.""" -@dataclass(order=True) -class _QueuedExecution: - priority: QueuePriority - timestamp: float = field(compare=False) - event: ExecutionRequestedEvent = field(compare=False) - - @property - def execution_id(self) -> str: - return self.event.execution_id - - @property - def user_id(self) -> str: - return self.event.metadata.user_id or "anonymous" - - @property - def age_seconds(self) -> float: - return time.time() - self.timestamp +_PRIORITY_SORT = {p: i for i, p in enumerate(QueuePriority)} class ExecutionCoordinator: @@ -79,8 +62,9 @@ def __init__( self._max_per_user = max_executions_per_user self._stale_timeout = stale_timeout_seconds - # Queue state - self._queue: list[_QueuedExecution] = [] + # Queue state – heap of (priority_index, sequence, event) tuples + self._queue: list[tuple[int, int, ExecutionRequestedEvent]] = [] + self._enqueue_counter = 0 self._queue_lock = asyncio.Lock() self._user_counts: dict[str, int] = defaultdict(int) self._execution_users: dict[str, str] = {} @@ -192,7 +176,7 @@ async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None: await self._publish_create_pod_command(event) queue_time = start_time - event.timestamp.timestamp() - self.metrics.record_coordinator_queue_time(queue_time, event.priority.name) + self.metrics.record_coordinator_queue_time(queue_time, event.priority) scheduling_duration = time.time() - start_time self.metrics.record_coordinator_scheduling_duration(scheduling_duration) @@ -225,9 +209,8 @@ async def _add_to_queue(self, event: ExecutionRequestedEvent) -> int: if self._user_counts[user_id] >= self._max_per_user: raise QueueRejectError(f"User execution limit exceeded ({self._max_per_user})") - queued = _QueuedExecution(priority=event.priority, timestamp=time.time(), event=event) - - heapq.heappush(self._queue, queued) + self._enqueue_counter += 1 + heapq.heappush(self._queue, (_PRIORITY_SORT[event.priority], self._enqueue_counter, event)) self._track_user(event.execution_id, user_id) position = self._find_position(event.execution_id) or 0 @@ -235,7 +218,7 @@ async def _add_to_queue(self, event: ExecutionRequestedEvent) -> int: self.logger.info( f"Added execution {event.execution_id} to queue. " - f"Priority: {event.priority.name}, Position: {position}, " + f"Priority: {event.priority}, Position: {position}, " f"Queue size: {len(self._queue)}" ) @@ -245,22 +228,23 @@ async def _pop_next(self) -> ExecutionRequestedEvent | None: """Pop the highest-priority non-stale execution from the queue.""" async with self._queue_lock: while self._queue: - queued = heapq.heappop(self._queue) + _, _, event = heapq.heappop(self._queue) + age = time.time() - event.timestamp.timestamp() - if queued.age_seconds > self._stale_timeout: - self._untrack_user(queued.execution_id) + if age > self._stale_timeout: + self._untrack_user(event.execution_id) continue - self._untrack_user(queued.execution_id) - self.metrics.record_coordinator_queue_time(queued.age_seconds, queued.priority.name) + self._untrack_user(event.execution_id) + self.metrics.record_coordinator_queue_time(age, event.priority) self.metrics.update_execution_request_queue_size(len(self._queue)) self.logger.info( - f"Retrieved execution {queued.execution_id} from queue. " - f"Wait time: {queued.age_seconds:.2f}s, Queue size: {len(self._queue)}" + f"Retrieved execution {event.execution_id} from queue. " + f"Wait time: {age:.2f}s, Queue size: {len(self._queue)}" ) - return queued.event + return event return None @@ -268,7 +252,7 @@ async def _remove_from_queue(self, execution_id: str) -> bool: """Remove a specific execution from the queue (e.g. on cancellation).""" async with self._queue_lock: initial_size = len(self._queue) - self._queue = [q for q in self._queue if q.execution_id != execution_id] + self._queue = [(p, s, e) for p, s, e in self._queue if e.execution_id != execution_id] if len(self._queue) < initial_size: heapq.heapify(self._queue) @@ -280,8 +264,8 @@ async def _remove_from_queue(self, execution_id: str) -> bool: return False def _find_position(self, execution_id: str) -> int | None: - for position, queued in enumerate(self._queue): - if queued.execution_id == execution_id: + for position, (_, _, event) in enumerate(self._queue): + if event.execution_id == execution_id: return position return None @@ -298,14 +282,15 @@ def _untrack_user(self, execution_id: str) -> None: def _sweep_stale(self) -> None: """Remove stale executions from queue. Must be called under _queue_lock.""" - active: list[_QueuedExecution] = [] + active: list[tuple[int, int, ExecutionRequestedEvent]] = [] removed = 0 - for queued in self._queue: - if queued.age_seconds > self._stale_timeout: - self._untrack_user(queued.execution_id) + now = time.time() + for entry in self._queue: + if now - entry[2].timestamp.timestamp() > self._stale_timeout: + self._untrack_user(entry[2].execution_id) removed += 1 else: - active.append(queued) + active.append(entry) if removed: self._queue = active heapq.heapify(self._queue) diff --git a/backend/tests/unit/events/test_schema_registry_manager.py b/backend/tests/unit/events/test_schema_registry_manager.py index 6819237a..118c4c7c 100644 --- a/backend/tests/unit/events/test_schema_registry_manager.py +++ b/backend/tests/unit/events/test_schema_registry_manager.py @@ -1,6 +1,7 @@ import logging import pytest +from app.domain.enums.execution import QueuePriority from app.domain.events.typed import ExecutionRequestedEvent from app.events.schema.schema_registry import SchemaRegistryManager from app.settings import Settings @@ -24,7 +25,7 @@ def test_deserialize_json_execution_requested(test_settings: Settings) -> None: "memory_limit": "128Mi", "cpu_request": "50m", "memory_request": "64Mi", - "priority": 5, + "priority": QueuePriority.NORMAL, "metadata": {"service_name": "t", "service_version": "1.0"}, } ev = m.deserialize_json(data) From 2fe458d9ad3ed3ad5807b94fd4d0cef437784666 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Thu, 29 Jan 2026 21:50:22 +0100 Subject: [PATCH 5/5] fixes --- .../app/services/coordinator/coordinator.py | 33 +++++++++---------- docs/operations/metrics-reference.md | 1 - 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/backend/app/services/coordinator/coordinator.py b/backend/app/services/coordinator/coordinator.py index 81c56288..ce767163 100644 --- a/backend/app/services/coordinator/coordinator.py +++ b/backend/app/services/coordinator/coordinator.py @@ -62,8 +62,8 @@ def __init__( self._max_per_user = max_executions_per_user self._stale_timeout = stale_timeout_seconds - # Queue state – heap of (priority_index, sequence, event) tuples - self._queue: list[tuple[int, int, ExecutionRequestedEvent]] = [] + # Queue state – heap of (priority_index, sequence, enqueued_at, event) tuples + self._queue: list[tuple[int, int, float, ExecutionRequestedEvent]] = [] self._enqueue_counter = 0 self._queue_lock = asyncio.Lock() self._user_counts: dict[str, int] = defaultdict(int) @@ -129,6 +129,7 @@ async def _handle_execution_cancelled(self, event: ExecutionCancelledEvent) -> N removed = await self._remove_from_queue(execution_id) self._active_executions.discard(execution_id) + self._untrack_user(execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) if removed: @@ -141,6 +142,7 @@ async def _handle_execution_completed(self, event: ExecutionCompletedEvent) -> N execution_id = event.execution_id self._active_executions.discard(execution_id) + self._untrack_user(execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) self.logger.info(f"Execution {execution_id} completed") @@ -151,6 +153,7 @@ async def _handle_execution_failed(self, event: ExecutionFailedEvent) -> None: execution_id = event.execution_id self._active_executions.discard(execution_id) + self._untrack_user(execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) await self._try_schedule_next() @@ -175,22 +178,17 @@ async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None: try: await self._publish_create_pod_command(event) - queue_time = start_time - event.timestamp.timestamp() - self.metrics.record_coordinator_queue_time(queue_time, event.priority) - scheduling_duration = time.time() - start_time self.metrics.record_coordinator_scheduling_duration(scheduling_duration) self.metrics.record_coordinator_execution_scheduled("scheduled") - self.logger.info( - f"Scheduled execution {event.execution_id}. " - f"Queue time: {queue_time:.2f}s" - ) + self.logger.info(f"Scheduled execution {event.execution_id}") except Exception as e: self.logger.error(f"Failed to schedule execution {event.execution_id}: {e}", exc_info=True) self._active_executions.discard(event.execution_id) + self._untrack_user(event.execution_id) self.metrics.update_coordinator_active_executions(len(self._active_executions)) self.metrics.record_coordinator_execution_scheduled("error") @@ -210,7 +208,7 @@ async def _add_to_queue(self, event: ExecutionRequestedEvent) -> int: raise QueueRejectError(f"User execution limit exceeded ({self._max_per_user})") self._enqueue_counter += 1 - heapq.heappush(self._queue, (_PRIORITY_SORT[event.priority], self._enqueue_counter, event)) + heapq.heappush(self._queue, (_PRIORITY_SORT[event.priority], self._enqueue_counter, time.time(), event)) self._track_user(event.execution_id, user_id) position = self._find_position(event.execution_id) or 0 @@ -228,14 +226,13 @@ async def _pop_next(self) -> ExecutionRequestedEvent | None: """Pop the highest-priority non-stale execution from the queue.""" async with self._queue_lock: while self._queue: - _, _, event = heapq.heappop(self._queue) - age = time.time() - event.timestamp.timestamp() + _, _, enqueued_at, event = heapq.heappop(self._queue) + age = time.time() - enqueued_at if age > self._stale_timeout: self._untrack_user(event.execution_id) continue - self._untrack_user(event.execution_id) self.metrics.record_coordinator_queue_time(age, event.priority) self.metrics.update_execution_request_queue_size(len(self._queue)) @@ -252,7 +249,7 @@ async def _remove_from_queue(self, execution_id: str) -> bool: """Remove a specific execution from the queue (e.g. on cancellation).""" async with self._queue_lock: initial_size = len(self._queue) - self._queue = [(p, s, e) for p, s, e in self._queue if e.execution_id != execution_id] + self._queue = [(p, s, t, e) for p, s, t, e in self._queue if e.execution_id != execution_id] if len(self._queue) < initial_size: heapq.heapify(self._queue) @@ -264,7 +261,7 @@ async def _remove_from_queue(self, execution_id: str) -> bool: return False def _find_position(self, execution_id: str) -> int | None: - for position, (_, _, event) in enumerate(self._queue): + for position, (_, _, _, event) in enumerate(self._queue): if event.execution_id == execution_id: return position return None @@ -282,12 +279,12 @@ def _untrack_user(self, execution_id: str) -> None: def _sweep_stale(self) -> None: """Remove stale executions from queue. Must be called under _queue_lock.""" - active: list[tuple[int, int, ExecutionRequestedEvent]] = [] + active: list[tuple[int, int, float, ExecutionRequestedEvent]] = [] removed = 0 now = time.time() for entry in self._queue: - if now - entry[2].timestamp.timestamp() > self._stale_timeout: - self._untrack_user(entry[2].execution_id) + if now - entry[2] > self._stale_timeout: + self._untrack_user(entry[3].execution_id) removed += 1 else: active.append(entry) diff --git a/docs/operations/metrics-reference.md b/docs/operations/metrics-reference.md index 0089eb02..fd66870b 100644 --- a/docs/operations/metrics-reference.md +++ b/docs/operations/metrics-reference.md @@ -40,7 +40,6 @@ Track scheduling and queue management. | `coordinator.scheduling.duration` | Histogram | - | Scheduling time | | `coordinator.executions.active` | UpDownCounter | - | Active managed executions | | `coordinator.queue.wait_time` | Histogram | priority | Queue wait by priority | -| `execution.queue.depth` | UpDownCounter | - | Queue depth | | `coordinator.executions.scheduled.total` | Counter | status | Scheduled executions | ### Rate Limit Metrics