Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/core/k8s_clients.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: kubernetes client 31.0.0 has no type annotations (all Any)
import logging
from dataclasses import dataclass

Expand Down
118 changes: 31 additions & 87 deletions backend/app/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,20 @@
import logging
import re
from datetime import datetime, timezone
from typing import Any, Dict
from typing import Any

from opentelemetry import trace

correlation_id_context: contextvars.ContextVar[str | None] = contextvars.ContextVar("correlation_id", default=None)

request_metadata_context: contextvars.ContextVar[Dict[str, Any] | None] = contextvars.ContextVar(
request_metadata_context: contextvars.ContextVar[dict[str, Any] | None] = contextvars.ContextVar(
"request_metadata", default=None
)


class CorrelationFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
correlation_id = correlation_id_context.get()
if correlation_id:
record.correlation_id = correlation_id

metadata = request_metadata_context.get()
if metadata:
record.request_method = metadata.get("method")
record.request_path = metadata.get("path")
# Client IP is now safely extracted without DNS lookup
if metadata.get("client"):
record.client_host = metadata["client"].get("host")

return True


class JSONFormatter(logging.Formatter):
"""JSON formatter that reads context directly from typed sources."""

def _sanitize_sensitive_data(self, data: str) -> str:
"""Remove or mask sensitive information from log data."""
# Mask API keys, tokens, and similar sensitive data
Expand Down Expand Up @@ -59,89 +44,48 @@ def _sanitize_sensitive_data(self, data: str) -> str:
return data

def format(self, record: logging.LogRecord) -> str:
# Sanitize the message
message = self._sanitize_sensitive_data(record.getMessage())

log_data = {
log_data: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": message,
"message": self._sanitize_sensitive_data(record.getMessage()),
}

if hasattr(record, "correlation_id"):
log_data["correlation_id"] = record.correlation_id

if hasattr(record, "request_method"):
log_data["request_method"] = record.request_method

if hasattr(record, "request_path"):
log_data["request_path"] = record.request_path

if hasattr(record, "client_host"):
log_data["client_host"] = record.client_host

# OpenTelemetry trace context (hexadecimal ids)
if hasattr(record, "trace_id"):
log_data["trace_id"] = record.trace_id
if hasattr(record, "span_id"):
log_data["span_id"] = record.span_id

if record.exc_info:
exc_text = self.formatException(record.exc_info)
log_data["exc_info"] = self._sanitize_sensitive_data(exc_text)

if hasattr(record, "stack_info") and record.stack_info:
stack_text = self.formatStack(record.stack_info)
log_data["stack_info"] = self._sanitize_sensitive_data(stack_text)
# Correlation context - read directly from typed ContextVar
(v := correlation_id_context.get()) and log_data.update(correlation_id=v)

# Request metadata - read directly from typed ContextVar
metadata = request_metadata_context.get() or {}
(v := metadata.get("method")) and log_data.update(request_method=v)
(v := metadata.get("path")) and log_data.update(request_path=v)
(v := (metadata.get("client") or {}).get("host")) and log_data.update(client_host=v)

# OpenTelemetry trace context - read directly from typed trace API
span = trace.get_current_span()
if span.is_recording():
span_context = span.get_span_context()
if span_context.is_valid:
log_data["trace_id"] = format(span_context.trace_id, "032x")
log_data["span_id"] = format(span_context.span_id, "016x")

record.exc_info and log_data.update(
exc_info=self._sanitize_sensitive_data(self.formatException(record.exc_info))
)
record.stack_info and log_data.update(
stack_info=self._sanitize_sensitive_data(self.formatStack(record.stack_info))
)

return json.dumps(log_data, ensure_ascii=False)


LOG_LEVELS: dict[str, int] = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}


def setup_logger(log_level: str) -> logging.Logger:
"""Create and configure the application logger. Called by DI with Settings.LOG_LEVEL."""
new_logger = logging.getLogger("integr8scode")
new_logger.handlers.clear()

console_handler = logging.StreamHandler()
formatter = JSONFormatter()

console_handler.setFormatter(formatter)

correlation_filter = CorrelationFilter()
console_handler.addFilter(correlation_filter)

class TracingFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# Inline minimal helpers to avoid circular import on tracing.utils
span = trace.get_current_span()
trace_id = None
span_id = None
if span and span.is_recording():
span_context = span.get_span_context()
if span_context.is_valid:
trace_id = format(span_context.trace_id, "032x")
span_id = format(span_context.span_id, "016x")
if trace_id:
record.trace_id = trace_id
if span_id:
record.span_id = span_id
return True

console_handler.addFilter(TracingFilter())

console_handler.setFormatter(JSONFormatter())
new_logger.addHandler(console_handler)

level = LOG_LEVELS.get(log_level.upper(), logging.DEBUG)
new_logger.setLevel(level)
new_logger.setLevel(logging.getLevelNamesMapping().get(log_level.upper(), logging.DEBUG))

return new_logger
94 changes: 63 additions & 31 deletions backend/app/core/metrics/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from app.core.metrics.base import BaseMetrics
from app.domain.enums import ResourceType


class CoordinatorMetrics(BaseMetrics):
Expand Down Expand Up @@ -68,6 +69,18 @@ def _create_instruments(self) -> None:
name="coordinator.scheduling.decisions.total", description="Total scheduling decisions made", unit="1"
)

# Internal state tracking for gauge-like counters
self._active_executions_current: int = 0
self._exec_request_queue_size: int = 0
self._resource_cpu: float = 0.0
self._resource_memory: float = 0.0
self._resource_gpu: float = 0.0
self._resource_usage_cpu: float = 0.0
self._resource_usage_memory: float = 0.0
self._resource_usage_gpu: float = 0.0
self._rate_limiter_user: int = 0
self._rate_limiter_global: int = 0

def record_coordinator_processing_time(self, duration_seconds: float) -> None:
self.coordinator_processing_time.record(duration_seconds)

Expand All @@ -78,8 +91,7 @@ def update_active_executions_gauge(self, count: int) -> None:
"""Update the count of active executions (absolute value)."""
# Reset to 0 then set to new value (for gauge-like behavior)
# This is a workaround since we're using up_down_counter
current_val = getattr(self, "_active_executions_current", 0)
delta = count - current_val
delta = count - self._active_executions_current
if delta != 0:
self.coordinator_active_executions.add(delta)
self._active_executions_current = count
Expand All @@ -103,12 +115,10 @@ def record_queue_wait_time_by_priority(self, wait_seconds: float, priority: str,

def update_execution_request_queue_size(self, size: int) -> None:
"""Update the execution-only request queue depth (absolute value)."""
key = "_exec_request_queue_size"
current_val = getattr(self, key, 0)
delta = size - current_val
delta = size - self._exec_request_queue_size
if delta != 0:
self.execution_request_queue_depth.add(delta)
setattr(self, key, size)
self._exec_request_queue_size = size

def record_rate_limited(self, limit_type: str, user_id: str) -> None:
self.coordinator_rate_limited.add(1, attributes={"limit_type": limit_type, "user_id": user_id})
Expand All @@ -118,36 +128,55 @@ def update_rate_limit_wait_time(self, limit_type: str, user_id: str, wait_second
wait_seconds, attributes={"limit_type": limit_type, "user_id": user_id}
)

def record_resource_allocation(self, resource_type: str, amount: float, execution_id: str) -> None:
def record_resource_allocation(self, resource_type: ResourceType, amount: float, execution_id: str) -> None:
self.coordinator_resource_allocations.add(
1, attributes={"resource_type": resource_type, "execution_id": execution_id}
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = current_val + amount
setattr(self, key, new_val)

def record_resource_release(self, resource_type: str, amount: float, execution_id: str) -> None:
match resource_type:
case ResourceType.CPU:
self._resource_cpu += amount
case ResourceType.MEMORY:
self._resource_memory += amount
case ResourceType.GPU:
self._resource_gpu += amount

def record_resource_release(self, resource_type: ResourceType, amount: float, execution_id: str) -> None:
self.coordinator_resource_allocations.add(
-1, attributes={"resource_type": resource_type, "execution_id": execution_id}
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = max(0.0, current_val - amount)
setattr(self, key, new_val)

def update_resource_usage(self, resource_type: str, usage_percent: float) -> None:
match resource_type:
case ResourceType.CPU:
self._resource_cpu = max(0.0, self._resource_cpu - amount)
case ResourceType.MEMORY:
self._resource_memory = max(0.0, self._resource_memory - amount)
case ResourceType.GPU:
self._resource_gpu = max(0.0, self._resource_gpu - amount)

def update_resource_usage(self, resource_type: ResourceType, usage_percent: float) -> None:
# Record as a gauge-like metric
key = f"_resource_usage_{resource_type}"
current_val = getattr(self, key, 0.0)
delta = usage_percent - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": resource_type})
setattr(self, key, usage_percent)
match resource_type:
case ResourceType.CPU:
delta = usage_percent - self._resource_usage_cpu
delta != 0 and self.coordinator_resource_utilization.add(
delta, attributes={"resource_type": resource_type}
)
self._resource_usage_cpu = usage_percent
case ResourceType.MEMORY:
delta = usage_percent - self._resource_usage_memory
delta != 0 and self.coordinator_resource_utilization.add(
delta, attributes={"resource_type": resource_type}
)
self._resource_usage_memory = usage_percent
case ResourceType.GPU:
delta = usage_percent - self._resource_usage_gpu
delta != 0 and self.coordinator_resource_utilization.add(
delta, attributes={"resource_type": resource_type}
)
self._resource_usage_gpu = usage_percent

def record_scheduling_decision(self, decision: str, reason: str) -> None:
self.coordinator_scheduling_decisions.add(1, attributes={"decision": decision, "reason": reason})
Expand All @@ -167,12 +196,15 @@ def record_priority_change(self, execution_id: str, old_priority: str, new_prior

def update_rate_limiter_tokens(self, limit_type: str, tokens: int) -> None:
# Track tokens as gauge-like metric
key = f"_rate_limiter_{limit_type}"
current_val = getattr(self, key, 0)
delta = tokens - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": f"rate_limit_{limit_type}"})
setattr(self, key, tokens)
attrs = {"resource_type": f"rate_limit_{limit_type}"}
if limit_type == "user":
delta = tokens - self._rate_limiter_user
delta != 0 and self.coordinator_resource_utilization.add(delta, attributes=attrs)
self._rate_limiter_user = tokens
elif limit_type == "global":
delta = tokens - self._rate_limiter_global
delta != 0 and self.coordinator_resource_utilization.add(delta, attributes=attrs)
self._rate_limiter_global = tokens
Comment thread
HardMax71 marked this conversation as resolved.

def record_rate_limit_reset(self, limit_type: str, user_id: str) -> None:
self.coordinator_scheduling_decisions.add(
Expand Down
4 changes: 3 additions & 1 deletion backend/app/domain/enums/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from app.domain.enums.common import ErrorType, SortOrder, Theme
from app.domain.enums.common import Environment, ErrorType, ResourceType, SortOrder, Theme
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.health import AlertSeverity, AlertStatus, ComponentStatus
from app.domain.enums.notification import (
Expand All @@ -12,7 +12,9 @@

__all__ = [
# Common
"Environment",
"ErrorType",
"ResourceType",
"SortOrder",
"Theme",
# Execution
Expand Down
8 changes: 8 additions & 0 deletions backend/app/domain/enums/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,11 @@ class Environment(StringEnum):
STAGING = "staging"
PRODUCTION = "production"
TEST = "test"


class ResourceType(StringEnum):
"""Types of compute resources for metrics and allocation."""

CPU = "cpu"
MEMORY = "memory"
GPU = "gpu"
2 changes: 2 additions & 0 deletions backend/app/events/core/consumer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: aiokafka message headers are untyped (Any)
import asyncio
import logging
from collections.abc import Awaitable, Callable
Expand Down
4 changes: 2 additions & 2 deletions backend/app/services/admin/admin_user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ async def create_user(self, *, admin_username: str, user_data: UserCreate) -> Us
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password,
role=getattr(user_data, "role", UserRole.USER),
is_active=getattr(user_data, "is_active", True),
role=user_data.role,
is_active=user_data.is_active,
is_superuser=False,
)
created = await self._users.create_user(create_data)
Expand Down
5 changes: 2 additions & 3 deletions backend/app/services/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def _on_stop(self) -> None:
await self.queue_manager.stop()

# Close idempotency manager
if hasattr(self, "idempotency_manager") and self.idempotency_manager:
if self.idempotency_manager:
await self.idempotency_manager.close()

self.logger.info(f"ExecutionCoordinator service stopped. Active executions: {len(self._active_executions)}")
Expand Down Expand Up @@ -360,8 +360,7 @@ async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None:

# Track metrics
queue_time = start_time - event.timestamp.timestamp()
priority = getattr(event, "priority", QueuePriority.NORMAL.value)
self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(priority).name)
self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(event.priority).name)

scheduling_duration = time.time() - start_time
self.metrics.record_coordinator_scheduling_duration(scheduling_duration)
Expand Down
7 changes: 4 additions & 3 deletions backend/app/services/coordinator/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, List

from app.core.metrics.context import get_coordinator_metrics
from app.domain.enums import ResourceType


@dataclass
Expand Down Expand Up @@ -311,14 +312,14 @@ def _update_metrics(self) -> None:
"""Update metrics"""
cpu_usage = self.pool.total_cpu_cores - self.pool.available_cpu_cores
cpu_percent = cpu_usage / self.pool.total_cpu_cores * 100
self.metrics.update_resource_usage("cpu", cpu_percent)
self.metrics.update_resource_usage(ResourceType.CPU, cpu_percent)

memory_usage = self.pool.total_memory_mb - self.pool.available_memory_mb
memory_percent = memory_usage / self.pool.total_memory_mb * 100
self.metrics.update_resource_usage("memory", memory_percent)
self.metrics.update_resource_usage(ResourceType.MEMORY, memory_percent)

gpu_usage = self.pool.total_gpu_count - self.pool.available_gpu_count
gpu_percent = gpu_usage / max(1, self.pool.total_gpu_count) * 100
self.metrics.update_resource_usage("gpu", gpu_percent)
self.metrics.update_resource_usage(ResourceType.GPU, gpu_percent)

self.metrics.update_coordinator_active_executions(len(self._allocations))
Loading
Loading