From 8cae3cf256103dbc92cc00082b31f067a16eff7c Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Mar 2026 10:02:02 -0700 Subject: [PATCH 1/6] Add failing tests for missing APM header injection into SQS _datadog attribute MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documents the missing send-side feature: Java never writes x-datadog-trace-id or x-datadog-parent-id into _datadog, breaking Java→other-tracer propagation. Co-Authored-By: Claude Sonnet 4.6 --- .../src/test/groovy/SqsClientTest.groovy | 65 ++++++++++++++++++ .../src/test/groovy/SqsClientTest.groovy | 66 +++++++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index 43bff3872e3..a9b8709caae 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -264,6 +264,71 @@ abstract class SqsClientTest extends VersionedNamingTestBase { client.shutdown() } + def "APM trace context is injected into _datadog message attribute on send"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace('parent', { + client.sendMessage(queueUrl, 'sometext') + }) + def messages = client.receiveMessage(queueUrl).messages + + then: + def sendSpan + assertTraces(1) { + trace(2) { + basicSpan(it, 'parent') + span { + serviceName expectedService("SQS", "SendMessage") + operationName expectedOperation("SQS", "SendMessage") + resourceName "SQS.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" "http://localhost:${address.port}/" + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "AmazonSQS" + "aws_service" "sqs" + "aws.endpoint" "http://localhost:${address.port}" + "aws.operation" "SendMessageRequest" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + if (isDataStreamsEnabled()) { + "$DDTags.PATHWAY_HASH" { String } + } + serviceNameSource("java-aws-sdk") + defaultTags() + } + } + sendSpan = span(1) + } + } + + def ddAttr = messages[0].messageAttributes['_datadog'] + ddAttr != null + ddAttr.dataType == 'String' + ddAttr.stringValue.contains('"x-datadog-trace-id"') + ddAttr.stringValue.contains(sendSpan.traceId.toString()) + ddAttr.stringValue.contains('"x-datadog-parent-id"') + ddAttr.stringValue.contains(Long.toUnsignedString(sendSpan.spanId)) + + cleanup: + client.shutdown() + } + @IgnoreIf({ instance.isDataStreamsEnabled() }) def "trace details propagated via embedded SQS message attribute (string)"() { setup: diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy index 067397524e7..638b7fd47f6 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy @@ -214,6 +214,72 @@ abstract class SqsClientTest extends VersionedNamingTestBase { cleanup: client.close() } + def "APM trace context is injected into _datadog message attribute on send"() { + setup: + def client = SqsClient.builder() + .region(Region.EU_CENTRAL_1) + .endpointOverride(endpoint) + .credentialsProvider(credentialsProvider) + .build() + def queueUrl = client.createQueue(CreateQueueRequest.builder().queueName('somequeue').build()).queueUrl() + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace('parent', { + client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody('sometext').build()) + }) + def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + + then: + def sendSpan + assertTraces(1) { + trace(2) { + basicSpan(it, 'parent') + span { + serviceName expectedService("Sqs", "SendMessage") + operationName expectedOperation("Sqs", "SendMessage") + resourceName "Sqs.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "SendMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if (isDataStreamsEnabled()) { + "$DDTags.PATHWAY_HASH" { String } + } + urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("SendMessage")) + serviceNameSource("java-aws-sdk") + defaultTags() + } + } + sendSpan = span(1) + } + } + + def ddAttr = messages[0].messageAttributes()['_datadog'] + ddAttr != null + ddAttr.dataType() == 'String' + ddAttr.stringValue().contains('"x-datadog-trace-id"') + ddAttr.stringValue().contains(sendSpan.traceId.toString()) + ddAttr.stringValue().contains('"x-datadog-parent-id"') + ddAttr.stringValue().contains(Long.toUnsignedString(sendSpan.spanId)) + + cleanup: + client.close() + } + @IgnoreIf({instance.isDataStreamsEnabled()}) def "trace details propagated via embedded SQS message attribute (string)"() { setup: From b73646f93b2c613cd966cfe61a7c50314f2c7a78 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Mar 2026 12:25:33 -0700 Subject: [PATCH 2/6] Inject APM trace headers into SQS _datadog message attribute on send Previously, the Java SQS instrumentation never wrote APM trace context (x-datadog-trace-id, x-datadog-parent-id) into the _datadog message attribute, breaking distributed tracing from Java to other language tracers (Python, Node, etc.) that rely on that attribute. Changes: - SqsInterceptor (v1 + v2): call defaultPropagator().inject() before DSM inject so APM headers are written into _datadog on SendMessage and SendMessageBatch - MessageAttributeInjector (v1 + v2): accumulate multiple key-value pairs into the _datadog JSON blob instead of short-circuiting on the second key - SqsClientInstrumentation (v1 + v2): register SqsInterceptor when isSqsInjectDatadogAttributeEnabled() is true, not only when DSM is enabled, so APM injection works even without Data Streams - Tests: add APM trace context injection test for both v1 and v2, covering DSM-on and DSM-off variants Co-Authored-By: Claude Sonnet 4.6 --- .../aws/v1/sqs/MessageAttributeInjector.java | 25 ++++++++++--- .../aws/v1/sqs/SqsClientInstrumentation.java | 3 +- .../aws/v1/sqs/SqsInterceptor.java | 3 ++ .../src/test/groovy/SqsClientTest.groovy | 33 +++++++++++++++-- .../aws/v2/sqs/MessageAttributeInjector.java | 27 ++++++++++---- .../aws/v2/sqs/SqsClientInstrumentation.java | 3 +- .../aws/v2/sqs/SqsInterceptor.java | 3 ++ .../src/test/groovy/SqsClientTest.groovy | 35 +++++++++++++++++-- 8 files changed, 115 insertions(+), 17 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java index f3302307aca..5df88dc95fd 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java @@ -14,13 +14,28 @@ public class MessageAttributeInjector implements CarrierSetter carrier, final String key, final String value) { - if (carrier.size() < 10 - && !carrier.containsKey(DATADOG_KEY) - && Config.get().isSqsInjectDatadogAttributeEnabled()) { - String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value); + if (!Config.get().isSqsInjectDatadogAttributeEnabled()) { + return; + } + if (!carrier.containsKey(DATADOG_KEY)) { + if (carrier.size() >= 10) { + return; + } carrier.put( DATADOG_KEY, - new MessageAttributeValue().withDataType("String").withStringValue(jsonPathway)); + new MessageAttributeValue() + .withDataType("String") + .withStringValue(String.format("{\"%s\": \"%s\"}", key, value))); + } else { + String existing = carrier.get(DATADOG_KEY).getStringValue(); + int closingBrace = existing.lastIndexOf('}'); + if (closingBrace >= 0) { + String updated = + existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value); + carrier.put( + DATADOG_KEY, + new MessageAttributeValue().withDataType("String").withStringValue(updated)); + } } } } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java index 2e7dee5f360..8c47ff8f660 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java @@ -52,7 +52,8 @@ public Map contextStore() { public static class HandlerChainAdvice { @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void addHandler(@Advice.Return final List handlers) { - if (Config.get().isDataStreamsEnabled()) { + if (Config.get().isDataStreamsEnabled() + || Config.get().isSqsInjectDatadogAttributeEnabled()) { for (RequestHandler2 interceptor : handlers) { if (interceptor instanceof SqsInterceptor) { return; // list already has our interceptor, return to builder diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index 3534e1ed9e2..d23f7d23dcd 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.aws.v1.sqs; +import static datadog.context.propagation.Propagators.defaultPropagator; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; @@ -50,6 +51,7 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request // a kind of ImmutableMap Map messageAttributes = new HashMap<>(smRequest.getMessageAttributes()); + defaultPropagator().inject(context, messageAttributes, SETTER); dsmPropagator.inject(context, messageAttributes, SETTER); // note: modifying message attributes has to be done before marshalling, otherwise the changes // are not reflected in the actual request (and the MD5 check on send will fail). @@ -65,6 +67,7 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) { Map messageAttributes = new HashMap<>(entry.getMessageAttributes()); + defaultPropagator().inject(context, messageAttributes, SETTER); dsmPropagator.inject(context, messageAttributes, SETTER); entry.setMessageAttributes(messageAttributes); } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index a9b8709caae..8b695edbdc6 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -278,10 +278,15 @@ abstract class SqsClientTest extends VersionedNamingTestBase { client.sendMessage(queueUrl, 'sometext') }) def messages = client.receiveMessage(queueUrl).messages + messages.forEach {/* consume to create message spans */ } + + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + } then: def sendSpan - assertTraces(1) { + assertTraces(2) { trace(2) { basicSpan(it, 'parent') span { @@ -306,7 +311,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase { "aws.operation" "SendMessageRequest" "aws.agent" "java-aws-sdk" "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" - if (isDataStreamsEnabled()) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } } serviceNameSource("java-aws-sdk") @@ -315,6 +320,30 @@ abstract class SqsClientTest extends VersionedNamingTestBase { } sendSpan = span(1) } + trace(1) { + span { + serviceName expectedService("SQS", "ReceiveMessage") + operationName expectedOperation("SQS", "ReceiveMessage") + resourceName "SQS.ReceiveMessage" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(sendSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "aws.service" "AmazonSQS" + "aws_service" "sqs" + "aws.operation" "ReceiveMessageRequest" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(true) + } + } + } } def ddAttr = messages[0].messageAttributes['_datadog'] diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java index 889ff8e94c6..f6377e62337 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java @@ -16,14 +16,29 @@ public class MessageAttributeInjector implements CarrierSetter carrier, final String key, final String value) { - if (carrier.size() < 10 - && !carrier.containsKey(DATADOG_KEY) - && Config.get().isSqsInjectDatadogAttributeEnabled()) { - - String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value); + if (!Config.get().isSqsInjectDatadogAttributeEnabled()) { + return; + } + if (!carrier.containsKey(DATADOG_KEY)) { + if (carrier.size() >= 10) { + return; + } carrier.put( DATADOG_KEY, - MessageAttributeValue.builder().dataType("String").stringValue(jsonPathway).build()); + MessageAttributeValue.builder() + .dataType("String") + .stringValue(String.format("{\"%s\": \"%s\"}", key, value)) + .build()); + } else { + String existing = carrier.get(DATADOG_KEY).stringValue(); + int closingBrace = existing.lastIndexOf('}'); + if (closingBrace >= 0) { + String updated = + existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value); + carrier.put( + DATADOG_KEY, + MessageAttributeValue.builder().dataType("String").stringValue(updated).build()); + } } } } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java index 7ce1a46f2d3..f71df8eacee 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java @@ -28,7 +28,8 @@ public void methodAdvice(MethodTransformer transformer) { public static class AwsSqsBuilderAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void methodExit(@Advice.Return final List interceptors) { - if (Config.get().isDataStreamsEnabled()) { + if (Config.get().isDataStreamsEnabled() + || Config.get().isSqsInjectDatadogAttributeEnabled()) { for (ExecutionInterceptor interceptor : interceptors) { if (interceptor instanceof SqsInterceptor) { return; // list already has our interceptor, return to builder diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java index 14bf6591938..1d7dc954ed9 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.aws.v2.sqs; +import static datadog.context.propagation.Propagators.defaultPropagator; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; @@ -51,6 +52,7 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); Map messageAttributes = new HashMap<>(request.messageAttributes()); + defaultPropagator().inject(ctx, messageAttributes, SETTER); dsmPropagator.inject(ctx, messageAttributes, SETTER); return request.toBuilder().messageAttributes(messageAttributes).build(); @@ -69,6 +71,7 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu for (SendMessageBatchRequestEntry entry : request.entries()) { Map messageAttributes = new HashMap<>(entry.messageAttributes()); + defaultPropagator().inject(ctx, messageAttributes, SETTER); dsmPropagator.inject(ctx, messageAttributes, SETTER); entries.add(entry.toBuilder().messageAttributes(messageAttributes).build()); } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy index 638b7fd47f6..139e71cd6af 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy @@ -230,9 +230,15 @@ abstract class SqsClientTest extends VersionedNamingTestBase { }) def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + } + + messages.forEach {/* consume to create message spans */ } + then: def sendSpan - assertTraces(1) { + assertTraces(2) { trace(2) { basicSpan(it, 'parent') span { @@ -256,7 +262,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase { "aws.agent" "java-aws-sdk" "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" "aws.requestId" "00000000-0000-0000-0000-000000000000" - if (isDataStreamsEnabled()) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } } urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("SendMessage")) @@ -266,6 +272,31 @@ abstract class SqsClientTest extends VersionedNamingTestBase { } sendSpan = span(1) } + trace(1) { + span { + serviceName expectedService("Sqs", "ReceiveMessage") + operationName expectedOperation("Sqs", "ReceiveMessage") + resourceName "Sqs.ReceiveMessage" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(sendSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "ReceiveMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(true) + } + } + } } def ddAttr = messages[0].messageAttributes()['_datadog'] From f6a7103ea3441d54672f5599de0a35f42af392f9 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Mar 2026 12:38:53 -0700 Subject: [PATCH 3/6] Guard against null stringValue in MessageAttributeInjector When a message is sent with an existing _datadog attribute of Binary type (e.g. from the SQS-JMS library), getStringValue() returns null. Skip injection in that case rather than NPE. Co-Authored-By: Claude Sonnet 4.6 --- .../instrumentation/aws/v1/sqs/MessageAttributeInjector.java | 3 +++ .../instrumentation/aws/v2/sqs/MessageAttributeInjector.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java index 5df88dc95fd..20be6934327 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java @@ -28,6 +28,9 @@ public void set( .withStringValue(String.format("{\"%s\": \"%s\"}", key, value))); } else { String existing = carrier.get(DATADOG_KEY).getStringValue(); + if (existing == null) { + return; + } int closingBrace = existing.lastIndexOf('}'); if (closingBrace >= 0) { String updated = diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java index f6377e62337..0e34db12015 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java @@ -31,6 +31,9 @@ public void set( .build()); } else { String existing = carrier.get(DATADOG_KEY).stringValue(); + if (existing == null) { + return; + } int closingBrace = existing.lastIndexOf('}'); if (closingBrace >= 0) { String updated = From 5ebeb03bcb929089d71a4f2de126a0ddbd1a1f80 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Mar 2026 13:06:19 -0700 Subject: [PATCH 4/6] Remove redundant dsmPropagator.inject() in SqsInterceptor DSM_CONCERN is registered with usedAsDefault=true, so defaultPropagator() already includes DSM injection when Data Streams is enabled. The separate dsmPropagator.inject() call was therefore always redundant: a no-op when DSM is off, a duplicate when DSM is on. Co-Authored-By: Claude Sonnet 4.6 --- .../trace/instrumentation/aws/v1/sqs/SqsInterceptor.java | 7 ------- .../trace/instrumentation/aws/v2/sqs/SqsInterceptor.java | 7 ------- 2 files changed, 14 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index d23f7d23dcd..a95bdea5b8b 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -4,7 +4,6 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; -import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName; import static datadog.trace.instrumentation.aws.v1.sqs.MessageAttributeInjector.SETTER; @@ -17,8 +16,6 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.SendMessageRequest; import datadog.context.Context; -import datadog.context.propagation.Propagator; -import datadog.context.propagation.Propagators; import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; @@ -45,14 +42,12 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request String queueUrl = smRequest.getQueueUrl(); if (queueUrl == null) return request; - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context context = newContext(request, queueUrl); // making a copy of the MessageAttributes before modifying them because they can be stored in // a kind of ImmutableMap Map messageAttributes = new HashMap<>(smRequest.getMessageAttributes()); defaultPropagator().inject(context, messageAttributes, SETTER); - dsmPropagator.inject(context, messageAttributes, SETTER); // note: modifying message attributes has to be done before marshalling, otherwise the changes // are not reflected in the actual request (and the MD5 check on send will fail). smRequest.setMessageAttributes(messageAttributes); @@ -62,13 +57,11 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request String queueUrl = smbRequest.getQueueUrl(); if (queueUrl == null) return request; - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context context = newContext(request, queueUrl); for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) { Map messageAttributes = new HashMap<>(entry.getMessageAttributes()); defaultPropagator().inject(context, messageAttributes, SETTER); - dsmPropagator.inject(context, messageAttributes, SETTER); entry.setMessageAttributes(messageAttributes); } } else if (request instanceof ReceiveMessageRequest) { diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java index 1d7dc954ed9..3ee78a14739 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java @@ -4,13 +4,10 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; -import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName; import static datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector.SETTER; import datadog.context.Context; -import datadog.context.propagation.Propagator; -import datadog.context.propagation.Propagators; import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; @@ -48,12 +45,10 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu return request; } - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); Map messageAttributes = new HashMap<>(request.messageAttributes()); defaultPropagator().inject(ctx, messageAttributes, SETTER); - dsmPropagator.inject(ctx, messageAttributes, SETTER); return request.toBuilder().messageAttributes(messageAttributes).build(); @@ -64,7 +59,6 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu return request; } - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); List entries = new ArrayList<>(); @@ -72,7 +66,6 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu Map messageAttributes = new HashMap<>(entry.messageAttributes()); defaultPropagator().inject(ctx, messageAttributes, SETTER); - dsmPropagator.inject(ctx, messageAttributes, SETTER); entries.add(entry.toBuilder().messageAttributes(messageAttributes).build()); } From ad0d115b56690749c7ec3b633c83844f297f3178 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Mar 2026 13:26:19 -0700 Subject: [PATCH 5/6] Don't overwrite pre-existing _datadog attribute; clarify accumulation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SqsInterceptor: skip injection if _datadog is already present in the message attributes. Appending to a caller-provided attribute could produce duplicate JSON keys or corrupt propagation context set intentionally upstream. MessageAttributeInjector: add comments explaining why set() accumulates into the existing _datadog value rather than overwriting — a single propagator.inject() call fires set() once per header key, so all keys (x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64, ...) must land in the same JSON attribute. Co-Authored-By: Claude Sonnet 4.6 --- .../aws/v1/sqs/MessageAttributeInjector.java | 4 ++++ .../instrumentation/aws/v1/sqs/SqsInterceptor.java | 12 ++++++++---- .../aws/v2/sqs/MessageAttributeInjector.java | 4 ++++ .../instrumentation/aws/v2/sqs/SqsInterceptor.java | 12 ++++++++---- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java index 20be6934327..f1b3861e670 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java @@ -17,6 +17,9 @@ public void set( if (!Config.get().isSqsInjectDatadogAttributeEnabled()) { return; } + // A single propagator.inject() call invokes set() once per header key (e.g. + // x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be + // accumulated into the same _datadog JSON attribute rather than overwriting each other. if (!carrier.containsKey(DATADOG_KEY)) { if (carrier.size() >= 10) { return; @@ -27,6 +30,7 @@ public void set( .withDataType("String") .withStringValue(String.format("{\"%s\": \"%s\"}", key, value))); } else { + // _datadog was created by an earlier set() call in this same inject session; append to it. String existing = carrier.get(DATADOG_KEY).getStringValue(); if (existing == null) { return; diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index a95bdea5b8b..a73597d0ac1 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -42,12 +42,14 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request String queueUrl = smRequest.getQueueUrl(); if (queueUrl == null) return request; - Context context = newContext(request, queueUrl); // making a copy of the MessageAttributes before modifying them because they can be stored in // a kind of ImmutableMap Map messageAttributes = new HashMap<>(smRequest.getMessageAttributes()); - defaultPropagator().inject(context, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + Context context = newContext(request, queueUrl); + defaultPropagator().inject(context, messageAttributes, SETTER); + } // note: modifying message attributes has to be done before marshalling, otherwise the changes // are not reflected in the actual request (and the MD5 check on send will fail). smRequest.setMessageAttributes(messageAttributes); @@ -57,11 +59,13 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request String queueUrl = smbRequest.getQueueUrl(); if (queueUrl == null) return request; - Context context = newContext(request, queueUrl); for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) { Map messageAttributes = new HashMap<>(entry.getMessageAttributes()); - defaultPropagator().inject(context, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + Context context = newContext(request, queueUrl); + defaultPropagator().inject(context, messageAttributes, SETTER); + } entry.setMessageAttributes(messageAttributes); } } else if (request instanceof ReceiveMessageRequest) { diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java index 0e34db12015..59dfb4f9e74 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java @@ -19,6 +19,9 @@ public void set( if (!Config.get().isSqsInjectDatadogAttributeEnabled()) { return; } + // A single propagator.inject() call invokes set() once per header key (e.g. + // x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be + // accumulated into the same _datadog JSON attribute rather than overwriting each other. if (!carrier.containsKey(DATADOG_KEY)) { if (carrier.size() >= 10) { return; @@ -30,6 +33,7 @@ public void set( .stringValue(String.format("{\"%s\": \"%s\"}", key, value)) .build()); } else { + // _datadog was created by an earlier set() call in this same inject session; append to it. String existing = carrier.get(DATADOG_KEY).stringValue(); if (existing == null) { return; diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java index 3ee78a14739..9217ac72b7d 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java @@ -45,10 +45,12 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu return request; } - Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); Map messageAttributes = new HashMap<>(request.messageAttributes()); - defaultPropagator().inject(ctx, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); + defaultPropagator().inject(ctx, messageAttributes, SETTER); + } return request.toBuilder().messageAttributes(messageAttributes).build(); @@ -59,13 +61,15 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu return request; } - Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); List entries = new ArrayList<>(); for (SendMessageBatchRequestEntry entry : request.entries()) { Map messageAttributes = new HashMap<>(entry.messageAttributes()); - defaultPropagator().inject(ctx, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); + defaultPropagator().inject(ctx, messageAttributes, SETTER); + } entries.add(entry.toBuilder().messageAttributes(messageAttributes).build()); } From 8f7f04ca71f8b24654213cfc3871321b324c3e0d Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Mar 2026 15:25:05 -0700 Subject: [PATCH 6/6] Fix span leak in SQS SendMessageBatch: create context once per batch Moving newContext/getContext outside the per-entry loop prevents creating a new span per entry where each overwrites the same contextStore slot, leaking all but the last span and injecting mismatched parent IDs into earlier batch messages. Co-Authored-By: Claude Sonnet 4.6 --- .../trace/instrumentation/aws/v1/sqs/SqsInterceptor.java | 2 +- .../trace/instrumentation/aws/v2/sqs/SqsInterceptor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index a73597d0ac1..0179ac4338c 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -59,11 +59,11 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request String queueUrl = smbRequest.getQueueUrl(); if (queueUrl == null) return request; + Context context = newContext(request, queueUrl); for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) { Map messageAttributes = new HashMap<>(entry.getMessageAttributes()); if (!messageAttributes.containsKey(DATADOG_KEY)) { - Context context = newContext(request, queueUrl); defaultPropagator().inject(context, messageAttributes, SETTER); } entry.setMessageAttributes(messageAttributes); diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java index 9217ac72b7d..6be07206130 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java @@ -61,13 +61,13 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu return request; } + Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); List entries = new ArrayList<>(); for (SendMessageBatchRequestEntry entry : request.entries()) { Map messageAttributes = new HashMap<>(entry.messageAttributes()); if (!messageAttributes.containsKey(DATADOG_KEY)) { - Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); defaultPropagator().inject(ctx, messageAttributes, SETTER); } entries.add(entry.toBuilder().messageAttributes(messageAttributes).build());