diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle
index 336ea73a0ec..66fc04a0396 100644
--- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle
@@ -1,13 +1,24 @@
+import org.jetbrains.kotlin.gradle.dsl.JvmTarget
+import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
+
+plugins {
+ id 'org.jetbrains.kotlin.jvm'
+}
+
muzzle {
pass {
group = 'org.springframework'
module = 'spring-messaging'
versions = "[4.0.0.RELEASE,)"
assertInverse = true
+ // KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
+ // which is not bundled in spring-messaging but is always present when Spring Kafka is.
+ extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
}
}
apply from: "$rootDir/gradle/java.gradle"
+apply from: "$rootDir/gradle/test-with-kotlin.gradle"
testJvmConstraints {
minJavaVersion = JavaVersion.VERSION_17
@@ -16,13 +27,24 @@ testJvmConstraints {
addTestSuiteForDir('latestDepTest', 'test')
["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
+ def kotlinTaskName = name.replace("Groovy", "Kotlin")
tasks.named(name, GroovyCompile) {
configureCompiler(it, 17)
+ classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
+ }
+}
+
+kotlin {
+ compilerOptions {
+ jvmTarget = JvmTarget.JVM_1_8
+ apiVersion = KotlinVersion.KOTLIN_1_9
+ languageVersion = KotlinVersion.KOTLIN_1_9
}
}
dependencies {
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
+ compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')
// capture SQS send and receive spans, propagate trace details in messages
@@ -36,6 +58,32 @@ dependencies {
}
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'
+ // Spring Kafka + embedded Kafka broker for coroutine tests
+ testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
+ exclude group: 'org.apache.kafka'
+ }
+ testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
+ exclude group: 'org.apache.kafka'
+ }
+
+ // KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation
+ testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')
+
+ testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
+ testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
+ testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
+ testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
+ testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'
+
+ testImplementation libs.kotlin
+ testImplementation "org.jetbrains.kotlin:kotlin-reflect"
+ testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
+ testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
+ testImplementation "io.projectreactor:reactor-core:3.+"
+
+ testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
+ testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')
+
latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java
new file mode 100644
index 00000000000..72f2c181996
--- /dev/null
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java
@@ -0,0 +1,71 @@
+package datadog.trace.instrumentation.springmessaging;
+
+import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.isMethod;
+
+import com.google.auto.service.AutoService;
+import datadog.context.Context;
+import datadog.trace.agent.tooling.Instrumenter;
+import datadog.trace.agent.tooling.InstrumenterModule;
+import datadog.trace.bootstrap.InstrumentationContext;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import net.bytebuddy.asm.Advice;
+import org.reactivestreams.Publisher;
+
+/**
+ * Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
+ * Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
+ * it during subscription.
+ *
+ *
When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
+ * KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
+ * listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
+ * AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
+ * SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
+ * Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
+ * and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
+ * retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
+ * up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
+ */
+@AutoService(InstrumenterModule.class)
+public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
+ implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
+
+ public KotlinAwareHandlerInstrumentation() {
+ super("spring-messaging", "spring-messaging-4");
+ }
+
+ @Override
+ public Map contextStore() {
+ return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
+ }
+
+ @Override
+ public List typeInstrumentations() {
+ return Collections.singletonList(new KotlinAwareHandlerInstrumentation());
+ }
+
+ @Override
+ public String instrumentedType() {
+ return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
+ }
+
+ @Override
+ public void methodAdvice(MethodTransformer transformer) {
+ transformer.applyAdvice(
+ isMethod().and(named("doInvoke")),
+ KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
+ }
+
+ public static class DoInvokeAdvice {
+ @Advice.OnMethodExit(suppress = Throwable.class)
+ public static void onExit(@Advice.Return Object result) {
+ if (result instanceof Publisher) {
+ InstrumentationContext.get(Publisher.class, Context.class)
+ .put((Publisher>) result, Context.current());
+ }
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy
new file mode 100644
index 00000000000..834b31f2a62
--- /dev/null
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy
@@ -0,0 +1,148 @@
+import datadog.trace.agent.test.InstrumentationSpecification
+import datadog.trace.agent.test.asserts.TraceAssert
+import listener.KafkaBatchCoroutineConfig
+import listener.KafkaBatchCoroutineListener
+import datadog.trace.api.DDSpanTypes
+import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
+import datadog.trace.bootstrap.instrumentation.api.Tags
+import datadog.trace.core.DDSpan
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.springframework.context.annotation.AnnotationConfigApplicationContext
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.test.EmbeddedKafkaBroker
+import org.springframework.kafka.test.utils.ContainerTestUtils
+
+import java.util.concurrent.TimeUnit
+
+class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification {
+
+ private static final String TOPIC = "batch-coroutine-topic"
+ private static final String CONSUMER_GROUP = "batch-coroutine-group"
+
+ def "batch @KafkaListener suspend fun - spans must be in the same trace as kafka.consume"() {
+ setup:
+ def appContext = new AnnotationConfigApplicationContext(KafkaBatchCoroutineConfig)
+ def listener = appContext.getBean(KafkaBatchCoroutineListener)
+ def template = appContext.getBean(KafkaTemplate)
+ def broker = appContext.getBean(EmbeddedKafkaBroker)
+ def registry = appContext.getBean(KafkaListenerEndpointRegistry)
+
+ // Wait until listener container has been assigned partitions before sending.
+ registry.listenerContainers.each { container ->
+ ContainerTestUtils.waitForAssignment(container, broker.partitionsPerTopic)
+ }
+
+ TEST_WRITER.clear()
+
+ when: "two messages are sent before the consumer polls so they arrive in one batch"
+ registry.listenerContainers.each { it.stop() }
+ template.send(new ProducerRecord(TOPIC, "key", "hello-batch"))
+ template.send(new ProducerRecord(TOPIC, "key", "hello-batch"))
+ template.flush()
+ registry.listenerContainers.each { it.start() }
+
+ then: "the listener processes the batch within 15 s"
+ listener.latch.await(15, TimeUnit.SECONDS)
+ listener.receivedValues == ["hello-batch", "hello-batch"]
+
+ and: "child.work is a child of spring.consume"
+ DDSpan produce1Span, produce2Span, springConsumeParent
+ assertTraces(10, SORT_TRACES_BY_ID) {
+ trace(1) {
+ produceSpan(it)
+ produce1Span = span(0)
+ }
+ trace(1) {
+ produceSpan(it)
+ produce2Span = span(0)
+ }
+
+ trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
+ trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
+ trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
+ trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
+
+ trace(1) {
+ // consume messages in one batch
+ springConsumeSpan(it)
+ springConsumeParent = span(0)
+ }
+ // child work span connected to the spring consume span
+ trace(1) { childWorkSpan(it, springConsumeParent) }
+
+ trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
+ trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
+ }
+
+ cleanup:
+ appContext.close()
+ }
+
+ private static void produceSpan(TraceAssert trace) {
+ trace.span {
+ operationName "kafka.produce"
+ resourceName "Produce Topic $TOPIC"
+ spanType "queue"
+ errored false
+ measured true
+ parent()
+ tags {
+ "$Tags.COMPONENT" "java-kafka"
+ "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
+ "$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC
+ "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String }
+ peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
+ defaultTags()
+ }
+ }
+ }
+
+ private static void kafkaConsumeSpan(TraceAssert trace, DDSpan parent, int offset) {
+ trace.span {
+ operationName "kafka.consume"
+ resourceName "Consume Topic $TOPIC"
+ spanType "queue"
+ errored false
+ measured true
+ childOf parent
+ tags {
+ "$Tags.COMPONENT" "java-kafka"
+ "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
+ "$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC
+ "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String }
+ peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
+ "$InstrumentationTags.CONSUMER_GROUP" CONSUMER_GROUP
+ "$InstrumentationTags.OFFSET" offset
+ "$InstrumentationTags.PARTITION" { Integer }
+ "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { Long }
+ "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { Long }
+ defaultTags(true)
+ }
+ }
+ }
+
+ private static void springConsumeSpan(TraceAssert trace) {
+ trace.span {
+ operationName "spring.consume"
+ resourceName "KafkaBatchCoroutineListener.consume"
+ spanType DDSpanTypes.MESSAGE_CONSUMER
+ errored false
+ measured true
+ parent()
+ tags {
+ "$Tags.COMPONENT" "spring-messaging"
+ "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
+ defaultTags(true)
+ }
+ }
+ }
+
+ private static void childWorkSpan(TraceAssert trace, DDSpan parent) {
+ trace.span {
+ operationName "child.work"
+ childOf parent
+ tags { defaultTags() }
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt
new file mode 100644
index 00000000000..3272a3371a6
--- /dev/null
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt
@@ -0,0 +1,88 @@
+package listener
+
+import datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.test.EmbeddedKafkaBroker
+import org.springframework.kafka.test.EmbeddedKafkaKraftBroker
+import org.springframework.kafka.test.utils.KafkaTestUtils
+import org.springframework.stereotype.Component
+import java.util.concurrent.CountDownLatch
+
+const val BATCH_COROUTINE_TOPIC = "batch-coroutine-topic"
+
+@Configuration(proxyBeanMethods = false)
+@EnableKafka
+class KafkaBatchCoroutineConfig {
+
+ @Bean(destroyMethod = "destroy")
+ fun embeddedKafkaBroker(): EmbeddedKafkaBroker {
+ val broker = EmbeddedKafkaKraftBroker(1, 2, BATCH_COROUTINE_TOPIC)
+ broker.afterPropertiesSet()
+ return broker
+ }
+
+ @Bean
+ fun producerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaProducerFactory {
+ val props = HashMap(KafkaTestUtils.producerProps(broker.brokersAsString))
+ props["key.serializer"] = StringSerializer::class.java.name
+ props["value.serializer"] = StringSerializer::class.java.name
+ return DefaultKafkaProducerFactory(props)
+ }
+
+ @Bean
+ fun kafkaTemplate(pf: DefaultKafkaProducerFactory) = KafkaTemplate(pf)
+
+ @Bean
+ fun consumerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaConsumerFactory {
+ val props = HashMap(KafkaTestUtils.consumerProps("batch-coroutine-group", "false", broker))
+ props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
+ props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
+ props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
+ return DefaultKafkaConsumerFactory(props)
+ }
+
+ @Bean
+ fun batchListenerContainerFactory(
+ cf: DefaultKafkaConsumerFactory
+ ): ConcurrentKafkaListenerContainerFactory {
+ val factory = ConcurrentKafkaListenerContainerFactory()
+ factory.consumerFactory = cf
+ factory.isBatchListener = true
+ return factory
+ }
+
+ @Bean
+ fun kafkaBatchCoroutineListener() = KafkaBatchCoroutineListener()
+}
+
+@Component
+class KafkaBatchCoroutineListener {
+
+ val latch = CountDownLatch(1)
+ val receivedValues = mutableListOf()
+
+ @KafkaListener(
+ topics = [BATCH_COROUTINE_TOPIC],
+ containerFactory = "batchListenerContainerFactory"
+ )
+ suspend fun consume(records: List>) {
+ Exception("consume records").printStackTrace(System.err)
+ // Create a child span inside the coroutine body.
+ // It should be linked to spring.consume, which should be linked to kafka.consume.
+ val childSpan = startSpan("child.work")
+ records.forEach { receivedValues.add(it.value()) }
+ childSpan.finish()
+ latch.countDown()
+ }
+}