diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 389bd80c..64d6afc6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,10 +6,18 @@ repos: # Local hooks using uv run to match CI exactly - repo: local hooks: - # Ruff - matches CI: cd backend && uv run ruff check . --config pyproject.toml + # Ruff format - auto-fix formatting + - id: ruff-format-backend + name: ruff format (backend) + entry: bash -c 'cd backend && uv run ruff format --config pyproject.toml .' + language: system + files: ^backend/.*\.py$ + pass_filenames: false + + # Ruff check - auto-fix safe lint issues (import sorting, etc.) - id: ruff-backend name: ruff check (backend) - entry: bash -c 'cd backend && uv run ruff check . --config pyproject.toml' + entry: bash -c 'cd backend && uv run ruff check --fix --config pyproject.toml .' language: system files: ^backend/.*\.py$ pass_filenames: false @@ -46,10 +54,10 @@ repos: files: ^frontend/src/.*\.css$ pass_filenames: false - # Prettier - matches CI: cd frontend && npx prettier --check + # Prettier - auto-fix formatting - id: prettier-frontend name: prettier (frontend) - entry: bash -c 'cd frontend && npx prettier --check "src/**/*.{ts,svelte,json}"' + entry: bash -c 'cd frontend && npx prettier --write "src/**/*.{ts,svelte,json}"' language: system files: ^frontend/src/.*\.(ts|svelte|json)$ pass_filenames: false diff --git a/backend/app/api/routes/admin/executions.py b/backend/app/api/routes/admin/executions.py index 02bf8526..8e11ce01 100644 --- a/backend/app/api/routes/admin/executions.py +++ b/backend/app/api/routes/admin/executions.py @@ -39,7 +39,11 @@ async def list_executions( skip: int = Query(0, ge=0), ) -> AdminExecutionListResponse: executions, total = await service.list_executions( - status=status, priority=priority, user_id=user_id, limit=limit, skip=skip, + status=status, + priority=priority, + user_id=user_id, + limit=limit, + skip=skip, ) return AdminExecutionListResponse( executions=[AdminExecutionResponse.model_validate(e) for e in executions], diff --git a/backend/app/api/routes/admin/users.py b/backend/app/api/routes/admin/users.py index 1209d776..1aa22a30 100644 --- a/backend/app/api/routes/admin/users.py +++ b/backend/app/api/routes/admin/users.py @@ -144,9 +144,7 @@ async def delete_user( if admin.user_id == user_id: raise HTTPException(status_code=400, detail="Cannot delete your own account") - result = await admin_user_service.delete_user( - admin_user_id=admin.user_id, user_id=user_id, cascade=cascade - ) + result = await admin_user_service.delete_user(admin_user_id=admin.user_id, user_id=user_id, cascade=cascade) return DeleteUserResponse.model_validate(result) diff --git a/backend/app/api/routes/execution.py b/backend/app/api/routes/execution.py index af8736fa..38ad192d 100644 --- a/backend/app/api/routes/execution.py +++ b/backend/app/api/routes/execution.py @@ -32,9 +32,9 @@ @inject async def get_execution_with_access( - execution_id: Annotated[str, Path()], - current_user: Annotated[User, Depends(current_user)], - execution_service: FromDishka[ExecutionService], + execution_id: Annotated[str, Path()], + current_user: Annotated[User, Depends(current_user)], + execution_service: FromDishka[ExecutionService], ) -> ExecutionInDB: domain_exec = await execution_service.get_execution_result(execution_id) @@ -50,22 +50,24 @@ async def get_execution_with_access( responses={500: {"model": ErrorResponse, "description": "Script execution failed"}}, ) async def create_execution( - request: Request, - current_user: Annotated[User, Depends(current_user)], - execution: ExecutionRequest, - execution_service: FromDishka[ExecutionService], - idempotency_key: Annotated[str | None, Header(alias="Idempotency-Key")] = None, + request: Request, + current_user: Annotated[User, Depends(current_user)], + execution: ExecutionRequest, + execution_service: FromDishka[ExecutionService], + idempotency_key: Annotated[str | None, Header(alias="Idempotency-Key")] = None, ) -> ExecutionResponse: """Submit a script for execution in an isolated Kubernetes pod.""" - trace.get_current_span().set_attributes({ - "http.method": "POST", - "http.route": "/api/v1/execute", - "execution.language": execution.lang, - "execution.language_version": execution.lang_version, - "execution.script_length": len(execution.script), - "user.id": current_user.user_id, - "client.address": get_client_ip(request), - }) + trace.get_current_span().set_attributes( + { + "http.method": "POST", + "http.route": "/api/v1/execute", + "execution.language": execution.lang, + "execution.language_version": execution.lang_version, + "execution.script_length": len(execution.script), + "user.id": current_user.user_id, + "client.address": get_client_ip(request), + } + ) exec_result = await execution_service.execute_script_idempotent( script=execution.script, @@ -83,7 +85,7 @@ async def create_execution( responses={403: {"model": ErrorResponse, "description": "Not the owner of this execution"}}, ) async def get_result( - execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], + execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], ) -> ExecutionResult: """Retrieve the result of a specific execution.""" return ExecutionResult.model_validate(execution) @@ -98,10 +100,10 @@ async def get_result( }, ) async def cancel_execution( - execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], - current_user: Annotated[User, Depends(current_user)], - cancel_request: CancelExecutionRequest, - execution_service: FromDishka[ExecutionService], + execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], + current_user: Annotated[User, Depends(current_user)], + cancel_request: CancelExecutionRequest, + execution_service: FromDishka[ExecutionService], ) -> CancelResponse: """Cancel a running or queued execution.""" result = await execution_service.cancel_execution( @@ -122,9 +124,9 @@ async def cancel_execution( }, ) async def retry_execution( - original_execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], - current_user: Annotated[User, Depends(current_user)], - execution_service: FromDishka[ExecutionService], + original_execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], + current_user: Annotated[User, Depends(current_user)], + execution_service: FromDishka[ExecutionService], ) -> ExecutionResponse: """Retry a failed or completed execution.""" @@ -146,10 +148,10 @@ async def retry_execution( responses={403: {"model": ErrorResponse, "description": "Not the owner of this execution"}}, ) async def get_execution_events( - execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], - event_service: FromDishka[EventService], - event_types: Annotated[list[EventType] | None, Query(description="Event types to filter")] = None, - limit: Annotated[int, Query(ge=1, le=1000)] = 100, + execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], + event_service: FromDishka[EventService], + event_types: Annotated[list[EventType] | None, Query(description="Event types to filter")] = None, + limit: Annotated[int, Query(ge=1, le=1000)] = 100, ) -> list[DomainEvent]: """Get all events for an execution.""" events = await event_service.get_events_by_execution_id( @@ -160,14 +162,14 @@ async def get_execution_events( @router.get("/user/executions", response_model=ExecutionListResponse) async def get_user_executions( - current_user: Annotated[User, Depends(current_user)], - execution_service: FromDishka[ExecutionService], - status: Annotated[ExecutionStatus | None, Query(description="Filter by execution status")] = None, - lang: Annotated[str | None, Query(description="Filter by programming language")] = None, - start_time: Annotated[datetime | None, Query(description="Filter executions created after this time")] = None, - end_time: Annotated[datetime | None, Query(description="Filter executions created before this time")] = None, - limit: Annotated[int, Query(ge=1, le=200)] = 50, - skip: Annotated[int, Query(ge=0)] = 0, + current_user: Annotated[User, Depends(current_user)], + execution_service: FromDishka[ExecutionService], + status: Annotated[ExecutionStatus | None, Query(description="Filter by execution status")] = None, + lang: Annotated[str | None, Query(description="Filter by programming language")] = None, + start_time: Annotated[datetime | None, Query(description="Filter executions created after this time")] = None, + end_time: Annotated[datetime | None, Query(description="Filter executions created before this time")] = None, + limit: Annotated[int, Query(ge=1, le=200)] = 50, + skip: Annotated[int, Query(ge=0)] = 0, ) -> ExecutionListResponse: """Get executions for the current user.""" @@ -194,7 +196,7 @@ async def get_user_executions( @router.get("/example-scripts", response_model=ExampleScripts) async def get_example_scripts( - execution_service: FromDishka[ExecutionService], + execution_service: FromDishka[ExecutionService], ) -> ExampleScripts: """Get example scripts for the code editor.""" scripts = await execution_service.get_example_scripts() @@ -203,7 +205,7 @@ async def get_example_scripts( @router.get("/k8s-limits", response_model=ResourceLimits) async def get_k8s_resource_limits( - execution_service: FromDishka[ExecutionService], + execution_service: FromDishka[ExecutionService], ) -> ResourceLimits: """Get Kubernetes resource limits for script execution.""" limits = await execution_service.get_k8s_resource_limits() @@ -212,9 +214,9 @@ async def get_k8s_resource_limits( @router.delete("/executions/{execution_id}", response_model=DeleteResponse) async def delete_execution( - execution_id: str, - admin: Annotated[User, Depends(admin_user)], - execution_service: FromDishka[ExecutionService], + execution_id: str, + admin: Annotated[User, Depends(admin_user)], + execution_service: FromDishka[ExecutionService], ) -> DeleteResponse: """Delete an execution and its associated data (admin only).""" await execution_service.delete_execution(execution_id, admin.user_id) diff --git a/backend/app/core/dishka_lifespan.py b/backend/app/core/dishka_lifespan.py index 730aaee6..921b304f 100644 --- a/backend/app/core/dishka_lifespan.py +++ b/backend/app/core/dishka_lifespan.py @@ -50,7 +50,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: await container.get(Tracer) FastAPIInstrumentor().instrument_app( - app, tracer_provider=trace.get_tracer_provider(), excluded_urls="health,metrics,docs,openapi.json", + app, + tracer_provider=trace.get_tracer_provider(), + excluded_urls="health,metrics,docs,openapi.json", ) logger.info("FastAPI OpenTelemetry instrumentation applied") diff --git a/backend/app/core/exceptions/handlers.py b/backend/app/core/exceptions/handlers.py index 7453817e..61842649 100644 --- a/backend/app/core/exceptions/handlers.py +++ b/backend/app/core/exceptions/handlers.py @@ -46,4 +46,6 @@ def _map_to_status_code(exc: DomainError) -> int: if isinstance(exc, InfrastructureError): return 500 return 500 + + # --8<-- [end:configure_exception_handlers] diff --git a/backend/app/core/logging.py b/backend/app/core/logging.py index 5dd70bb3..048c40fe 100644 --- a/backend/app/core/logging.py +++ b/backend/app/core/logging.py @@ -102,6 +102,8 @@ def setup_logger(log_level: str) -> structlog.stdlib.BoundLogger: logger: structlog.stdlib.BoundLogger = structlog.get_logger("integr8scode") return logger + + # --8<-- [end:setup_logger] @@ -122,13 +124,15 @@ def setup_log_exporter(settings: Settings, logger: structlog.stdlib.BoundLogger) if not settings.OTEL_EXPORTER_OTLP_ENDPOINT: return - resource = Resource.create({ - SERVICE_NAME: settings.SERVICE_NAME, - SERVICE_VERSION: settings.SERVICE_VERSION, - "service.namespace": "integr8scode", - "deployment.environment": settings.ENVIRONMENT, - "service.instance.id": settings.HOSTNAME, - }) + resource = Resource.create( + { + SERVICE_NAME: settings.SERVICE_NAME, + SERVICE_VERSION: settings.SERVICE_VERSION, + "service.namespace": "integr8scode", + "deployment.environment": settings.ENVIRONMENT, + "service.instance.id": settings.HOSTNAME, + } + ) endpoint = settings.OTEL_EXPORTER_OTLP_ENDPOINT log_exporter = OTLPLogExporter( diff --git a/backend/app/core/metrics/base.py b/backend/app/core/metrics/base.py index dd57512a..c5477707 100644 --- a/backend/app/core/metrics/base.py +++ b/backend/app/core/metrics/base.py @@ -18,6 +18,7 @@ def __init__(self, settings: Settings, meter_name: str | None = None): meter_name = meter_name or self.__class__.__name__ self._meter = metrics.get_meter(meter_name) self._create_instruments() + # --8<-- [end:init] def _create_instruments(self) -> None: diff --git a/backend/app/core/metrics/dlq.py b/backend/app/core/metrics/dlq.py index d4262223..ab35a054 100644 --- a/backend/app/core/metrics/dlq.py +++ b/backend/app/core/metrics/dlq.py @@ -65,4 +65,3 @@ def record_dlq_processing_error(self, original_topic: str, event_type: str, erro self.dlq_processing_errors.add( 1, attributes={"original_topic": original_topic, "event_type": event_type, "error_type": error_type} ) - diff --git a/backend/app/core/middlewares/metrics.py b/backend/app/core/middlewares/metrics.py index 69056af4..20a75f6e 100644 --- a/backend/app/core/middlewares/metrics.py +++ b/backend/app/core/middlewares/metrics.py @@ -118,6 +118,7 @@ def _get_path_template(path: str) -> str: path = re.sub(r"/[0-9a-f]{24}", "/{id}", path) return path + # --8<-- [end:path_template] @@ -208,4 +209,6 @@ def get_process_metrics(_: CallbackOptions) -> list[Observation]: meter.create_observable_gauge( name="process_metrics", callbacks=[get_process_metrics], description="Process-level metrics", unit="mixed" ) + + # --8<-- [end:system_metrics] diff --git a/backend/app/core/middlewares/rate_limit.py b/backend/app/core/middlewares/rate_limit.py index d1349b17..c35ac21a 100644 --- a/backend/app/core/middlewares/rate_limit.py +++ b/backend/app/core/middlewares/rate_limit.py @@ -110,6 +110,7 @@ def _extract_user_id(request: Request) -> str: craft arbitrary bucket keys to bypass IP-based limits. """ return f"ip:{get_client_ip(request)}" + # --8<-- [end:extract_user_id] async def _check_rate_limit(self, user_id: str, endpoint: str) -> RateLimitStatus: diff --git a/backend/app/core/middlewares/request_size_limit.py b/backend/app/core/middlewares/request_size_limit.py index a44c3ec0..0cd222ba 100644 --- a/backend/app/core/middlewares/request_size_limit.py +++ b/backend/app/core/middlewares/request_size_limit.py @@ -14,6 +14,7 @@ class RequestSizeLimitMiddleware: def __init__(self, app: ASGIApp, max_size_mb: int = 10) -> None: self.app = app self.max_size_bytes = max_size_mb * 1024 * 1024 + # --8<-- [end:RequestSizeLimitMiddleware] async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index b4219689..1509d920 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -84,7 +84,10 @@ class BrokerProvider(Provider): @provide def get_broker( - self, settings: Settings, logger: structlog.stdlib.BoundLogger, _tracer: Tracer, + self, + settings: Settings, + logger: structlog.stdlib.BoundLogger, + _tracer: Tracer, ) -> KafkaBroker: broker = KafkaBroker( settings.KAFKA_BOOTSTRAP_SERVERS, @@ -112,7 +115,6 @@ def get_logger(self, settings: Settings) -> structlog.stdlib.BoundLogger: return setup_logger(settings.LOG_LEVEL) - class RedisProvider(Provider): scope = Scope.APP @@ -142,10 +144,10 @@ async def get_redis_client( @provide def get_rate_limit_service( - self, - redis_client: redis.Redis, - settings: Settings, - rate_limit_metrics: RateLimitMetrics, + self, + redis_client: redis.Redis, + settings: Settings, + rate_limit_metrics: RateLimitMetrics, ) -> RateLimitService: return RateLimitService(redis_client, settings, rate_limit_metrics) @@ -159,7 +161,9 @@ def get_security_service(self, settings: Settings, security_metrics: SecurityMet @provide def get_tracer( - self, settings: Settings, logger: structlog.stdlib.BoundLogger, + self, + settings: Settings, + logger: structlog.stdlib.BoundLogger, ) -> Tracer: return Tracer(settings, logger) @@ -169,17 +173,17 @@ class MessagingProvider(Provider): @provide def get_kafka_event_transport( - self, - broker: KafkaBroker, - logger: structlog.stdlib.BoundLogger, + self, + broker: KafkaBroker, + logger: structlog.stdlib.BoundLogger, ) -> KafkaEventTransport: return KafkaEventTransport(broker, logger) @provide def get_unified_producer( - self, - event_repository: EventRepository, - transport: KafkaEventTransport, + self, + event_repository: EventRepository, + transport: KafkaEventTransport, ) -> UnifiedProducer: return UnifiedProducer(event_repository, transport) @@ -189,10 +193,10 @@ def get_idempotency_repository(self, redis_client: redis.Redis) -> RedisIdempote @provide def get_idempotency_manager( - self, - repo: RedisIdempotencyRepository, - logger: structlog.stdlib.BoundLogger, - idempotency_metrics: IdempotencyMetrics, + self, + repo: RedisIdempotencyRepository, + logger: structlog.stdlib.BoundLogger, + idempotency_metrics: IdempotencyMetrics, ) -> IdempotencyManager: return IdempotencyManager(IdempotencyConfig(), repo, logger, idempotency_metrics) @@ -204,12 +208,12 @@ class DLQProvider(Provider): @provide def get_dlq_manager( - self, - broker: KafkaBroker, - settings: Settings, - logger: structlog.stdlib.BoundLogger, - dlq_metrics: DLQMetrics, - repository: DLQRepository, + self, + broker: KafkaBroker, + settings: Settings, + logger: structlog.stdlib.BoundLogger, + dlq_metrics: DLQMetrics, + repository: DLQRepository, ) -> DLQManager: return DLQManager( settings=settings, @@ -361,10 +365,10 @@ def get_sse_redis_bus( @provide def get_sse_service( - self, - bus: SSERedisBus, - execution_repository: ExecutionRepository, - logger: structlog.stdlib.BoundLogger, + self, + bus: SSERedisBus, + execution_repository: ExecutionRepository, + logger: structlog.stdlib.BoundLogger, ) -> SSEService: return SSEService(bus=bus, execution_repository=execution_repository, logger=logger) @@ -374,15 +378,15 @@ class AuthProvider(Provider): @provide def get_auth_service( - self, - user_repository: UserRepository, - security_service: SecurityService, - security_metrics: SecurityMetrics, - logger: structlog.stdlib.BoundLogger, - lockout_service: LoginLockoutService, - runtime_settings: RuntimeSettingsLoader, - producer: UnifiedProducer, - settings: Settings, + self, + user_repository: UserRepository, + security_service: SecurityService, + security_metrics: SecurityMetrics, + logger: structlog.stdlib.BoundLogger, + lockout_service: LoginLockoutService, + runtime_settings: RuntimeSettingsLoader, + producer: UnifiedProducer, + settings: Settings, ) -> AuthService: return AuthService( user_repo=user_repository, @@ -407,11 +411,11 @@ def get_event_service(self, event_repository: EventRepository) -> EventService: @provide def get_kafka_event_service( - self, - kafka_producer: UnifiedProducer, - settings: Settings, - logger: structlog.stdlib.BoundLogger, - event_metrics: EventMetrics, + self, + kafka_producer: UnifiedProducer, + settings: Settings, + logger: structlog.stdlib.BoundLogger, + event_metrics: EventMetrics, ) -> KafkaEventService: return KafkaEventService( kafka_producer=kafka_producer, @@ -426,11 +430,11 @@ class UserServicesProvider(Provider): @provide def get_user_settings_service( - self, - repository: UserSettingsRepository, - kafka_event_service: KafkaEventService, - settings: Settings, - logger: structlog.stdlib.BoundLogger, + self, + repository: UserSettingsRepository, + kafka_event_service: KafkaEventService, + settings: Settings, + logger: structlog.stdlib.BoundLogger, ) -> UserSettingsService: return UserSettingsService(repository, kafka_event_service, settings, logger) @@ -440,10 +444,10 @@ class RuntimeSettingsProvider(Provider): @provide def get_runtime_settings_loader( - self, - admin_settings_repository: AdminSettingsRepository, - settings: Settings, - logger: structlog.stdlib.BoundLogger, + self, + admin_settings_repository: AdminSettingsRepository, + settings: Settings, + logger: structlog.stdlib.BoundLogger, ) -> RuntimeSettingsLoader: return RuntimeSettingsLoader(admin_settings_repository, settings, logger) @@ -453,40 +457,40 @@ class AdminServicesProvider(Provider): @provide def get_login_lockout_service( - self, - redis_client: redis.Redis, - runtime_settings: RuntimeSettingsLoader, - logger: structlog.stdlib.BoundLogger, + self, + redis_client: redis.Redis, + runtime_settings: RuntimeSettingsLoader, + logger: structlog.stdlib.BoundLogger, ) -> LoginLockoutService: return LoginLockoutService(redis_client, runtime_settings, logger) @provide(scope=Scope.REQUEST) def get_admin_events_service( - self, - admin_events_repository: AdminEventsRepository, - event_replay_service: EventReplayService, - logger: structlog.stdlib.BoundLogger, + self, + admin_events_repository: AdminEventsRepository, + event_replay_service: EventReplayService, + logger: structlog.stdlib.BoundLogger, ) -> AdminEventsService: return AdminEventsService(admin_events_repository, event_replay_service, logger) @provide def get_admin_settings_service( - self, - admin_settings_repository: AdminSettingsRepository, - runtime_settings: RuntimeSettingsLoader, - logger: structlog.stdlib.BoundLogger, + self, + admin_settings_repository: AdminSettingsRepository, + runtime_settings: RuntimeSettingsLoader, + logger: structlog.stdlib.BoundLogger, ) -> AdminSettingsService: return AdminSettingsService(admin_settings_repository, runtime_settings, logger) @provide def get_notification_service( - self, - notification_repository: NotificationRepository, - kafka_event_service: KafkaEventService, - sse_redis_bus: SSERedisBus, - settings: Settings, - logger: structlog.stdlib.BoundLogger, - notification_metrics: NotificationMetrics, + self, + notification_repository: NotificationRepository, + kafka_event_service: KafkaEventService, + sse_redis_bus: SSERedisBus, + settings: Settings, + logger: structlog.stdlib.BoundLogger, + notification_metrics: NotificationMetrics, ) -> NotificationService: return NotificationService( notification_repository=notification_repository, @@ -499,10 +503,10 @@ def get_notification_service( @provide def get_notification_scheduler( - self, - notification_repository: NotificationRepository, - notification_service: NotificationService, - logger: structlog.stdlib.BoundLogger, + self, + notification_repository: NotificationRepository, + notification_service: NotificationService, + logger: structlog.stdlib.BoundLogger, ) -> NotificationScheduler: return NotificationScheduler( notification_repository=notification_repository, @@ -542,11 +546,11 @@ class BusinessServicesProvider(Provider): @provide def get_saga_service( - self, - saga_repository: SagaRepository, - execution_repository: ExecutionRepository, - saga_orchestrator: SagaOrchestrator, - logger: structlog.stdlib.BoundLogger, + self, + saga_repository: SagaRepository, + execution_repository: ExecutionRepository, + saga_orchestrator: SagaOrchestrator, + logger: structlog.stdlib.BoundLogger, ) -> SagaService: return SagaService( saga_repo=saga_repository, @@ -557,14 +561,14 @@ def get_saga_service( @provide def get_execution_service( - self, - execution_repository: ExecutionRepository, - kafka_producer: UnifiedProducer, - settings: Settings, - logger: structlog.stdlib.BoundLogger, - execution_metrics: ExecutionMetrics, - idempotency_manager: IdempotencyManager, - runtime_settings: RuntimeSettingsLoader, + self, + execution_repository: ExecutionRepository, + kafka_producer: UnifiedProducer, + settings: Settings, + logger: structlog.stdlib.BoundLogger, + execution_metrics: ExecutionMetrics, + idempotency_manager: IdempotencyManager, + runtime_settings: RuntimeSettingsLoader, ) -> ExecutionService: return ExecutionService( execution_repo=execution_repository, @@ -578,20 +582,20 @@ def get_execution_service( @provide def get_saved_script_service( - self, saved_script_repository: SavedScriptRepository, logger: structlog.stdlib.BoundLogger + self, saved_script_repository: SavedScriptRepository, logger: structlog.stdlib.BoundLogger ) -> SavedScriptService: return SavedScriptService(saved_script_repository, logger) @provide def get_admin_user_service( - self, - user_repository: UserRepository, - event_service: EventService, - execution_service: ExecutionService, - rate_limit_service: RateLimitService, - security_service: SecurityService, - security_metrics: SecurityMetrics, - logger: structlog.stdlib.BoundLogger, + self, + user_repository: UserRepository, + event_service: EventService, + execution_service: ExecutionService, + rate_limit_service: RateLimitService, + security_service: SecurityService, + security_metrics: SecurityMetrics, + logger: structlog.stdlib.BoundLogger, ) -> AdminUserService: return AdminUserService( user_repository=user_repository, @@ -605,11 +609,11 @@ def get_admin_user_service( @provide def get_admin_execution_service( - self, - execution_repository: ExecutionRepository, - queue_service: ExecutionQueueService, - runtime_settings: RuntimeSettingsLoader, - logger: structlog.stdlib.BoundLogger, + self, + execution_repository: ExecutionRepository, + queue_service: ExecutionQueueService, + runtime_settings: RuntimeSettingsLoader, + logger: structlog.stdlib.BoundLogger, ) -> AdminExecutionService: return AdminExecutionService( execution_repo=execution_repository, @@ -726,12 +730,12 @@ class EventReplayProvider(Provider): @provide def get_event_replay_service( - self, - replay_repository: ReplayRepository, - kafka_producer: UnifiedProducer, - replay_metrics: ReplayMetrics, - logger: structlog.stdlib.BoundLogger, - sse_bus: SSERedisBus, + self, + replay_repository: ReplayRepository, + kafka_producer: UnifiedProducer, + replay_metrics: ReplayMetrics, + logger: structlog.stdlib.BoundLogger, + sse_bus: SSERedisBus, ) -> EventReplayService: return EventReplayService( repository=replay_repository, diff --git a/backend/app/core/security.py b/backend/app/core/security.py index 6a70321d..488d9c33 100644 --- a/backend/app/core/security.py +++ b/backend/app/core/security.py @@ -21,15 +21,14 @@ def __init__(self, settings: Settings, security_metrics: SecurityMetrics) -> Non self.settings = settings self._security_metrics = security_metrics # --8<-- [start:password_hashing] - self._password_hash = PasswordHash(( - BcryptHasher(rounds=self.settings.BCRYPT_ROUNDS), - )) + self._password_hash = PasswordHash((BcryptHasher(rounds=self.settings.BCRYPT_ROUNDS),)) def verify_password(self, plain_password: str, hashed_password: str) -> bool: return self._password_hash.verify(plain_password, hashed_password) def get_password_hash(self, password: str) -> str: return self._password_hash.hash(password) + # --8<-- [end:password_hashing] # --8<-- [start:create_access_token] @@ -39,6 +38,7 @@ def create_access_token(self, data: dict[str, Any], expires_delta: timedelta) -> to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, self.settings.SECRET_KEY, algorithm=self.settings.ALGORITHM) return encoded_jwt + # --8<-- [end:create_access_token] def decode_token(self, token: str, *, allow_expired: bool = False) -> str: @@ -51,7 +51,10 @@ def decode_token(self, token: str, *, allow_expired: bool = False) -> str: try: options = {"verify_exp": not allow_expired} payload = jwt.decode( - token, self.settings.SECRET_KEY, algorithms=[self.settings.ALGORITHM], options=options, + token, + self.settings.SECRET_KEY, + algorithms=[self.settings.ALGORITHM], + options=options, ) username: str | None = payload.get("sub") if username is None: @@ -94,10 +97,12 @@ def validate_csrf_token(self, header_token: str, cookie_token: str) -> bool: return hmac.compare_digest(header_token, cookie_token) # Paths exempt from CSRF validation (auth handles its own security) - CSRF_EXEMPT_PATHS: frozenset[str] = frozenset({ - "/api/v1/auth/login", - "/api/v1/auth/register", - }) + CSRF_EXEMPT_PATHS: frozenset[str] = frozenset( + { + "/api/v1/auth/login", + "/api/v1/auth/register", + } + ) def validate_csrf_from_request(self, request: Request) -> str: """Validate CSRF token from HTTP request using double-submit cookie pattern. @@ -144,4 +149,5 @@ def validate_csrf_from_request(self, request: Request) -> str: raise CSRFValidationError("CSRF token signature invalid") return header_token + # --8<-- [end:csrf_validation] diff --git a/backend/app/core/tracing/tracer.py b/backend/app/core/tracing/tracer.py index 6889ff7b..0c7a85a1 100644 --- a/backend/app/core/tracing/tracer.py +++ b/backend/app/core/tracing/tracer.py @@ -21,13 +21,15 @@ def __init__(self, settings: Settings, logger: structlog.stdlib.BoundLogger) -> name = settings.TRACING_SERVICE_NAME rate = settings.TRACING_SAMPLING_RATE - resource = Resource.create({ - SERVICE_NAME: name, - SERVICE_VERSION: settings.TRACING_SERVICE_VERSION, - "deployment.environment": settings.ENVIRONMENT, - "service.namespace": "integr8scode", - "service.instance.id": settings.HOSTNAME, - }) + resource = Resource.create( + { + SERVICE_NAME: name, + SERVICE_VERSION: settings.TRACING_SERVICE_VERSION, + "deployment.environment": settings.ENVIRONMENT, + "service.namespace": "integr8scode", + "service.instance.id": settings.HOSTNAME, + } + ) sampler: Sampler if rate <= 0: @@ -41,10 +43,12 @@ def __init__(self, settings: Settings, logger: structlog.stdlib.BoundLogger) -> if settings.OTLP_TRACES_ENDPOINT: provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter( - endpoint=settings.OTLP_TRACES_ENDPOINT, - insecure=settings.OTLP_TRACES_ENDPOINT.startswith("http://"), - )) + BatchSpanProcessor( + OTLPSpanExporter( + endpoint=settings.OTLP_TRACES_ENDPOINT, + insecure=settings.OTLP_TRACES_ENDPOINT.startswith("http://"), + ) + ) ) trace.set_tracer_provider(provider) diff --git a/backend/app/db/docs/event.py b/backend/app/db/docs/event.py index 514db9dc..65e7269a 100644 --- a/backend/app/db/docs/event.py +++ b/backend/app/db/docs/event.py @@ -71,6 +71,8 @@ class Settings: default_language="english", ), ] + + # --8<-- [end:EventDocument] diff --git a/backend/app/db/repositories/admin/admin_events_repository.py b/backend/app/db/repositories/admin/admin_events_repository.py index 05729bce..341c4bcc 100644 --- a/backend/app/db/repositories/admin/admin_events_repository.py +++ b/backend/app/db/repositories/admin/admin_events_repository.py @@ -180,23 +180,22 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics: error_rate = (error_count / total * 100) if total > 0 else 0 events_by_type = [ - EventTypeCount(event_type=EventType(t["_id"]), count=t["count"]) - for t in facet.get("by_type", []) + EventTypeCount(event_type=EventType(t["_id"]), count=t["count"]) for t in facet.get("by_type", []) ] events_by_hour = [HourlyEventCount(**doc) for doc in facet.get("by_hour", [])] - top_users = [ - UserEventCount(**doc) for doc in facet.get("by_user", []) if doc["user_id"] - ] + top_users = [UserEventCount(**doc) for doc in facet.get("by_user", []) if doc["user_id"]] # Separate collection — must be a separate query exec_time_field = S.field(ExecutionDocument.resource_usage.execution_time_wall_seconds) # type: ignore[union-attr] exec_pipeline = ( Pipeline() - .match({ - ExecutionDocument.created_at: {"$gte": start_time}, - ExecutionDocument.status: ExecutionStatus.COMPLETED, - ExecutionDocument.resource_usage.execution_time_wall_seconds: {"$exists": True}, # type: ignore[union-attr] - }) + .match( + { + ExecutionDocument.created_at: {"$gte": start_time}, + ExecutionDocument.status: ExecutionStatus.COMPLETED, + ExecutionDocument.resource_usage.execution_time_wall_seconds: {"$exists": True}, # type: ignore[union-attr] + } + ) .group(by=None, query={"avg_duration": S.avg(exec_time_field)}) ) exec_result = await ExecutionDocument.aggregate(exec_pipeline.export()).to_list() @@ -251,15 +250,18 @@ async def get_replay_session_sse_status(self, session_id: str) -> DomainReplaySS started_at=doc.started_at, completed_at=doc.completed_at, errors=[ - e if isinstance(e, DomainReplayError) - else DomainReplayError(**e) if isinstance(e, dict) + e + if isinstance(e, DomainReplayError) + else DomainReplayError(**e) + if isinstance(e, dict) else DomainReplayError(**dataclasses.asdict(e)) for e in doc.errors ], ) async def get_execution_results_for_filter( - self, replay_filter: ReplayFilter, + self, + replay_filter: ReplayFilter, ) -> list[ExecutionResultSummary]: mongo_query = replay_filter.to_mongo_query() if not mongo_query: @@ -268,9 +270,7 @@ async def get_execution_results_for_filter( exec_ids = list({e.execution_id for e in matched_events if e.execution_id})[:10] if not exec_ids: return [] - exec_docs = await ExecutionDocument.find( - In(ExecutionDocument.execution_id, exec_ids) - ).to_list() + exec_docs = await ExecutionDocument.find(In(ExecutionDocument.execution_id, exec_ids)).to_list() return [ ExecutionResultSummary( execution_id=d.execution_id, diff --git a/backend/app/db/repositories/dlq_repository.py b/backend/app/db/repositories/dlq_repository.py index 4fd23f05..10bbab2b 100644 --- a/backend/app/db/repositories/dlq_repository.py +++ b/backend/app/db/repositories/dlq_repository.py @@ -32,12 +32,12 @@ def _to_domain(doc: DLQMessageDocument) -> DLQMessage: return DLQMessage(**data) async def get_messages( - self, - status: DLQMessageStatus | None = None, - topic: str | None = None, - event_type: EventType | None = None, - limit: int = 50, - offset: int = 0, + self, + status: DLQMessageStatus | None = None, + topic: str | None = None, + event_type: EventType | None = None, + limit: int = 50, + offset: int = 0, ) -> DLQMessageListResult: conditions: list[Any] = [ DLQMessageDocument.status == status if status else None, diff --git a/backend/app/db/repositories/event_repository.py b/backend/app/db/repositories/event_repository.py index 9a510e77..f1194f8b 100644 --- a/backend/app/db/repositories/event_repository.py +++ b/backend/app/db/repositories/event_repository.py @@ -44,11 +44,13 @@ async def store_event(self, event: DomainEvent) -> str: data = event.model_dump(exclude_none=True) data.setdefault("stored_at", datetime.now(timezone.utc)) doc = EventDocument(**data) - trace.get_current_span().set_attributes({ - "event.type": event.event_type, - "event.id": event.event_id, - "execution.id": event.aggregate_id or "", - }) + trace.get_current_span().set_attributes( + { + "event.type": event.event_type, + "event.id": event.event_id, + "execution.id": event.aggregate_id or "", + } + ) try: await doc.insert() except DuplicateKeyError: @@ -73,7 +75,7 @@ async def get_event(self, event_id: str) -> DomainEvent | None: return DomainEventAdapter.validate_python(doc) async def get_events_by_aggregate( - self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100 + self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100 ) -> list[DomainEvent]: conditions: list[BaseFindOperator] = [Eq(EventDocument.aggregate_id, aggregate_id)] if event_types: @@ -84,7 +86,7 @@ async def get_events_by_aggregate( return [DomainEventAdapter.validate_python(d) for d in docs] async def get_events_by_execution_id( - self, execution_id: str, event_types: list[EventType] | None = None, limit: int = 100 + self, execution_id: str, event_types: list[EventType] | None = None, limit: int = 100 ) -> list[DomainEvent]: conditions: list[BaseFindOperator] = [Eq(EventDocument.execution_id, execution_id)] if event_types: @@ -95,12 +97,12 @@ async def get_events_by_execution_id( return [DomainEventAdapter.validate_python(d) for d in docs] async def get_execution_events( - self, - execution_id: str, - limit: int = 100, - skip: int = 0, - exclude_system_events: bool = False, - event_types: list[EventType] | None = None, + self, + execution_id: str, + limit: int = 100, + skip: int = 0, + exclude_system_events: bool = False, + event_types: list[EventType] | None = None, ) -> EventListResult: conditions: list[Any] = [ Or( @@ -116,7 +118,9 @@ async def get_execution_events( docs = ( await EventDocument.find(*conditions) .sort([("timestamp", SortDirection.ASCENDING)]) - .skip(skip).limit(limit).to_list() + .skip(skip) + .limit(limit) + .to_list() ) events = [DomainEventAdapter.validate_python(d) for d in docs] total_count = await EventDocument.find(*conditions).count() @@ -130,10 +134,10 @@ async def get_execution_events( ) async def get_event_statistics( - self, - start_time: datetime | None = None, - end_time: datetime | None = None, - match: dict[str, object] | None = None, + self, + start_time: datetime | None = None, + end_time: datetime | None = None, + match: dict[str, object] | None = None, ) -> EventStatistics: pipeline: list[Mapping[str, object]] = [] if match: @@ -199,26 +203,24 @@ async def get_event_statistics( async for doc in EventDocument.aggregate(pipeline): doc["events_by_type"] = [ - EventTypeCount(event_type=EventType(k), count=v) - for k, v in doc.get("events_by_type", {}).items() + EventTypeCount(event_type=EventType(k), count=v) for k, v in doc.get("events_by_type", {}).items() ] doc["events_by_service"] = [ - ServiceEventCount(service_name=k, count=v) - for k, v in doc.get("events_by_service", {}).items() + ServiceEventCount(service_name=k, count=v) for k, v in doc.get("events_by_service", {}).items() ] return EventStatistics(**doc) return EventStatistics(total_events=0, events_by_type=[], events_by_service=[], events_by_hour=[]) async def get_user_events_paginated( - self, - user_id: str, - event_types: list[EventType] | None = None, - start_time: datetime | None = None, - end_time: datetime | None = None, - limit: int = 100, - skip: int = 0, - sort_order: str = "desc", + self, + user_id: str, + event_types: list[EventType] | None = None, + start_time: datetime | None = None, + end_time: datetime | None = None, + limit: int = 100, + skip: int = 0, + sort_order: str = "desc", ) -> EventListResult: conditions = [ EventDocument.metadata.user_id == user_id, @@ -231,7 +233,9 @@ async def get_user_events_paginated( docs = ( await EventDocument.find(*conditions) .sort([("timestamp", sort_direction)]) - .skip(skip).limit(limit).to_list() + .skip(skip) + .limit(limit) + .to_list() ) events = [DomainEventAdapter.validate_python(d) for d in docs] total_count = await EventDocument.find(*conditions).count() @@ -248,17 +252,19 @@ async def count_events(self, *conditions: Any) -> int: return await EventDocument.find(*conditions).count() async def query_events( - self, - query: dict[str, Any], - sort_field: str = "timestamp", - skip: int = 0, - limit: int = 100, + self, + query: dict[str, Any], + sort_field: str = "timestamp", + skip: int = 0, + limit: int = 100, ) -> EventListResult: """Query events with filter, sort, and pagination. Always sorts descending (newest first).""" docs = ( await EventDocument.find(query) .sort([(sort_field, SortDirection.DESCENDING)]) - .skip(skip).limit(limit).to_list() + .skip(skip) + .limit(limit) + .to_list() ) events = [DomainEventAdapter.validate_python(d) for d in docs] total_count = await EventDocument.find(query).count() @@ -268,7 +274,7 @@ async def query_events( ) async def delete_event_with_archival( - self, event_id: str, deleted_by: str, deletion_reason: str = "Admin deletion via API" + self, event_id: str, deleted_by: str, deletion_reason: str = "Admin deletion via API" ) -> ArchivedEvent | None: doc = await EventDocument.find_one(EventDocument.event_id == event_id) if not doc: diff --git a/backend/app/db/repositories/execution_repository.py b/backend/app/db/repositories/execution_repository.py index ece33df0..68da731d 100644 --- a/backend/app/db/repositories/execution_repository.py +++ b/backend/app/db/repositories/execution_repository.py @@ -57,19 +57,22 @@ async def write_terminal_result(self, result: ExecutionResultDomain) -> bool: ExecutionDocument.execution_id == result.execution_id, In(ExecutionDocument.status, list(EXECUTION_ACTIVE)), ).update( - {"$set": { - "status": result.status, - "exit_code": result.exit_code, - "stdout": result.stdout, - "stderr": result.stderr, - "resource_usage": dataclasses.asdict(result.resource_usage) if result.resource_usage else None, - "error_type": result.error_type, - "updated_at": datetime.now(timezone.utc), - }} + { + "$set": { + "status": result.status, + "exit_code": result.exit_code, + "stdout": result.stdout, + "stderr": result.stderr, + "resource_usage": dataclasses.asdict(result.resource_usage) if result.resource_usage else None, + "error_type": result.error_type, + "updated_at": datetime.now(timezone.utc), + } + } ) if not update_result or getattr(update_result, "modified_count", 0) == 0: self.logger.warning( - "Execution not found or already in terminal state", execution_id=result.execution_id, + "Execution not found or already in terminal state", + execution_id=result.execution_id, ) return False return True @@ -112,33 +115,47 @@ async def aggregate_stats(self, query: dict[str, Any]) -> dict[str, Any]: if query: pipeline.append({"$match": query}) - pipeline.append({ - "$facet": { - "by_status": [{"$group": {"_id": "$status", "count": {"$sum": 1}}}], - "by_language": [ - {"$group": { - "_id": {"$concat": ["$lang", "-", "$lang_version"]}, - "count": {"$sum": 1}, - }}, - ], - "totals": [{"$group": { - "_id": None, - "total": {"$sum": 1}, - "successful": {"$sum": {"$cond": [{"$eq": ["$status", ExecutionStatus.COMPLETED]}, 1, 0]}}, - }}], - "avg_duration": [ - {"$match": { - "status": ExecutionStatus.COMPLETED, - "created_at": {"$ne": None}, - "updated_at": {"$ne": None}, - }}, - {"$group": { - "_id": None, - "avg_ms": {"$avg": {"$subtract": ["$updated_at", "$created_at"]}}, - }}, - ], - }, - }) + pipeline.append( + { + "$facet": { + "by_status": [{"$group": {"_id": "$status", "count": {"$sum": 1}}}], + "by_language": [ + { + "$group": { + "_id": {"$concat": ["$lang", "-", "$lang_version"]}, + "count": {"$sum": 1}, + } + }, + ], + "totals": [ + { + "$group": { + "_id": None, + "total": {"$sum": 1}, + "successful": { + "$sum": {"$cond": [{"$eq": ["$status", ExecutionStatus.COMPLETED]}, 1, 0]} + }, + } + } + ], + "avg_duration": [ + { + "$match": { + "status": ExecutionStatus.COMPLETED, + "created_at": {"$ne": None}, + "updated_at": {"$ne": None}, + } + }, + { + "$group": { + "_id": None, + "avg_ms": {"$avg": {"$subtract": ["$updated_at", "$created_at"]}}, + } + }, + ], + }, + } + ) collection = ExecutionDocument.get_pymongo_collection() cursor = await collection.aggregate(pipeline) diff --git a/backend/app/db/repositories/notification_repository.py b/backend/app/db/repositories/notification_repository.py index 4a33a067..783140f6 100644 --- a/backend/app/db/repositories/notification_repository.py +++ b/backend/app/db/repositories/notification_repository.py @@ -149,9 +149,7 @@ async def try_claim_pending(self, notification_id: str) -> bool: return bool(result and getattr(result, "modified_count", 0) > 0) # Subscriptions - async def get_subscription( - self, user_id: str, channel: NotificationChannel - ) -> DomainNotificationSubscription: + async def get_subscription(self, user_id: str, channel: NotificationChannel) -> DomainNotificationSubscription: """Get subscription for user/channel, returning default enabled subscription if none exists.""" doc = await NotificationSubscriptionDocument.find_one( NotificationSubscriptionDocument.user_id == user_id, @@ -189,8 +187,7 @@ async def get_all_subscriptions(self, user_id: str) -> list[DomainNotificationSu NotificationSubscriptionDocument.user_id == user_id, ).to_list() existing: dict[NotificationChannel, DomainNotificationSubscription] = { - doc.channel: DomainNotificationSubscription(**doc.model_dump(include=_sub_fields)) - for doc in docs + doc.channel: DomainNotificationSubscription(**doc.model_dump(include=_sub_fields)) for doc in docs } return [ existing.get(channel, DomainNotificationSubscription(user_id=user_id, channel=channel, enabled=True)) diff --git a/backend/app/db/repositories/saga_repository.py b/backend/app/db/repositories/saga_repository.py index c812381d..add76cb3 100644 --- a/backend/app/db/repositories/saga_repository.py +++ b/backend/app/db/repositories/saga_repository.py @@ -75,7 +75,10 @@ async def save_saga(self, saga_id: str, **updates: Any) -> Saga: return self._to_domain(doc) async def atomic_cancel_saga( - self, saga_id: str, error_message: str, completed_at: datetime, + self, + saga_id: str, + error_message: str, + completed_at: datetime, ) -> Saga: """Atomically cancel a saga using findOneAndUpdate. @@ -86,13 +89,15 @@ async def atomic_cancel_saga( SagaDocument.saga_id == saga_id, In(SagaDocument.state, list(SAGA_ACTIVE)), ).update( - Set({ # type: ignore[no-untyped-call] - SagaDocument.state: SagaState.CANCELLED, - SagaDocument.error_message: error_message, - SagaDocument.completed_at: completed_at, - SagaDocument.updated_at: datetime.now(timezone.utc), - "revision_id": uuid4(), - }), + Set( + { # type: ignore[no-untyped-call] + SagaDocument.state: SagaState.CANCELLED, + SagaDocument.error_message: error_message, + SagaDocument.completed_at: completed_at, + SagaDocument.updated_at: datetime.now(timezone.utc), + "revision_id": uuid4(), + } + ), response_type=UpdateResponse.NEW_DOCUMENT, ) if not doc: @@ -138,7 +143,7 @@ async def get_saga(self, saga_id: str) -> Saga | None: return self._to_domain(doc) if doc else None async def get_sagas_by_execution( - self, execution_id: str, state: SagaState | None = None, limit: int = 100, skip: int = 0 + self, execution_id: str, state: SagaState | None = None, limit: int = 100, skip: int = 0 ) -> SagaListResult: conditions: list[BaseFindOperator] = [Eq(SagaDocument.execution_id, execution_id)] if state: @@ -172,10 +177,10 @@ async def get_user_execution_ids(self, user_id: str) -> list[str]: return result async def find_timed_out_sagas( - self, - cutoff_time: datetime, - states: list[SagaState] | None = None, - limit: int = 100, + self, + cutoff_time: datetime, + states: list[SagaState] | None = None, + limit: int = 100, ) -> list[Saga]: states = states or [SagaState.RUNNING, SagaState.COMPENSATING] docs = ( @@ -207,9 +212,7 @@ async def get_saga_statistics(self, saga_filter: SagaFilter | None = None) -> di ] duration_pipeline = ( Pipeline() - .project( - duration={"$subtract": [S.field(SagaDocument.completed_at), S.field(SagaDocument.created_at)]} - ) + .project(duration={"$subtract": [S.field(SagaDocument.completed_at), S.field(SagaDocument.created_at)]}) .group(by=None, query={"avg_duration": S.avg("$duration")}) ) avg_duration = 0.0 diff --git a/backend/app/dlq/manager.py b/backend/app/dlq/manager.py index b7a85075..8721bf73 100644 --- a/backend/app/dlq/manager.py +++ b/backend/app/dlq/manager.py @@ -51,13 +51,18 @@ def __init__( self.repository = repository self._retry_overrides: dict[str, RetryPolicy] = {} - self._filters: list[Callable[[DLQMessage], bool]] = filters if filters is not None else [ - f for f in [ - None if settings.ENVIRONMENT == "test" else self._filter_test_events, - self._filter_old_messages, - ] if f is not None - ] - + self._filters: list[Callable[[DLQMessage], bool]] = ( + filters + if filters is not None + else [ + f + for f in [ + None if settings.ENVIRONMENT == "test" else self._filter_test_events, + self._filter_old_messages, + ] + if f is not None + ] + ) def _filter_test_events(self, message: DLQMessage) -> bool: return not message.event.event_id.startswith("test-") diff --git a/backend/app/dlq/models.py b/backend/app/dlq/models.py index f8928f17..d359939b 100644 --- a/backend/app/dlq/models.py +++ b/backend/app/dlq/models.py @@ -127,16 +127,25 @@ def get_next_retry_time(self, message: DLQMessage) -> datetime: AGGRESSIVE_RETRY = RetryPolicy( strategy=RetryStrategy.EXPONENTIAL_BACKOFF, - max_retries=5, base_delay_seconds=30, max_delay_seconds=300, retry_multiplier=2.0, + max_retries=5, + base_delay_seconds=30, + max_delay_seconds=300, + retry_multiplier=2.0, ) CAUTIOUS_RETRY = RetryPolicy( strategy=RetryStrategy.EXPONENTIAL_BACKOFF, - max_retries=3, base_delay_seconds=60, max_delay_seconds=600, retry_multiplier=3.0, + max_retries=3, + base_delay_seconds=60, + max_delay_seconds=600, + retry_multiplier=3.0, ) IMMEDIATE_RETRY = RetryPolicy(strategy=RetryStrategy.IMMEDIATE, max_retries=3) DEFAULT_RETRY = RetryPolicy( strategy=RetryStrategy.EXPONENTIAL_BACKOFF, - max_retries=4, base_delay_seconds=60, max_delay_seconds=1800, retry_multiplier=2.5, + max_retries=4, + base_delay_seconds=60, + max_delay_seconds=1800, + retry_multiplier=2.5, ) diff --git a/backend/app/domain/events/typed.py b/backend/app/domain/events/typed.py index 531cf125..787c66b1 100644 --- a/backend/app/domain/events/typed.py +++ b/backend/app/domain/events/typed.py @@ -241,6 +241,8 @@ class UserSettingsUpdatedEvent(BaseEvent): user_id: str changed_fields: list[str] = Field(default_factory=list) reason: str | None = None + + # --8<-- [end:UserSettingsUpdatedEvent] diff --git a/backend/app/domain/idempotency/models.py b/backend/app/domain/idempotency/models.py index 364cd8e8..832184cf 100644 --- a/backend/app/domain/idempotency/models.py +++ b/backend/app/domain/idempotency/models.py @@ -12,6 +12,8 @@ class IdempotencyStatus(StringEnum): PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" + + # --8<-- [end:IdempotencyStatus] diff --git a/backend/app/domain/notification/exceptions.py b/backend/app/domain/notification/exceptions.py index 8848cf8f..1aff9a0a 100644 --- a/backend/app/domain/notification/exceptions.py +++ b/backend/app/domain/notification/exceptions.py @@ -17,6 +17,8 @@ def __init__(self, user_id: str, limit: int, window_hours: int) -> None: self.limit = limit self.window_hours = window_hours super().__init__(f"Rate limit exceeded for user '{user_id}': max {limit} per {window_hours}h") + + # --8<-- [end:NotificationThrottledError] diff --git a/backend/app/domain/rate_limit/rate_limit_models.py b/backend/app/domain/rate_limit/rate_limit_models.py index c496143f..fcb7f781 100644 --- a/backend/app/domain/rate_limit/rate_limit_models.py +++ b/backend/app/domain/rate_limit/rate_limit_models.py @@ -22,6 +22,8 @@ class EndpointGroup(StringEnum): AUTH = "auth" PUBLIC = "public" API = "api" + + # --8<-- [end:EndpointGroup] @@ -32,6 +34,8 @@ class EndpointUsageStats: algorithm: RateLimitAlgorithm remaining: int + + # --8<-- [end:EndpointUsageStats] @@ -67,6 +71,8 @@ class UserRateLimit: created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) notes: str | None = None + + # --8<-- [end:UserRateLimit] diff --git a/backend/app/domain/replay/models.py b/backend/app/domain/replay/models.py index b085fb39..876e4718 100644 --- a/backend/app/domain/replay/models.py +++ b/backend/app/domain/replay/models.py @@ -41,6 +41,8 @@ class ReplayError: error: str error_type: str | None = None event_id: str | None = None + + # --8<-- [end:ReplayError] @@ -112,6 +114,8 @@ def to_mongo_query(self) -> dict[str, Any]: query["metadata.service_name"] = self.service_name return query + + # --8<-- [end:ReplayFilter] diff --git a/backend/app/domain/saga/exceptions.py b/backend/app/domain/saga/exceptions.py index 7ed2d63b..ddb41d42 100644 --- a/backend/app/domain/saga/exceptions.py +++ b/backend/app/domain/saga/exceptions.py @@ -17,6 +17,8 @@ def __init__(self, saga_id: str, user_id: str) -> None: self.saga_id = saga_id self.user_id = user_id super().__init__(f"Access denied to saga '{saga_id}' for user '{user_id}'") + + # --8<-- [end:SagaAccessDeniedError] diff --git a/backend/app/events/handlers.py b/backend/app/events/handlers.py index b2af375b..bfc8c9f3 100644 --- a/backend/app/events/handlers.py +++ b/backend/app/events/handlers.py @@ -32,31 +32,38 @@ async def _track_consumed( - metrics: EventMetrics, event: DomainEvent, consumer_group: str, coro: Awaitable[None], + metrics: EventMetrics, + event: DomainEvent, + consumer_group: str, + coro: Awaitable[None], ) -> None: """Await *coro* and record domain-level failure metric on error.""" try: await coro except Exception as e: metrics.record_events_processing_failed( - topic=event.event_type, event_type=event.event_type, - consumer_group=consumer_group, error_type=type(e).__name__, + topic=event.event_type, + event_type=event.event_type, + consumer_group=consumer_group, + error_type=type(e).__name__, ) raise # --8<-- [start:with_idempotency] async def with_idempotency( - event: DomainEvent, - handler: Callable[..., Awaitable[None]], - idem: IdempotencyManager, - key_strategy: KeyStrategy, - ttl_seconds: int, - logger: structlog.stdlib.BoundLogger, + event: DomainEvent, + handler: Callable[..., Awaitable[None]], + idem: IdempotencyManager, + key_strategy: KeyStrategy, + ttl_seconds: int, + logger: structlog.stdlib.BoundLogger, ) -> None: """Run *handler* inside an idempotency guard (check -> execute -> mark).""" result = await idem.check_and_reserve( - event=event, key_strategy=key_strategy, ttl_seconds=ttl_seconds, + event=event, + key_strategy=key_strategy, + ttl_seconds=ttl_seconds, ) if result.is_duplicate: logger.info(f"Duplicate event: {event.event_type} ({event.event_id})") @@ -66,9 +73,13 @@ async def with_idempotency( await idem.mark_completed(event=event, key_strategy=key_strategy) except Exception as e: await idem.mark_failed( - event=event, error=str(e), key_strategy=key_strategy, + event=event, + error=str(e), + key_strategy=key_strategy, ) raise + + # --8<-- [end:with_idempotency] @@ -79,14 +90,18 @@ def register_k8s_worker_subscriber(broker: KafkaBroker) -> None: ack_policy=AckPolicy.ACK, ) async def on_create_pod( - body: CreatePodCommandEvent, - worker: FromDishka[KubernetesWorker], - idem: FromDishka[IdempotencyManager], - logger: FromDishka[structlog.stdlib.BoundLogger], - event_metrics: FromDishka[EventMetrics], + body: CreatePodCommandEvent, + worker: FromDishka[KubernetesWorker], + idem: FromDishka[IdempotencyManager], + logger: FromDishka[structlog.stdlib.BoundLogger], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "k8s-worker", - with_idempotency(body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger)) + await _track_consumed( + event_metrics, + body, + "k8s-worker", + with_idempotency(body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger), + ) @broker.subscriber( EventType.DELETE_POD_COMMAND, @@ -94,14 +109,18 @@ async def on_create_pod( ack_policy=AckPolicy.ACK, ) async def on_delete_pod( - body: DeletePodCommandEvent, - worker: FromDishka[KubernetesWorker], - idem: FromDishka[IdempotencyManager], - logger: FromDishka[structlog.stdlib.BoundLogger], - event_metrics: FromDishka[EventMetrics], + body: DeletePodCommandEvent, + worker: FromDishka[KubernetesWorker], + idem: FromDishka[IdempotencyManager], + logger: FromDishka[structlog.stdlib.BoundLogger], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "k8s-worker", - with_idempotency(body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger)) + await _track_consumed( + event_metrics, + body, + "k8s-worker", + with_idempotency(body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger), + ) def register_result_processor_subscriber(broker: KafkaBroker) -> None: @@ -113,14 +132,18 @@ def register_result_processor_subscriber(broker: KafkaBroker) -> None: auto_offset_reset="earliest", ) async def on_execution_completed( - body: ExecutionCompletedEvent, - processor: FromDishka[ResultProcessor], - idem: FromDishka[IdempotencyManager], - logger: FromDishka[structlog.stdlib.BoundLogger], - event_metrics: FromDishka[EventMetrics], + body: ExecutionCompletedEvent, + processor: FromDishka[ResultProcessor], + idem: FromDishka[IdempotencyManager], + logger: FromDishka[structlog.stdlib.BoundLogger], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "result-processor", - with_idempotency(body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger)) + await _track_consumed( + event_metrics, + body, + "result-processor", + with_idempotency(body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger), + ) @broker.subscriber( EventType.EXECUTION_FAILED, @@ -130,14 +153,18 @@ async def on_execution_completed( auto_offset_reset="earliest", ) async def on_execution_failed( - body: ExecutionFailedEvent, - processor: FromDishka[ResultProcessor], - idem: FromDishka[IdempotencyManager], - logger: FromDishka[structlog.stdlib.BoundLogger], - event_metrics: FromDishka[EventMetrics], + body: ExecutionFailedEvent, + processor: FromDishka[ResultProcessor], + idem: FromDishka[IdempotencyManager], + logger: FromDishka[structlog.stdlib.BoundLogger], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "result-processor", - with_idempotency(body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger)) + await _track_consumed( + event_metrics, + body, + "result-processor", + with_idempotency(body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger), + ) @broker.subscriber( EventType.EXECUTION_TIMEOUT, @@ -147,14 +174,18 @@ async def on_execution_failed( auto_offset_reset="earliest", ) async def on_execution_timeout( - body: ExecutionTimeoutEvent, - processor: FromDishka[ResultProcessor], - idem: FromDishka[IdempotencyManager], - logger: FromDishka[structlog.stdlib.BoundLogger], - event_metrics: FromDishka[EventMetrics], + body: ExecutionTimeoutEvent, + processor: FromDishka[ResultProcessor], + idem: FromDishka[IdempotencyManager], + logger: FromDishka[structlog.stdlib.BoundLogger], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "result-processor", - with_idempotency(body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger)) + await _track_consumed( + event_metrics, + body, + "result-processor", + with_idempotency(body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger), + ) def register_saga_subscriber(broker: KafkaBroker) -> None: @@ -164,14 +195,19 @@ def register_saga_subscriber(broker: KafkaBroker) -> None: ack_policy=AckPolicy.ACK, ) async def on_execution_requested( - body: ExecutionRequestedEvent, - orchestrator: FromDishka[SagaOrchestrator], - idem: FromDishka[IdempotencyManager], - logger: FromDishka[structlog.stdlib.BoundLogger], - event_metrics: FromDishka[EventMetrics], + body: ExecutionRequestedEvent, + orchestrator: FromDishka[SagaOrchestrator], + idem: FromDishka[IdempotencyManager], + logger: FromDishka[structlog.stdlib.BoundLogger], + event_metrics: FromDishka[EventMetrics], ) -> None: coro = with_idempotency( - body, orchestrator.handle_execution_requested, idem, KeyStrategy.EVENT_BASED, 3600, logger, + body, + orchestrator.handle_execution_requested, + idem, + KeyStrategy.EVENT_BASED, + 3600, + logger, ) await _track_consumed(event_metrics, body, "saga-orchestrator", coro) @@ -181,12 +217,11 @@ async def on_execution_requested( ack_policy=AckPolicy.ACK, ) async def on_execution_completed( - body: ExecutionCompletedEvent, - orchestrator: FromDishka[SagaOrchestrator], - event_metrics: FromDishka[EventMetrics], + body: ExecutionCompletedEvent, + orchestrator: FromDishka[SagaOrchestrator], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "saga-orchestrator", - orchestrator.handle_execution_completed(body)) + await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_completed(body)) @broker.subscriber( EventType.EXECUTION_FAILED, @@ -194,12 +229,11 @@ async def on_execution_completed( ack_policy=AckPolicy.ACK, ) async def on_execution_failed( - body: ExecutionFailedEvent, - orchestrator: FromDishka[SagaOrchestrator], - event_metrics: FromDishka[EventMetrics], + body: ExecutionFailedEvent, + orchestrator: FromDishka[SagaOrchestrator], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "saga-orchestrator", - orchestrator.handle_execution_failed(body)) + await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_failed(body)) @broker.subscriber( EventType.EXECUTION_TIMEOUT, @@ -207,12 +241,11 @@ async def on_execution_failed( ack_policy=AckPolicy.ACK, ) async def on_execution_timeout( - body: ExecutionTimeoutEvent, - orchestrator: FromDishka[SagaOrchestrator], - event_metrics: FromDishka[EventMetrics], + body: ExecutionTimeoutEvent, + orchestrator: FromDishka[SagaOrchestrator], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "saga-orchestrator", - orchestrator.handle_execution_timeout(body)) + await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_timeout(body)) @broker.subscriber( EventType.EXECUTION_CANCELLED, @@ -220,12 +253,11 @@ async def on_execution_timeout( ack_policy=AckPolicy.ACK, ) async def on_execution_cancelled( - body: ExecutionCancelledEvent, - orchestrator: FromDishka[SagaOrchestrator], - event_metrics: FromDishka[EventMetrics], + body: ExecutionCancelledEvent, + orchestrator: FromDishka[SagaOrchestrator], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, "saga-orchestrator", - orchestrator.handle_execution_cancelled(body)) + await _track_consumed(event_metrics, body, "saga-orchestrator", orchestrator.handle_execution_cancelled(body)) _SSE_EVENT_TYPES = [ @@ -260,14 +292,12 @@ def register_sse_subscriber(broker: KafkaBroker, settings: Settings) -> None: max_workers=settings.SSE_CONSUMER_POOL_SIZE, ) async def on_sse_event( - body: DomainEvent, - sse_bus: FromDishka[SSERedisBus], + body: DomainEvent, + sse_bus: FromDishka[SSERedisBus], ) -> None: execution_id = getattr(body, "execution_id", None) if execution_id: - sse_data = SSEExecutionEventData(**{ - k: v for k, v in body.model_dump().items() if k in _sse_field_names - }) + sse_data = SSEExecutionEventData(**{k: v for k, v in body.model_dump().items() if k in _sse_field_names}) await sse_bus.publish_event(execution_id, sse_data) @@ -282,12 +312,11 @@ def register_notification_subscriber(broker: KafkaBroker) -> None: auto_offset_reset="latest", ) async def on_execution_completed( - body: ExecutionCompletedEvent, - service: FromDishka[NotificationService], - event_metrics: FromDishka[EventMetrics], + body: ExecutionCompletedEvent, + service: FromDishka[NotificationService], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, group_id, - service.handle_execution_completed(body)) + await _track_consumed(event_metrics, body, group_id, service.handle_execution_completed(body)) @broker.subscriber( EventType.EXECUTION_FAILED, @@ -297,12 +326,11 @@ async def on_execution_completed( auto_offset_reset="latest", ) async def on_execution_failed( - body: ExecutionFailedEvent, - service: FromDishka[NotificationService], - event_metrics: FromDishka[EventMetrics], + body: ExecutionFailedEvent, + service: FromDishka[NotificationService], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, group_id, - service.handle_execution_failed(body)) + await _track_consumed(event_metrics, body, group_id, service.handle_execution_failed(body)) @broker.subscriber( EventType.EXECUTION_TIMEOUT, @@ -312,11 +340,8 @@ async def on_execution_failed( auto_offset_reset="latest", ) async def on_execution_timeout( - body: ExecutionTimeoutEvent, - service: FromDishka[NotificationService], - event_metrics: FromDishka[EventMetrics], + body: ExecutionTimeoutEvent, + service: FromDishka[NotificationService], + event_metrics: FromDishka[EventMetrics], ) -> None: - await _track_consumed(event_metrics, body, group_id, - service.handle_execution_timeout(body)) - - + await _track_consumed(event_metrics, body, group_id, service.handle_execution_timeout(body)) diff --git a/backend/app/infrastructure/kafka/topics.py b/backend/app/infrastructure/kafka/topics.py index 4730806e..a4239270 100644 --- a/backend/app/infrastructure/kafka/topics.py +++ b/backend/app/infrastructure/kafka/topics.py @@ -54,18 +54,22 @@ # The pod monitor uses this to decide when to delete the pod. # Note: EXECUTION_CANCELLED is excluded — cancellation triggers pod deletion # via the saga's DeletePodCommandEvent compensation, not the monitor. -EXECUTION_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset({ - EventType.EXECUTION_COMPLETED, - EventType.EXECUTION_FAILED, - EventType.EXECUTION_TIMEOUT, -}) +EXECUTION_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset( + { + EventType.EXECUTION_COMPLETED, + EventType.EXECUTION_FAILED, + EventType.EXECUTION_TIMEOUT, + } +) # Events that signal the full execution pipeline has concluded # (result stored or execution failed before result could be stored). # The SSE service uses this to close the execution stream. -EXECUTION_PIPELINE_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset({ - EventType.RESULT_STORED, - EventType.RESULT_FAILED, - EventType.EXECUTION_FAILED, - EventType.EXECUTION_TIMEOUT, -}) +EXECUTION_PIPELINE_TERMINAL_EVENT_TYPES: frozenset[EventType] = frozenset( + { + EventType.RESULT_STORED, + EventType.RESULT_FAILED, + EventType.EXECUTION_FAILED, + EventType.EXECUTION_TIMEOUT, + } +) diff --git a/backend/app/runtime_registry.py b/backend/app/runtime_registry.py index 49c53d78..d471a62c 100644 --- a/backend/app/runtime_registry.py +++ b/backend/app/runtime_registry.py @@ -8,6 +8,8 @@ class RuntimeConfig(NamedTuple): image: str # Full Docker image reference file_name: str # Name that will be mounted under /scripts/ command: list[str] # Entrypoint executed inside the container + + # --8<-- [end:RuntimeConfig] @@ -17,6 +19,8 @@ class LanguageSpec(TypedDict): image_tpl: str file_ext: str interpreter: list[str] + + # --8<-- [end:LanguageSpec] diff --git a/backend/app/schemas_pydantic/saved_script.py b/backend/app/schemas_pydantic/saved_script.py index 3979a848..bc60c474 100644 --- a/backend/app/schemas_pydantic/saved_script.py +++ b/backend/app/schemas_pydantic/saved_script.py @@ -44,6 +44,8 @@ class SavedScriptResponse(BaseModel): updated_at: datetime model_config = ConfigDict(from_attributes=True, json_schema_serialization_defaults_required=True) + + # --8<-- [end:SavedScriptResponse] diff --git a/backend/app/schemas_pydantic/user.py b/backend/app/schemas_pydantic/user.py index bf8d1ea1..871b9112 100644 --- a/backend/app/schemas_pydantic/user.py +++ b/backend/app/schemas_pydantic/user.py @@ -114,8 +114,6 @@ class LoginResponse(BaseModel): model_config = ConfigDict(from_attributes=True, json_schema_serialization_defaults_required=True) - - class UnlockResponse(BaseModel): """Response model for account unlock.""" diff --git a/backend/app/schemas_pydantic/user_settings.py b/backend/app/schemas_pydantic/user_settings.py index a2ee7e1c..ad9447c2 100644 --- a/backend/app/schemas_pydantic/user_settings.py +++ b/backend/app/schemas_pydantic/user_settings.py @@ -109,5 +109,3 @@ class RestoreSettingsRequest(BaseModel): timestamp: datetime reason: str | None = None - - diff --git a/backend/app/services/admin/admin_events_service.py b/backend/app/services/admin/admin_events_service.py index 18ca8848..de71ef16 100644 --- a/backend/app/services/admin/admin_events_service.py +++ b/backend/app/services/admin/admin_events_service.py @@ -66,14 +66,14 @@ def _filter_to_json_dict(f: EventFilter) -> dict[str, Any]: class AdminReplayResult: def __init__( - self, - *, - dry_run: bool, - total_events: int, - replay_id: str, - status: ReplayStatus, - session_id: str | None = None, - events_preview: list[EventSummary] | None = None, + self, + *, + dry_run: bool, + total_events: int, + replay_id: str, + status: ReplayStatus, + session_id: str | None = None, + events_preview: list[EventSummary] | None = None, ) -> None: self.dry_run = dry_run self.total_events = total_events @@ -92,21 +92,21 @@ class ExportResult: class AdminEventsService: def __init__( - self, - repository: AdminEventsRepository, - replay_service: EventReplayService, - logger: structlog.stdlib.BoundLogger, + self, + repository: AdminEventsRepository, + replay_service: EventReplayService, + logger: structlog.stdlib.BoundLogger, ) -> None: self._repo = repository self._replay_service = replay_service self.logger = logger async def browse_events( - self, - *, - event_filter: EventFilter, - skip: int, - limit: int, + self, + *, + event_filter: EventFilter, + skip: int, + limit: int, ) -> EventBrowseResult: return await self._repo.get_events_page(event_filter, skip=skip, limit=limit) @@ -117,12 +117,12 @@ async def get_event_stats(self, *, hours: int) -> EventStatistics: return await self._repo.get_event_stats(hours=hours) async def prepare_or_schedule_replay( - self, - *, - replay_filter: ReplayFilter, - dry_run: bool, - replay_id: str, - target_service: str | None, + self, + *, + replay_filter: ReplayFilter, + dry_run: bool, + replay_id: str, + target_service: str | None, ) -> AdminReplayResult: if replay_filter.is_empty(): raise ValidationError("Must specify at least one filter for replay") @@ -241,7 +241,7 @@ def _estimate_completion(self, doc: ReplaySessionDocument, now: datetime) -> dat return now + timedelta(seconds=remaining / rate) if rate > 0 else None async def export_events( - self, *, event_filter: EventFilter, limit: int, export_format: ExportFormat + self, *, event_filter: EventFilter, limit: int, export_format: ExportFormat ) -> ExportResult: if export_format == ExportFormat.CSV: return await self._export_csv(event_filter=event_filter, limit=limit) diff --git a/backend/app/services/admin/admin_execution_service.py b/backend/app/services/admin/admin_execution_service.py index 1624e84a..10042d1a 100644 --- a/backend/app/services/admin/admin_execution_service.py +++ b/backend/app/services/admin/admin_execution_service.py @@ -41,13 +41,18 @@ async def list_executions( query["user_id"] = user_id executions = await self._repo.get_executions( - query=query, limit=limit, skip=skip, sort=[("created_at", -1)], + query=query, + limit=limit, + skip=skip, + sort=[("created_at", -1)], ) total = await self._repo.count_executions(query) return executions, total async def update_priority( - self, execution_id: str, new_priority: QueuePriority, + self, + execution_id: str, + new_priority: QueuePriority, ) -> DomainExecution: updated = await self._repo.update_priority(execution_id, new_priority) if not updated: diff --git a/backend/app/services/admin/admin_user_service.py b/backend/app/services/admin/admin_user_service.py index 56d68272..229a990a 100644 --- a/backend/app/services/admin/admin_user_service.py +++ b/backend/app/services/admin/admin_user_service.py @@ -133,7 +133,9 @@ async def list_users( bypass_rate_limit=s.bypass_rate_limit, global_multiplier=s.global_multiplier, has_custom_limits=s.has_custom_limits, - ) if (s := summaries.get(user.user_id)) else user + ) + if (s := summaries.get(user.user_id)) + else user for user in result.users ] @@ -166,9 +168,7 @@ async def create_user( return created async def get_user(self, *, admin_user_id: str, user_id: str) -> User | None: - self.logger.info( - "Admin getting user details", admin_user_id=admin_user_id, target_user_id=user_id - ) + self.logger.info("Admin getting user details", admin_user_id=admin_user_id, target_user_id=user_id) return await self._users.get_user_by_id(user_id) async def update_user(self, *, admin_user_id: str, user_id: str, update: UserUpdate) -> User | None: @@ -202,9 +202,7 @@ async def delete_user(self, *, admin_user_id: str, user_id: str, cascade: bool) return result async def reset_user_password(self, *, admin_user_id: str, user_id: str, new_password: str) -> bool: - self.logger.info( - "Admin resetting user password", admin_user_id=admin_user_id, target_user_id=user_id - ) + self.logger.info("Admin resetting user password", admin_user_id=admin_user_id, target_user_id=user_id) self._security_metrics.record_password_reset_request(method="admin") hashed = self._security.get_password_hash(new_password) pr = PasswordReset(user_id=user_id, new_password=hashed) @@ -214,9 +212,7 @@ async def reset_user_password(self, *, admin_user_id: str, user_id: str, new_pas return ok async def get_user_rate_limits(self, *, admin_user_id: str, user_id: str) -> UserRateLimitsResult: - self.logger.info( - "Admin getting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id - ) + self.logger.info("Admin getting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id) user_limit = await self._rate_limits.get_user_rate_limit(user_id) usage_stats = await self._rate_limits.get_usage_stats(user_id) return UserRateLimitsResult( @@ -244,8 +240,6 @@ async def update_user_rate_limits( return RateLimitUpdateResult(user_id=user_id, updated=True, config=config) async def reset_user_rate_limits(self, *, admin_user_id: str, user_id: str) -> bool: - self.logger.info( - "Admin resetting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id - ) + self.logger.info("Admin resetting user rate limits", admin_user_id=admin_user_id, target_user_id=user_id) await self._rate_limits.reset_user_limits(user_id) return True diff --git a/backend/app/services/auth_service.py b/backend/app/services/auth_service.py index 6233681e..a2df454b 100644 --- a/backend/app/services/auth_service.py +++ b/backend/app/services/auth_service.py @@ -85,7 +85,10 @@ async def get_current_user(self, request: Request) -> User: async def get_admin(self, request: Request) -> User: user = await self.get_current_user(request) self.security_metrics.record_authorization_check( - "/admin", request.method, user.role == UserRole.ADMIN, user_role=user.role, + "/admin", + request.method, + user.role == UserRole.ADMIN, + user_role=user.role, ) if user.role != UserRole.ADMIN: self.logger.warning("Admin access denied", username=user.username, role=user.role) @@ -166,7 +169,8 @@ async def login( access_token_expires = timedelta(minutes=session_timeout) access_token = self.security_service.create_access_token( - data={"sub": user.username}, expires_delta=access_token_expires, + data={"sub": user.username}, + expires_delta=access_token_expires, ) csrf_token = self.security_service.generate_csrf_token(access_token) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index b92b5e73..13b11e71 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -120,9 +120,7 @@ async def pause_session(self, session_id: str) -> ReplayOperationResult: if scheduler: scheduler.remove_all_jobs() await self._repository.update_session_status(session_id, ReplayStatus.PAUSED) - return ReplayOperationResult( - session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" - ) + return ReplayOperationResult(session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused") async def resume_session(self, session_id: str) -> ReplayOperationResult: session = self.get_session(session_id) diff --git a/backend/app/services/execution_queue.py b/backend/app/services/execution_queue.py index ee5de60f..0f79ac1c 100644 --- a/backend/app/services/execution_queue.py +++ b/backend/app/services/execution_queue.py @@ -178,7 +178,9 @@ async def update_priority(self, execution_id: str, new_priority: QueuePriority) if not result: return False self._logger.info( - "Updated execution priority", execution_id=execution_id, new_priority=new_priority, + "Updated execution priority", + execution_id=execution_id, + new_priority=new_priority, ) return True diff --git a/backend/app/services/execution_service.py b/backend/app/services/execution_service.py index 7d5a5334..a93b79e7 100644 --- a/backend/app/services/execution_service.py +++ b/backend/app/services/execution_service.py @@ -580,4 +580,3 @@ def _build_stats_query( query["created_at"] = time_filter return query - diff --git a/backend/app/services/idempotency/idempotency_manager.py b/backend/app/services/idempotency/idempotency_manager.py index 29e3fcbc..c1bcc372 100644 --- a/backend/app/services/idempotency/idempotency_manager.py +++ b/backend/app/services/idempotency/idempotency_manager.py @@ -32,6 +32,8 @@ class IdempotencyConfig(BaseModel): processing_timeout_seconds: int = 300 enable_result_caching: bool = True max_result_size_bytes: int = 1048576 + + # --8<-- [end:IdempotencyConfig] @@ -69,6 +71,7 @@ def _generate_key( else: raise ValueError(f"Invalid key strategy: {key_strategy}") return f"{self.config.key_prefix}:{key}" + # --8<-- [end:generate_key] async def check_and_reserve( diff --git a/backend/app/services/idempotency/redis_repository.py b/backend/app/services/idempotency/redis_repository.py index 8cdf0e83..149441af 100644 --- a/backend/app/services/idempotency/redis_repository.py +++ b/backend/app/services/idempotency/redis_repository.py @@ -100,6 +100,7 @@ async def insert_processing(self, record: IdempotencyRecord) -> None: if not ok: # Mirror Mongo behavior so manager's DuplicateKeyError path is reused raise DuplicateKeyError("Key already exists") + # --8<-- [end:insert_processing] async def update_record(self, record: IdempotencyRecord) -> int: @@ -119,4 +120,3 @@ async def update_record(self, record: IdempotencyRecord) -> int: else: await self._r.set(k, payload) return 1 - diff --git a/backend/app/services/k8s_worker/worker.py b/backend/app/services/k8s_worker/worker.py index c414a5cf..a9ab313b 100644 --- a/backend/app/services/k8s_worker/worker.py +++ b/backend/app/services/k8s_worker/worker.py @@ -35,11 +35,11 @@ class KubernetesWorker: """ def __init__( - self, - api_client: k8s_client.ApiClient, - producer: UnifiedProducer, - settings: Settings, - logger: structlog.stdlib.BoundLogger, + self, + api_client: k8s_client.ApiClient, + producer: UnifiedProducer, + settings: Settings, + logger: structlog.stdlib.BoundLogger, ): self.logger = logger self.metrics = KubernetesMetrics(settings) @@ -133,8 +133,7 @@ async def _create_pod_for_execution(self, command: CreatePodCommandEvent) -> Non self.metrics.record_k8s_pod_created("success", command.language) self.logger.info( - f"Successfully created pod {pod.metadata.name} for execution {execution_id}. " - f"Duration: {duration:.2f}s" + f"Successfully created pod {pod.metadata.name} for execution {execution_id}. Duration: {duration:.2f}s" ) except Exception as e: @@ -198,9 +197,7 @@ async def _create_pod(self, pod: k8s_client.V1Pod) -> k8s_client.V1Pod | None: else: raise - async def _set_configmap_owner( - self, config_map: k8s_client.V1ConfigMap, owner_pod: k8s_client.V1Pod - ) -> None: + async def _set_configmap_owner(self, config_map: k8s_client.V1ConfigMap, owner_pod: k8s_client.V1Pod) -> None: """Patch the ConfigMap with an ownerReference pointing to the pod. This makes K8s garbage-collect the ConfigMap automatically when the pod is deleted. @@ -281,8 +278,11 @@ async def _ensure_executor_network_policy(self, namespace: str) -> None: ) await self.networking_v1.patch_namespaced_network_policy( # type: ignore[call-arg] - name=policy_name, namespace=namespace, body=policy, - field_manager="integr8s", force=True, + name=policy_name, + namespace=namespace, + body=policy, + field_manager="integr8s", + force=True, _content_type="application/apply-patch+yaml", ) self.logger.info(f"NetworkPolicy '{policy_name}' applied in namespace {namespace}") @@ -324,14 +324,16 @@ async def ensure_image_pre_puller_daemonset(self) -> None: for i, image_ref in enumerate(sorted(all_images)): sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-") self.logger.info(f"DAEMONSET: before: {image_ref} -> {sanitized_image_ref}") - init_containers.append(k8s_client.V1Container( - name=f"pull-{i}-{sanitized_image_ref}", - image=image_ref, - command=["/bin/sh", "-c", f'echo "Image {image_ref} pulled."'], - image_pull_policy="Always", - security_context=psa_security_context, - resources=minimal_resources, - )) + init_containers.append( + k8s_client.V1Container( + name=f"pull-{i}-{sanitized_image_ref}", + image=image_ref, + command=["/bin/sh", "-c", f'echo "Image {image_ref} pulled."'], + image_pull_policy="Always", + security_context=psa_security_context, + resources=minimal_resources, + ) + ) daemonset = k8s_client.V1DaemonSet( api_version="apps/v1", @@ -343,11 +345,14 @@ async def ensure_image_pre_puller_daemonset(self) -> None: metadata=k8s_client.V1ObjectMeta(labels={"name": daemonset_name}), spec=k8s_client.V1PodSpec( init_containers=init_containers, - containers=[k8s_client.V1Container( - name="pause", image="registry.k8s.io/pause:3.9", - security_context=psa_security_context, - resources=minimal_resources, - )], + containers=[ + k8s_client.V1Container( + name="pause", + image="registry.k8s.io/pause:3.9", + security_context=psa_security_context, + resources=minimal_resources, + ) + ], tolerations=[k8s_client.V1Toleration(operator="Exists")], security_context=k8s_client.V1PodSecurityContext( run_as_non_root=True, @@ -361,8 +366,11 @@ async def ensure_image_pre_puller_daemonset(self) -> None: ) await self.apps_v1.patch_namespaced_daemon_set( # type: ignore[call-arg] - name=daemonset_name, namespace=namespace, body=daemonset, - field_manager="integr8s", force=True, + name=daemonset_name, + namespace=namespace, + body=daemonset, + field_manager="integr8s", + force=True, _content_type="application/apply-patch+yaml", ) self.logger.info(f"DaemonSet '{daemonset_name}' applied successfully") diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index b9a63b83..634c0abf 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -86,6 +86,8 @@ async def clear(self) -> None: """Clear all throttle entries.""" async with self._lock: self._entries.clear() + + # --8<-- [end:ThrottleCache] @@ -142,9 +144,7 @@ async def create_notification( max_days = self.settings.NOTIF_MAX_SCHEDULE_DAYS max_schedule = datetime.now(UTC) + timedelta(days=max_days) if scheduled_for > max_schedule: - raise NotificationValidationError( - f"scheduled_for cannot exceed {max_days} days from now" - ) + raise NotificationValidationError(f"scheduled_for cannot exceed {max_days} days from now") self.logger.info( f"Creating notification for user {user_id}", user_id=user_id, @@ -311,9 +311,7 @@ async def _create_system_for_user( ) return "created" except Exception as e: - self.logger.error( - "Failed to create system notification for user", user_id=user_id, error=str(e) - ) + self.logger.error("Failed to create system notification for user", user_id=user_id, error=str(e)) return "failed" async def _send_in_app( @@ -357,11 +355,13 @@ async def _send_webhook( webhook_host=safe_host, ) - trace.get_current_span().set_attributes({ - "notification.id": str(notification.notification_id), - "notification.channel": "webhook", - "notification.webhook_host": safe_host, - }) + trace.get_current_span().set_attributes( + { + "notification.id": str(notification.notification_id), + "notification.channel": "webhook", + "notification.webhook_host": safe_host, + } + ) async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(webhook_url, json=payload, headers=headers) response.raise_for_status() @@ -406,10 +406,12 @@ async def _send_slack(self, notification: DomainNotification, subscription: Doma priority_color=self._get_slack_color(notification.severity), ) - trace.get_current_span().set_attributes({ - "notification.id": str(notification.notification_id), - "notification.channel": "slack", - }) + trace.get_current_span().set_attributes( + { + "notification.id": str(notification.notification_id), + "notification.channel": "slack", + } + ) async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(subscription.slack_webhook, json=slack_message) response.raise_for_status() @@ -601,6 +603,7 @@ async def _should_skip_notification( return f"Notification tags {notification.tags} excluded by preferences for {notification.channel}" return None + # --8<-- [end:should_skip_notification] async def _deliver_notification(self, notification: DomainNotification) -> bool: diff --git a/backend/app/services/pod_monitor/event_mapper.py b/backend/app/services/pod_monitor/event_mapper.py index 107eaafb..a14d8775 100644 --- a/backend/app/services/pod_monitor/event_mapper.py +++ b/backend/app/services/pod_monitor/event_mapper.py @@ -22,8 +22,12 @@ # Python 3.12 type aliases type PodPhase = str type PodMonitorEvent = ( - PodScheduledEvent | PodRunningEvent | PodTerminatedEvent - | ExecutionCompletedEvent | ExecutionFailedEvent | ExecutionTimeoutEvent + PodScheduledEvent + | PodRunningEvent + | PodTerminatedEvent + | ExecutionCompletedEvent + | ExecutionFailedEvent + | ExecutionTimeoutEvent ) type EventList = list[PodMonitorEvent] type AsyncMapper = Callable[["PodContext"], Awaitable[PodMonitorEvent | None]] @@ -199,7 +203,7 @@ def _is_duplicate(self, pod_name: str, phase: PodPhase) -> bool: return True if len(self._event_cache) >= self._MAX_CACHE_SIZE: # Evict oldest half to amortise cleanup cost - keys = list(self._event_cache)[:self._MAX_CACHE_SIZE // 2] + keys = list(self._event_cache)[: self._MAX_CACHE_SIZE // 2] for k in keys: del self._event_cache[k] self._event_cache[pod_name] = phase @@ -343,9 +347,7 @@ async def _check_timeout(self, ctx: PodContext) -> ExecutionTimeoutEvent | None: # A pod can carry DeadlineExceeded while containers already exited 0. # In that case, let the normal completed mapping path handle it. if self._all_containers_succeeded(ctx.pod): - self.logger.info( - f"POD-EVENT: timeout ignored because containers succeeded exec={ctx.execution_id}" - ) + self.logger.info(f"POD-EVENT: timeout ignored because containers succeeded exec={ctx.execution_id}") return None logs = await self._extract_logs(ctx.pod) @@ -519,5 +521,3 @@ def _parse_framed_output(logs: str) -> tuple[str, str]: return "", "" return stdout, stderr - - diff --git a/backend/app/services/pod_monitor/monitor.py b/backend/app/services/pod_monitor/monitor.py index 44ce1b16..2f893181 100644 --- a/backend/app/services/pod_monitor/monitor.py +++ b/backend/app/services/pod_monitor/monitor.py @@ -186,7 +186,9 @@ async def _delete_pod(self, pod: k8s_client.V1Pod) -> None: pod_name = pod.metadata.name try: await self._v1.delete_namespaced_pod( - name=pod_name, namespace=pod.metadata.namespace, grace_period_seconds=0, + name=pod_name, + namespace=pod.metadata.namespace, + grace_period_seconds=0, ) self.logger.info("Deleted completed pod", pod_name=pod_name) except ApiException as e: diff --git a/backend/app/services/rate_limit_service.py b/backend/app/services/rate_limit_service.py index a0ccb9c4..cad7ece8 100644 --- a/backend/app/services/rate_limit_service.py +++ b/backend/app/services/rate_limit_service.py @@ -106,9 +106,7 @@ async def check_rate_limit( user_id, endpoint, effective_limit, rule.window_seconds, rule.burst_multiplier, rule ) else: - status = await self._check_sliding_window( - user_id, endpoint, effective_limit, rule.window_seconds, rule - ) + status = await self._check_sliding_window(user_id, endpoint, effective_limit, rule.window_seconds, rule) if status.allowed: self.metrics.record_allowed(normalized_endpoint, rule.group) @@ -207,7 +205,13 @@ async def _check_token_bucket( # --8<-- [start:check_token_bucket] result = await self.redis.eval( # type: ignore[misc] - self._TOKEN_BUCKET_LUA, 1, key, max_tokens, refill_rate, now, window_seconds * 2, + self._TOKEN_BUCKET_LUA, + 1, + key, + max_tokens, + refill_rate, + now, + window_seconds * 2, ) allowed = bool(result[0]) tokens = float(result[1]) @@ -357,9 +361,7 @@ async def get_usage_stats(self, user_id: str) -> dict[str, EndpointUsageStats]: rule = self._find_matching_rule(endpoint, user_config, config) limit = int(rule.requests * multiplier) if rule else 0 remaining = max(0, limit - count) - stats[endpoint] = EndpointUsageStats( - algorithm=RateLimitAlgorithm.SLIDING_WINDOW, remaining=remaining - ) + stats[endpoint] = EndpointUsageStats(algorithm=RateLimitAlgorithm.SLIDING_WINDOW, remaining=remaining) elif algo == "tb": bucket_data = await self.redis.get(key) if bucket_data: diff --git a/backend/app/services/result_processor/processor.py b/backend/app/services/result_processor/processor.py index 59edb36d..5c73ceb0 100644 --- a/backend/app/services/result_processor/processor.py +++ b/backend/app/services/result_processor/processor.py @@ -21,12 +21,12 @@ class ResultProcessor: """Service for processing execution completion events and storing results.""" def __init__( - self, - execution_repo: ExecutionRepository, - producer: UnifiedProducer, - settings: Settings, - logger: structlog.stdlib.BoundLogger, - execution_metrics: ExecutionMetrics, + self, + execution_repo: ExecutionRepository, + producer: UnifiedProducer, + settings: Settings, + logger: structlog.stdlib.BoundLogger, + execution_metrics: ExecutionMetrics, ) -> None: self._execution_repo = execution_repo self._producer = producer diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py index 06d2e416..affde5c3 100644 --- a/backend/app/services/saga/saga_orchestrator.py +++ b/backend/app/services/saga/saga_orchestrator.py @@ -90,13 +90,9 @@ async def handle_execution_cancelled(self, event: DomainEvent) -> None: if not isinstance(event, ExecutionCancelledEvent): raise TypeError(f"Expected ExecutionCancelledEvent, got {type(event).__name__}") await self._queue.remove(event.execution_id) - await self._resolve_completion( - event.execution_id, SagaState.CANCELLED, f"cancelled by {event.cancelled_by}" - ) + await self._resolve_completion(event.execution_id, SagaState.CANCELLED, f"cancelled by {event.cancelled_by}") - async def _resolve_completion( - self, execution_id: str, state: SagaState, error_message: str | None = None - ) -> None: + async def _resolve_completion(self, execution_id: str, state: SagaState, error_message: str | None = None) -> None: """Look up the active saga for an execution and transition it to a terminal state. Always releases the queue slot and attempts to schedule the next pending @@ -111,7 +107,10 @@ async def _resolve_completion( else: self.logger.info("Marking saga terminal state", saga_id=saga.saga_id, state=state) await self._repo.save_saga( - saga.saga_id, state=state, error_message=error_message, completed_at=datetime.now(UTC), + saga.saga_id, + state=state, + error_message=error_message, + completed_at=datetime.now(UTC), ) await self._queue.release(execution_id) @@ -144,7 +143,8 @@ async def try_schedule_from_queue(self) -> None: execution_id=execution_id, ) await self._resolve_completion( - execution_id, SagaState.FAILED, + execution_id, + SagaState.FAILED, f"Failed to start saga after {retry_count} attempts", ) else: @@ -208,9 +208,7 @@ async def _execute_saga( saved = await self._repo.save_saga(instance.saga_id, current_step=step.name) if saved.state.is_terminal: - self.logger.info( - "Saga no longer active, stopping", saga_id=instance.saga_id, state=saved.state - ) + self.logger.info("Saga no longer active, stopping", saga_id=instance.saga_id, state=saved.state) return self.logger.info("Executing saga step", step=step.name, saga_id=instance.saga_id) @@ -231,9 +229,9 @@ async def _execute_saga( await self._repo.save_saga( instance.saga_id, completed_steps=[*saved.completed_steps, step.name], - context_data=SagaContextData(**{ - k: v for k, v in context.data.items() if k in SagaContextData.__dataclass_fields__ - }), + context_data=SagaContextData( + **{k: v for k, v in context.data.items() if k in SagaContextData.__dataclass_fields__} + ), ) compensation = step.get_compensation() @@ -318,7 +316,10 @@ async def _compensate_saga(self, saga_id: str, context: SagaContext) -> None: async def _fail_saga(self, saga_id: str, error_message: str) -> None: """Mark saga as failed.""" await self._repo.save_saga( - saga_id, state=SagaState.FAILED, error_message=error_message, completed_at=datetime.now(UTC), + saga_id, + state=SagaState.FAILED, + error_message=error_message, + completed_at=datetime.now(UTC), ) self.logger.error("Saga failed", saga_id=saga_id, error_message=error_message) @@ -388,7 +389,9 @@ async def cancel_saga(self, saga_id: str) -> None: self.logger.info("Saga cancelled successfully", saga_id=saga_id) async def _publish_saga_started_event( - self, instance: Saga, trigger_event: ExecutionRequestedEvent, + self, + instance: Saga, + trigger_event: ExecutionRequestedEvent, ) -> None: """Publish saga started event after the document is persisted.""" try: diff --git a/backend/app/services/saga/saga_service.py b/backend/app/services/saga/saga_service.py index 05b8d92e..5b58b301 100644 --- a/backend/app/services/saga/saga_service.py +++ b/backend/app/services/saga/saga_service.py @@ -58,9 +58,7 @@ async def check_execution_access(self, execution_id: str, user: User) -> bool: async def get_saga_with_access_check(self, saga_id: str, user: User) -> Saga: """Get saga with access control.""" - self.logger.debug( - "Getting saga for user", saga_id=saga_id, user_id=user.user_id, user_role=user.role - ) + self.logger.debug("Getting saga for user", saga_id=saga_id, user_id=user.user_id, user_role=user.role) saga = await self.saga_repo.get_saga(saga_id) if not saga: @@ -162,4 +160,3 @@ async def get_saga_statistics(self, user: User, include_all: bool = False) -> di saga_filter = SagaFilter(execution_ids=user_execution_ids) return await self.saga_repo.get_saga_statistics(saga_filter) - diff --git a/backend/app/services/saga/saga_step.py b/backend/app/services/saga/saga_step.py index c0f560aa..bb3c136c 100644 --- a/backend/app/services/saga/saga_step.py +++ b/backend/app/services/saga/saga_step.py @@ -28,7 +28,6 @@ def add_compensation(self, compensation: "CompensationStep") -> None: self.compensations.append(compensation) - class SagaStep(ABC, Generic[T]): """Base class for saga steps""" diff --git a/backend/app/services/saved_script_service.py b/backend/app/services/saved_script_service.py index da62c259..d3028417 100644 --- a/backend/app/services/saved_script_service.py +++ b/backend/app/services/saved_script_service.py @@ -35,6 +35,7 @@ async def create_saved_script( script_name=created_script.name, ) return created_script + # --8<-- [end:create_saved_script] async def get_saved_script(self, script_id: str, user_id: str) -> DomainSavedScript: diff --git a/backend/app/services/sse/sse_service.py b/backend/app/services/sse/sse_service.py index 14a98c93..423c205d 100644 --- a/backend/app/services/sse/sse_service.py +++ b/backend/app/services/sse/sse_service.py @@ -48,16 +48,18 @@ async def create_execution_stream( raise ForbiddenError("Access denied") return self._execution_pipeline(execution) - async def _execution_pipeline( - self, execution: DomainExecution - ) -> AsyncGenerator[dict[str, Any], None]: + async def _execution_pipeline(self, execution: DomainExecution) -> AsyncGenerator[dict[str, Any], None]: execution_id = execution.execution_id - yield {"data": _exec_adapter.dump_json(SSEExecutionEventData( - event_type=SSEControlEvent.STATUS, - execution_id=execution_id, - timestamp=execution.updated_at, - status=execution.status, - )).decode()} + yield { + "data": _exec_adapter.dump_json( + SSEExecutionEventData( + event_type=SSEControlEvent.STATUS, + execution_id=execution_id, + timestamp=execution.updated_at, + status=execution.status, + ) + ).decode() + } async for event in self._bus.listen_execution(execution_id): if event.event_type == EventType.RESULT_STORED: result = await self._execution_repository.get_execution_result(execution_id) @@ -80,9 +82,7 @@ async def create_replay_stream( """ return self._replay_pipeline(initial_status) - async def _replay_pipeline( - self, initial_status: DomainReplaySSEPayload - ) -> AsyncGenerator[dict[str, Any], None]: + async def _replay_pipeline(self, initial_status: DomainReplaySSEPayload) -> AsyncGenerator[dict[str, Any], None]: session_id = initial_status.session_id yield {"data": _replay_adapter.dump_json(initial_status).decode()} if initial_status.status.is_terminal: diff --git a/backend/app/services/user_settings_service.py b/backend/app/services/user_settings_service.py index 82abf731..3daad34c 100644 --- a/backend/app/services/user_settings_service.py +++ b/backend/app/services/user_settings_service.py @@ -79,6 +79,7 @@ async def get_user_settings_fresh(self, user_id: str) -> DomainUserSettings: self._add_to_cache(user_id, settings) return settings + # --8<-- [end:get_user_settings_fresh] # --8<-- [start:update_user_settings] @@ -92,12 +93,14 @@ async def update_user_settings( if not changes: return current - new_settings = self._build_settings({ - **dataclasses.asdict(current), - **changes, - "version": (current.version or 0) + 1, - "updated_at": datetime.now(timezone.utc), - }) + new_settings = self._build_settings( + { + **dataclasses.asdict(current), + **changes, + "version": (current.version or 0) + 1, + "updated_at": datetime.now(timezone.utc), + } + ) await self._publish_settings_event(user_id, changes, reason) @@ -105,6 +108,7 @@ async def update_user_settings( if (await self.repository.count_events_since_snapshot(user_id)) >= 10: await self.repository.create_snapshot(new_settings) return new_settings + # --8<-- [end:update_user_settings] async def _publish_settings_event(self, user_id: str, changes: dict[str, Any], reason: str | None) -> None: @@ -174,6 +178,7 @@ async def get_settings_history(self, user_id: str, limit: int = 50) -> list[Doma ) ) return history + # --8<-- [end:get_settings_history] async def restore_settings_to_point(self, user_id: str, timestamp: datetime) -> DomainUserSettings: @@ -207,6 +212,7 @@ def _apply_event(self, settings: DomainUserSettings, event: DomainUserSettingsCh """Apply a settings update event via dict merge.""" event_data = {k: v for k, v in dataclasses.asdict(event).items() if v is not None} return self._build_settings({**dataclasses.asdict(settings), **event_data, "updated_at": event.timestamp}) + # --8<-- [end:apply_event] @staticmethod @@ -227,4 +233,3 @@ def _add_to_cache(self, user_id: str, settings: DomainUserSettings) -> None: """Add settings to TTL+LRU cache.""" self._cache[user_id] = settings self.logger.debug(f"Cached settings for user {user_id}", cache_size=len(self._cache)) - diff --git a/backend/app/settings.py b/backend/app/settings.py index 96f6b930..f74472d1 100644 --- a/backend/app/settings.py +++ b/backend/app/settings.py @@ -180,16 +180,18 @@ def __init__( SECURE_COOKIES: bool = True # CORS allowed origins (overridable via config.toml) - CORS_ORIGINS: list[str] = Field(default_factory=lambda: [ - "https://localhost:5001", - "https://127.0.0.1:5001", - "https://[::1]:5001", - "https://localhost", - "https://127.0.0.1", - "https://localhost:443", - "https://127.0.0.1:443", - "https://[::1]:443", - ]) + CORS_ORIGINS: list[str] = Field( + default_factory=lambda: [ + "https://localhost:5001", + "https://127.0.0.1:5001", + "https://[::1]:5001", + "https://localhost", + "https://127.0.0.1", + "https://localhost:443", + "https://127.0.0.1:443", + "https://[::1]:443", + ] + ) # Logging configuration LOG_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" diff --git a/backend/scripts/check_orphan_modules.py b/backend/scripts/check_orphan_modules.py index 5319c42c..2796c6d3 100644 --- a/backend/scripts/check_orphan_modules.py +++ b/backend/scripts/check_orphan_modules.py @@ -8,15 +8,18 @@ Usage: uv run python scripts/check_orphan_modules.py """ + import ast import sys from pathlib import Path import grimp -ENTRY_POINTS: frozenset[str] = frozenset({ - "app.main", -}) +ENTRY_POINTS: frozenset[str] = frozenset( + { + "app.main", + } +) def _is_empty_init(module: str) -> bool: diff --git a/frontend/nginx.conf.template b/frontend/nginx.conf.template index f77e5760..f4b445c4 100644 --- a/frontend/nginx.conf.template +++ b/frontend/nginx.conf.template @@ -20,7 +20,7 @@ server { # --8<-- [end:nonce_injection] # --8<-- [start:security_headers] - add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'nonce-$request_id'; style-src-elem 'self' 'nonce-$request_id'; style-src-attr 'unsafe-inline'; img-src 'self' data: blob:; font-src 'self' data:; object-src 'none'; base-uri 'self'; form-action 'self'; frame-ancestors 'none'; connect-src 'self';"; + add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'nonce-$request_id'; style-src-elem 'self' 'nonce-$request_id' https://fonts.googleapis.com; style-src-attr 'unsafe-inline'; img-src 'self' data: blob:; font-src 'self' data: https://fonts.gstatic.com; object-src 'none'; base-uri 'self'; form-action 'self'; frame-ancestors 'none'; connect-src 'self';"; add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always; add_header X-Frame-Options "DENY"; add_header X-Content-Type-Options "nosniff"; diff --git a/frontend/public/index.html b/frontend/public/index.html index 7859f44b..109cd071 100644 --- a/frontend/public/index.html +++ b/frontend/public/index.html @@ -10,6 +10,10 @@ + + + +