Conversation
d33b0e2 to
fe2c84b
Compare
MSK event headers delivered by the Java Lambda runtime use a JSON object with numeric string keys and decimal string values rather than an array of integers. Records are similarly delivered as an object with numeric string keys instead of an array. Update deserialization and carrier extraction to support both formats, and update the fixture and tests to reflect the real-world payload shape. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace `n as u8` cast with `u8::try_from(n).ok()` to avoid truncation - Collapse nested `if let` blocks into a single `if let ... && let ...` - Replace redundant closure `|o| o.len()` with `serde_json::Map::len` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
…ssary sort in `headers_to_string_map`
performance enhancement from codex Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
performance gain during normalization Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds support for MSK Lambda events where Kafka headers are serialized into JSON in multiple runtime-dependent shapes, enabling trace context extraction from those headers.
Changes:
- Add fixture payload representing the MSK “object format” headers shape (as observed in Java runtime).
- Extend MSK trigger parsing to handle headers in both array and object encodings, and extract Datadog trace context from them.
- Add tests covering object-format MSK events and selecting a record containing trace context.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| bottlecap/tests/payloads/msk_event_with_headers.json | New test payload covering the Java runtime’s object-shaped record/headers encoding. |
| bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | Adds header decoding/extraction logic, chooses a record with trace context, and updates tests accordingly. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| /// Scans all records in the records map and returns the `(topic_key, record_value)` of the first | ||
| /// record whose headers contain a tracecontext key. Returns `None` if none found. | ||
| fn find_record_with_trace_context( | ||
| records_map: &serde_json::Map<String, Value>, | ||
| ) -> Option<(String, Value)> { | ||
| for (key, group) in records_map { | ||
| match group { | ||
| Value::Array(arr) => { | ||
| for record in arr { | ||
| if let Some(headers) = record.get("headers") | ||
| && headers_has_trace_context(headers) | ||
| { | ||
| return Some((key.clone(), record.clone())); | ||
| } | ||
| } | ||
| } | ||
| Value::Object(obj) => { | ||
| for record in obj.values() { | ||
| if let Some(headers) = record.get("headers") | ||
| && headers_has_trace_context(headers) | ||
| { | ||
| return Some((key.clone(), record.clone())); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
"The concern is real but the optimization is premature here. The Lambda payload cap is 6MB total, so even in the worst case the clone is bounded. The locator adds a non-trivial amount of complexity (new enum, more complex extraction logic) for a one-time cost per invocation on what's already a cold path. The current code is significantly easier to read and maintain. " by Claude Code
https://datadoghq.atlassian.net/browse/SLES-2739
In Kafka's wire protocol (KIP-82), header values are always byte[]. Every Kafka client library enforces this:
All three tracers accept string trace context values from the propagation layer, convert to UTF-8 bytes at the carrier adapter boundary, and hand byte[] to the Kafka client.
This isn't a quirk of Java's getBytes() — it's the only way Kafka headers work.
What MSK Lambda does
When MSK triggers a Lambda, AWS serializes the Kafka record to JSON. Since header values are byte[] on the wire, AWS encodes them as decimal byte values. However, the exact JSON
shape depends on the Lambda runtime:
"headers": [{"x-datadog-trace-id": [51, 54, 57, ...]}]
decimal strings
"records": {
"topic-0": {
"0": {
"headers": {
"0": {"someOtherHeader": ["70", "114", ...]},
"2": {"x-datadog-trace-id": {"0":"52","1":"54",...}},
"4": {"x-datadog-sampling-priority": ["49"]}
}
}
}
}
What's the difference between the msk_event.json and the newly added
msk_event_with_headers.jsonhere?as Kafka headers. It includes non-Datadog headers at lower indices to verify that the carrier extraction correctly finds Datadog headers regardless of their position. (source: I did a real world example and below is the evidence of testing)