diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 389bd80c..64d6afc6 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -6,10 +6,18 @@ repos:
# Local hooks using uv run to match CI exactly
- repo: local
hooks:
- # Ruff - matches CI: cd backend && uv run ruff check . --config pyproject.toml
+ # Ruff format - auto-fix formatting
+ - id: ruff-format-backend
+ name: ruff format (backend)
+ entry: bash -c 'cd backend && uv run ruff format --config pyproject.toml .'
+ language: system
+ files: ^backend/.*\.py$
+ pass_filenames: false
+
+ # Ruff check - auto-fix safe lint issues (import sorting, etc.)
- id: ruff-backend
name: ruff check (backend)
- entry: bash -c 'cd backend && uv run ruff check . --config pyproject.toml'
+ entry: bash -c 'cd backend && uv run ruff check --fix --config pyproject.toml .'
language: system
files: ^backend/.*\.py$
pass_filenames: false
@@ -46,10 +54,10 @@ repos:
files: ^frontend/src/.*\.css$
pass_filenames: false
- # Prettier - matches CI: cd frontend && npx prettier --check
+ # Prettier - auto-fix formatting
- id: prettier-frontend
name: prettier (frontend)
- entry: bash -c 'cd frontend && npx prettier --check "src/**/*.{ts,svelte,json}"'
+ entry: bash -c 'cd frontend && npx prettier --write "src/**/*.{ts,svelte,json}"'
language: system
files: ^frontend/src/.*\.(ts|svelte|json)$
pass_filenames: false
diff --git a/backend/app/api/routes/admin/executions.py b/backend/app/api/routes/admin/executions.py
index 02bf8526..8e11ce01 100644
--- a/backend/app/api/routes/admin/executions.py
+++ b/backend/app/api/routes/admin/executions.py
@@ -39,7 +39,11 @@ async def list_executions(
skip: int = Query(0, ge=0),
) -> AdminExecutionListResponse:
executions, total = await service.list_executions(
- status=status, priority=priority, user_id=user_id, limit=limit, skip=skip,
+ status=status,
+ priority=priority,
+ user_id=user_id,
+ limit=limit,
+ skip=skip,
)
return AdminExecutionListResponse(
executions=[AdminExecutionResponse.model_validate(e) for e in executions],
diff --git a/backend/app/api/routes/admin/users.py b/backend/app/api/routes/admin/users.py
index 1209d776..1aa22a30 100644
--- a/backend/app/api/routes/admin/users.py
+++ b/backend/app/api/routes/admin/users.py
@@ -144,9 +144,7 @@ async def delete_user(
if admin.user_id == user_id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")
- result = await admin_user_service.delete_user(
- admin_user_id=admin.user_id, user_id=user_id, cascade=cascade
- )
+ result = await admin_user_service.delete_user(admin_user_id=admin.user_id, user_id=user_id, cascade=cascade)
return DeleteUserResponse.model_validate(result)
diff --git a/backend/app/api/routes/execution.py b/backend/app/api/routes/execution.py
index af8736fa..38ad192d 100644
--- a/backend/app/api/routes/execution.py
+++ b/backend/app/api/routes/execution.py
@@ -32,9 +32,9 @@
@inject
async def get_execution_with_access(
- execution_id: Annotated[str, Path()],
- current_user: Annotated[User, Depends(current_user)],
- execution_service: FromDishka[ExecutionService],
+ execution_id: Annotated[str, Path()],
+ current_user: Annotated[User, Depends(current_user)],
+ execution_service: FromDishka[ExecutionService],
) -> ExecutionInDB:
domain_exec = await execution_service.get_execution_result(execution_id)
@@ -50,22 +50,24 @@ async def get_execution_with_access(
responses={500: {"model": ErrorResponse, "description": "Script execution failed"}},
)
async def create_execution(
- request: Request,
- current_user: Annotated[User, Depends(current_user)],
- execution: ExecutionRequest,
- execution_service: FromDishka[ExecutionService],
- idempotency_key: Annotated[str | None, Header(alias="Idempotency-Key")] = None,
+ request: Request,
+ current_user: Annotated[User, Depends(current_user)],
+ execution: ExecutionRequest,
+ execution_service: FromDishka[ExecutionService],
+ idempotency_key: Annotated[str | None, Header(alias="Idempotency-Key")] = None,
) -> ExecutionResponse:
"""Submit a script for execution in an isolated Kubernetes pod."""
- trace.get_current_span().set_attributes({
- "http.method": "POST",
- "http.route": "/api/v1/execute",
- "execution.language": execution.lang,
- "execution.language_version": execution.lang_version,
- "execution.script_length": len(execution.script),
- "user.id": current_user.user_id,
- "client.address": get_client_ip(request),
- })
+ trace.get_current_span().set_attributes(
+ {
+ "http.method": "POST",
+ "http.route": "/api/v1/execute",
+ "execution.language": execution.lang,
+ "execution.language_version": execution.lang_version,
+ "execution.script_length": len(execution.script),
+ "user.id": current_user.user_id,
+ "client.address": get_client_ip(request),
+ }
+ )
exec_result = await execution_service.execute_script_idempotent(
script=execution.script,
@@ -83,7 +85,7 @@ async def create_execution(
responses={403: {"model": ErrorResponse, "description": "Not the owner of this execution"}},
)
async def get_result(
- execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
+ execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
) -> ExecutionResult:
"""Retrieve the result of a specific execution."""
return ExecutionResult.model_validate(execution)
@@ -98,10 +100,10 @@ async def get_result(
},
)
async def cancel_execution(
- execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
- current_user: Annotated[User, Depends(current_user)],
- cancel_request: CancelExecutionRequest,
- execution_service: FromDishka[ExecutionService],
+ execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
+ current_user: Annotated[User, Depends(current_user)],
+ cancel_request: CancelExecutionRequest,
+ execution_service: FromDishka[ExecutionService],
) -> CancelResponse:
"""Cancel a running or queued execution."""
result = await execution_service.cancel_execution(
@@ -122,9 +124,9 @@ async def cancel_execution(
},
)
async def retry_execution(
- original_execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
- current_user: Annotated[User, Depends(current_user)],
- execution_service: FromDishka[ExecutionService],
+ original_execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
+ current_user: Annotated[User, Depends(current_user)],
+ execution_service: FromDishka[ExecutionService],
) -> ExecutionResponse:
"""Retry a failed or completed execution."""
@@ -146,10 +148,10 @@ async def retry_execution(
responses={403: {"model": ErrorResponse, "description": "Not the owner of this execution"}},
)
async def get_execution_events(
- execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
- event_service: FromDishka[EventService],
- event_types: Annotated[list[EventType] | None, Query(description="Event types to filter")] = None,
- limit: Annotated[int, Query(ge=1, le=1000)] = 100,
+ execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
+ event_service: FromDishka[EventService],
+ event_types: Annotated[list[EventType] | None, Query(description="Event types to filter")] = None,
+ limit: Annotated[int, Query(ge=1, le=1000)] = 100,
) -> list[DomainEvent]:
"""Get all events for an execution."""
events = await event_service.get_events_by_execution_id(
@@ -160,14 +162,14 @@ async def get_execution_events(
@router.get("/user/executions", response_model=ExecutionListResponse)
async def get_user_executions(
- current_user: Annotated[User, Depends(current_user)],
- execution_service: FromDishka[ExecutionService],
- status: Annotated[ExecutionStatus | None, Query(description="Filter by execution status")] = None,
- lang: Annotated[str | None, Query(description="Filter by programming language")] = None,
- start_time: Annotated[datetime | None, Query(description="Filter executions created after this time")] = None,
- end_time: Annotated[datetime | None, Query(description="Filter executions created before this time")] = None,
- limit: Annotated[int, Query(ge=1, le=200)] = 50,
- skip: Annotated[int, Query(ge=0)] = 0,
+ current_user: Annotated[User, Depends(current_user)],
+ execution_service: FromDishka[ExecutionService],
+ status: Annotated[ExecutionStatus | None, Query(description="Filter by execution status")] = None,
+ lang: Annotated[str | None, Query(description="Filter by programming language")] = None,
+ start_time: Annotated[datetime | None, Query(description="Filter executions created after this time")] = None,
+ end_time: Annotated[datetime | None, Query(description="Filter executions created before this time")] = None,
+ limit: Annotated[int, Query(ge=1, le=200)] = 50,
+ skip: Annotated[int, Query(ge=0)] = 0,
) -> ExecutionListResponse:
"""Get executions for the current user."""
@@ -194,7 +196,7 @@ async def get_user_executions(
@router.get("/example-scripts", response_model=ExampleScripts)
async def get_example_scripts(
- execution_service: FromDishka[ExecutionService],
+ execution_service: FromDishka[ExecutionService],
) -> ExampleScripts:
"""Get example scripts for the code editor."""
scripts = await execution_service.get_example_scripts()
@@ -203,7 +205,7 @@ async def get_example_scripts(
@router.get("/k8s-limits", response_model=ResourceLimits)
async def get_k8s_resource_limits(
- execution_service: FromDishka[ExecutionService],
+ execution_service: FromDishka[ExecutionService],
) -> ResourceLimits:
"""Get Kubernetes resource limits for script execution."""
limits = await execution_service.get_k8s_resource_limits()
@@ -212,9 +214,9 @@ async def get_k8s_resource_limits(
@router.delete("/executions/{execution_id}", response_model=DeleteResponse)
async def delete_execution(
- execution_id: str,
- admin: Annotated[User, Depends(admin_user)],
- execution_service: FromDishka[ExecutionService],
+ execution_id: str,
+ admin: Annotated[User, Depends(admin_user)],
+ execution_service: FromDishka[ExecutionService],
) -> DeleteResponse:
"""Delete an execution and its associated data (admin only)."""
await execution_service.delete_execution(execution_id, admin.user_id)
diff --git a/backend/app/core/dishka_lifespan.py b/backend/app/core/dishka_lifespan.py
index 730aaee6..921b304f 100644
--- a/backend/app/core/dishka_lifespan.py
+++ b/backend/app/core/dishka_lifespan.py
@@ -50,7 +50,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
await container.get(Tracer)
FastAPIInstrumentor().instrument_app(
- app, tracer_provider=trace.get_tracer_provider(), excluded_urls="health,metrics,docs,openapi.json",
+ app,
+ tracer_provider=trace.get_tracer_provider(),
+ excluded_urls="health,metrics,docs,openapi.json",
)
logger.info("FastAPI OpenTelemetry instrumentation applied")
diff --git a/backend/app/core/exceptions/handlers.py b/backend/app/core/exceptions/handlers.py
index 7453817e..61842649 100644
--- a/backend/app/core/exceptions/handlers.py
+++ b/backend/app/core/exceptions/handlers.py
@@ -46,4 +46,6 @@ def _map_to_status_code(exc: DomainError) -> int:
if isinstance(exc, InfrastructureError):
return 500
return 500
+
+
# --8<-- [end:configure_exception_handlers]
diff --git a/backend/app/core/logging.py b/backend/app/core/logging.py
index 5dd70bb3..048c40fe 100644
--- a/backend/app/core/logging.py
+++ b/backend/app/core/logging.py
@@ -102,6 +102,8 @@ def setup_logger(log_level: str) -> structlog.stdlib.BoundLogger:
logger: structlog.stdlib.BoundLogger = structlog.get_logger("integr8scode")
return logger
+
+
# --8<-- [end:setup_logger]
@@ -122,13 +124,15 @@ def setup_log_exporter(settings: Settings, logger: structlog.stdlib.BoundLogger)
if not settings.OTEL_EXPORTER_OTLP_ENDPOINT:
return
- resource = Resource.create({
- SERVICE_NAME: settings.SERVICE_NAME,
- SERVICE_VERSION: settings.SERVICE_VERSION,
- "service.namespace": "integr8scode",
- "deployment.environment": settings.ENVIRONMENT,
- "service.instance.id": settings.HOSTNAME,
- })
+ resource = Resource.create(
+ {
+ SERVICE_NAME: settings.SERVICE_NAME,
+ SERVICE_VERSION: settings.SERVICE_VERSION,
+ "service.namespace": "integr8scode",
+ "deployment.environment": settings.ENVIRONMENT,
+ "service.instance.id": settings.HOSTNAME,
+ }
+ )
endpoint = settings.OTEL_EXPORTER_OTLP_ENDPOINT
log_exporter = OTLPLogExporter(
diff --git a/backend/app/core/metrics/base.py b/backend/app/core/metrics/base.py
index dd57512a..c5477707 100644
--- a/backend/app/core/metrics/base.py
+++ b/backend/app/core/metrics/base.py
@@ -18,6 +18,7 @@ def __init__(self, settings: Settings, meter_name: str | None = None):
meter_name = meter_name or self.__class__.__name__
self._meter = metrics.get_meter(meter_name)
self._create_instruments()
+
# --8<-- [end:init]
def _create_instruments(self) -> None:
diff --git a/backend/app/core/metrics/dlq.py b/backend/app/core/metrics/dlq.py
index d4262223..ab35a054 100644
--- a/backend/app/core/metrics/dlq.py
+++ b/backend/app/core/metrics/dlq.py
@@ -65,4 +65,3 @@ def record_dlq_processing_error(self, original_topic: str, event_type: str, erro
self.dlq_processing_errors.add(
1, attributes={"original_topic": original_topic, "event_type": event_type, "error_type": error_type}
)
-
diff --git a/backend/app/core/middlewares/metrics.py b/backend/app/core/middlewares/metrics.py
index 69056af4..20a75f6e 100644
--- a/backend/app/core/middlewares/metrics.py
+++ b/backend/app/core/middlewares/metrics.py
@@ -118,6 +118,7 @@ def _get_path_template(path: str) -> str:
path = re.sub(r"/[0-9a-f]{24}", "/{id}", path)
return path
+
# --8<-- [end:path_template]
@@ -208,4 +209,6 @@ def get_process_metrics(_: CallbackOptions) -> list[Observation]:
meter.create_observable_gauge(
name="process_metrics", callbacks=[get_process_metrics], description="Process-level metrics", unit="mixed"
)
+
+
# --8<-- [end:system_metrics]
diff --git a/backend/app/core/middlewares/rate_limit.py b/backend/app/core/middlewares/rate_limit.py
index d1349b17..c35ac21a 100644
--- a/backend/app/core/middlewares/rate_limit.py
+++ b/backend/app/core/middlewares/rate_limit.py
@@ -110,6 +110,7 @@ def _extract_user_id(request: Request) -> str:
craft arbitrary bucket keys to bypass IP-based limits.
"""
return f"ip:{get_client_ip(request)}"
+
# --8<-- [end:extract_user_id]
async def _check_rate_limit(self, user_id: str, endpoint: str) -> RateLimitStatus:
diff --git a/backend/app/core/middlewares/request_size_limit.py b/backend/app/core/middlewares/request_size_limit.py
index a44c3ec0..0cd222ba 100644
--- a/backend/app/core/middlewares/request_size_limit.py
+++ b/backend/app/core/middlewares/request_size_limit.py
@@ -14,6 +14,7 @@ class RequestSizeLimitMiddleware:
def __init__(self, app: ASGIApp, max_size_mb: int = 10) -> None:
self.app = app
self.max_size_bytes = max_size_mb * 1024 * 1024
+
# --8<-- [end:RequestSizeLimitMiddleware]
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py
index b4219689..1509d920 100644
--- a/backend/app/core/providers.py
+++ b/backend/app/core/providers.py
@@ -84,7 +84,10 @@ class BrokerProvider(Provider):
@provide
def get_broker(
- self, settings: Settings, logger: structlog.stdlib.BoundLogger, _tracer: Tracer,
+ self,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
+ _tracer: Tracer,
) -> KafkaBroker:
broker = KafkaBroker(
settings.KAFKA_BOOTSTRAP_SERVERS,
@@ -112,7 +115,6 @@ def get_logger(self, settings: Settings) -> structlog.stdlib.BoundLogger:
return setup_logger(settings.LOG_LEVEL)
-
class RedisProvider(Provider):
scope = Scope.APP
@@ -142,10 +144,10 @@ async def get_redis_client(
@provide
def get_rate_limit_service(
- self,
- redis_client: redis.Redis,
- settings: Settings,
- rate_limit_metrics: RateLimitMetrics,
+ self,
+ redis_client: redis.Redis,
+ settings: Settings,
+ rate_limit_metrics: RateLimitMetrics,
) -> RateLimitService:
return RateLimitService(redis_client, settings, rate_limit_metrics)
@@ -159,7 +161,9 @@ def get_security_service(self, settings: Settings, security_metrics: SecurityMet
@provide
def get_tracer(
- self, settings: Settings, logger: structlog.stdlib.BoundLogger,
+ self,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
) -> Tracer:
return Tracer(settings, logger)
@@ -169,17 +173,17 @@ class MessagingProvider(Provider):
@provide
def get_kafka_event_transport(
- self,
- broker: KafkaBroker,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ broker: KafkaBroker,
+ logger: structlog.stdlib.BoundLogger,
) -> KafkaEventTransport:
return KafkaEventTransport(broker, logger)
@provide
def get_unified_producer(
- self,
- event_repository: EventRepository,
- transport: KafkaEventTransport,
+ self,
+ event_repository: EventRepository,
+ transport: KafkaEventTransport,
) -> UnifiedProducer:
return UnifiedProducer(event_repository, transport)
@@ -189,10 +193,10 @@ def get_idempotency_repository(self, redis_client: redis.Redis) -> RedisIdempote
@provide
def get_idempotency_manager(
- self,
- repo: RedisIdempotencyRepository,
- logger: structlog.stdlib.BoundLogger,
- idempotency_metrics: IdempotencyMetrics,
+ self,
+ repo: RedisIdempotencyRepository,
+ logger: structlog.stdlib.BoundLogger,
+ idempotency_metrics: IdempotencyMetrics,
) -> IdempotencyManager:
return IdempotencyManager(IdempotencyConfig(), repo, logger, idempotency_metrics)
@@ -204,12 +208,12 @@ class DLQProvider(Provider):
@provide
def get_dlq_manager(
- self,
- broker: KafkaBroker,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
- dlq_metrics: DLQMetrics,
- repository: DLQRepository,
+ self,
+ broker: KafkaBroker,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
+ dlq_metrics: DLQMetrics,
+ repository: DLQRepository,
) -> DLQManager:
return DLQManager(
settings=settings,
@@ -361,10 +365,10 @@ def get_sse_redis_bus(
@provide
def get_sse_service(
- self,
- bus: SSERedisBus,
- execution_repository: ExecutionRepository,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ bus: SSERedisBus,
+ execution_repository: ExecutionRepository,
+ logger: structlog.stdlib.BoundLogger,
) -> SSEService:
return SSEService(bus=bus, execution_repository=execution_repository, logger=logger)
@@ -374,15 +378,15 @@ class AuthProvider(Provider):
@provide
def get_auth_service(
- self,
- user_repository: UserRepository,
- security_service: SecurityService,
- security_metrics: SecurityMetrics,
- logger: structlog.stdlib.BoundLogger,
- lockout_service: LoginLockoutService,
- runtime_settings: RuntimeSettingsLoader,
- producer: UnifiedProducer,
- settings: Settings,
+ self,
+ user_repository: UserRepository,
+ security_service: SecurityService,
+ security_metrics: SecurityMetrics,
+ logger: structlog.stdlib.BoundLogger,
+ lockout_service: LoginLockoutService,
+ runtime_settings: RuntimeSettingsLoader,
+ producer: UnifiedProducer,
+ settings: Settings,
) -> AuthService:
return AuthService(
user_repo=user_repository,
@@ -407,11 +411,11 @@ def get_event_service(self, event_repository: EventRepository) -> EventService:
@provide
def get_kafka_event_service(
- self,
- kafka_producer: UnifiedProducer,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
- event_metrics: EventMetrics,
+ self,
+ kafka_producer: UnifiedProducer,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
+ event_metrics: EventMetrics,
) -> KafkaEventService:
return KafkaEventService(
kafka_producer=kafka_producer,
@@ -426,11 +430,11 @@ class UserServicesProvider(Provider):
@provide
def get_user_settings_service(
- self,
- repository: UserSettingsRepository,
- kafka_event_service: KafkaEventService,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ repository: UserSettingsRepository,
+ kafka_event_service: KafkaEventService,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
) -> UserSettingsService:
return UserSettingsService(repository, kafka_event_service, settings, logger)
@@ -440,10 +444,10 @@ class RuntimeSettingsProvider(Provider):
@provide
def get_runtime_settings_loader(
- self,
- admin_settings_repository: AdminSettingsRepository,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ admin_settings_repository: AdminSettingsRepository,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
) -> RuntimeSettingsLoader:
return RuntimeSettingsLoader(admin_settings_repository, settings, logger)
@@ -453,40 +457,40 @@ class AdminServicesProvider(Provider):
@provide
def get_login_lockout_service(
- self,
- redis_client: redis.Redis,
- runtime_settings: RuntimeSettingsLoader,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ redis_client: redis.Redis,
+ runtime_settings: RuntimeSettingsLoader,
+ logger: structlog.stdlib.BoundLogger,
) -> LoginLockoutService:
return LoginLockoutService(redis_client, runtime_settings, logger)
@provide(scope=Scope.REQUEST)
def get_admin_events_service(
- self,
- admin_events_repository: AdminEventsRepository,
- event_replay_service: EventReplayService,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ admin_events_repository: AdminEventsRepository,
+ event_replay_service: EventReplayService,
+ logger: structlog.stdlib.BoundLogger,
) -> AdminEventsService:
return AdminEventsService(admin_events_repository, event_replay_service, logger)
@provide
def get_admin_settings_service(
- self,
- admin_settings_repository: AdminSettingsRepository,
- runtime_settings: RuntimeSettingsLoader,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ admin_settings_repository: AdminSettingsRepository,
+ runtime_settings: RuntimeSettingsLoader,
+ logger: structlog.stdlib.BoundLogger,
) -> AdminSettingsService:
return AdminSettingsService(admin_settings_repository, runtime_settings, logger)
@provide
def get_notification_service(
- self,
- notification_repository: NotificationRepository,
- kafka_event_service: KafkaEventService,
- sse_redis_bus: SSERedisBus,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
- notification_metrics: NotificationMetrics,
+ self,
+ notification_repository: NotificationRepository,
+ kafka_event_service: KafkaEventService,
+ sse_redis_bus: SSERedisBus,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
+ notification_metrics: NotificationMetrics,
) -> NotificationService:
return NotificationService(
notification_repository=notification_repository,
@@ -499,10 +503,10 @@ def get_notification_service(
@provide
def get_notification_scheduler(
- self,
- notification_repository: NotificationRepository,
- notification_service: NotificationService,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ notification_repository: NotificationRepository,
+ notification_service: NotificationService,
+ logger: structlog.stdlib.BoundLogger,
) -> NotificationScheduler:
return NotificationScheduler(
notification_repository=notification_repository,
@@ -542,11 +546,11 @@ class BusinessServicesProvider(Provider):
@provide
def get_saga_service(
- self,
- saga_repository: SagaRepository,
- execution_repository: ExecutionRepository,
- saga_orchestrator: SagaOrchestrator,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ saga_repository: SagaRepository,
+ execution_repository: ExecutionRepository,
+ saga_orchestrator: SagaOrchestrator,
+ logger: structlog.stdlib.BoundLogger,
) -> SagaService:
return SagaService(
saga_repo=saga_repository,
@@ -557,14 +561,14 @@ def get_saga_service(
@provide
def get_execution_service(
- self,
- execution_repository: ExecutionRepository,
- kafka_producer: UnifiedProducer,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
- execution_metrics: ExecutionMetrics,
- idempotency_manager: IdempotencyManager,
- runtime_settings: RuntimeSettingsLoader,
+ self,
+ execution_repository: ExecutionRepository,
+ kafka_producer: UnifiedProducer,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
+ execution_metrics: ExecutionMetrics,
+ idempotency_manager: IdempotencyManager,
+ runtime_settings: RuntimeSettingsLoader,
) -> ExecutionService:
return ExecutionService(
execution_repo=execution_repository,
@@ -578,20 +582,20 @@ def get_execution_service(
@provide
def get_saved_script_service(
- self, saved_script_repository: SavedScriptRepository, logger: structlog.stdlib.BoundLogger
+ self, saved_script_repository: SavedScriptRepository, logger: structlog.stdlib.BoundLogger
) -> SavedScriptService:
return SavedScriptService(saved_script_repository, logger)
@provide
def get_admin_user_service(
- self,
- user_repository: UserRepository,
- event_service: EventService,
- execution_service: ExecutionService,
- rate_limit_service: RateLimitService,
- security_service: SecurityService,
- security_metrics: SecurityMetrics,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ user_repository: UserRepository,
+ event_service: EventService,
+ execution_service: ExecutionService,
+ rate_limit_service: RateLimitService,
+ security_service: SecurityService,
+ security_metrics: SecurityMetrics,
+ logger: structlog.stdlib.BoundLogger,
) -> AdminUserService:
return AdminUserService(
user_repository=user_repository,
@@ -605,11 +609,11 @@ def get_admin_user_service(
@provide
def get_admin_execution_service(
- self,
- execution_repository: ExecutionRepository,
- queue_service: ExecutionQueueService,
- runtime_settings: RuntimeSettingsLoader,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ execution_repository: ExecutionRepository,
+ queue_service: ExecutionQueueService,
+ runtime_settings: RuntimeSettingsLoader,
+ logger: structlog.stdlib.BoundLogger,
) -> AdminExecutionService:
return AdminExecutionService(
execution_repo=execution_repository,
@@ -726,12 +730,12 @@ class EventReplayProvider(Provider):
@provide
def get_event_replay_service(
- self,
- replay_repository: ReplayRepository,
- kafka_producer: UnifiedProducer,
- replay_metrics: ReplayMetrics,
- logger: structlog.stdlib.BoundLogger,
- sse_bus: SSERedisBus,
+ self,
+ replay_repository: ReplayRepository,
+ kafka_producer: UnifiedProducer,
+ replay_metrics: ReplayMetrics,
+ logger: structlog.stdlib.BoundLogger,
+ sse_bus: SSERedisBus,
) -> EventReplayService:
return EventReplayService(
repository=replay_repository,
diff --git a/backend/app/core/security.py b/backend/app/core/security.py
index 6a70321d..488d9c33 100644
--- a/backend/app/core/security.py
+++ b/backend/app/core/security.py
@@ -21,15 +21,14 @@ def __init__(self, settings: Settings, security_metrics: SecurityMetrics) -> Non
self.settings = settings
self._security_metrics = security_metrics
# --8<-- [start:password_hashing]
- self._password_hash = PasswordHash((
- BcryptHasher(rounds=self.settings.BCRYPT_ROUNDS),
- ))
+ self._password_hash = PasswordHash((BcryptHasher(rounds=self.settings.BCRYPT_ROUNDS),))
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
return self._password_hash.verify(plain_password, hashed_password)
def get_password_hash(self, password: str) -> str:
return self._password_hash.hash(password)
+
# --8<-- [end:password_hashing]
# --8<-- [start:create_access_token]
@@ -39,6 +38,7 @@ def create_access_token(self, data: dict[str, Any], expires_delta: timedelta) ->
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, self.settings.SECRET_KEY, algorithm=self.settings.ALGORITHM)
return encoded_jwt
+
# --8<-- [end:create_access_token]
def decode_token(self, token: str, *, allow_expired: bool = False) -> str:
@@ -51,7 +51,10 @@ def decode_token(self, token: str, *, allow_expired: bool = False) -> str:
try:
options = {"verify_exp": not allow_expired}
payload = jwt.decode(
- token, self.settings.SECRET_KEY, algorithms=[self.settings.ALGORITHM], options=options,
+ token,
+ self.settings.SECRET_KEY,
+ algorithms=[self.settings.ALGORITHM],
+ options=options,
)
username: str | None = payload.get("sub")
if username is None:
@@ -94,10 +97,12 @@ def validate_csrf_token(self, header_token: str, cookie_token: str) -> bool:
return hmac.compare_digest(header_token, cookie_token)
# Paths exempt from CSRF validation (auth handles its own security)
- CSRF_EXEMPT_PATHS: frozenset[str] = frozenset({
- "/api/v1/auth/login",
- "/api/v1/auth/register",
- })
+ CSRF_EXEMPT_PATHS: frozenset[str] = frozenset(
+ {
+ "/api/v1/auth/login",
+ "/api/v1/auth/register",
+ }
+ )
def validate_csrf_from_request(self, request: Request) -> str:
"""Validate CSRF token from HTTP request using double-submit cookie pattern.
@@ -144,4 +149,5 @@ def validate_csrf_from_request(self, request: Request) -> str:
raise CSRFValidationError("CSRF token signature invalid")
return header_token
+
# --8<-- [end:csrf_validation]
diff --git a/backend/app/core/tracing/tracer.py b/backend/app/core/tracing/tracer.py
index 6889ff7b..0c7a85a1 100644
--- a/backend/app/core/tracing/tracer.py
+++ b/backend/app/core/tracing/tracer.py
@@ -21,13 +21,15 @@ def __init__(self, settings: Settings, logger: structlog.stdlib.BoundLogger) ->
name = settings.TRACING_SERVICE_NAME
rate = settings.TRACING_SAMPLING_RATE
- resource = Resource.create({
- SERVICE_NAME: name,
- SERVICE_VERSION: settings.TRACING_SERVICE_VERSION,
- "deployment.environment": settings.ENVIRONMENT,
- "service.namespace": "integr8scode",
- "service.instance.id": settings.HOSTNAME,
- })
+ resource = Resource.create(
+ {
+ SERVICE_NAME: name,
+ SERVICE_VERSION: settings.TRACING_SERVICE_VERSION,
+ "deployment.environment": settings.ENVIRONMENT,
+ "service.namespace": "integr8scode",
+ "service.instance.id": settings.HOSTNAME,
+ }
+ )
sampler: Sampler
if rate <= 0:
@@ -41,10 +43,12 @@ def __init__(self, settings: Settings, logger: structlog.stdlib.BoundLogger) ->
if settings.OTLP_TRACES_ENDPOINT:
provider.add_span_processor(
- BatchSpanProcessor(OTLPSpanExporter(
- endpoint=settings.OTLP_TRACES_ENDPOINT,
- insecure=settings.OTLP_TRACES_ENDPOINT.startswith("http://"),
- ))
+ BatchSpanProcessor(
+ OTLPSpanExporter(
+ endpoint=settings.OTLP_TRACES_ENDPOINT,
+ insecure=settings.OTLP_TRACES_ENDPOINT.startswith("http://"),
+ )
+ )
)
trace.set_tracer_provider(provider)
diff --git a/backend/app/db/docs/event.py b/backend/app/db/docs/event.py
index 514db9dc..65e7269a 100644
--- a/backend/app/db/docs/event.py
+++ b/backend/app/db/docs/event.py
@@ -71,6 +71,8 @@ class Settings:
default_language="english",
),
]
+
+
# --8<-- [end:EventDocument]
diff --git a/backend/app/db/repositories/admin/admin_events_repository.py b/backend/app/db/repositories/admin/admin_events_repository.py
index 05729bce..341c4bcc 100644
--- a/backend/app/db/repositories/admin/admin_events_repository.py
+++ b/backend/app/db/repositories/admin/admin_events_repository.py
@@ -180,23 +180,22 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:
error_rate = (error_count / total * 100) if total > 0 else 0
events_by_type = [
- EventTypeCount(event_type=EventType(t["_id"]), count=t["count"])
- for t in facet.get("by_type", [])
+ EventTypeCount(event_type=EventType(t["_id"]), count=t["count"]) for t in facet.get("by_type", [])
]
events_by_hour = [HourlyEventCount(**doc) for doc in facet.get("by_hour", [])]
- top_users = [
- UserEventCount(**doc) for doc in facet.get("by_user", []) if doc["user_id"]
- ]
+ top_users = [UserEventCount(**doc) for doc in facet.get("by_user", []) if doc["user_id"]]
# Separate collection — must be a separate query
exec_time_field = S.field(ExecutionDocument.resource_usage.execution_time_wall_seconds) # type: ignore[union-attr]
exec_pipeline = (
Pipeline()
- .match({
- ExecutionDocument.created_at: {"$gte": start_time},
- ExecutionDocument.status: ExecutionStatus.COMPLETED,
- ExecutionDocument.resource_usage.execution_time_wall_seconds: {"$exists": True}, # type: ignore[union-attr]
- })
+ .match(
+ {
+ ExecutionDocument.created_at: {"$gte": start_time},
+ ExecutionDocument.status: ExecutionStatus.COMPLETED,
+ ExecutionDocument.resource_usage.execution_time_wall_seconds: {"$exists": True}, # type: ignore[union-attr]
+ }
+ )
.group(by=None, query={"avg_duration": S.avg(exec_time_field)})
)
exec_result = await ExecutionDocument.aggregate(exec_pipeline.export()).to_list()
@@ -251,15 +250,18 @@ async def get_replay_session_sse_status(self, session_id: str) -> DomainReplaySS
started_at=doc.started_at,
completed_at=doc.completed_at,
errors=[
- e if isinstance(e, DomainReplayError)
- else DomainReplayError(**e) if isinstance(e, dict)
+ e
+ if isinstance(e, DomainReplayError)
+ else DomainReplayError(**e)
+ if isinstance(e, dict)
else DomainReplayError(**dataclasses.asdict(e))
for e in doc.errors
],
)
async def get_execution_results_for_filter(
- self, replay_filter: ReplayFilter,
+ self,
+ replay_filter: ReplayFilter,
) -> list[ExecutionResultSummary]:
mongo_query = replay_filter.to_mongo_query()
if not mongo_query:
@@ -268,9 +270,7 @@ async def get_execution_results_for_filter(
exec_ids = list({e.execution_id for e in matched_events if e.execution_id})[:10]
if not exec_ids:
return []
- exec_docs = await ExecutionDocument.find(
- In(ExecutionDocument.execution_id, exec_ids)
- ).to_list()
+ exec_docs = await ExecutionDocument.find(In(ExecutionDocument.execution_id, exec_ids)).to_list()
return [
ExecutionResultSummary(
execution_id=d.execution_id,
diff --git a/backend/app/db/repositories/dlq_repository.py b/backend/app/db/repositories/dlq_repository.py
index 4fd23f05..10bbab2b 100644
--- a/backend/app/db/repositories/dlq_repository.py
+++ b/backend/app/db/repositories/dlq_repository.py
@@ -32,12 +32,12 @@ def _to_domain(doc: DLQMessageDocument) -> DLQMessage:
return DLQMessage(**data)
async def get_messages(
- self,
- status: DLQMessageStatus | None = None,
- topic: str | None = None,
- event_type: EventType | None = None,
- limit: int = 50,
- offset: int = 0,
+ self,
+ status: DLQMessageStatus | None = None,
+ topic: str | None = None,
+ event_type: EventType | None = None,
+ limit: int = 50,
+ offset: int = 0,
) -> DLQMessageListResult:
conditions: list[Any] = [
DLQMessageDocument.status == status if status else None,
diff --git a/backend/app/db/repositories/event_repository.py b/backend/app/db/repositories/event_repository.py
index 9a510e77..f1194f8b 100644
--- a/backend/app/db/repositories/event_repository.py
+++ b/backend/app/db/repositories/event_repository.py
@@ -44,11 +44,13 @@ async def store_event(self, event: DomainEvent) -> str:
data = event.model_dump(exclude_none=True)
data.setdefault("stored_at", datetime.now(timezone.utc))
doc = EventDocument(**data)
- trace.get_current_span().set_attributes({
- "event.type": event.event_type,
- "event.id": event.event_id,
- "execution.id": event.aggregate_id or "",
- })
+ trace.get_current_span().set_attributes(
+ {
+ "event.type": event.event_type,
+ "event.id": event.event_id,
+ "execution.id": event.aggregate_id or "",
+ }
+ )
try:
await doc.insert()
except DuplicateKeyError:
@@ -73,7 +75,7 @@ async def get_event(self, event_id: str) -> DomainEvent | None:
return DomainEventAdapter.validate_python(doc)
async def get_events_by_aggregate(
- self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100
+ self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100
) -> list[DomainEvent]:
conditions: list[BaseFindOperator] = [Eq(EventDocument.aggregate_id, aggregate_id)]
if event_types:
@@ -84,7 +86,7 @@ async def get_events_by_aggregate(
return [DomainEventAdapter.validate_python(d) for d in docs]
async def get_events_by_execution_id(
- self, execution_id: str, event_types: list[EventType] | None = None, limit: int = 100
+ self, execution_id: str, event_types: list[EventType] | None = None, limit: int = 100
) -> list[DomainEvent]:
conditions: list[BaseFindOperator] = [Eq(EventDocument.execution_id, execution_id)]
if event_types:
@@ -95,12 +97,12 @@ async def get_events_by_execution_id(
return [DomainEventAdapter.validate_python(d) for d in docs]
async def get_execution_events(
- self,
- execution_id: str,
- limit: int = 100,
- skip: int = 0,
- exclude_system_events: bool = False,
- event_types: list[EventType] | None = None,
+ self,
+ execution_id: str,
+ limit: int = 100,
+ skip: int = 0,
+ exclude_system_events: bool = False,
+ event_types: list[EventType] | None = None,
) -> EventListResult:
conditions: list[Any] = [
Or(
@@ -116,7 +118,9 @@ async def get_execution_events(
docs = (
await EventDocument.find(*conditions)
.sort([("timestamp", SortDirection.ASCENDING)])
- .skip(skip).limit(limit).to_list()
+ .skip(skip)
+ .limit(limit)
+ .to_list()
)
events = [DomainEventAdapter.validate_python(d) for d in docs]
total_count = await EventDocument.find(*conditions).count()
@@ -130,10 +134,10 @@ async def get_execution_events(
)
async def get_event_statistics(
- self,
- start_time: datetime | None = None,
- end_time: datetime | None = None,
- match: dict[str, object] | None = None,
+ self,
+ start_time: datetime | None = None,
+ end_time: datetime | None = None,
+ match: dict[str, object] | None = None,
) -> EventStatistics:
pipeline: list[Mapping[str, object]] = []
if match:
@@ -199,26 +203,24 @@ async def get_event_statistics(
async for doc in EventDocument.aggregate(pipeline):
doc["events_by_type"] = [
- EventTypeCount(event_type=EventType(k), count=v)
- for k, v in doc.get("events_by_type", {}).items()
+ EventTypeCount(event_type=EventType(k), count=v) for k, v in doc.get("events_by_type", {}).items()
]
doc["events_by_service"] = [
- ServiceEventCount(service_name=k, count=v)
- for k, v in doc.get("events_by_service", {}).items()
+ ServiceEventCount(service_name=k, count=v) for k, v in doc.get("events_by_service", {}).items()
]
return EventStatistics(**doc)
return EventStatistics(total_events=0, events_by_type=[], events_by_service=[], events_by_hour=[])
async def get_user_events_paginated(
- self,
- user_id: str,
- event_types: list[EventType] | None = None,
- start_time: datetime | None = None,
- end_time: datetime | None = None,
- limit: int = 100,
- skip: int = 0,
- sort_order: str = "desc",
+ self,
+ user_id: str,
+ event_types: list[EventType] | None = None,
+ start_time: datetime | None = None,
+ end_time: datetime | None = None,
+ limit: int = 100,
+ skip: int = 0,
+ sort_order: str = "desc",
) -> EventListResult:
conditions = [
EventDocument.metadata.user_id == user_id,
@@ -231,7 +233,9 @@ async def get_user_events_paginated(
docs = (
await EventDocument.find(*conditions)
.sort([("timestamp", sort_direction)])
- .skip(skip).limit(limit).to_list()
+ .skip(skip)
+ .limit(limit)
+ .to_list()
)
events = [DomainEventAdapter.validate_python(d) for d in docs]
total_count = await EventDocument.find(*conditions).count()
@@ -248,17 +252,19 @@ async def count_events(self, *conditions: Any) -> int:
return await EventDocument.find(*conditions).count()
async def query_events(
- self,
- query: dict[str, Any],
- sort_field: str = "timestamp",
- skip: int = 0,
- limit: int = 100,
+ self,
+ query: dict[str, Any],
+ sort_field: str = "timestamp",
+ skip: int = 0,
+ limit: int = 100,
) -> EventListResult:
"""Query events with filter, sort, and pagination. Always sorts descending (newest first)."""
docs = (
await EventDocument.find(query)
.sort([(sort_field, SortDirection.DESCENDING)])
- .skip(skip).limit(limit).to_list()
+ .skip(skip)
+ .limit(limit)
+ .to_list()
)
events = [DomainEventAdapter.validate_python(d) for d in docs]
total_count = await EventDocument.find(query).count()
@@ -268,7 +274,7 @@ async def query_events(
)
async def delete_event_with_archival(
- self, event_id: str, deleted_by: str, deletion_reason: str = "Admin deletion via API"
+ self, event_id: str, deleted_by: str, deletion_reason: str = "Admin deletion via API"
) -> ArchivedEvent | None:
doc = await EventDocument.find_one(EventDocument.event_id == event_id)
if not doc:
diff --git a/backend/app/db/repositories/execution_repository.py b/backend/app/db/repositories/execution_repository.py
index ece33df0..68da731d 100644
--- a/backend/app/db/repositories/execution_repository.py
+++ b/backend/app/db/repositories/execution_repository.py
@@ -57,19 +57,22 @@ async def write_terminal_result(self, result: ExecutionResultDomain) -> bool:
ExecutionDocument.execution_id == result.execution_id,
In(ExecutionDocument.status, list(EXECUTION_ACTIVE)),
).update(
- {"$set": {
- "status": result.status,
- "exit_code": result.exit_code,
- "stdout": result.stdout,
- "stderr": result.stderr,
- "resource_usage": dataclasses.asdict(result.resource_usage) if result.resource_usage else None,
- "error_type": result.error_type,
- "updated_at": datetime.now(timezone.utc),
- }}
+ {
+ "$set": {
+ "status": result.status,
+ "exit_code": result.exit_code,
+ "stdout": result.stdout,
+ "stderr": result.stderr,
+ "resource_usage": dataclasses.asdict(result.resource_usage) if result.resource_usage else None,
+ "error_type": result.error_type,
+ "updated_at": datetime.now(timezone.utc),
+ }
+ }
)
if not update_result or getattr(update_result, "modified_count", 0) == 0:
self.logger.warning(
- "Execution not found or already in terminal state", execution_id=result.execution_id,
+ "Execution not found or already in terminal state",
+ execution_id=result.execution_id,
)
return False
return True
@@ -112,33 +115,47 @@ async def aggregate_stats(self, query: dict[str, Any]) -> dict[str, Any]:
if query:
pipeline.append({"$match": query})
- pipeline.append({
- "$facet": {
- "by_status": [{"$group": {"_id": "$status", "count": {"$sum": 1}}}],
- "by_language": [
- {"$group": {
- "_id": {"$concat": ["$lang", "-", "$lang_version"]},
- "count": {"$sum": 1},
- }},
- ],
- "totals": [{"$group": {
- "_id": None,
- "total": {"$sum": 1},
- "successful": {"$sum": {"$cond": [{"$eq": ["$status", ExecutionStatus.COMPLETED]}, 1, 0]}},
- }}],
- "avg_duration": [
- {"$match": {
- "status": ExecutionStatus.COMPLETED,
- "created_at": {"$ne": None},
- "updated_at": {"$ne": None},
- }},
- {"$group": {
- "_id": None,
- "avg_ms": {"$avg": {"$subtract": ["$updated_at", "$created_at"]}},
- }},
- ],
- },
- })
+ pipeline.append(
+ {
+ "$facet": {
+ "by_status": [{"$group": {"_id": "$status", "count": {"$sum": 1}}}],
+ "by_language": [
+ {
+ "$group": {
+ "_id": {"$concat": ["$lang", "-", "$lang_version"]},
+ "count": {"$sum": 1},
+ }
+ },
+ ],
+ "totals": [
+ {
+ "$group": {
+ "_id": None,
+ "total": {"$sum": 1},
+ "successful": {
+ "$sum": {"$cond": [{"$eq": ["$status", ExecutionStatus.COMPLETED]}, 1, 0]}
+ },
+ }
+ }
+ ],
+ "avg_duration": [
+ {
+ "$match": {
+ "status": ExecutionStatus.COMPLETED,
+ "created_at": {"$ne": None},
+ "updated_at": {"$ne": None},
+ }
+ },
+ {
+ "$group": {
+ "_id": None,
+ "avg_ms": {"$avg": {"$subtract": ["$updated_at", "$created_at"]}},
+ }
+ },
+ ],
+ },
+ }
+ )
collection = ExecutionDocument.get_pymongo_collection()
cursor = await collection.aggregate(pipeline)
diff --git a/backend/app/db/repositories/notification_repository.py b/backend/app/db/repositories/notification_repository.py
index 4a33a067..783140f6 100644
--- a/backend/app/db/repositories/notification_repository.py
+++ b/backend/app/db/repositories/notification_repository.py
@@ -149,9 +149,7 @@ async def try_claim_pending(self, notification_id: str) -> bool:
return bool(result and getattr(result, "modified_count", 0) > 0)
# Subscriptions
- async def get_subscription(
- self, user_id: str, channel: NotificationChannel
- ) -> DomainNotificationSubscription:
+ async def get_subscription(self, user_id: str, channel: NotificationChannel) -> DomainNotificationSubscription:
"""Get subscription for user/channel, returning default enabled subscription if none exists."""
doc = await NotificationSubscriptionDocument.find_one(
NotificationSubscriptionDocument.user_id == user_id,
@@ -189,8 +187,7 @@ async def get_all_subscriptions(self, user_id: str) -> list[DomainNotificationSu
NotificationSubscriptionDocument.user_id == user_id,
).to_list()
existing: dict[NotificationChannel, DomainNotificationSubscription] = {
- doc.channel: DomainNotificationSubscription(**doc.model_dump(include=_sub_fields))
- for doc in docs
+ doc.channel: DomainNotificationSubscription(**doc.model_dump(include=_sub_fields)) for doc in docs
}
return [
existing.get(channel, DomainNotificationSubscription(user_id=user_id, channel=channel, enabled=True))
diff --git a/backend/app/db/repositories/saga_repository.py b/backend/app/db/repositories/saga_repository.py
index c812381d..add76cb3 100644
--- a/backend/app/db/repositories/saga_repository.py
+++ b/backend/app/db/repositories/saga_repository.py
@@ -75,7 +75,10 @@ async def save_saga(self, saga_id: str, **updates: Any) -> Saga:
return self._to_domain(doc)
async def atomic_cancel_saga(
- self, saga_id: str, error_message: str, completed_at: datetime,
+ self,
+ saga_id: str,
+ error_message: str,
+ completed_at: datetime,
) -> Saga:
"""Atomically cancel a saga using findOneAndUpdate.
@@ -86,13 +89,15 @@ async def atomic_cancel_saga(
SagaDocument.saga_id == saga_id,
In(SagaDocument.state, list(SAGA_ACTIVE)),
).update(
- Set({ # type: ignore[no-untyped-call]
- SagaDocument.state: SagaState.CANCELLED,
- SagaDocument.error_message: error_message,
- SagaDocument.completed_at: completed_at,
- SagaDocument.updated_at: datetime.now(timezone.utc),
- "revision_id": uuid4(),
- }),
+ Set(
+ { # type: ignore[no-untyped-call]
+ SagaDocument.state: SagaState.CANCELLED,
+ SagaDocument.error_message: error_message,
+ SagaDocument.completed_at: completed_at,
+ SagaDocument.updated_at: datetime.now(timezone.utc),
+ "revision_id": uuid4(),
+ }
+ ),
response_type=UpdateResponse.NEW_DOCUMENT,
)
if not doc:
@@ -138,7 +143,7 @@ async def get_saga(self, saga_id: str) -> Saga | None:
return self._to_domain(doc) if doc else None
async def get_sagas_by_execution(
- self, execution_id: str, state: SagaState | None = None, limit: int = 100, skip: int = 0
+ self, execution_id: str, state: SagaState | None = None, limit: int = 100, skip: int = 0
) -> SagaListResult:
conditions: list[BaseFindOperator] = [Eq(SagaDocument.execution_id, execution_id)]
if state:
@@ -172,10 +177,10 @@ async def get_user_execution_ids(self, user_id: str) -> list[str]:
return result
async def find_timed_out_sagas(
- self,
- cutoff_time: datetime,
- states: list[SagaState] | None = None,
- limit: int = 100,
+ self,
+ cutoff_time: datetime,
+ states: list[SagaState] | None = None,
+ limit: int = 100,
) -> list[Saga]:
states = states or [SagaState.RUNNING, SagaState.COMPENSATING]
docs = (
@@ -207,9 +212,7 @@ async def get_saga_statistics(self, saga_filter: SagaFilter | None = None) -> di
]
duration_pipeline = (
Pipeline()
- .project(
- duration={"$subtract": [S.field(SagaDocument.completed_at), S.field(SagaDocument.created_at)]}
- )
+ .project(duration={"$subtract": [S.field(SagaDocument.completed_at), S.field(SagaDocument.created_at)]})
.group(by=None, query={"avg_duration": S.avg("$duration")})
)
avg_duration = 0.0
diff --git a/backend/app/dlq/manager.py b/backend/app/dlq/manager.py
index b7a85075..8721bf73 100644
--- a/backend/app/dlq/manager.py
+++ b/backend/app/dlq/manager.py
@@ -51,13 +51,18 @@ def __init__(
self.repository = repository
self._retry_overrides: dict[str, RetryPolicy] = {}
- self._filters: list[Callable[[DLQMessage], bool]] = filters if filters is not None else [
- f for f in [
- None if settings.ENVIRONMENT == "test" else self._filter_test_events,
- self._filter_old_messages,
- ] if f is not None
- ]
-
+ self._filters: list[Callable[[DLQMessage], bool]] = (
+ filters
+ if filters is not None
+ else [
+ f
+ for f in [
+ None if settings.ENVIRONMENT == "test" else self._filter_test_events,
+ self._filter_old_messages,
+ ]
+ if f is not None
+ ]
+ )
def _filter_test_events(self, message: DLQMessage) -> bool:
return not message.event.event_id.startswith("test-")
diff --git a/backend/app/dlq/models.py b/backend/app/dlq/models.py
index f8928f17..d359939b 100644
--- a/backend/app/dlq/models.py
+++ b/backend/app/dlq/models.py
@@ -127,16 +127,25 @@ def get_next_retry_time(self, message: DLQMessage) -> datetime:
AGGRESSIVE_RETRY = RetryPolicy(
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
- max_retries=5, base_delay_seconds=30, max_delay_seconds=300, retry_multiplier=2.0,
+ max_retries=5,
+ base_delay_seconds=30,
+ max_delay_seconds=300,
+ retry_multiplier=2.0,
)
CAUTIOUS_RETRY = RetryPolicy(
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
- max_retries=3, base_delay_seconds=60, max_delay_seconds=600, retry_multiplier=3.0,
+ max_retries=3,
+ base_delay_seconds=60,
+ max_delay_seconds=600,
+ retry_multiplier=3.0,
)
IMMEDIATE_RETRY = RetryPolicy(strategy=RetryStrategy.IMMEDIATE, max_retries=3)
DEFAULT_RETRY = RetryPolicy(
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
- max_retries=4, base_delay_seconds=60, max_delay_seconds=1800, retry_multiplier=2.5,
+ max_retries=4,
+ base_delay_seconds=60,
+ max_delay_seconds=1800,
+ retry_multiplier=2.5,
)
diff --git a/backend/app/domain/events/typed.py b/backend/app/domain/events/typed.py
index 531cf125..787c66b1 100644
--- a/backend/app/domain/events/typed.py
+++ b/backend/app/domain/events/typed.py
@@ -241,6 +241,8 @@ class UserSettingsUpdatedEvent(BaseEvent):
user_id: str
changed_fields: list[str] = Field(default_factory=list)
reason: str | None = None
+
+
# --8<-- [end:UserSettingsUpdatedEvent]
diff --git a/backend/app/domain/idempotency/models.py b/backend/app/domain/idempotency/models.py
index 364cd8e8..832184cf 100644
--- a/backend/app/domain/idempotency/models.py
+++ b/backend/app/domain/idempotency/models.py
@@ -12,6 +12,8 @@ class IdempotencyStatus(StringEnum):
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
+
+
# --8<-- [end:IdempotencyStatus]
diff --git a/backend/app/domain/notification/exceptions.py b/backend/app/domain/notification/exceptions.py
index 8848cf8f..1aff9a0a 100644
--- a/backend/app/domain/notification/exceptions.py
+++ b/backend/app/domain/notification/exceptions.py
@@ -17,6 +17,8 @@ def __init__(self, user_id: str, limit: int, window_hours: int) -> None:
self.limit = limit
self.window_hours = window_hours
super().__init__(f"Rate limit exceeded for user '{user_id}': max {limit} per {window_hours}h")
+
+
# --8<-- [end:NotificationThrottledError]
diff --git a/backend/app/domain/rate_limit/rate_limit_models.py b/backend/app/domain/rate_limit/rate_limit_models.py
index c496143f..fcb7f781 100644
--- a/backend/app/domain/rate_limit/rate_limit_models.py
+++ b/backend/app/domain/rate_limit/rate_limit_models.py
@@ -22,6 +22,8 @@ class EndpointGroup(StringEnum):
AUTH = "auth"
PUBLIC = "public"
API = "api"
+
+
# --8<-- [end:EndpointGroup]
@@ -32,6 +34,8 @@ class EndpointUsageStats:
algorithm: RateLimitAlgorithm
remaining: int
+
+
# --8<-- [end:EndpointUsageStats]
@@ -67,6 +71,8 @@ class UserRateLimit:
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
notes: str | None = None
+
+
# --8<-- [end:UserRateLimit]
diff --git a/backend/app/domain/replay/models.py b/backend/app/domain/replay/models.py
index b085fb39..876e4718 100644
--- a/backend/app/domain/replay/models.py
+++ b/backend/app/domain/replay/models.py
@@ -41,6 +41,8 @@ class ReplayError:
error: str
error_type: str | None = None
event_id: str | None = None
+
+
# --8<-- [end:ReplayError]
@@ -112,6 +114,8 @@ def to_mongo_query(self) -> dict[str, Any]:
query["metadata.service_name"] = self.service_name
return query
+
+
# --8<-- [end:ReplayFilter]
diff --git a/backend/app/domain/saga/exceptions.py b/backend/app/domain/saga/exceptions.py
index 7ed2d63b..ddb41d42 100644
--- a/backend/app/domain/saga/exceptions.py
+++ b/backend/app/domain/saga/exceptions.py
@@ -17,6 +17,8 @@ def __init__(self, saga_id: str, user_id: str) -> None:
self.saga_id = saga_id
self.user_id = user_id
super().__init__(f"Access denied to saga '{saga_id}' for user '{user_id}'")
+
+
# --8<-- [end:SagaAccessDeniedError]
diff --git a/backend/app/events/handlers.py b/backend/app/events/handlers.py
index b2af375b..bfc8c9f3 100644
--- a/backend/app/events/handlers.py
+++ b/backend/app/events/handlers.py
@@ -32,31 +32,38 @@
async def _track_consumed(
- metrics: EventMetrics, event: DomainEvent, consumer_group: str, coro: Awaitable[None],
+ metrics: EventMetrics,
+ event: DomainEvent,
+ consumer_group: str,
+ coro: Awaitable[None],
) -> None:
"""Await *coro* and record domain-level failure metric on error."""
try:
await coro
except Exception as e:
metrics.record_events_processing_failed(
- topic=event.event_type, event_type=event.event_type,
- consumer_group=consumer_group, error_type=type(e).__name__,
+ topic=event.event_type,
+ event_type=event.event_type,
+ consumer_group=consumer_group,
+ error_type=type(e).__name__,
)
raise
# --8<-- [start:with_idempotency]
async def with_idempotency(
- event: DomainEvent,
- handler: Callable[..., Awaitable[None]],
- idem: IdempotencyManager,
- key_strategy: KeyStrategy,
- ttl_seconds: int,
- logger: structlog.stdlib.BoundLogger,
+ event: DomainEvent,
+ handler: Callable[..., Awaitable[None]],
+ idem: IdempotencyManager,
+ key_strategy: KeyStrategy,
+ ttl_seconds: int,
+ logger: structlog.stdlib.BoundLogger,
) -> None:
"""Run *handler* inside an idempotency guard (check -> execute -> mark)."""
result = await idem.check_and_reserve(
- event=event, key_strategy=key_strategy, ttl_seconds=ttl_seconds,
+ event=event,
+ key_strategy=key_strategy,
+ ttl_seconds=ttl_seconds,
)
if result.is_duplicate:
logger.info(f"Duplicate event: {event.event_type} ({event.event_id})")
@@ -66,9 +73,13 @@ async def with_idempotency(
await idem.mark_completed(event=event, key_strategy=key_strategy)
except Exception as e:
await idem.mark_failed(
- event=event, error=str(e), key_strategy=key_strategy,
+ event=event,
+ error=str(e),
+ key_strategy=key_strategy,
)
raise
+
+
# --8<-- [end:with_idempotency]
@@ -79,14 +90,18 @@ def register_k8s_worker_subscriber(broker: KafkaBroker) -> None:
ack_policy=AckPolicy.ACK,
)
async def on_create_pod(
- body: CreatePodCommandEvent,
- worker: FromDishka[KubernetesWorker],
- idem: FromDishka[IdempotencyManager],
- logger: FromDishka[structlog.stdlib.BoundLogger],
- event_metrics: FromDishka[EventMetrics],
+ body: CreatePodCommandEvent,
+ worker: FromDishka[KubernetesWorker],
+ idem: FromDishka[IdempotencyManager],
+ logger: FromDishka[structlog.stdlib.BoundLogger],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "k8s-worker",
- with_idempotency(body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger))
+ await _track_consumed(
+ event_metrics,
+ body,
+ "k8s-worker",
+ with_idempotency(body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger),
+ )
@broker.subscriber(
EventType.DELETE_POD_COMMAND,
@@ -94,14 +109,18 @@ async def on_create_pod(
ack_policy=AckPolicy.ACK,
)
async def on_delete_pod(
- body: DeletePodCommandEvent,
- worker: FromDishka[KubernetesWorker],
- idem: FromDishka[IdempotencyManager],
- logger: FromDishka[structlog.stdlib.BoundLogger],
- event_metrics: FromDishka[EventMetrics],
+ body: DeletePodCommandEvent,
+ worker: FromDishka[KubernetesWorker],
+ idem: FromDishka[IdempotencyManager],
+ logger: FromDishka[structlog.stdlib.BoundLogger],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "k8s-worker",
- with_idempotency(body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger))
+ await _track_consumed(
+ event_metrics,
+ body,
+ "k8s-worker",
+ with_idempotency(body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger),
+ )
def register_result_processor_subscriber(broker: KafkaBroker) -> None:
@@ -113,14 +132,18 @@ def register_result_processor_subscriber(broker: KafkaBroker) -> None:
auto_offset_reset="earliest",
)
async def on_execution_completed(
- body: ExecutionCompletedEvent,
- processor: FromDishka[ResultProcessor],
- idem: FromDishka[IdempotencyManager],
- logger: FromDishka[structlog.stdlib.BoundLogger],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionCompletedEvent,
+ processor: FromDishka[ResultProcessor],
+ idem: FromDishka[IdempotencyManager],
+ logger: FromDishka[structlog.stdlib.BoundLogger],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "result-processor",
- with_idempotency(body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger))
+ await _track_consumed(
+ event_metrics,
+ body,
+ "result-processor",
+ with_idempotency(body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger),
+ )
@broker.subscriber(
EventType.EXECUTION_FAILED,
@@ -130,14 +153,18 @@ async def on_execution_completed(
auto_offset_reset="earliest",
)
async def on_execution_failed(
- body: ExecutionFailedEvent,
- processor: FromDishka[ResultProcessor],
- idem: FromDishka[IdempotencyManager],
- logger: FromDishka[structlog.stdlib.BoundLogger],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionFailedEvent,
+ processor: FromDishka[ResultProcessor],
+ idem: FromDishka[IdempotencyManager],
+ logger: FromDishka[structlog.stdlib.BoundLogger],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "result-processor",
- with_idempotency(body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger))
+ await _track_consumed(
+ event_metrics,
+ body,
+ "result-processor",
+ with_idempotency(body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger),
+ )
@broker.subscriber(
EventType.EXECUTION_TIMEOUT,
@@ -147,14 +174,18 @@ async def on_execution_failed(
auto_offset_reset="earliest",
)
async def on_execution_timeout(
- body: ExecutionTimeoutEvent,
- processor: FromDishka[ResultProcessor],
- idem: FromDishka[IdempotencyManager],
- logger: FromDishka[structlog.stdlib.BoundLogger],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionTimeoutEvent,
+ processor: FromDishka[ResultProcessor],
+ idem: FromDishka[IdempotencyManager],
+ logger: FromDishka[structlog.stdlib.BoundLogger],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "result-processor",
- with_idempotency(body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger))
+ await _track_consumed(
+ event_metrics,
+ body,
+ "result-processor",
+ with_idempotency(body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger),
+ )
def register_saga_subscriber(broker: KafkaBroker) -> None:
@@ -164,14 +195,19 @@ def register_saga_subscriber(broker: KafkaBroker) -> None:
ack_policy=AckPolicy.ACK,
)
async def on_execution_requested(
- body: ExecutionRequestedEvent,
- orchestrator: FromDishka[SagaOrchestrator],
- idem: FromDishka[IdempotencyManager],
- logger: FromDishka[structlog.stdlib.BoundLogger],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionRequestedEvent,
+ orchestrator: FromDishka[SagaOrchestrator],
+ idem: FromDishka[IdempotencyManager],
+ logger: FromDishka[structlog.stdlib.BoundLogger],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
coro = with_idempotency(
- body, orchestrator.handle_execution_requested, idem, KeyStrategy.EVENT_BASED, 3600, logger,
+ body,
+ orchestrator.handle_execution_requested,
+ idem,
+ KeyStrategy.EVENT_BASED,
+ 3600,
+ logger,
)
await _track_consumed(event_metrics, body, "saga-orchestrator", coro)
@@ -181,12 +217,11 @@ async def on_execution_requested(
ack_policy=AckPolicy.ACK,
)
async def on_execution_completed(
- body: ExecutionCompletedEvent,
- orchestrator: FromDishka[SagaOrchestrator],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionCompletedEvent,
+ orchestrator: FromDishka[SagaOrchestrator],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "saga-orchestrator",
- orchestrator.handle_execution_completed(body))
+ await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_completed(body))
@broker.subscriber(
EventType.EXECUTION_FAILED,
@@ -194,12 +229,11 @@ async def on_execution_completed(
ack_policy=AckPolicy.ACK,
)
async def on_execution_failed(
- body: ExecutionFailedEvent,
- orchestrator: FromDishka[SagaOrchestrator],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionFailedEvent,
+ orchestrator: FromDishka[SagaOrchestrator],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "saga-orchestrator",
- orchestrator.handle_execution_failed(body))
+ await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_failed(body))
@broker.subscriber(
EventType.EXECUTION_TIMEOUT,
@@ -207,12 +241,11 @@ async def on_execution_failed(
ack_policy=AckPolicy.ACK,
)
async def on_execution_timeout(
- body: ExecutionTimeoutEvent,
- orchestrator: FromDishka[SagaOrchestrator],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionTimeoutEvent,
+ orchestrator: FromDishka[SagaOrchestrator],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "saga-orchestrator",
- orchestrator.handle_execution_timeout(body))
+ await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_timeout(body))
@broker.subscriber(
EventType.EXECUTION_CANCELLED,
@@ -220,12 +253,11 @@ async def on_execution_timeout(
ack_policy=AckPolicy.ACK,
)
async def on_execution_cancelled(
- body: ExecutionCancelledEvent,
- orchestrator: FromDishka[SagaOrchestrator],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionCancelledEvent,
+ orchestrator: FromDishka[SagaOrchestrator],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, "saga-orchestrator",
- orchestrator.handle_execution_cancelled(body))
+ await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_cancelled(body))
_SSE_EVENT_TYPES = [
@@ -260,14 +292,12 @@ def register_sse_subscriber(broker: KafkaBroker, settings: Settings) -> None:
max_workers=settings.SSE_CONSUMER_POOL_SIZE,
)
async def on_sse_event(
- body: DomainEvent,
- sse_bus: FromDishka[SSERedisBus],
+ body: DomainEvent,
+ sse_bus: FromDishka[SSERedisBus],
) -> None:
execution_id = getattr(body, "execution_id", None)
if execution_id:
- sse_data = SSEExecutionEventData(**{
- k: v for k, v in body.model_dump().items() if k in _sse_field_names
- })
+ sse_data = SSEExecutionEventData(**{k: v for k, v in body.model_dump().items() if k in _sse_field_names})
await sse_bus.publish_event(execution_id, sse_data)
@@ -282,12 +312,11 @@ def register_notification_subscriber(broker: KafkaBroker) -> None:
auto_offset_reset="latest",
)
async def on_execution_completed(
- body: ExecutionCompletedEvent,
- service: FromDishka[NotificationService],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionCompletedEvent,
+ service: FromDishka[NotificationService],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, group_id,
- service.handle_execution_completed(body))
+ await _track_consumed(event_metrics, body, group_id, service.handle_execution_completed(body))
@broker.subscriber(
EventType.EXECUTION_FAILED,
@@ -297,12 +326,11 @@ async def on_execution_completed(
auto_offset_reset="latest",
)
async def on_execution_failed(
- body: ExecutionFailedEvent,
- service: FromDishka[NotificationService],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionFailedEvent,
+ service: FromDishka[NotificationService],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, group_id,
- service.handle_execution_failed(body))
+ await _track_consumed(event_metrics, body, group_id, service.handle_execution_failed(body))
@broker.subscriber(
EventType.EXECUTION_TIMEOUT,
@@ -312,11 +340,8 @@ async def on_execution_failed(
auto_offset_reset="latest",
)
async def on_execution_timeout(
- body: ExecutionTimeoutEvent,
- service: FromDishka[NotificationService],
- event_metrics: FromDishka[EventMetrics],
+ body: ExecutionTimeoutEvent,
+ service: FromDishka[NotificationService],
+ event_metrics: FromDishka[EventMetrics],
) -> None:
- await _track_consumed(event_metrics, body, group_id,
- service.handle_execution_timeout(body))
-
-
+ await _track_consumed(event_metrics, body, group_id, service.handle_execution_timeout(body))
diff --git a/backend/app/infrastructure/kafka/topics.py b/backend/app/infrastructure/kafka/topics.py
index 4730806e..a4239270 100644
--- a/backend/app/infrastructure/kafka/topics.py
+++ b/backend/app/infrastructure/kafka/topics.py
@@ -54,18 +54,22 @@
# The pod monitor uses this to decide when to delete the pod.
# Note: EXECUTION_CANCELLED is excluded — cancellation triggers pod deletion
# via the saga's DeletePodCommandEvent compensation, not the monitor.
-EXECUTION_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset({
- EventType.EXECUTION_COMPLETED,
- EventType.EXECUTION_FAILED,
- EventType.EXECUTION_TIMEOUT,
-})
+EXECUTION_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset(
+ {
+ EventType.EXECUTION_COMPLETED,
+ EventType.EXECUTION_FAILED,
+ EventType.EXECUTION_TIMEOUT,
+ }
+)
# Events that signal the full execution pipeline has concluded
# (result stored or execution failed before result could be stored).
# The SSE service uses this to close the execution stream.
-EXECUTION_PIPELINE_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset({
- EventType.RESULT_STORED,
- EventType.RESULT_FAILED,
- EventType.EXECUTION_FAILED,
- EventType.EXECUTION_TIMEOUT,
-})
+EXECUTION_PIPELINE_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset(
+ {
+ EventType.RESULT_STORED,
+ EventType.RESULT_FAILED,
+ EventType.EXECUTION_FAILED,
+ EventType.EXECUTION_TIMEOUT,
+ }
+)
diff --git a/backend/app/runtime_registry.py b/backend/app/runtime_registry.py
index 49c53d78..d471a62c 100644
--- a/backend/app/runtime_registry.py
+++ b/backend/app/runtime_registry.py
@@ -8,6 +8,8 @@ class RuntimeConfig(NamedTuple):
image: str # Full Docker image reference
file_name: str # Name that will be mounted under /scripts/
command: list[str] # Entrypoint executed inside the container
+
+
# --8<-- [end:RuntimeConfig]
@@ -17,6 +19,8 @@ class LanguageSpec(TypedDict):
image_tpl: str
file_ext: str
interpreter: list[str]
+
+
# --8<-- [end:LanguageSpec]
diff --git a/backend/app/schemas_pydantic/saved_script.py b/backend/app/schemas_pydantic/saved_script.py
index 3979a848..bc60c474 100644
--- a/backend/app/schemas_pydantic/saved_script.py
+++ b/backend/app/schemas_pydantic/saved_script.py
@@ -44,6 +44,8 @@ class SavedScriptResponse(BaseModel):
updated_at: datetime
model_config = ConfigDict(from_attributes=True, json_schema_serialization_defaults_required=True)
+
+
# --8<-- [end:SavedScriptResponse]
diff --git a/backend/app/schemas_pydantic/user.py b/backend/app/schemas_pydantic/user.py
index bf8d1ea1..871b9112 100644
--- a/backend/app/schemas_pydantic/user.py
+++ b/backend/app/schemas_pydantic/user.py
@@ -114,8 +114,6 @@ class LoginResponse(BaseModel):
model_config = ConfigDict(from_attributes=True, json_schema_serialization_defaults_required=True)
-
-
class UnlockResponse(BaseModel):
"""Response model for account unlock."""
diff --git a/backend/app/schemas_pydantic/user_settings.py b/backend/app/schemas_pydantic/user_settings.py
index a2ee7e1c..ad9447c2 100644
--- a/backend/app/schemas_pydantic/user_settings.py
+++ b/backend/app/schemas_pydantic/user_settings.py
@@ -109,5 +109,3 @@ class RestoreSettingsRequest(BaseModel):
timestamp: datetime
reason: str | None = None
-
-
diff --git a/backend/app/services/admin/admin_events_service.py b/backend/app/services/admin/admin_events_service.py
index 18ca8848..de71ef16 100644
--- a/backend/app/services/admin/admin_events_service.py
+++ b/backend/app/services/admin/admin_events_service.py
@@ -66,14 +66,14 @@ def _filter_to_json_dict(f: EventFilter) -> dict[str, Any]:
class AdminReplayResult:
def __init__(
- self,
- *,
- dry_run: bool,
- total_events: int,
- replay_id: str,
- status: ReplayStatus,
- session_id: str | None = None,
- events_preview: list[EventSummary] | None = None,
+ self,
+ *,
+ dry_run: bool,
+ total_events: int,
+ replay_id: str,
+ status: ReplayStatus,
+ session_id: str | None = None,
+ events_preview: list[EventSummary] | None = None,
) -> None:
self.dry_run = dry_run
self.total_events = total_events
@@ -92,21 +92,21 @@ class ExportResult:
class AdminEventsService:
def __init__(
- self,
- repository: AdminEventsRepository,
- replay_service: EventReplayService,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ repository: AdminEventsRepository,
+ replay_service: EventReplayService,
+ logger: structlog.stdlib.BoundLogger,
) -> None:
self._repo = repository
self._replay_service = replay_service
self.logger = logger
async def browse_events(
- self,
- *,
- event_filter: EventFilter,
- skip: int,
- limit: int,
+ self,
+ *,
+ event_filter: EventFilter,
+ skip: int,
+ limit: int,
) -> EventBrowseResult:
return await self._repo.get_events_page(event_filter, skip=skip, limit=limit)
@@ -117,12 +117,12 @@ async def get_event_stats(self, *, hours: int) -> EventStatistics:
return await self._repo.get_event_stats(hours=hours)
async def prepare_or_schedule_replay(
- self,
- *,
- replay_filter: ReplayFilter,
- dry_run: bool,
- replay_id: str,
- target_service: str | None,
+ self,
+ *,
+ replay_filter: ReplayFilter,
+ dry_run: bool,
+ replay_id: str,
+ target_service: str | None,
) -> AdminReplayResult:
if replay_filter.is_empty():
raise ValidationError("Must specify at least one filter for replay")
@@ -241,7 +241,7 @@ def _estimate_completion(self, doc: ReplaySessionDocument, now: datetime) -> dat
return now + timedelta(seconds=remaining / rate) if rate > 0 else None
async def export_events(
- self, *, event_filter: EventFilter, limit: int, export_format: ExportFormat
+ self, *, event_filter: EventFilter, limit: int, export_format: ExportFormat
) -> ExportResult:
if export_format == ExportFormat.CSV:
return await self._export_csv(event_filter=event_filter, limit=limit)
diff --git a/backend/app/services/admin/admin_execution_service.py b/backend/app/services/admin/admin_execution_service.py
index 1624e84a..10042d1a 100644
--- a/backend/app/services/admin/admin_execution_service.py
+++ b/backend/app/services/admin/admin_execution_service.py
@@ -41,13 +41,18 @@ async def list_executions(
query["user_id"] = user_id
executions = await self._repo.get_executions(
- query=query, limit=limit, skip=skip, sort=[("created_at", -1)],
+ query=query,
+ limit=limit,
+ skip=skip,
+ sort=[("created_at", -1)],
)
total = await self._repo.count_executions(query)
return executions, total
async def update_priority(
- self, execution_id: str, new_priority: QueuePriority,
+ self,
+ execution_id: str,
+ new_priority: QueuePriority,
) -> DomainExecution:
updated = await self._repo.update_priority(execution_id, new_priority)
if not updated:
diff --git a/backend/app/services/admin/admin_user_service.py b/backend/app/services/admin/admin_user_service.py
index 56d68272..229a990a 100644
--- a/backend/app/services/admin/admin_user_service.py
+++ b/backend/app/services/admin/admin_user_service.py
@@ -133,7 +133,9 @@ async def list_users(
bypass_rate_limit=s.bypass_rate_limit,
global_multiplier=s.global_multiplier,
has_custom_limits=s.has_custom_limits,
- ) if (s := summaries.get(user.user_id)) else user
+ )
+ if (s := summaries.get(user.user_id))
+ else user
for user in result.users
]
@@ -166,9 +168,7 @@ async def create_user(
return created
async def get_user(self, *, admin_user_id: str, user_id: str) -> User | None:
- self.logger.info(
- "Admin getting user details", admin_user_id=admin_user_id, target_user_id=user_id
- )
+ self.logger.info("Admin getting user details", admin_user_id=admin_user_id, target_user_id=user_id)
return await self._users.get_user_by_id(user_id)
async def update_user(self, *, admin_user_id: str, user_id: str, update: UserUpdate) -> User | None:
@@ -202,9 +202,7 @@ async def delete_user(self, *, admin_user_id: str, user_id: str, cascade: bool)
return result
async def reset_user_password(self, *, admin_user_id: str, user_id: str, new_password: str) -> bool:
- self.logger.info(
- "Admin resetting user password", admin_user_id=admin_user_id, target_user_id=user_id
- )
+ self.logger.info("Admin resetting user password", admin_user_id=admin_user_id, target_user_id=user_id)
self._security_metrics.record_password_reset_request(method="admin")
hashed = self._security.get_password_hash(new_password)
pr = PasswordReset(user_id=user_id, new_password=hashed)
@@ -214,9 +212,7 @@ async def reset_user_password(self, *, admin_user_id: str, user_id: str, new_pas
return ok
async def get_user_rate_limits(self, *, admin_user_id: str, user_id: str) -> UserRateLimitsResult:
- self.logger.info(
- "Admin getting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id
- )
+ self.logger.info("Admin getting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id)
user_limit = await self._rate_limits.get_user_rate_limit(user_id)
usage_stats = await self._rate_limits.get_usage_stats(user_id)
return UserRateLimitsResult(
@@ -244,8 +240,6 @@ async def update_user_rate_limits(
return RateLimitUpdateResult(user_id=user_id, updated=True, config=config)
async def reset_user_rate_limits(self, *, admin_user_id: str, user_id: str) -> bool:
- self.logger.info(
- "Admin resetting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id
- )
+ self.logger.info("Admin resetting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id)
await self._rate_limits.reset_user_limits(user_id)
return True
diff --git a/backend/app/services/auth_service.py b/backend/app/services/auth_service.py
index 6233681e..a2df454b 100644
--- a/backend/app/services/auth_service.py
+++ b/backend/app/services/auth_service.py
@@ -85,7 +85,10 @@ async def get_current_user(self, request: Request) -> User:
async def get_admin(self, request: Request) -> User:
user = await self.get_current_user(request)
self.security_metrics.record_authorization_check(
- "/admin", request.method, user.role == UserRole.ADMIN, user_role=user.role,
+ "/admin",
+ request.method,
+ user.role == UserRole.ADMIN,
+ user_role=user.role,
)
if user.role != UserRole.ADMIN:
self.logger.warning("Admin access denied", username=user.username, role=user.role)
@@ -166,7 +169,8 @@ async def login(
access_token_expires = timedelta(minutes=session_timeout)
access_token = self.security_service.create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires,
+ data={"sub": user.username},
+ expires_delta=access_token_expires,
)
csrf_token = self.security_service.generate_csrf_token(access_token)
diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py
index b92b5e73..13b11e71 100644
--- a/backend/app/services/event_replay/replay_service.py
+++ b/backend/app/services/event_replay/replay_service.py
@@ -120,9 +120,7 @@ async def pause_session(self, session_id: str) -> ReplayOperationResult:
if scheduler:
scheduler.remove_all_jobs()
await self._repository.update_session_status(session_id, ReplayStatus.PAUSED)
- return ReplayOperationResult(
- session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused"
- )
+ return ReplayOperationResult(session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused")
async def resume_session(self, session_id: str) -> ReplayOperationResult:
session = self.get_session(session_id)
diff --git a/backend/app/services/execution_queue.py b/backend/app/services/execution_queue.py
index ee5de60f..0f79ac1c 100644
--- a/backend/app/services/execution_queue.py
+++ b/backend/app/services/execution_queue.py
@@ -178,7 +178,9 @@ async def update_priority(self, execution_id: str, new_priority: QueuePriority)
if not result:
return False
self._logger.info(
- "Updated execution priority", execution_id=execution_id, new_priority=new_priority,
+ "Updated execution priority",
+ execution_id=execution_id,
+ new_priority=new_priority,
)
return True
diff --git a/backend/app/services/execution_service.py b/backend/app/services/execution_service.py
index 7d5a5334..a93b79e7 100644
--- a/backend/app/services/execution_service.py
+++ b/backend/app/services/execution_service.py
@@ -580,4 +580,3 @@ def _build_stats_query(
query["created_at"] = time_filter
return query
-
diff --git a/backend/app/services/idempotency/idempotency_manager.py b/backend/app/services/idempotency/idempotency_manager.py
index 29e3fcbc..c1bcc372 100644
--- a/backend/app/services/idempotency/idempotency_manager.py
+++ b/backend/app/services/idempotency/idempotency_manager.py
@@ -32,6 +32,8 @@ class IdempotencyConfig(BaseModel):
processing_timeout_seconds: int = 300
enable_result_caching: bool = True
max_result_size_bytes: int = 1048576
+
+
# --8<-- [end:IdempotencyConfig]
@@ -69,6 +71,7 @@ def _generate_key(
else:
raise ValueError(f"Invalid key strategy: {key_strategy}")
return f"{self.config.key_prefix}:{key}"
+
# --8<-- [end:generate_key]
async def check_and_reserve(
diff --git a/backend/app/services/idempotency/redis_repository.py b/backend/app/services/idempotency/redis_repository.py
index 8cdf0e83..149441af 100644
--- a/backend/app/services/idempotency/redis_repository.py
+++ b/backend/app/services/idempotency/redis_repository.py
@@ -100,6 +100,7 @@ async def insert_processing(self, record: IdempotencyRecord) -> None:
if not ok:
# Mirror Mongo behavior so manager's DuplicateKeyError path is reused
raise DuplicateKeyError("Key already exists")
+
# --8<-- [end:insert_processing]
async def update_record(self, record: IdempotencyRecord) -> int:
@@ -119,4 +120,3 @@ async def update_record(self, record: IdempotencyRecord) -> int:
else:
await self._r.set(k, payload)
return 1
-
diff --git a/backend/app/services/k8s_worker/worker.py b/backend/app/services/k8s_worker/worker.py
index c414a5cf..a9ab313b 100644
--- a/backend/app/services/k8s_worker/worker.py
+++ b/backend/app/services/k8s_worker/worker.py
@@ -35,11 +35,11 @@ class KubernetesWorker:
"""
def __init__(
- self,
- api_client: k8s_client.ApiClient,
- producer: UnifiedProducer,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
+ self,
+ api_client: k8s_client.ApiClient,
+ producer: UnifiedProducer,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
):
self.logger = logger
self.metrics = KubernetesMetrics(settings)
@@ -133,8 +133,7 @@ async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> Non
self.metrics.record_k8s_pod_created("success", command.language)
self.logger.info(
- f"Successfully created pod {pod.metadata.name} for execution {execution_id}. "
- f"Duration: {duration:.2f}s"
+ f"Successfully created pod {pod.metadata.name} for execution {execution_id}. Duration: {duration:.2f}s"
)
except Exception as e:
@@ -198,9 +197,7 @@ async def _create_pod(self, pod: k8s_client.V1Pod) -> k8s_client.V1Pod | None:
else:
raise
- async def _set_configmap_owner(
- self, config_map: k8s_client.V1ConfigMap, owner_pod: k8s_client.V1Pod
- ) -> None:
+ async def _set_configmap_owner(self, config_map: k8s_client.V1ConfigMap, owner_pod: k8s_client.V1Pod) -> None:
"""Patch the ConfigMap with an ownerReference pointing to the pod.
This makes K8s garbage-collect the ConfigMap automatically when the pod is deleted.
@@ -281,8 +278,11 @@ async def _ensure_executor_network_policy(self, namespace: str) -> None:
)
await self.networking_v1.patch_namespaced_network_policy( # type: ignore[call-arg]
- name=policy_name, namespace=namespace, body=policy,
- field_manager="integr8s", force=True,
+ name=policy_name,
+ namespace=namespace,
+ body=policy,
+ field_manager="integr8s",
+ force=True,
_content_type="application/apply-patch+yaml",
)
self.logger.info(f"NetworkPolicy '{policy_name}' applied in namespace {namespace}")
@@ -324,14 +324,16 @@ async def ensure_image_pre_puller_daemonset(self) -> None:
for i, image_ref in enumerate(sorted(all_images)):
sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-")
self.logger.info(f"DAEMONSET: before: {image_ref} -> {sanitized_image_ref}")
- init_containers.append(k8s_client.V1Container(
- name=f"pull-{i}-{sanitized_image_ref}",
- image=image_ref,
- command=["/bin/sh", "-c", f'echo "Image {image_ref} pulled."'],
- image_pull_policy="Always",
- security_context=psa_security_context,
- resources=minimal_resources,
- ))
+ init_containers.append(
+ k8s_client.V1Container(
+ name=f"pull-{i}-{sanitized_image_ref}",
+ image=image_ref,
+ command=["/bin/sh", "-c", f'echo "Image {image_ref} pulled."'],
+ image_pull_policy="Always",
+ security_context=psa_security_context,
+ resources=minimal_resources,
+ )
+ )
daemonset = k8s_client.V1DaemonSet(
api_version="apps/v1",
@@ -343,11 +345,14 @@ async def ensure_image_pre_puller_daemonset(self) -> None:
metadata=k8s_client.V1ObjectMeta(labels={"name": daemonset_name}),
spec=k8s_client.V1PodSpec(
init_containers=init_containers,
- containers=[k8s_client.V1Container(
- name="pause", image="registry.k8s.io/pause:3.9",
- security_context=psa_security_context,
- resources=minimal_resources,
- )],
+ containers=[
+ k8s_client.V1Container(
+ name="pause",
+ image="registry.k8s.io/pause:3.9",
+ security_context=psa_security_context,
+ resources=minimal_resources,
+ )
+ ],
tolerations=[k8s_client.V1Toleration(operator="Exists")],
security_context=k8s_client.V1PodSecurityContext(
run_as_non_root=True,
@@ -361,8 +366,11 @@ async def ensure_image_pre_puller_daemonset(self) -> None:
)
await self.apps_v1.patch_namespaced_daemon_set( # type: ignore[call-arg]
- name=daemonset_name, namespace=namespace, body=daemonset,
- field_manager="integr8s", force=True,
+ name=daemonset_name,
+ namespace=namespace,
+ body=daemonset,
+ field_manager="integr8s",
+ force=True,
_content_type="application/apply-patch+yaml",
)
self.logger.info(f"DaemonSet '{daemonset_name}' applied successfully")
diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py
index b9a63b83..634c0abf 100644
--- a/backend/app/services/notification_service.py
+++ b/backend/app/services/notification_service.py
@@ -86,6 +86,8 @@ async def clear(self) -> None:
"""Clear all throttle entries."""
async with self._lock:
self._entries.clear()
+
+
# --8<-- [end:ThrottleCache]
@@ -142,9 +144,7 @@ async def create_notification(
max_days = self.settings.NOTIF_MAX_SCHEDULE_DAYS
max_schedule = datetime.now(UTC) + timedelta(days=max_days)
if scheduled_for > max_schedule:
- raise NotificationValidationError(
- f"scheduled_for cannot exceed {max_days} days from now"
- )
+ raise NotificationValidationError(f"scheduled_for cannot exceed {max_days} days from now")
self.logger.info(
f"Creating notification for user {user_id}",
user_id=user_id,
@@ -311,9 +311,7 @@ async def _create_system_for_user(
)
return "created"
except Exception as e:
- self.logger.error(
- "Failed to create system notification for user", user_id=user_id, error=str(e)
- )
+ self.logger.error("Failed to create system notification for user", user_id=user_id, error=str(e))
return "failed"
async def _send_in_app(
@@ -357,11 +355,13 @@ async def _send_webhook(
webhook_host=safe_host,
)
- trace.get_current_span().set_attributes({
- "notification.id": str(notification.notification_id),
- "notification.channel": "webhook",
- "notification.webhook_host": safe_host,
- })
+ trace.get_current_span().set_attributes(
+ {
+ "notification.id": str(notification.notification_id),
+ "notification.channel": "webhook",
+ "notification.webhook_host": safe_host,
+ }
+ )
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(webhook_url, json=payload, headers=headers)
response.raise_for_status()
@@ -406,10 +406,12 @@ async def _send_slack(self, notification: DomainNotification, subscription: Doma
priority_color=self._get_slack_color(notification.severity),
)
- trace.get_current_span().set_attributes({
- "notification.id": str(notification.notification_id),
- "notification.channel": "slack",
- })
+ trace.get_current_span().set_attributes(
+ {
+ "notification.id": str(notification.notification_id),
+ "notification.channel": "slack",
+ }
+ )
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(subscription.slack_webhook, json=slack_message)
response.raise_for_status()
@@ -601,6 +603,7 @@ async def _should_skip_notification(
return f"Notification tags {notification.tags} excluded by preferences for {notification.channel}"
return None
+
# --8<-- [end:should_skip_notification]
async def _deliver_notification(self, notification: DomainNotification) -> bool:
diff --git a/backend/app/services/pod_monitor/event_mapper.py b/backend/app/services/pod_monitor/event_mapper.py
index 107eaafb..a14d8775 100644
--- a/backend/app/services/pod_monitor/event_mapper.py
+++ b/backend/app/services/pod_monitor/event_mapper.py
@@ -22,8 +22,12 @@
# Python 3.12 type aliases
type PodPhase = str
type PodMonitorEvent = (
- PodScheduledEvent | PodRunningEvent | PodTerminatedEvent
- | ExecutionCompletedEvent | ExecutionFailedEvent | ExecutionTimeoutEvent
+ PodScheduledEvent
+ | PodRunningEvent
+ | PodTerminatedEvent
+ | ExecutionCompletedEvent
+ | ExecutionFailedEvent
+ | ExecutionTimeoutEvent
)
type EventList = list[PodMonitorEvent]
type AsyncMapper = Callable[["PodContext"], Awaitable[PodMonitorEvent | None]]
@@ -199,7 +203,7 @@ def _is_duplicate(self, pod_name: str, phase: PodPhase) -> bool:
return True
if len(self._event_cache) >= self._MAX_CACHE_SIZE:
# Evict oldest half to amortise cleanup cost
- keys = list(self._event_cache)[:self._MAX_CACHE_SIZE // 2]
+ keys = list(self._event_cache)[: self._MAX_CACHE_SIZE // 2]
for k in keys:
del self._event_cache[k]
self._event_cache[pod_name] = phase
@@ -343,9 +347,7 @@ async def _check_timeout(self, ctx: PodContext) -> ExecutionTimeoutEvent | None:
# A pod can carry DeadlineExceeded while containers already exited 0.
# In that case, let the normal completed mapping path handle it.
if self._all_containers_succeeded(ctx.pod):
- self.logger.info(
- f"POD-EVENT: timeout ignored because containers succeeded exec={ctx.execution_id}"
- )
+ self.logger.info(f"POD-EVENT: timeout ignored because containers succeeded exec={ctx.execution_id}")
return None
logs = await self._extract_logs(ctx.pod)
@@ -519,5 +521,3 @@ def _parse_framed_output(logs: str) -> tuple[str, str]:
return "", ""
return stdout, stderr
-
-
diff --git a/backend/app/services/pod_monitor/monitor.py b/backend/app/services/pod_monitor/monitor.py
index 44ce1b16..2f893181 100644
--- a/backend/app/services/pod_monitor/monitor.py
+++ b/backend/app/services/pod_monitor/monitor.py
@@ -186,7 +186,9 @@ async def _delete_pod(self, pod: k8s_client.V1Pod) -> None:
pod_name = pod.metadata.name
try:
await self._v1.delete_namespaced_pod(
- name=pod_name, namespace=pod.metadata.namespace, grace_period_seconds=0,
+ name=pod_name,
+ namespace=pod.metadata.namespace,
+ grace_period_seconds=0,
)
self.logger.info("Deleted completed pod", pod_name=pod_name)
except ApiException as e:
diff --git a/backend/app/services/rate_limit_service.py b/backend/app/services/rate_limit_service.py
index a0ccb9c4..cad7ece8 100644
--- a/backend/app/services/rate_limit_service.py
+++ b/backend/app/services/rate_limit_service.py
@@ -106,9 +106,7 @@ async def check_rate_limit(
user_id, endpoint, effective_limit, rule.window_seconds, rule.burst_multiplier, rule
)
else:
- status = await self._check_sliding_window(
- user_id, endpoint, effective_limit, rule.window_seconds, rule
- )
+ status = await self._check_sliding_window(user_id, endpoint, effective_limit, rule.window_seconds, rule)
if status.allowed:
self.metrics.record_allowed(normalized_endpoint, rule.group)
@@ -207,7 +205,13 @@ async def _check_token_bucket(
# --8<-- [start:check_token_bucket]
result = await self.redis.eval( # type: ignore[misc]
- self._TOKEN_BUCKET_LUA, 1, key, max_tokens, refill_rate, now, window_seconds * 2,
+ self._TOKEN_BUCKET_LUA,
+ 1,
+ key,
+ max_tokens,
+ refill_rate,
+ now,
+ window_seconds * 2,
)
allowed = bool(result[0])
tokens = float(result[1])
@@ -357,9 +361,7 @@ async def get_usage_stats(self, user_id: str) -> dict[str, EndpointUsageStats]:
rule = self._find_matching_rule(endpoint, user_config, config)
limit = int(rule.requests * multiplier) if rule else 0
remaining = max(0, limit - count)
- stats[endpoint] = EndpointUsageStats(
- algorithm=RateLimitAlgorithm.SLIDING_WINDOW, remaining=remaining
- )
+ stats[endpoint] = EndpointUsageStats(algorithm=RateLimitAlgorithm.SLIDING_WINDOW, remaining=remaining)
elif algo == "tb":
bucket_data = await self.redis.get(key)
if bucket_data:
diff --git a/backend/app/services/result_processor/processor.py b/backend/app/services/result_processor/processor.py
index 59edb36d..5c73ceb0 100644
--- a/backend/app/services/result_processor/processor.py
+++ b/backend/app/services/result_processor/processor.py
@@ -21,12 +21,12 @@ class ResultProcessor:
"""Service for processing execution completion events and storing results."""
def __init__(
- self,
- execution_repo: ExecutionRepository,
- producer: UnifiedProducer,
- settings: Settings,
- logger: structlog.stdlib.BoundLogger,
- execution_metrics: ExecutionMetrics,
+ self,
+ execution_repo: ExecutionRepository,
+ producer: UnifiedProducer,
+ settings: Settings,
+ logger: structlog.stdlib.BoundLogger,
+ execution_metrics: ExecutionMetrics,
) -> None:
self._execution_repo = execution_repo
self._producer = producer
diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py
index 06d2e416..affde5c3 100644
--- a/backend/app/services/saga/saga_orchestrator.py
+++ b/backend/app/services/saga/saga_orchestrator.py
@@ -90,13 +90,9 @@ async def handle_execution_cancelled(self, event: DomainEvent) -> None:
if not isinstance(event, ExecutionCancelledEvent):
raise TypeError(f"Expected ExecutionCancelledEvent, got {type(event).__name__}")
await self._queue.remove(event.execution_id)
- await self._resolve_completion(
- event.execution_id, SagaState.CANCELLED, f"cancelled by {event.cancelled_by}"
- )
+ await self._resolve_completion(event.execution_id, SagaState.CANCELLED, f"cancelled by {event.cancelled_by}")
- async def _resolve_completion(
- self, execution_id: str, state: SagaState, error_message: str | None = None
- ) -> None:
+ async def _resolve_completion(self, execution_id: str, state: SagaState, error_message: str | None = None) -> None:
"""Look up the active saga for an execution and transition it to a terminal state.
Always releases the queue slot and attempts to schedule the next pending
@@ -111,7 +107,10 @@ async def _resolve_completion(
else:
self.logger.info("Marking saga terminal state", saga_id=saga.saga_id, state=state)
await self._repo.save_saga(
- saga.saga_id, state=state, error_message=error_message, completed_at=datetime.now(UTC),
+ saga.saga_id,
+ state=state,
+ error_message=error_message,
+ completed_at=datetime.now(UTC),
)
await self._queue.release(execution_id)
@@ -144,7 +143,8 @@ async def try_schedule_from_queue(self) -> None:
execution_id=execution_id,
)
await self._resolve_completion(
- execution_id, SagaState.FAILED,
+ execution_id,
+ SagaState.FAILED,
f"Failed to start saga after {retry_count} attempts",
)
else:
@@ -208,9 +208,7 @@ async def _execute_saga(
saved = await self._repo.save_saga(instance.saga_id, current_step=step.name)
if saved.state.is_terminal:
- self.logger.info(
- "Saga no longer active, stopping", saga_id=instance.saga_id, state=saved.state
- )
+ self.logger.info("Saga no longer active, stopping", saga_id=instance.saga_id, state=saved.state)
return
self.logger.info("Executing saga step", step=step.name, saga_id=instance.saga_id)
@@ -231,9 +229,9 @@ async def _execute_saga(
await self._repo.save_saga(
instance.saga_id,
completed_steps=[*saved.completed_steps, step.name],
- context_data=SagaContextData(**{
- k: v for k, v in context.data.items() if k in SagaContextData.__dataclass_fields__
- }),
+ context_data=SagaContextData(
+ **{k: v for k, v in context.data.items() if k in SagaContextData.__dataclass_fields__}
+ ),
)
compensation = step.get_compensation()
@@ -318,7 +316,10 @@ async def _compensate_saga(self, saga_id: str, context: SagaContext) -> None:
async def _fail_saga(self, saga_id: str, error_message: str) -> None:
"""Mark saga as failed."""
await self._repo.save_saga(
- saga_id, state=SagaState.FAILED, error_message=error_message, completed_at=datetime.now(UTC),
+ saga_id,
+ state=SagaState.FAILED,
+ error_message=error_message,
+ completed_at=datetime.now(UTC),
)
self.logger.error("Saga failed", saga_id=saga_id, error_message=error_message)
@@ -388,7 +389,9 @@ async def cancel_saga(self, saga_id: str) -> None:
self.logger.info("Saga cancelled successfully", saga_id=saga_id)
async def _publish_saga_started_event(
- self, instance: Saga, trigger_event: ExecutionRequestedEvent,
+ self,
+ instance: Saga,
+ trigger_event: ExecutionRequestedEvent,
) -> None:
"""Publish saga started event after the document is persisted."""
try:
diff --git a/backend/app/services/saga/saga_service.py b/backend/app/services/saga/saga_service.py
index 05b8d92e..5b58b301 100644
--- a/backend/app/services/saga/saga_service.py
+++ b/backend/app/services/saga/saga_service.py
@@ -58,9 +58,7 @@ async def check_execution_access(self, execution_id: str, user: User) -> bool:
async def get_saga_with_access_check(self, saga_id: str, user: User) -> Saga:
"""Get saga with access control."""
- self.logger.debug(
- "Getting saga for user", saga_id=saga_id, user_id=user.user_id, user_role=user.role
- )
+ self.logger.debug("Getting saga for user", saga_id=saga_id, user_id=user.user_id, user_role=user.role)
saga = await self.saga_repo.get_saga(saga_id)
if not saga:
@@ -162,4 +160,3 @@ async def get_saga_statistics(self, user: User, include_all: bool = False) -> di
saga_filter = SagaFilter(execution_ids=user_execution_ids)
return await self.saga_repo.get_saga_statistics(saga_filter)
-
diff --git a/backend/app/services/saga/saga_step.py b/backend/app/services/saga/saga_step.py
index c0f560aa..bb3c136c 100644
--- a/backend/app/services/saga/saga_step.py
+++ b/backend/app/services/saga/saga_step.py
@@ -28,7 +28,6 @@ def add_compensation(self, compensation: "CompensationStep") -> None:
self.compensations.append(compensation)
-
class SagaStep(ABC, Generic[T]):
"""Base class for saga steps"""
diff --git a/backend/app/services/saved_script_service.py b/backend/app/services/saved_script_service.py
index da62c259..d3028417 100644
--- a/backend/app/services/saved_script_service.py
+++ b/backend/app/services/saved_script_service.py
@@ -35,6 +35,7 @@ async def create_saved_script(
script_name=created_script.name,
)
return created_script
+
# --8<-- [end:create_saved_script]
async def get_saved_script(self, script_id: str, user_id: str) -> DomainSavedScript:
diff --git a/backend/app/services/sse/sse_service.py b/backend/app/services/sse/sse_service.py
index 14a98c93..423c205d 100644
--- a/backend/app/services/sse/sse_service.py
+++ b/backend/app/services/sse/sse_service.py
@@ -48,16 +48,18 @@ async def create_execution_stream(
raise ForbiddenError("Access denied")
return self._execution_pipeline(execution)
- async def _execution_pipeline(
- self, execution: DomainExecution
- ) -> AsyncGenerator[dict[str, Any], None]:
+ async def _execution_pipeline(self, execution: DomainExecution) -> AsyncGenerator[dict[str, Any], None]:
execution_id = execution.execution_id
- yield {"data": _exec_adapter.dump_json(SSEExecutionEventData(
- event_type=SSEControlEvent.STATUS,
- execution_id=execution_id,
- timestamp=execution.updated_at,
- status=execution.status,
- )).decode()}
+ yield {
+ "data": _exec_adapter.dump_json(
+ SSEExecutionEventData(
+ event_type=SSEControlEvent.STATUS,
+ execution_id=execution_id,
+ timestamp=execution.updated_at,
+ status=execution.status,
+ )
+ ).decode()
+ }
async for event in self._bus.listen_execution(execution_id):
if event.event_type == EventType.RESULT_STORED:
result = await self._execution_repository.get_execution_result(execution_id)
@@ -80,9 +82,7 @@ async def create_replay_stream(
"""
return self._replay_pipeline(initial_status)
- async def _replay_pipeline(
- self, initial_status: DomainReplaySSEPayload
- ) -> AsyncGenerator[dict[str, Any], None]:
+ async def _replay_pipeline(self, initial_status: DomainReplaySSEPayload) -> AsyncGenerator[dict[str, Any], None]:
session_id = initial_status.session_id
yield {"data": _replay_adapter.dump_json(initial_status).decode()}
if initial_status.status.is_terminal:
diff --git a/backend/app/services/user_settings_service.py b/backend/app/services/user_settings_service.py
index 82abf731..3daad34c 100644
--- a/backend/app/services/user_settings_service.py
+++ b/backend/app/services/user_settings_service.py
@@ -79,6 +79,7 @@ async def get_user_settings_fresh(self, user_id: str) -> DomainUserSettings:
self._add_to_cache(user_id, settings)
return settings
+
# --8<-- [end:get_user_settings_fresh]
# --8<-- [start:update_user_settings]
@@ -92,12 +93,14 @@ async def update_user_settings(
if not changes:
return current
- new_settings = self._build_settings({
- **dataclasses.asdict(current),
- **changes,
- "version": (current.version or 0) + 1,
- "updated_at": datetime.now(timezone.utc),
- })
+ new_settings = self._build_settings(
+ {
+ **dataclasses.asdict(current),
+ **changes,
+ "version": (current.version or 0) + 1,
+ "updated_at": datetime.now(timezone.utc),
+ }
+ )
await self._publish_settings_event(user_id, changes, reason)
@@ -105,6 +108,7 @@ async def update_user_settings(
if (await self.repository.count_events_since_snapshot(user_id)) >= 10:
await self.repository.create_snapshot(new_settings)
return new_settings
+
# --8<-- [end:update_user_settings]
async def _publish_settings_event(self, user_id: str, changes: dict[str, Any], reason: str | None) -> None:
@@ -174,6 +178,7 @@ async def get_settings_history(self, user_id: str, limit: int = 50) -> list[Doma
)
)
return history
+
# --8<-- [end:get_settings_history]
async def restore_settings_to_point(self, user_id: str, timestamp: datetime) -> DomainUserSettings:
@@ -207,6 +212,7 @@ def _apply_event(self, settings: DomainUserSettings, event: DomainUserSettingsCh
"""Apply a settings update event via dict merge."""
event_data = {k: v for k, v in dataclasses.asdict(event).items() if v is not None}
return self._build_settings({**dataclasses.asdict(settings), **event_data, "updated_at": event.timestamp})
+
# --8<-- [end:apply_event]
@staticmethod
@@ -227,4 +233,3 @@ def _add_to_cache(self, user_id: str, settings: DomainUserSettings) -> None:
"""Add settings to TTL+LRU cache."""
self._cache[user_id] = settings
self.logger.debug(f"Cached settings for user {user_id}", cache_size=len(self._cache))
-
diff --git a/backend/app/settings.py b/backend/app/settings.py
index 96f6b930..f74472d1 100644
--- a/backend/app/settings.py
+++ b/backend/app/settings.py
@@ -180,16 +180,18 @@ def __init__(
SECURE_COOKIES: bool = True
# CORS allowed origins (overridable via config.toml)
- CORS_ORIGINS: list[str] = Field(default_factory=lambda: [
- "https://localhost:5001",
- "https://127.0.0.1:5001",
- "https://[::1]:5001",
- "https://localhost",
- "https://127.0.0.1",
- "https://localhost:443",
- "https://127.0.0.1:443",
- "https://[::1]:443",
- ])
+ CORS_ORIGINS: list[str] = Field(
+ default_factory=lambda: [
+ "https://localhost:5001",
+ "https://127.0.0.1:5001",
+ "https://[::1]:5001",
+ "https://localhost",
+ "https://127.0.0.1",
+ "https://localhost:443",
+ "https://127.0.0.1:443",
+ "https://[::1]:443",
+ ]
+ )
# Logging configuration
LOG_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
diff --git a/backend/scripts/check_orphan_modules.py b/backend/scripts/check_orphan_modules.py
index 5319c42c..2796c6d3 100644
--- a/backend/scripts/check_orphan_modules.py
+++ b/backend/scripts/check_orphan_modules.py
@@ -8,15 +8,18 @@
Usage:
uv run python scripts/check_orphan_modules.py
"""
+
import ast
import sys
from pathlib import Path
import grimp
-ENTRY_POINTS: frozenset[str] = frozenset({
- "app.main",
-})
+ENTRY_POINTS: frozenset[str] = frozenset(
+ {
+ "app.main",
+ }
+)
def _is_empty_init(module: str) -> bool:
diff --git a/frontend/nginx.conf.template b/frontend/nginx.conf.template
index f77e5760..f4b445c4 100644
--- a/frontend/nginx.conf.template
+++ b/frontend/nginx.conf.template
@@ -20,7 +20,7 @@ server {
# --8<-- [end:nonce_injection]
# --8<-- [start:security_headers]
- add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'nonce-$request_id'; style-src-elem 'self' 'nonce-$request_id'; style-src-attr 'unsafe-inline'; img-src 'self' data: blob:; font-src 'self' data:; object-src 'none'; base-uri 'self'; form-action 'self'; frame-ancestors 'none'; connect-src 'self';";
+ add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'nonce-$request_id'; style-src-elem 'self' 'nonce-$request_id' https://fonts.googleapis.com; style-src-attr 'unsafe-inline'; img-src 'self' data: blob:; font-src 'self' data: https://fonts.gstatic.com; object-src 'none'; base-uri 'self'; form-action 'self'; frame-ancestors 'none'; connect-src 'self';";
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
add_header X-Frame-Options "DENY";
add_header X-Content-Type-Options "nosniff";
diff --git a/frontend/public/index.html b/frontend/public/index.html
index 7859f44b..109cd071 100644
--- a/frontend/public/index.html
+++ b/frontend/public/index.html
@@ -10,6 +10,10 @@
+
+
+
+