Skip to content

feat(triggers): handle MSK events#1066

Open
joeyzhao2018 wants to merge 25 commits intomainfrom
joey/handle_msk
Open

feat(triggers): handle MSK events#1066
joeyzhao2018 wants to merge 25 commits intomainfrom
joey/handle_msk

Conversation

@joeyzhao2018
Copy link
Contributor

@joeyzhao2018 joeyzhao2018 commented Mar 5, 2026

https://datadoghq.atlassian.net/browse/SLES-2739

In Kafka's wire protocol (KIP-82), header values are always byte[]. Every Kafka client library enforces this:

Tracer Injection code Mechanism
dd-trace-java headers.add(key, value.getBytes(UTF_8)) String.getBytes() → byte[]
dd-trace-go Value: []byte(val) Go type conversion → []byte
dd-trace-dotnet _headers.Add(name, Encoding.UTF8.GetBytes(value)) UTF8.GetBytes() → byte[]

All three tracers accept string trace context values from the propagation layer, convert to UTF-8 bytes at the carrier adapter boundary, and hand byte[] to the Kafka client.
This isn't a quirk of Java's getBytes() — it's the only way Kafka headers work.

What MSK Lambda does

When MSK triggers a Lambda, AWS serializes the Kafka record to JSON. Since header values are byte[] on the wire, AWS encodes them as decimal byte values. However, the exact JSON
shape depends on the Lambda runtime:

  • Array format (observed in the existing msk_event.json testing payloads, i didn't change the support for this to be safe): byte values as a JSON array of integers
    "headers": [{"x-datadog-trace-id": [51, 54, 57, ...]}]
  • Object format (observed with the Java Lambda runtime): both the records list and the per-header byte values are JSON objects with numeric string keys, and byte values are
    decimal strings
    "records": {
    "topic-0": {
    "0": {
    "headers": {
    "0": {"someOtherHeader": ["70", "114", ...]},
    "2": {"x-datadog-trace-id": {"0":"52","1":"54",...}},
    "4": {"x-datadog-sampling-priority": ["49"]}
    }
    }
    }
    }
  • Note that Datadog headers can appear at any index — non-instrumentation headers may precede them.

What's the difference between the msk_event.json and the newly added msk_event_with_headers.json here?

  • msk_event.json represents a standard MSK trigger where the producer didn't attach any Kafka headers — i.e. no Datadog tracer was running on the producer side (or it's a non-instrumented producer like a raw Kafka client, a Kinesis Firehose delivery stream, or a schema-registry message). In those cases Lambda still delivers the event but with "headers": []. It's also the format you get when testing MSK triggers manually in the AWS console, which doesn't inject headers. ( source: Claude Code)
  • msk_event_with_headers.json reflects the real-world object format produced by the Java Lambda runtime, with a producer instrumented with a Datadog tracer injecting trace context
    as Kafka headers. It includes non-Datadog headers at lower indices to verify that the carrier extraction correctly finds Datadog headers regardless of their position. (source: I did a real world example and below is the evidence of testing)
Screenshot 2026-03-12 at 11 14 33 PM

joeyzhao2018 and others added 10 commits March 9, 2026 15:32
MSK event headers delivered by the Java Lambda runtime use a JSON object
with numeric string keys and decimal string values rather than an array
of integers. Records are similarly delivered as an object with numeric
string keys instead of an array.

Update deserialization and carrier extraction to support both formats,
and update the fixture and tests to reflect the real-world payload shape.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace `n as u8` cast with `u8::try_from(n).ok()` to avoid truncation
- Collapse nested `if let` blocks into a single `if let ... && let ...`
- Replace redundant closure `|o| o.len()` with `serde_json::Map::len`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@joeyzhao2018 joeyzhao2018 marked this pull request as ready for review March 13, 2026 15:45
@joeyzhao2018 joeyzhao2018 requested a review from a team as a code owner March 13, 2026 15:45
@joeyzhao2018 joeyzhao2018 requested a review from duncanista March 13, 2026 15:45
@duncanista duncanista changed the title feature: handle MSK events feat(triggers): handle MSK events Mar 13, 2026
@duncanista duncanista requested a review from Copilot March 16, 2026 16:07

This comment was marked as resolved.

joeyzhao2018 and others added 4 commits March 16, 2026 12:26

This comment was marked as duplicate.

This comment was marked as resolved.

This comment was marked as resolved.

joeyzhao2018 and others added 4 commits March 18, 2026 11:34
performance enhancement from codex

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
performance gain during normalization

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds support for MSK Lambda events where Kafka headers are serialized into JSON in multiple runtime-dependent shapes, enabling trace context extraction from those headers.

Changes:

  • Add fixture payload representing the MSK “object format” headers shape (as observed in Java runtime).
  • Extend MSK trigger parsing to handle headers in both array and object encodings, and extract Datadog trace context from them.
  • Add tests covering object-format MSK events and selecting a record containing trace context.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

File Description
bottlecap/tests/payloads/msk_event_with_headers.json New test payload covering the Java runtime’s object-shaped record/headers encoding.
bottlecap/src/lifecycle/invocation/triggers/msk_event.rs Adds header decoding/extraction logic, chooses a record with trace context, and updates tests accordingly.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +86 to +110
/// Scans all records in the records map and returns the `(topic_key, record_value)` of the first
/// record whose headers contain a tracecontext key. Returns `None` if none found.
fn find_record_with_trace_context(
records_map: &serde_json::Map<String, Value>,
) -> Option<(String, Value)> {
for (key, group) in records_map {
match group {
Value::Array(arr) => {
for record in arr {
if let Some(headers) = record.get("headers")
&& headers_has_trace_context(headers)
{
return Some((key.clone(), record.clone()));
}
}
}
Value::Object(obj) => {
for record in obj.values() {
if let Some(headers) = record.get("headers")
&& headers_has_trace_context(headers)
{
return Some((key.clone(), record.clone()));
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The concern is real but the optimization is premature here. The Lambda payload cap is 6MB total, so even in the worst case the clone is bounded. The locator adds a non-trivial amount of complexity (new enum, more complex extraction logic) for a one-time cost per invocation on what's already a cold path. The current code is significantly easier to read and maintain. " by Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants