Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def __init__(
]

self._dlq_events_topic = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DLQ_EVENTS}"
self._event_metadata = EventMetadata(service_name="dlq-manager", service_version="1.0.0")

def _filter_test_events(self, message: DLQMessage) -> bool:
event_id = message.event.event_id or ""
Expand Down Expand Up @@ -99,7 +98,11 @@ async def handle_message(self, message: DLQMessage) -> None:
retry_count=message.retry_count,
producer_id=message.producer_id,
failed_at=message.failed_at,
metadata=self._event_metadata,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
),
),
topic=self._dlq_events_topic,
)
Expand All @@ -126,9 +129,6 @@ async def retry_message(self, message: DLQMessage) -> None:
"""
hdrs: dict[str, str] = {
"event_type": message.event.event_type,
"dlq_retry_count": str(message.retry_count + 1),
"dlq_original_error": message.error,
"dlq_retry_timestamp": datetime.now(timezone.utc).isoformat(),
}
hdrs = inject_trace_context(hdrs)

Expand Down Expand Up @@ -159,7 +159,11 @@ async def retry_message(self, message: DLQMessage) -> None:
original_event_type=str(message.event.event_type),
retry_count=new_retry_count,
retry_topic=message.original_topic,
metadata=self._event_metadata,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
),
),
topic=self._dlq_events_topic,
)
Expand All @@ -185,7 +189,11 @@ async def discard_message(self, message: DLQMessage, reason: str) -> None:
original_event_type=str(message.event.event_type),
reason=reason,
retry_count=message.retry_count,
metadata=self._event_metadata,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
),
),
topic=self._dlq_events_topic,
)
Expand Down
2 changes: 0 additions & 2 deletions backend/app/events/core/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ async def produce(self, event_to_produce: DomainEvent, key: str) -> None:
try:
headers = inject_trace_context({
"event_type": event_to_produce.event_type,
"correlation_id": event_to_produce.metadata.correlation_id or "",
"service": event_to_produce.metadata.service_name,
})

# FastStream handles Pydantic → JSON serialization natively
Expand Down
101 changes: 77 additions & 24 deletions backend/app/events/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@
from app.settings import Settings


def _extract_headers(msg: StreamMessage[Any]) -> dict[str, str]:
"""Decode raw Kafka headers into a string dict for OTel extraction."""
return {k: v.decode() if isinstance(v, bytes) else v for k, v in (msg.raw_message.headers or [])}
Comment thread
HardMax71 marked this conversation as resolved.


async def _with_trace(
msg: StreamMessage[Any],
span_name: str,
body: DomainEvent,
handler: Callable[[], Awaitable[None]],
) -> None:
"""Run handler inside an OTel consumer span linked to the producer's trace context."""
headers = _extract_headers(msg)
ctx = extract_trace_context(headers)
with get_tracer().start_as_current_span(
name=span_name,
context=ctx,
kind=SpanKind.CONSUMER,
attributes={
EventAttributes.EVENT_TYPE: body.event_type,
EventAttributes.EVENT_ID: body.event_id,
},
):
await handler()


async def with_idempotency(
event: DomainEvent,
handler: Callable[..., Awaitable[None]],
Expand Down Expand Up @@ -78,46 +104,50 @@ def register_coordinator_subscriber(broker: KafkaBroker, settings: Settings) ->
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_REQUESTED)
async def on_execution_requested(
body: ExecutionRequestedEvent,
msg: StreamMessage[Any],
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_requested", body, lambda: with_idempotency(
body, coordinator.handle_execution_requested, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: StreamMessage[Any],
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_completed", body, lambda: with_idempotency(
body, coordinator.handle_execution_completed, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: StreamMessage[Any],
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_failed", body, lambda: with_idempotency(
body, coordinator.handle_execution_failed, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_CANCELLED)
async def on_execution_cancelled(
body: ExecutionCancelledEvent,
msg: StreamMessage[Any],
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_cancelled", body, lambda: with_idempotency(
body, coordinator.handle_execution_cancelled, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand All @@ -134,20 +164,26 @@ def register_k8s_worker_subscriber(broker: KafkaBroker, settings: Settings) -> N
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.CREATE_POD_COMMAND)
async def on_create_pod(
body: CreatePodCommandEvent,
msg: StreamMessage[Any],
worker: FromDishka[KubernetesWorker],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger)
await _with_trace(msg, "k8s_worker.create_pod", body, lambda: with_idempotency(
body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger,
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.DELETE_POD_COMMAND)
async def on_delete_pod(
body: DeletePodCommandEvent,
msg: StreamMessage[Any],
worker: FromDishka[KubernetesWorker],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger)
await _with_trace(msg, "k8s_worker.delete_pod", body, lambda: with_idempotency(
body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger,
))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand All @@ -166,29 +202,38 @@ def register_result_processor_subscriber(broker: KafkaBroker, settings: Settings
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: StreamMessage[Any],
processor: FromDishka[ResultProcessor],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger)
await _with_trace(msg, "result_processor.execution_completed", body, lambda: with_idempotency(
body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger,
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: StreamMessage[Any],
processor: FromDishka[ResultProcessor],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger)
await _with_trace(msg, "result_processor.execution_failed", body, lambda: with_idempotency(
body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger,
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_TIMEOUT)
async def on_execution_timeout(
body: ExecutionTimeoutEvent,
msg: StreamMessage[Any],
processor: FromDishka[ResultProcessor],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger)
await _with_trace(msg, "result_processor.execution_timeout", body, lambda: with_idempotency(
body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger,
))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand All @@ -205,30 +250,34 @@ def register_saga_subscriber(broker: KafkaBroker, settings: Settings) -> None:
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_REQUESTED)
async def on_execution_requested(
body: ExecutionRequestedEvent,
msg: StreamMessage[Any],
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_requested(body)
await _with_trace(msg, "saga.execution_requested", body, lambda: orchestrator.handle_execution_requested(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: StreamMessage[Any],
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_completed(body)
await _with_trace(msg, "saga.execution_completed", body, lambda: orchestrator.handle_execution_completed(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: StreamMessage[Any],
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_failed(body)
await _with_trace(msg, "saga.execution_failed", body, lambda: orchestrator.handle_execution_failed(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_TIMEOUT)
async def on_execution_timeout(
body: ExecutionTimeoutEvent,
msg: StreamMessage[Any],
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_timeout(body)
await _with_trace(msg, "saga.execution_timeout", body, lambda: orchestrator.handle_execution_timeout(body))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand Down Expand Up @@ -264,23 +313,28 @@ def register_notification_subscriber(broker: KafkaBroker, settings: Settings) ->
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: StreamMessage[Any],
service: FromDishka[NotificationService],
) -> None:
await service.handle_execution_completed(body)
await _with_trace(
msg, "notification.execution_completed", body, lambda: service.handle_execution_completed(body),
)

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: StreamMessage[Any],
service: FromDishka[NotificationService],
) -> None:
await service.handle_execution_failed(body)
await _with_trace(msg, "notification.execution_failed", body, lambda: service.handle_execution_failed(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_TIMEOUT)
async def on_execution_timeout(
body: ExecutionTimeoutEvent,
msg: StreamMessage[Any],
service: FromDishka[NotificationService],
) -> None:
await service.handle_execution_timeout(body)
await _with_trace(msg, "notification.execution_timeout", body, lambda: service.handle_execution_timeout(body))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand Down Expand Up @@ -308,8 +362,7 @@ async def on_dlq_message(
logger: FromDishka[logging.Logger],
) -> None:
start = asyncio.get_running_loop().time()
raw = msg.raw_message
headers = {k: v.decode() for k, v in (raw.headers or [])}
headers = _extract_headers(msg)

dlq_msg = DLQMessage(
event=body,
Expand All @@ -319,8 +372,8 @@ async def on_dlq_message(
failed_at=datetime.fromisoformat(headers["failed_at"]),
status=DLQMessageStatus(headers.get("status", "pending")),
producer_id=headers.get("producer_id", "unknown"),
dlq_offset=raw.offset,
dlq_partition=raw.partition,
dlq_offset=msg.raw_message.offset,
dlq_partition=msg.raw_message.partition,
headers=headers,
)

Expand Down
3 changes: 0 additions & 3 deletions backend/app/services/pod_monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ async def _process_pod_event(self, event: PodEvent) -> None:
async def _publish_event(self, event: DomainEvent, pod: k8s_client.V1Pod) -> None:
"""Publish event to Kafka and store in events collection."""
try:
if pod.metadata and pod.metadata.labels:
event.metadata.correlation_id = pod.metadata.labels.get("execution-id") or ""

execution_id = getattr(event, "execution_id", None) or event.aggregate_id
key = str(execution_id or (pod.metadata.name if pod.metadata else "unknown"))

Expand Down
18 changes: 10 additions & 8 deletions backend/app/services/result_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ async def handle_execution_completed(self, event: DomainEvent) -> None:

try:
await self._execution_repo.write_terminal_result(result)
await self._publish_result_stored(result)
await self._publish_result_stored(result, event.metadata.correlation_id)
except Exception as e:
self.logger.error(f"Failed to handle ExecutionCompletedEvent: {e}", exc_info=True)
await self._publish_result_failed(event.execution_id, str(e))
await self._publish_result_failed(event.execution_id, str(e), event.metadata.correlation_id)

async def handle_execution_failed(self, event: DomainEvent) -> None:
"""Handle execution failed event."""
Expand All @@ -90,10 +90,10 @@ async def handle_execution_failed(self, event: DomainEvent) -> None:
result = ExecutionResultDomain(**event.model_dump(), status=ExecutionStatus.FAILED)
try:
await self._execution_repo.write_terminal_result(result)
await self._publish_result_stored(result)
await self._publish_result_stored(result, event.metadata.correlation_id)
except Exception as e:
self.logger.error(f"Failed to handle ExecutionFailedEvent: {e}", exc_info=True)
await self._publish_result_failed(event.execution_id, str(e))
await self._publish_result_failed(event.execution_id, str(e), event.metadata.correlation_id)

async def handle_execution_timeout(self, event: DomainEvent) -> None:
"""Handle execution timeout event."""
Expand All @@ -115,12 +115,12 @@ async def handle_execution_timeout(self, event: DomainEvent) -> None:
)
try:
await self._execution_repo.write_terminal_result(result)
await self._publish_result_stored(result)
await self._publish_result_stored(result, event.metadata.correlation_id)
except Exception as e:
self.logger.error(f"Failed to handle ExecutionTimeoutEvent: {e}", exc_info=True)
await self._publish_result_failed(event.execution_id, str(e))
await self._publish_result_failed(event.execution_id, str(e), event.metadata.correlation_id)

async def _publish_result_stored(self, result: ExecutionResultDomain) -> None:
async def _publish_result_stored(self, result: ExecutionResultDomain, correlation_id: str) -> None:
"""Publish result stored event."""
size_bytes = len(result.stdout) + len(result.stderr)
event = ResultStoredEvent(
Expand All @@ -131,18 +131,20 @@ async def _publish_result_stored(self, result: ExecutionResultDomain) -> None:
metadata=EventMetadata(
service_name=GroupId.RESULT_PROCESSOR,
service_version="1.0.0",
correlation_id=correlation_id,
),
)
await self._producer.produce(event_to_produce=event, key=result.execution_id)

async def _publish_result_failed(self, execution_id: str, error_message: str) -> None:
async def _publish_result_failed(self, execution_id: str, error_message: str, correlation_id: str) -> None:
"""Publish result processing failed event."""
event = ResultFailedEvent(
execution_id=execution_id,
error=error_message,
metadata=EventMetadata(
service_name=GroupId.RESULT_PROCESSOR,
service_version="1.0.0",
correlation_id=correlation_id,
),
)
await self._producer.produce(event_to_produce=event, key=execution_id)
Loading
Loading