diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle index 336ea73a0ec..66fc04a0396 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle @@ -1,13 +1,24 @@ +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.dsl.KotlinVersion + +plugins { + id 'org.jetbrains.kotlin.jvm' +} + muzzle { pass { group = 'org.springframework' module = 'spring-messaging' versions = "[4.0.0.RELEASE,)" assertInverse = true + // KotlinAwareHandlerInstrumentation references Publisher from reactive-streams, + // which is not bundled in spring-messaging but is always present when Spring Kafka is. + extraDependency 'org.reactivestreams:reactive-streams:1.0.4' } } apply from: "$rootDir/gradle/java.gradle" +apply from: "$rootDir/gradle/test-with-kotlin.gradle" testJvmConstraints { minJavaVersion = JavaVersion.VERSION_17 @@ -16,13 +27,24 @@ testJvmConstraints { addTestSuiteForDir('latestDepTest', 'test') ["compileTestGroovy", "compileLatestDepTestGroovy"].each { name -> + def kotlinTaskName = name.replace("Groovy", "Kotlin") tasks.named(name, GroovyCompile) { configureCompiler(it, 17) + classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory }) + } +} + +kotlin { + compilerOptions { + jvmTarget = JvmTarget.JVM_1_8 + apiVersion = KotlinVersion.KOTLIN_1_9 + languageVersion = KotlinVersion.KOTLIN_1_9 } } dependencies { compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE' + compileOnly 'org.reactivestreams:reactive-streams:1.0.4' testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common') // capture SQS send and receive spans, propagate trace details in messages @@ -36,6 +58,32 @@ dependencies { } testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3' + // Spring Kafka + embedded Kafka broker for coroutine tests + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', { + exclude group: 'org.apache.kafka' + } + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', { + exclude group: 'org.apache.kafka' + } + + // KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation + testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0') + + testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test' + testImplementation 'org.apache.kafka:kafka-clients:3.8.0' + testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test' + testImplementation 'org.apache.kafka:kafka_2.13:3.8.0' + testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test' + + testImplementation libs.kotlin + testImplementation "org.jetbrains.kotlin:kotlin-reflect" + testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+" + testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+" + testImplementation "io.projectreactor:reactor-core:3.+" + + testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3') + testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8') + latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', { exclude group: 'org.slf4j', module: 'slf4j-api' } diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java new file mode 100644 index 00000000000..72f2c181996 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java @@ -0,0 +1,71 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import org.reactivestreams.Publisher; + +/** + * Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link + * Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates + * it during subscription. + * + *

When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code + * KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the + * listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code + * AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link + * SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link + * Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active — + * and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then + * retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks + * up the correct parent context when the underlying {@code AbstractCoroutine} is constructed. + */ +@AutoService(InstrumenterModule.class) +public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public KotlinAwareHandlerInstrumentation() { + super("spring-messaging", "spring-messaging-4"); + } + + @Override + public Map contextStore() { + return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); + } + + @Override + public List typeInstrumentations() { + return Collections.singletonList(new KotlinAwareHandlerInstrumentation()); + } + + @Override + public String instrumentedType() { + return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("doInvoke")), + KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice"); + } + + public static class DoInvokeAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return Object result) { + if (result instanceof Publisher) { + InstrumentationContext.get(Publisher.class, Context.class) + .put((Publisher) result, Context.current()); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy new file mode 100644 index 00000000000..834b31f2a62 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy @@ -0,0 +1,148 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.agent.test.asserts.TraceAssert +import listener.KafkaBatchCoroutineConfig +import listener.KafkaBatchCoroutineListener +import datadog.trace.api.DDSpanTypes +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.DDSpan +import org.apache.kafka.clients.producer.ProducerRecord +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.kafka.config.KafkaListenerEndpointRegistry +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.utils.ContainerTestUtils + +import java.util.concurrent.TimeUnit + +class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { + + private static final String TOPIC = "batch-coroutine-topic" + private static final String CONSUMER_GROUP = "batch-coroutine-group" + + def "batch @KafkaListener suspend fun - spans must be in the same trace as kafka.consume"() { + setup: + def appContext = new AnnotationConfigApplicationContext(KafkaBatchCoroutineConfig) + def listener = appContext.getBean(KafkaBatchCoroutineListener) + def template = appContext.getBean(KafkaTemplate) + def broker = appContext.getBean(EmbeddedKafkaBroker) + def registry = appContext.getBean(KafkaListenerEndpointRegistry) + + // Wait until listener container has been assigned partitions before sending. + registry.listenerContainers.each { container -> + ContainerTestUtils.waitForAssignment(container, broker.partitionsPerTopic) + } + + TEST_WRITER.clear() + + when: "two messages are sent before the consumer polls so they arrive in one batch" + registry.listenerContainers.each { it.stop() } + template.send(new ProducerRecord(TOPIC, "key", "hello-batch")) + template.send(new ProducerRecord(TOPIC, "key", "hello-batch")) + template.flush() + registry.listenerContainers.each { it.start() } + + then: "the listener processes the batch within 15 s" + listener.latch.await(15, TimeUnit.SECONDS) + listener.receivedValues == ["hello-batch", "hello-batch"] + + and: "child.work is a child of spring.consume" + DDSpan produce1Span, produce2Span, springConsumeParent + assertTraces(10, SORT_TRACES_BY_ID) { + trace(1) { + produceSpan(it) + produce1Span = span(0) + } + trace(1) { + produceSpan(it) + produce2Span = span(0) + } + + trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } + trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } + trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } + trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } + + trace(1) { + // consume messages in one batch + springConsumeSpan(it) + springConsumeParent = span(0) + } + // child work span connected to the spring consume span + trace(1) { childWorkSpan(it, springConsumeParent) } + + trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } + trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } + } + + cleanup: + appContext.close() + } + + private static void produceSpan(TraceAssert trace) { + trace.span { + operationName "kafka.produce" + resourceName "Produce Topic $TOPIC" + spanType "queue" + errored false + measured true + parent() + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String } + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + private static void kafkaConsumeSpan(TraceAssert trace, DDSpan parent, int offset) { + trace.span { + operationName "kafka.consume" + resourceName "Consume Topic $TOPIC" + spanType "queue" + errored false + measured true + childOf parent + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String } + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + "$InstrumentationTags.CONSUMER_GROUP" CONSUMER_GROUP + "$InstrumentationTags.OFFSET" offset + "$InstrumentationTags.PARTITION" { Integer } + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { Long } + "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { Long } + defaultTags(true) + } + } + } + + private static void springConsumeSpan(TraceAssert trace) { + trace.span { + operationName "spring.consume" + resourceName "KafkaBatchCoroutineListener.consume" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + parent() + tags { + "$Tags.COMPONENT" "spring-messaging" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + defaultTags(true) + } + } + } + + private static void childWorkSpan(TraceAssert trace, DDSpan parent) { + trace.span { + operationName "child.work" + childOf parent + tags { defaultTags() } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt new file mode 100644 index 00000000000..3272a3371a6 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt @@ -0,0 +1,88 @@ +package listener + +import datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker +import org.springframework.kafka.test.utils.KafkaTestUtils +import org.springframework.stereotype.Component +import java.util.concurrent.CountDownLatch + +const val BATCH_COROUTINE_TOPIC = "batch-coroutine-topic" + +@Configuration(proxyBeanMethods = false) +@EnableKafka +class KafkaBatchCoroutineConfig { + + @Bean(destroyMethod = "destroy") + fun embeddedKafkaBroker(): EmbeddedKafkaBroker { + val broker = EmbeddedKafkaKraftBroker(1, 2, BATCH_COROUTINE_TOPIC) + broker.afterPropertiesSet() + return broker + } + + @Bean + fun producerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaProducerFactory { + val props = HashMap(KafkaTestUtils.producerProps(broker.brokersAsString)) + props["key.serializer"] = StringSerializer::class.java.name + props["value.serializer"] = StringSerializer::class.java.name + return DefaultKafkaProducerFactory(props) + } + + @Bean + fun kafkaTemplate(pf: DefaultKafkaProducerFactory) = KafkaTemplate(pf) + + @Bean + fun consumerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaConsumerFactory { + val props = HashMap(KafkaTestUtils.consumerProps("batch-coroutine-group", "false", broker)) + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + return DefaultKafkaConsumerFactory(props) + } + + @Bean + fun batchListenerContainerFactory( + cf: DefaultKafkaConsumerFactory + ): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = cf + factory.isBatchListener = true + return factory + } + + @Bean + fun kafkaBatchCoroutineListener() = KafkaBatchCoroutineListener() +} + +@Component +class KafkaBatchCoroutineListener { + + val latch = CountDownLatch(1) + val receivedValues = mutableListOf() + + @KafkaListener( + topics = [BATCH_COROUTINE_TOPIC], + containerFactory = "batchListenerContainerFactory" + ) + suspend fun consume(records: List>) { + Exception("consume records").printStackTrace(System.err) + // Create a child span inside the coroutine body. + // It should be linked to spring.consume, which should be linked to kafka.consume. + val childSpan = startSpan("child.work") + records.forEach { receivedValues.add(it.value()) } + childSpan.finish() + latch.countDown() + } +}