Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def create_app_container(settings: Settings) -> AsyncContainer:
AdminServicesProvider(),
EventReplayProvider(),
BusinessServicesProvider(),
CoordinatorProvider(),
KubernetesProvider(),
ResourceCleanerProvider(),
FastapiProvider(),
Expand Down
154 changes: 17 additions & 137 deletions backend/app/core/metrics/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@ class CoordinatorMetrics(BaseMetrics):
"""Metrics for coordinator and scheduling operations."""

def _create_instruments(self) -> None:
# Coordinator processing metrics
self.coordinator_processing_time = self._meter.create_histogram(
name="coordinator.processing.time",
description="Time spent processing execution events in seconds",
unit="s",
)

self.coordinator_scheduling_duration = self._meter.create_histogram(
name="coordinator.scheduling.duration", description="Time spent scheduling executions in seconds", unit="s"
name="coordinator.scheduling.duration",
description="Time spent scheduling executions in seconds",
unit="s",
)

self.coordinator_active_executions = self._meter.create_up_down_counter(
Expand All @@ -22,159 +17,44 @@ def _create_instruments(self) -> None:
unit="1",
)

# Queue management metrics
self.coordinator_queue_time = self._meter.create_histogram(
name="coordinator.queue.wait_time",
description="Time spent waiting in coordinator queue by priority",
unit="s",
)

self.coordinator_queue_operations = self._meter.create_counter(
name="coordinator.queue.operations.total", description="Total queue operations (add/remove)", unit="1"
)

# Execution-only request queue depth (authoritative, maintained by coordinator)
self.execution_request_queue_depth = self._meter.create_up_down_counter(
name="execution.queue.depth",
description="Depth of user execution requests queued (excludes replays and non-request events)",
description="Depth of user execution requests queued",
unit="1",
)

# Scheduling metrics
self.coordinator_executions_scheduled = self._meter.create_counter(
name="coordinator.executions.scheduled.total", description="Total number of executions scheduled", unit="1"
)

# Rate limiting metrics
self.coordinator_rate_limited = self._meter.create_counter(
name="coordinator.rate_limited.total", description="Total number of rate-limited requests", unit="1"
)

self.coordinator_rate_limit_wait_time = self._meter.create_histogram(
name="coordinator.rate_limit.wait_time", description="Time clients wait due to rate limiting", unit="s"
)

# Resource management metrics
self.coordinator_resource_allocations = self._meter.create_counter(
name="coordinator.resource.allocations.total", description="Total number of resource allocations", unit="1"
)

self.coordinator_resource_utilization = self._meter.create_up_down_counter(
name="coordinator.resource.utilization", description="Current resource utilization", unit="1"
)

# Scheduling decision metrics
self.coordinator_scheduling_decisions = self._meter.create_counter(
name="coordinator.scheduling.decisions.total", description="Total scheduling decisions made", unit="1"
name="coordinator.executions.scheduled.total",
description="Total number of executions scheduled",
unit="1",
)

def record_coordinator_processing_time(self, duration_seconds: float) -> None:
self.coordinator_processing_time.record(duration_seconds)

def record_scheduling_duration(self, duration_seconds: float) -> None:
def record_coordinator_scheduling_duration(self, duration_seconds: float) -> None:
self.coordinator_scheduling_duration.record(duration_seconds)

def update_active_executions_gauge(self, count: int) -> None:
"""Update the count of active executions (absolute value)."""
# Reset to 0 then set to new value (for gauge-like behavior)
# This is a workaround since we're using up_down_counter
current_val = getattr(self, "_active_executions_current", 0)
delta = count - current_val
def update_coordinator_active_executions(self, count: int) -> None:
current = getattr(self, "_active_current", 0)
delta = count - current
if delta != 0:
self.coordinator_active_executions.add(delta)
self._active_executions_current = count
self._active_current = count

def record_coordinator_queue_time(self, wait_seconds: float, priority: str) -> None:
self.coordinator_queue_time.record(wait_seconds, attributes={"priority": priority})

def record_coordinator_execution_scheduled(self, status: str) -> None:
self.coordinator_executions_scheduled.add(1, attributes={"status": status})

def record_coordinator_scheduling_duration(self, duration_seconds: float) -> None:
self.coordinator_scheduling_duration.record(duration_seconds)

def update_coordinator_active_executions(self, count: int) -> None:
self.update_active_executions_gauge(count)

def record_queue_wait_time_by_priority(self, wait_seconds: float, priority: str, queue_name: str) -> None:
self.coordinator_queue_time.record(wait_seconds, attributes={"priority": priority, "queue": queue_name})

# Removed legacy coordinator.queue.size; use execution.queue.depth instead

def update_execution_request_queue_size(self, size: int) -> None:
"""Update the execution-only request queue depth (absolute value)."""
key = "_exec_request_queue_size"
current_val = getattr(self, key, 0)
delta = size - current_val
current = getattr(self, "_queue_depth_current", 0)
delta = size - current
if delta != 0:
self.execution_request_queue_depth.add(delta)
setattr(self, key, size)

def record_rate_limited(self, limit_type: str, user_id: str) -> None:
self.coordinator_rate_limited.add(1, attributes={"limit_type": limit_type, "user_id": user_id})

def update_rate_limit_wait_time(self, limit_type: str, user_id: str, wait_seconds: float) -> None:
self.coordinator_rate_limit_wait_time.record(
wait_seconds, attributes={"limit_type": limit_type, "user_id": user_id}
)

def record_resource_allocation(self, resource_type: str, amount: float, execution_id: str) -> None:
self.coordinator_resource_allocations.add(
1, attributes={"resource_type": resource_type, "execution_id": execution_id}
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = current_val + amount
setattr(self, key, new_val)

def record_resource_release(self, resource_type: str, amount: float, execution_id: str) -> None:
self.coordinator_resource_allocations.add(
-1, attributes={"resource_type": resource_type, "execution_id": execution_id}
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = max(0.0, current_val - amount)
setattr(self, key, new_val)
self._queue_depth_current = size

def update_resource_usage(self, resource_type: str, usage_percent: float) -> None:
# Record as a gauge-like metric
key = f"_resource_usage_{resource_type}"
current_val = getattr(self, key, 0.0)
delta = usage_percent - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": resource_type})
setattr(self, key, usage_percent)

def record_scheduling_decision(self, decision: str, reason: str) -> None:
self.coordinator_scheduling_decisions.add(1, attributes={"decision": decision, "reason": reason})

def record_queue_reordering(self, queue_name: str, items_moved: int) -> None:
self.coordinator_queue_operations.add(1, attributes={"operation": "reorder", "queue": queue_name})

# Record the number of items moved as a histogram
self.coordinator_queue_time.record(
float(items_moved), attributes={"priority": "reordered", "queue": queue_name}
)

def record_priority_change(self, execution_id: str, old_priority: str, new_priority: str) -> None:
self.coordinator_scheduling_decisions.add(
1, attributes={"decision": "priority_change", "reason": f"{old_priority}_to_{new_priority}"}
)

def update_rate_limiter_tokens(self, limit_type: str, tokens: int) -> None:
# Track tokens as gauge-like metric
key = f"_rate_limiter_{limit_type}"
current_val = getattr(self, key, 0)
delta = tokens - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": f"rate_limit_{limit_type}"})
setattr(self, key, tokens)

def record_rate_limit_reset(self, limit_type: str, user_id: str) -> None:
self.coordinator_scheduling_decisions.add(
1, attributes={"decision": "rate_limit_reset", "reason": f"{limit_type}_for_{user_id}"}
)
def record_coordinator_execution_scheduled(self, status: str) -> None:
self.coordinator_executions_scheduled.add(1, attributes={"status": status})
108 changes: 68 additions & 40 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,46 +590,9 @@ async def _provide_saga_orchestrator(
yield orchestrator


async def _provide_execution_coordinator(
kafka_producer: UnifiedProducer,
schema_registry: SchemaRegistryManager,
settings: Settings,
event_store: EventStore,
execution_repository: ExecutionRepository,
idempotency_manager: IdempotencyManager,
logger: logging.Logger,
coordinator_metrics: CoordinatorMetrics,
event_metrics: EventMetrics,
) -> AsyncIterator[ExecutionCoordinator]:
"""Shared factory for ExecutionCoordinator with lifecycle management."""
dispatcher = IdempotentEventDispatcher(
logger=logger,
idempotency_manager=idempotency_manager,
key_strategy=KeyStrategy.EVENT_BASED,
ttl_seconds=7200,
)
async with ExecutionCoordinator(
producer=kafka_producer,
schema_registry_manager=schema_registry,
settings=settings,
event_store=event_store,
execution_repository=execution_repository,
dispatcher=dispatcher,
logger=logger,
coordinator_metrics=coordinator_metrics,
event_metrics=event_metrics,
) as coordinator:
yield coordinator


class BusinessServicesProvider(Provider):
scope = Scope.REQUEST

def __init__(self) -> None:
super().__init__()
# Register shared factory functions on instance (avoids warning about missing self)
self.provide(_provide_execution_coordinator)

@provide
def get_saga_service(
self,
Expand Down Expand Up @@ -702,9 +665,74 @@ def get_admin_user_service(
class CoordinatorProvider(Provider):
scope = Scope.APP

def __init__(self) -> None:
super().__init__()
self.provide(_provide_execution_coordinator)
@provide
def get_coordinator_dispatcher(
self, logger: logging.Logger, idempotency_manager: IdempotencyManager
) -> EventDispatcher:
"""Create idempotent EventDispatcher for coordinator."""
return IdempotentEventDispatcher(
logger=logger,
idempotency_manager=idempotency_manager,
key_strategy=KeyStrategy.EVENT_BASED,
ttl_seconds=7200,
)

@provide
def get_execution_coordinator(
self,
producer: UnifiedProducer,
dispatcher: EventDispatcher,
execution_repository: ExecutionRepository,
logger: logging.Logger,
coordinator_metrics: CoordinatorMetrics,
) -> ExecutionCoordinator:
"""Create ExecutionCoordinator - registers handlers on dispatcher in constructor."""
return ExecutionCoordinator(
producer=producer,
dispatcher=dispatcher,
execution_repository=execution_repository,
logger=logger,
coordinator_metrics=coordinator_metrics,
)

@provide
async def get_coordinator_consumer(
self,
coordinator: ExecutionCoordinator, # Ensures coordinator created first (handlers registered)
dispatcher: EventDispatcher,
schema_registry: SchemaRegistryManager,
settings: Settings,
logger: logging.Logger,
event_metrics: EventMetrics,
) -> AsyncIterator[UnifiedConsumer]:
"""Create and start consumer for coordinator."""
consumer_config = ConsumerConfig(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=GroupId.EXECUTION_COORDINATOR,
enable_auto_commit=False,
session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS,
heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS,
max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS,
request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS,
)

consumer = UnifiedConsumer(
consumer_config,
event_dispatcher=dispatcher,
schema_registry=schema_registry,
settings=settings,
logger=logger,
event_metrics=event_metrics,
)

await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.EXECUTION_COORDINATOR]))
logger.info("Coordinator consumer started")

try:
yield consumer
finally:
await consumer.stop()
logger.info("Coordinator consumer stopped")


class K8sWorkerProvider(Provider):
Expand Down
3 changes: 2 additions & 1 deletion backend/app/domain/enums/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from app.domain.enums.common import ErrorType, SortOrder, Theme
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.execution import ExecutionStatus, QueuePriority
from app.domain.enums.health import AlertSeverity, AlertStatus, ComponentStatus
from app.domain.enums.notification import (
NotificationChannel,
Expand All @@ -17,6 +17,7 @@
"Theme",
# Execution
"ExecutionStatus",
"QueuePriority",
# Health
"AlertSeverity",
"AlertStatus",
Expand Down
10 changes: 10 additions & 0 deletions backend/app/domain/enums/execution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
from app.core.utils import StringEnum


class QueuePriority(StringEnum):
"""Execution priority, ordered highest to lowest."""

CRITICAL = "critical"
HIGH = "high"
NORMAL = "normal"
LOW = "low"
BACKGROUND = "background"


class ExecutionStatus(StringEnum):
"""Status of an execution."""

Expand Down
7 changes: 4 additions & 3 deletions backend/app/domain/events/typed.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from app.domain.enums.auth import LoginMethod
from app.domain.enums.common import Environment
from app.domain.enums.events import EventType
from app.domain.enums.execution import QueuePriority
from app.domain.enums.notification import NotificationChannel, NotificationSeverity
from app.domain.enums.storage import ExecutionErrorType, StorageType

Expand Down Expand Up @@ -75,15 +76,15 @@ class ExecutionRequestedEvent(BaseEvent):
memory_limit: str
cpu_request: str
memory_request: str
priority: int = 5
priority: QueuePriority = QueuePriority.NORMAL


class ExecutionAcceptedEvent(BaseEvent):
event_type: Literal[EventType.EXECUTION_ACCEPTED] = EventType.EXECUTION_ACCEPTED
execution_id: str
queue_position: int
estimated_wait_seconds: float | None = None
priority: int = 5
priority: QueuePriority = QueuePriority.NORMAL


class ExecutionQueuedEvent(BaseEvent):
Expand Down Expand Up @@ -428,7 +429,7 @@ class CreatePodCommandEvent(BaseEvent):
memory_limit: str
cpu_request: str
memory_request: str
priority: int = 5
priority: QueuePriority = QueuePriority.NORMAL


class DeletePodCommandEvent(BaseEvent):
Expand Down
Loading
Loading