diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java index f3302307aca..f1b3861e670 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java @@ -14,13 +14,35 @@ public class MessageAttributeInjector implements CarrierSetter carrier, final String key, final String value) { - if (carrier.size() < 10 - && !carrier.containsKey(DATADOG_KEY) - && Config.get().isSqsInjectDatadogAttributeEnabled()) { - String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value); + if (!Config.get().isSqsInjectDatadogAttributeEnabled()) { + return; + } + // A single propagator.inject() call invokes set() once per header key (e.g. + // x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be + // accumulated into the same _datadog JSON attribute rather than overwriting each other. + if (!carrier.containsKey(DATADOG_KEY)) { + if (carrier.size() >= 10) { + return; + } carrier.put( DATADOG_KEY, - new MessageAttributeValue().withDataType("String").withStringValue(jsonPathway)); + new MessageAttributeValue() + .withDataType("String") + .withStringValue(String.format("{\"%s\": \"%s\"}", key, value))); + } else { + // _datadog was created by an earlier set() call in this same inject session; append to it. + String existing = carrier.get(DATADOG_KEY).getStringValue(); + if (existing == null) { + return; + } + int closingBrace = existing.lastIndexOf('}'); + if (closingBrace >= 0) { + String updated = + existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value); + carrier.put( + DATADOG_KEY, + new MessageAttributeValue().withDataType("String").withStringValue(updated)); + } } } } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java index 2e7dee5f360..8c47ff8f660 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java @@ -52,7 +52,8 @@ public Map contextStore() { public static class HandlerChainAdvice { @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void addHandler(@Advice.Return final List handlers) { - if (Config.get().isDataStreamsEnabled()) { + if (Config.get().isDataStreamsEnabled() + || Config.get().isSqsInjectDatadogAttributeEnabled()) { for (RequestHandler2 interceptor : handlers) { if (interceptor instanceof SqsInterceptor) { return; // list already has our interceptor, return to builder diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index 3534e1ed9e2..0179ac4338c 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -1,9 +1,9 @@ package datadog.trace.instrumentation.aws.v1.sqs; +import static datadog.context.propagation.Propagators.defaultPropagator; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; -import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName; import static datadog.trace.instrumentation.aws.v1.sqs.MessageAttributeInjector.SETTER; @@ -16,8 +16,6 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.SendMessageRequest; import datadog.context.Context; -import datadog.context.propagation.Propagator; -import datadog.context.propagation.Propagators; import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; @@ -44,13 +42,14 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request String queueUrl = smRequest.getQueueUrl(); if (queueUrl == null) return request; - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); - Context context = newContext(request, queueUrl); // making a copy of the MessageAttributes before modifying them because they can be stored in // a kind of ImmutableMap Map messageAttributes = new HashMap<>(smRequest.getMessageAttributes()); - dsmPropagator.inject(context, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + Context context = newContext(request, queueUrl); + defaultPropagator().inject(context, messageAttributes, SETTER); + } // note: modifying message attributes has to be done before marshalling, otherwise the changes // are not reflected in the actual request (and the MD5 check on send will fail). smRequest.setMessageAttributes(messageAttributes); @@ -60,12 +59,13 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request String queueUrl = smbRequest.getQueueUrl(); if (queueUrl == null) return request; - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context context = newContext(request, queueUrl); for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) { Map messageAttributes = new HashMap<>(entry.getMessageAttributes()); - dsmPropagator.inject(context, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + defaultPropagator().inject(context, messageAttributes, SETTER); + } entry.setMessageAttributes(messageAttributes); } } else if (request instanceof ReceiveMessageRequest) { diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index 43bff3872e3..8b695edbdc6 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -264,6 +264,100 @@ abstract class SqsClientTest extends VersionedNamingTestBase { client.shutdown() } + def "APM trace context is injected into _datadog message attribute on send"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace('parent', { + client.sendMessage(queueUrl, 'sometext') + }) + def messages = client.receiveMessage(queueUrl).messages + messages.forEach {/* consume to create message spans */ } + + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + } + + then: + def sendSpan + assertTraces(2) { + trace(2) { + basicSpan(it, 'parent') + span { + serviceName expectedService("SQS", "SendMessage") + operationName expectedOperation("SQS", "SendMessage") + resourceName "SQS.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" "http://localhost:${address.port}/" + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "AmazonSQS" + "aws_service" "sqs" + "aws.endpoint" "http://localhost:${address.port}" + "aws.operation" "SendMessageRequest" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + serviceNameSource("java-aws-sdk") + defaultTags() + } + } + sendSpan = span(1) + } + trace(1) { + span { + serviceName expectedService("SQS", "ReceiveMessage") + operationName expectedOperation("SQS", "ReceiveMessage") + resourceName "SQS.ReceiveMessage" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(sendSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "aws.service" "AmazonSQS" + "aws_service" "sqs" + "aws.operation" "ReceiveMessageRequest" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(true) + } + } + } + } + + def ddAttr = messages[0].messageAttributes['_datadog'] + ddAttr != null + ddAttr.dataType == 'String' + ddAttr.stringValue.contains('"x-datadog-trace-id"') + ddAttr.stringValue.contains(sendSpan.traceId.toString()) + ddAttr.stringValue.contains('"x-datadog-parent-id"') + ddAttr.stringValue.contains(Long.toUnsignedString(sendSpan.spanId)) + + cleanup: + client.shutdown() + } + @IgnoreIf({ instance.isDataStreamsEnabled() }) def "trace details propagated via embedded SQS message attribute (string)"() { setup: diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java index 889ff8e94c6..59dfb4f9e74 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java @@ -16,14 +16,36 @@ public class MessageAttributeInjector implements CarrierSetter carrier, final String key, final String value) { - if (carrier.size() < 10 - && !carrier.containsKey(DATADOG_KEY) - && Config.get().isSqsInjectDatadogAttributeEnabled()) { - - String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value); + if (!Config.get().isSqsInjectDatadogAttributeEnabled()) { + return; + } + // A single propagator.inject() call invokes set() once per header key (e.g. + // x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be + // accumulated into the same _datadog JSON attribute rather than overwriting each other. + if (!carrier.containsKey(DATADOG_KEY)) { + if (carrier.size() >= 10) { + return; + } carrier.put( DATADOG_KEY, - MessageAttributeValue.builder().dataType("String").stringValue(jsonPathway).build()); + MessageAttributeValue.builder() + .dataType("String") + .stringValue(String.format("{\"%s\": \"%s\"}", key, value)) + .build()); + } else { + // _datadog was created by an earlier set() call in this same inject session; append to it. + String existing = carrier.get(DATADOG_KEY).stringValue(); + if (existing == null) { + return; + } + int closingBrace = existing.lastIndexOf('}'); + if (closingBrace >= 0) { + String updated = + existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value); + carrier.put( + DATADOG_KEY, + MessageAttributeValue.builder().dataType("String").stringValue(updated).build()); + } } } } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java index 7ce1a46f2d3..f71df8eacee 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java @@ -28,7 +28,8 @@ public void methodAdvice(MethodTransformer transformer) { public static class AwsSqsBuilderAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void methodExit(@Advice.Return final List interceptors) { - if (Config.get().isDataStreamsEnabled()) { + if (Config.get().isDataStreamsEnabled() + || Config.get().isSqsInjectDatadogAttributeEnabled()) { for (ExecutionInterceptor interceptor : interceptors) { if (interceptor instanceof SqsInterceptor) { return; // list already has our interceptor, return to builder diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java index 14bf6591938..6be07206130 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java @@ -1,15 +1,13 @@ package datadog.trace.instrumentation.aws.v2.sqs; +import static datadog.context.propagation.Propagators.defaultPropagator; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; -import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName; import static datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector.SETTER; import datadog.context.Context; -import datadog.context.propagation.Propagator; -import datadog.context.propagation.Propagators; import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; @@ -47,11 +45,12 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu return request; } - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); - Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); Map messageAttributes = new HashMap<>(request.messageAttributes()); - dsmPropagator.inject(ctx, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); + defaultPropagator().inject(ctx, messageAttributes, SETTER); + } return request.toBuilder().messageAttributes(messageAttributes).build(); @@ -62,14 +61,15 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu return request; } - Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context ctx = getContext(executionAttributes, optionalQueueUrl.get()); List entries = new ArrayList<>(); for (SendMessageBatchRequestEntry entry : request.entries()) { Map messageAttributes = new HashMap<>(entry.messageAttributes()); - dsmPropagator.inject(ctx, messageAttributes, SETTER); + if (!messageAttributes.containsKey(DATADOG_KEY)) { + defaultPropagator().inject(ctx, messageAttributes, SETTER); + } entries.add(entry.toBuilder().messageAttributes(messageAttributes).build()); } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy index 067397524e7..139e71cd6af 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy @@ -214,6 +214,103 @@ abstract class SqsClientTest extends VersionedNamingTestBase { cleanup: client.close() } + def "APM trace context is injected into _datadog message attribute on send"() { + setup: + def client = SqsClient.builder() + .region(Region.EU_CENTRAL_1) + .endpointOverride(endpoint) + .credentialsProvider(credentialsProvider) + .build() + def queueUrl = client.createQueue(CreateQueueRequest.builder().queueName('somequeue').build()).queueUrl() + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace('parent', { + client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody('sometext').build()) + }) + def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + } + + messages.forEach {/* consume to create message spans */ } + + then: + def sendSpan + assertTraces(2) { + trace(2) { + basicSpan(it, 'parent') + span { + serviceName expectedService("Sqs", "SendMessage") + operationName expectedOperation("Sqs", "SendMessage") + resourceName "Sqs.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "SendMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("SendMessage")) + serviceNameSource("java-aws-sdk") + defaultTags() + } + } + sendSpan = span(1) + } + trace(1) { + span { + serviceName expectedService("Sqs", "ReceiveMessage") + operationName expectedOperation("Sqs", "ReceiveMessage") + resourceName "Sqs.ReceiveMessage" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(sendSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "ReceiveMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(true) + } + } + } + } + + def ddAttr = messages[0].messageAttributes()['_datadog'] + ddAttr != null + ddAttr.dataType() == 'String' + ddAttr.stringValue().contains('"x-datadog-trace-id"') + ddAttr.stringValue().contains(sendSpan.traceId.toString()) + ddAttr.stringValue().contains('"x-datadog-parent-id"') + ddAttr.stringValue().contains(Long.toUnsignedString(sendSpan.spanId)) + + cleanup: + client.close() + } + @IgnoreIf({instance.isDataStreamsEnabled()}) def "trace details propagated via embedded SQS message attribute (string)"() { setup: