Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion

plugins {
id 'org.jetbrains.kotlin.jvm'
}

muzzle {
pass {
group = 'org.springframework'
module = 'spring-messaging'
versions = "[4.0.0.RELEASE,)"
assertInverse = true
// KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
// which is not bundled in spring-messaging but is always present when Spring Kafka is.
extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
}
}

apply from: "$rootDir/gradle/java.gradle"
apply from: "$rootDir/gradle/test-with-kotlin.gradle"

testJvmConstraints {
minJavaVersion = JavaVersion.VERSION_17
Expand All @@ -16,13 +27,24 @@ testJvmConstraints {
addTestSuiteForDir('latestDepTest', 'test')

["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
def kotlinTaskName = name.replace("Groovy", "Kotlin")
tasks.named(name, GroovyCompile) {
configureCompiler(it, 17)
classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
}
}

kotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_1_8
apiVersion = KotlinVersion.KOTLIN_1_9
languageVersion = KotlinVersion.KOTLIN_1_9
}
}

dependencies {
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')

// capture SQS send and receive spans, propagate trace details in messages
Expand All @@ -36,6 +58,32 @@ dependencies {
}
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'

// Spring Kafka + embedded Kafka broker for coroutine tests
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}

// KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation
testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')

testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'

testImplementation libs.kotlin
testImplementation "org.jetbrains.kotlin:kotlin-reflect"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
testImplementation "io.projectreactor:reactor-core:3.+"

testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')

latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;

/**
* Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
* Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
* it during subscription.
*
* <p>When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
* KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
* listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
* AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
* SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
* Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
* and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
* retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
* up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
*/
@AutoService(InstrumenterModule.class)
public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public KotlinAwareHandlerInstrumentation() {
super("spring-messaging", "spring-messaging-4");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
}

@Override
public List<Instrumenter> typeInstrumentations() {
return Collections.singletonList(new KotlinAwareHandlerInstrumentation());
}

@Override
public String instrumentedType() {
return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("doInvoke")),
KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
}

public static class DoInvokeAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return Object result) {
if (result instanceof Publisher) {
InstrumentationContext.get(Publisher.class, Context.class)
.put((Publisher<?>) result, Context.current());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import datadog.trace.agent.test.InstrumentationSpecification
import datadog.trace.agent.test.asserts.TraceAssert
import listener.KafkaBatchCoroutineConfig
import listener.KafkaBatchCoroutineListener
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.core.DDSpan
import org.apache.kafka.clients.producer.ProducerRecord
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.kafka.config.KafkaListenerEndpointRegistry
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.utils.ContainerTestUtils

import java.util.concurrent.TimeUnit

class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification {

private static final String TOPIC = "batch-coroutine-topic"
private static final String CONSUMER_GROUP = "batch-coroutine-group"

def "batch @KafkaListener suspend fun - spans must be in the same trace as kafka.consume"() {
setup:
def appContext = new AnnotationConfigApplicationContext(KafkaBatchCoroutineConfig)
def listener = appContext.getBean(KafkaBatchCoroutineListener)
def template = appContext.getBean(KafkaTemplate)
def broker = appContext.getBean(EmbeddedKafkaBroker)
def registry = appContext.getBean(KafkaListenerEndpointRegistry)

// Wait until listener container has been assigned partitions before sending.
registry.listenerContainers.each { container ->
ContainerTestUtils.waitForAssignment(container, broker.partitionsPerTopic)
}

TEST_WRITER.clear()

when: "two messages are sent before the consumer polls so they arrive in one batch"
registry.listenerContainers.each { it.stop() }
template.send(new ProducerRecord(TOPIC, "key", "hello-batch"))
template.send(new ProducerRecord(TOPIC, "key", "hello-batch"))
template.flush()
registry.listenerContainers.each { it.start() }

then: "the listener processes the batch within 15 s"
listener.latch.await(15, TimeUnit.SECONDS)
listener.receivedValues == ["hello-batch", "hello-batch"]

and: "child.work is a child of spring.consume"
DDSpan produce1Span, produce2Span, springConsumeParent
assertTraces(10, SORT_TRACES_BY_ID) {
trace(1) {
produceSpan(it)
produce1Span = span(0)
}
trace(1) {
produceSpan(it)
produce2Span = span(0)
}

trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }

trace(1) {
// consume messages in one batch
springConsumeSpan(it)
springConsumeParent = span(0)
}
// child work span connected to the spring consume span
trace(1) { childWorkSpan(it, springConsumeParent) }

trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
}

cleanup:
appContext.close()
}

private static void produceSpan(TraceAssert trace) {
trace.span {
operationName "kafka.produce"
resourceName "Produce Topic $TOPIC"
spanType "queue"
errored false
measured true
parent()
tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String }
peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
defaultTags()
}
}
}

private static void kafkaConsumeSpan(TraceAssert trace, DDSpan parent, int offset) {
trace.span {
operationName "kafka.consume"
resourceName "Consume Topic $TOPIC"
spanType "queue"
errored false
measured true
childOf parent
tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String }
peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
"$InstrumentationTags.CONSUMER_GROUP" CONSUMER_GROUP
"$InstrumentationTags.OFFSET" offset
"$InstrumentationTags.PARTITION" { Integer }
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { Long }
"$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { Long }
defaultTags(true)
}
}
}

private static void springConsumeSpan(TraceAssert trace) {
trace.span {
operationName "spring.consume"
resourceName "KafkaBatchCoroutineListener.consume"
spanType DDSpanTypes.MESSAGE_CONSUMER
errored false
measured true
parent()
tags {
"$Tags.COMPONENT" "spring-messaging"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
defaultTags(true)
}
}
}

private static void childWorkSpan(TraceAssert trace, DDSpan parent) {
trace.span {
operationName "child.work"
childOf parent
tags { defaultTags() }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package listener

import datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker
import org.springframework.kafka.test.utils.KafkaTestUtils
import org.springframework.stereotype.Component
import java.util.concurrent.CountDownLatch

const val BATCH_COROUTINE_TOPIC = "batch-coroutine-topic"

@Configuration(proxyBeanMethods = false)
@EnableKafka
class KafkaBatchCoroutineConfig {

@Bean(destroyMethod = "destroy")
fun embeddedKafkaBroker(): EmbeddedKafkaBroker {
val broker = EmbeddedKafkaKraftBroker(1, 2, BATCH_COROUTINE_TOPIC)
broker.afterPropertiesSet()
return broker
}

@Bean
fun producerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaProducerFactory<String, String> {
val props = HashMap<String, Any>(KafkaTestUtils.producerProps(broker.brokersAsString))
props["key.serializer"] = StringSerializer::class.java.name
props["value.serializer"] = StringSerializer::class.java.name
return DefaultKafkaProducerFactory(props)
}

@Bean
fun kafkaTemplate(pf: DefaultKafkaProducerFactory<String, String>) = KafkaTemplate(pf)

@Bean
fun consumerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaConsumerFactory<String, String> {
val props = HashMap<String, Any>(KafkaTestUtils.consumerProps("batch-coroutine-group", "false", broker))
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
return DefaultKafkaConsumerFactory(props)
}

@Bean
fun batchListenerContainerFactory(
cf: DefaultKafkaConsumerFactory<String, String>
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = cf
factory.isBatchListener = true
return factory
}

@Bean
fun kafkaBatchCoroutineListener() = KafkaBatchCoroutineListener()
}

@Component
class KafkaBatchCoroutineListener {

val latch = CountDownLatch(1)
val receivedValues = mutableListOf<String>()

@KafkaListener(
topics = [BATCH_COROUTINE_TOPIC],
containerFactory = "batchListenerContainerFactory"
)
suspend fun consume(records: List<ConsumerRecord<String, String>>) {
Exception("consume records").printStackTrace(System.err)
// Create a child span inside the coroutine body.
// It should be linked to spring.consume, which should be linked to kafka.consume.
val childSpan = startSpan("child.work")
records.forEach { receivedValues.add(it.value()) }
childSpan.finish()
latch.countDown()
}
}
Loading