Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,35 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
@Override
public void set(
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
if (carrier.size() < 10
&& !carrier.containsKey(DATADOG_KEY)
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {
String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
if (!Config.get().isSqsInjectDatadogAttributeEnabled()) {
return;
}
// A single propagator.inject() call invokes set() once per header key (e.g.
// x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be
// accumulated into the same _datadog JSON attribute rather than overwriting each other.
if (!carrier.containsKey(DATADOG_KEY)) {
if (carrier.size() >= 10) {
return;
}
carrier.put(
DATADOG_KEY,
new MessageAttributeValue().withDataType("String").withStringValue(jsonPathway));
new MessageAttributeValue()
.withDataType("String")
.withStringValue(String.format("{\"%s\": \"%s\"}", key, value)));
} else {
// _datadog was created by an earlier set() call in this same inject session; append to it.
String existing = carrier.get(DATADOG_KEY).getStringValue();
if (existing == null) {
return;
}
int closingBrace = existing.lastIndexOf('}');
if (closingBrace >= 0) {
String updated =
existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value);
carrier.put(
DATADOG_KEY,
new MessageAttributeValue().withDataType("String").withStringValue(updated));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public Map<String, String> contextStore() {
public static class HandlerChainAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<RequestHandler2> handlers) {
if (Config.get().isDataStreamsEnabled()) {
if (Config.get().isDataStreamsEnabled()
|| Config.get().isSqsInjectDatadogAttributeEnabled()) {
for (RequestHandler2 interceptor : handlers) {
if (interceptor instanceof SqsInterceptor) {
return; // list already has our interceptor, return to builder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.create;
import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
import static datadog.trace.instrumentation.aws.v1.sqs.MessageAttributeInjector.SETTER;
Expand All @@ -16,8 +16,6 @@
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import datadog.context.Context;
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
Expand All @@ -44,13 +42,14 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
String queueUrl = smRequest.getQueueUrl();
if (queueUrl == null) return request;

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context context = newContext(request, queueUrl);
// making a copy of the MessageAttributes before modifying them because they can be stored in
// a kind of ImmutableMap
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(smRequest.getMessageAttributes());
dsmPropagator.inject(context, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
Context context = newContext(request, queueUrl);
defaultPropagator().inject(context, messageAttributes, SETTER);
}
// note: modifying message attributes has to be done before marshalling, otherwise the changes
// are not reflected in the actual request (and the MD5 check on send will fail).
smRequest.setMessageAttributes(messageAttributes);
Expand All @@ -60,12 +59,13 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
String queueUrl = smbRequest.getQueueUrl();
if (queueUrl == null) return request;

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context context = newContext(request, queueUrl);
for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.getMessageAttributes());
dsmPropagator.inject(context, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
defaultPropagator().inject(context, messageAttributes, SETTER);
}
entry.setMessageAttributes(messageAttributes);
}
} else if (request instanceof ReceiveMessageRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,100 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
client.shutdown()
}

def "APM trace context is injected into _datadog message attribute on send"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
TraceUtils.runUnderTrace('parent', {
client.sendMessage(queueUrl, 'sometext')
})
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {/* consume to create message spans */ }

if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
}

then:
def sendSpan
assertTraces(2) {
trace(2) {
basicSpan(it, 'parent')
span {
serviceName expectedService("SQS", "SendMessage")
operationName expectedOperation("SQS", "SendMessage")
resourceName "SQS.SendMessage"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
childOf(span(0))
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" "http://localhost:${address.port}/"
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" address.port
"$Tags.PEER_HOSTNAME" "localhost"
"aws.service" "AmazonSQS"
"aws_service" "sqs"
"aws.endpoint" "http://localhost:${address.port}"
"aws.operation" "SendMessageRequest"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
if ({ isDataStreamsEnabled() }) {
"$DDTags.PATHWAY_HASH" { String }
}
serviceNameSource("java-aws-sdk")
defaultTags()
}
}
sendSpan = span(1)
}
trace(1) {
span {
serviceName expectedService("SQS", "ReceiveMessage")
operationName expectedOperation("SQS", "ReceiveMessage")
resourceName "SQS.ReceiveMessage"
spanType DDSpanTypes.MESSAGE_CONSUMER
errored false
measured true
childOf(sendSpan)
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"aws.service" "AmazonSQS"
"aws_service" "sqs"
"aws.operation" "ReceiveMessageRequest"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
if ({ isDataStreamsEnabled() }) {
"$DDTags.PATHWAY_HASH" { String }
}
defaultTags(true)
}
}
}
}

def ddAttr = messages[0].messageAttributes['_datadog']
ddAttr != null
ddAttr.dataType == 'String'
ddAttr.stringValue.contains('"x-datadog-trace-id"')
ddAttr.stringValue.contains(sendSpan.traceId.toString())
ddAttr.stringValue.contains('"x-datadog-parent-id"')
ddAttr.stringValue.contains(Long.toUnsignedString(sendSpan.spanId))

cleanup:
client.shutdown()
}

@IgnoreIf({ instance.isDataStreamsEnabled() })
def "trace details propagated via embedded SQS message attribute (string)"() {
setup:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,36 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
@Override
public void set(
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
if (carrier.size() < 10
&& !carrier.containsKey(DATADOG_KEY)
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {

String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
if (!Config.get().isSqsInjectDatadogAttributeEnabled()) {
return;
}
// A single propagator.inject() call invokes set() once per header key (e.g.
// x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be
// accumulated into the same _datadog JSON attribute rather than overwriting each other.
if (!carrier.containsKey(DATADOG_KEY)) {
if (carrier.size() >= 10) {
return;
}
carrier.put(
DATADOG_KEY,
MessageAttributeValue.builder().dataType("String").stringValue(jsonPathway).build());
MessageAttributeValue.builder()
.dataType("String")
.stringValue(String.format("{\"%s\": \"%s\"}", key, value))
.build());
} else {
// _datadog was created by an earlier set() call in this same inject session; append to it.
String existing = carrier.get(DATADOG_KEY).stringValue();
if (existing == null) {
return;
}
int closingBrace = existing.lastIndexOf('}');
if (closingBrace >= 0) {
String updated =
existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value);
carrier.put(
DATADOG_KEY,
MessageAttributeValue.builder().dataType("String").stringValue(updated).build());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public void methodAdvice(MethodTransformer transformer) {
public static class AwsSqsBuilderAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(@Advice.Return final List<ExecutionInterceptor> interceptors) {
if (Config.get().isDataStreamsEnabled()) {
if (Config.get().isDataStreamsEnabled()
|| Config.get().isSqsInjectDatadogAttributeEnabled()) {
for (ExecutionInterceptor interceptor : interceptors) {
if (interceptor instanceof SqsInterceptor) {
return; // list already has our interceptor, return to builder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.create;
import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
import static datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector.SETTER;

import datadog.context.Context;
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
Expand Down Expand Up @@ -47,11 +45,12 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
return request;
}

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
dsmPropagator.inject(ctx, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
defaultPropagator().inject(ctx, messageAttributes, SETTER);
}

return request.toBuilder().messageAttributes(messageAttributes).build();

Expand All @@ -62,14 +61,15 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
return request;
}

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
List<SendMessageBatchRequestEntry> entries = new ArrayList<>();

for (SendMessageBatchRequestEntry entry : request.entries()) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());
dsmPropagator.inject(ctx, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
defaultPropagator().inject(ctx, messageAttributes, SETTER);
}
entries.add(entry.toBuilder().messageAttributes(messageAttributes).build());
}

Expand Down
Loading
Loading