Conversation
📝 WalkthroughWalkthroughRefactors correlation_id propagation across services, adds OpenTelemetry consumer-span tracing to event handlers, changes DLQ and producer header/key behavior to use per-message EventMetadata, and updates infrastructure: kubectl bump, certificate filename change (mkcert-ca.pem → ca.pem), and dependency version bumps. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Producer
participant Kafka
participant Consumer
participant Handler
participant DLQ
Producer->>Kafka: produce(event, headers with event_type)
Kafka->>Consumer: deliver(message, headers)
Consumer->>Handler: handle_message(msg) with _with_trace (extract headers, create span)
alt handler succeeds
Handler->>Producer: produce(result_event) -- includes per-message EventMetadata.correlation_id
else handler fails
Handler->>DLQ: send_to_dlq(msg, headers including event_type, key=event_id)
DLQ->>Kafka: publish(dlq-message)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@frontend/package.json`:
- Line 62: The dependency entry for "@hey-api/openapi-ts" in package.json
changed from an exact pin to a caret range which can cause generator output
diffs; revert it to an exact version to preserve deterministic codegen (e.g.,
change "@hey-api/openapi-ts": "^0.90.10" back to the exact version you intend to
lock, such as "0.90.4" or "0.90.10"), update package.json accordingly, and run a
lockfile update (npm/yarn/pnpm) to ensure the locked dependency matches the
exact version.
🧹 Nitpick comments (5)
frontend/package.json (1)
72-73:@typescript-eslint/parserand@typescript-eslint/eslint-pluginversion ranges are now slightly misaligned.
parseris bumped to^8.53.1whileeslint-pluginremains at^8.53.0. The typescript-eslint project recommends keeping these in lockstep. Consider bumping the plugin to^8.53.1as well.Proposed fix
- "@typescript-eslint/eslint-plugin": "^8.53.0", + "@typescript-eslint/eslint-plugin": "^8.53.1", "@typescript-eslint/parser": "^8.53.1",backend/app/services/saga/execution_saga.py (1)
170-176: Compensation correctly retrieves correlation_id from context.The fallback to
""on Line 175 would overrideEventMetadata'sdefault_factory(which generates a UUID) with an empty string if the key is missing. In practice this path is guarded by thepod_creation_triggeredcheck on Line 162, so it should always be populated. Still, falling back to a generated UUID viaNonesentinel + explicit handling (or omitting the kwarg) would be slightly safer.backend/app/services/saga/saga_orchestrator.py (1)
314-321: Empty-string fallback forcorrelation_idsuppresses the UUID default.Same pattern as in
execution_saga.py: ifcontext_datalacks a"correlation_id"key (e.g., sagas persisted before this change is deployed), the fallback""overridesEventMetadata'sdefault_factorythat would otherwise generate a valid UUID. Consider usingNoneas the sentinel and only passingcorrelation_idwhen it's truthy, or letting the default factory handle it:♻️ Suggested approach
- correlation_id = saga_instance.context_data.get("correlation_id", "") if saga_instance.context_data else "" + correlation_id = (saga_instance.context_data.get("correlation_id") or "") if saga_instance.context_data else ""Even better, omit the kwarg entirely when empty so the default factory kicks in:
correlation_id = saga_instance.context_data.get("correlation_id") if saga_instance.context_data else None metadata_kwargs = { "service_name": "saga-orchestrator", "service_version": "1.0.0", "user_id": cancelled_by or "system", } if correlation_id: metadata_kwargs["correlation_id"] = correlation_id metadata = EventMetadata(**metadata_kwargs)backend/app/events/handlers.py (2)
39-41:_extract_headers: potentialAttributeErrorif a header value isNone.If a Kafka header has a
Nonevalue (permitted by the protocol), theisinstance(v, bytes)check passes throughNoneas-is, which is fine for the dict. However, downstream callers likeextract_trace_contextor.get()-based lookups expectstrvalues. Consider guarding:♻️ Defensive tweak
def _extract_headers(msg: StreamMessage[Any]) -> dict[str, str]: """Decode raw Kafka headers into a string dict for OTel extraction.""" - return {k: v.decode() if isinstance(v, bytes) else v for k, v in (msg.raw_message.headers or [])} + return { + k: v.decode() if isinstance(v, bytes) else (v or "") + for k, v in (msg.raw_message.headers or []) + }
364-398: DLQ subscriber:headers["failed_at"](Line 372) willKeyErrorif the header is missing.Other header fields use
.get()with defaults, butfailed_atis accessed with direct indexing. If a DLQ message arrives without this header (e.g., from a legacy producer or a bug), the subscriber will crash and the message won't be ACKed.If this is intentionally required (i.e., a missing
failed_atis a programming error that should surface loudly), this is acceptable. Otherwise, consider a default:♻️ Optional: safe fallback
- failed_at=datetime.fromisoformat(headers["failed_at"]), + failed_at=datetime.fromisoformat(headers.get("failed_at", datetime.now(timezone.utc).isoformat())),
There was a problem hiding this comment.
1 issue found across 16 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/saga/saga_orchestrator.py">
<violation number="1" location="backend/app/services/saga/saga_orchestrator.py:316">
P2: Avoid overriding EventMetadata’s default correlation_id with an empty string; it can publish events with a blank correlation_id and break trace correlation. Generate a fallback ID (or only pass when present).</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|
There was a problem hiding this comment.
2 issues found across 4 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/events/handlers.py">
<violation number="1" location="backend/app/events/handlers.py:371">
P2: Directly indexing DLQ headers can raise KeyError and crash the consumer when a DLQ message lacks one of these headers (e.g., older messages). Restore safe defaults or guard missing headers to keep DLQ processing resilient.</violation>
</file>
<file name="backend/app/services/saga/saga_orchestrator.py">
<violation number="1" location="backend/app/services/saga/saga_orchestrator.py:315">
P2: Guard against saga_instance.context_data being None before calling .get; otherwise saga cancellation can fail with AttributeError for sagas without context_data.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/events/handlers.py (1)
369-380:⚠️ Potential issue | 🟠 MajorDLQ header access uses
[]—KeyErrorif any expected header is missing.Lines 371–376 access
headers["original_topic"],headers["error"], etc. with direct dict subscript. If a message reaches the DLQ with a missing header (e.g., from a different producer version, or header corruption), this will raiseKeyErrorand the message will be NACKed/retried endlessly (or poison the consumer).Since this is the DLQ — the place broken messages land — defensive access is especially important here.
Proposed fix: validate required headers up front
headers = _extract_headers(msg) raw = msg.raw_message - assert not isinstance(raw, tuple) # single-message consumer, never batch + if isinstance(raw, tuple): + raise TypeError("Expected a single Kafka message, got a batch") + + required = ("original_topic", "error", "retry_count", "failed_at", "status", "producer_id") + missing = [h for h in required if h not in headers] + if missing: + logger.error("DLQ message missing required headers", extra={"missing": missing}) + return # or send to a "poison pill" topic dlq_msg = DLQMessage( event=body,
🤖 Fix all issues with AI agents
In `@backend/app/events/handlers.py`:
- Around line 39-41: The _extract_headers function currently can include None
values in its returned dict, violating the annotated dict[str, str] and causing
downstream failures (e.g., DLQ subscriber / DLQMessage parsing that expects
strings). Update _extract_headers to either (a) filter out headers with None
values so they are not included in the returned dict, or (b) coerce None to a
safe string (e.g., "") before returning; implement this by changing the dict
comprehension in _extract_headers to skip entries where v is None or to convert
None -> "" and ensure the return type is preserved as dict[str, str].
🧹 Nitpick comments (3)
backend/app/dlq/manager.py (1)
100-104: Extract repeatedEventMetadataconstruction to reduce duplication.The same
service_name="dlq-manager",service_version="1.0.0"triple appears inhandle_message,retry_message, anddiscard_message. A small helper (or class-level constants) would keep this DRY and make version bumps a single-line change.♻️ Proposed refactor
Add constants or a helper method on
DLQManager:class DLQManager: + SERVICE_NAME = "dlq-manager" + SERVICE_VERSION = "1.0.0" + + def _make_metadata(self, correlation_id: str) -> EventMetadata: + return EventMetadata( + service_name=self.SERVICE_NAME, + service_version=self.SERVICE_VERSION, + correlation_id=correlation_id, + )Then replace each inline construction, e.g.:
- metadata=EventMetadata( - service_name="dlq-manager", - service_version="1.0.0", - correlation_id=message.event.metadata.correlation_id, - ), + metadata=self._make_metadata(message.event.metadata.correlation_id),Also applies to: 161-165, 191-195
backend/app/events/handlers.py (2)
367-367:assertin production code can be stripped bypython -O.If the process runs with the
-Oflag, this assert is silently removed. Consider replacing with an explicit check andraise.Proposed fix
- assert not isinstance(raw, tuple) # single-message consumer, never batch + if isinstance(raw, tuple): + raise TypeError("Expected a single Kafka message, got a batch")
395-401: Metrics are only recorded on successful processing — silent loss on exceptions.If
manager.handle_message(dlq_msg)raises, lines 395–401 are skipped. This means failed DLQ processing won't be counted inrecord_dlq_message_receivedorrecord_dlq_processing_duration. Consider wrapping intry/finallyso at least the duration and receipt are always recorded.Proposed fix
+ try: with get_tracer().start_as_current_span( name="dlq.consume", context=ctx, kind=SpanKind.CONSUMER, attributes={ EventAttributes.KAFKA_TOPIC: str(manager.dlq_topic), EventAttributes.EVENT_TYPE: body.event_type, EventAttributes.EVENT_ID: body.event_id, }, ): await manager.handle_message(dlq_msg) + finally: + manager.metrics.record_dlq_message_received(dlq_msg.original_topic, body.event_type) + manager.metrics.record_dlq_message_age( + (datetime.now(timezone.utc) - dlq_msg.failed_at).total_seconds() + ) + manager.metrics.record_dlq_processing_duration( + asyncio.get_running_loop().time() - start, "process" + ) - - manager.metrics.record_dlq_message_received(dlq_msg.original_topic, body.event_type) - manager.metrics.record_dlq_message_age( - (datetime.now(timezone.utc) - dlq_msg.failed_at).total_seconds() - ) - manager.metrics.record_dlq_processing_duration( - asyncio.get_running_loop().time() - start, "process" - )



Summary by cubic
Stop relying on Kafka headers for business data and pass correlation_id through EventMetadata end to end, with new consumer OTel spans for accurate trace propagation. Also updates cert setup (CA filename change) and bumps a few dependencies.
Bug Fixes
Migration
Written for commit 01b1c5c. Summary will update on new commits.
Summary by CodeRabbit