From 167c6c540810eb44fc876c36b04f05cbbf8eb408 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Tue, 13 Jan 2026 17:05:40 -0800 Subject: [PATCH] Avoid sending empty payloads to the LLMObs endpoint --- .../serialization/FlushingBuffer.java | 13 +++++- .../serialization/FlushingBufferTest.java | 41 +++++++++++++++++++ .../common/writer/PayloadDispatcherImpl.java | 11 ++++- .../writer/ddintake/LLMObsSpanMapper.java | 5 +++ .../ddintake/LLMObsSpanMapperTest.groovy | 30 ++++++++++++++ 5 files changed, 96 insertions(+), 4 deletions(-) diff --git a/communication/src/main/java/datadog/communication/serialization/FlushingBuffer.java b/communication/src/main/java/datadog/communication/serialization/FlushingBuffer.java index d23f5b2358e..332434ad46a 100644 --- a/communication/src/main/java/datadog/communication/serialization/FlushingBuffer.java +++ b/communication/src/main/java/datadog/communication/serialization/FlushingBuffer.java @@ -27,8 +27,12 @@ public boolean isDirty() { @Override public void mark() { - mark = buffer.position(); - ++messageCount; + int current = buffer.position(); + if (current != mark) { + // count only non-empty messages + ++messageCount; + mark = current; + } } @Override @@ -101,4 +105,9 @@ public void reset() { buffer.limit(buffer.capacity()); mark = 0; } + + // for tests only + int getMessageCount() { + return messageCount; + } } diff --git a/communication/src/test/groovy/datadog/communication/serialization/FlushingBufferTest.java b/communication/src/test/groovy/datadog/communication/serialization/FlushingBufferTest.java index 33b1a61a27e..708a5cb8514 100644 --- a/communication/src/test/groovy/datadog/communication/serialization/FlushingBufferTest.java +++ b/communication/src/test/groovy/datadog/communication/serialization/FlushingBufferTest.java @@ -10,4 +10,45 @@ public class FlushingBufferTest { public void testBufferCapacity() { assertEquals(5, new FlushingBuffer(5, (messageCount, buffer) -> {}).capacity()); } + + @Test + public void testMessageCount() { + FlushingBuffer fb = new FlushingBuffer(10, (messageCount, buffer) -> {}); + + // initial counter + assertEquals(0, fb.getMessageCount()); + + fb.mark(); + fb.mark(); + + // counter doesn't change if no data pushed into the buffer + assertEquals(0, fb.getMessageCount()); + + fb.put((byte) 1); + // still zero because the message counter increases on mark + assertEquals(0, fb.getMessageCount()); + + fb.mark(); + // expect increased message counter + assertEquals(1, fb.getMessageCount()); + + fb.mark(); + fb.mark(); + // no change to the counter expected for consecutive mark calls + + fb.putChar('a'); + fb.putChar('b'); + fb.putChar('c'); + // no change to the message counter expected before mark call + assertEquals(1, fb.getMessageCount()); + + fb.mark(); + // expect increased message counter + assertEquals(2, fb.getMessageCount()); + + fb.mark(); + fb.mark(); + // no change to the counter expected for consecutive mark calls + assertEquals(2, fb.getMessageCount()); + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java index a0011216770..77fbd5c5a74 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java @@ -111,13 +111,20 @@ public void accept(int messageCount, ByteBuffer buffer) { mapper.reset(); if (response.success()) { if (log.isDebugEnabled()) { - log.debug("Successfully sent {} traces to the API", messageCount); + log.debug( + "Successfully sent {} traces {} bytes to the API {}", + messageCount, + buffer.position(), + mapper.endpoint()); } healthMetrics.onSend(messageCount, sizeInBytes, response); } else { if (log.isDebugEnabled()) { log.debug( - "Failed to send {} traces of size {} bytes to the API", messageCount, sizeInBytes); + "Failed to send {} traces of size {} bytes to the API {}", + messageCount, + sizeInBytes, + mapper.endpoint()); } healthMetrics.onFailedSend(messageCount, sizeInBytes, response); } diff --git a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java index fce921787e2..214062e6d84 100644 --- a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java +++ b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java @@ -93,6 +93,11 @@ public void map(List> trace, Writable writable) { List> llmobsSpans = trace.stream().filter(LLMObsSpanMapper::isLLMObsSpan).collect(Collectors.toList()); + if (llmobsSpans.isEmpty()) { + // do nothing if no llmobs spans in the trace + return; + } + writable.startMap(3); writable.writeUTF8(EVENT_TYPE); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy index 52a40ff1d26..aa6856fece5 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy @@ -110,6 +110,36 @@ class LLMObsSpanMapperTest extends DDCoreSpecification { spanData["tags"].contains("language:jvm") } + def "test LLMObsSpanMapper writes no spans when none are LLMObs spans"() { + setup: + def mapper = new LLMObsSpanMapper() + def tracer = tracerBuilder().writer(new ListWriter()).build() + + def regularSpan1 = tracer.buildSpan("http.request") + .withResourceName("GET /api/users") + .withTag("http.method", "GET") + .withTag("http.url", "https://example.com/api/users") + .start() + regularSpan1.finish() + + def regularSpan2 = tracer.buildSpan("database.query") + .withResourceName("SELECT * FROM users") + .withTag("db.type", "postgresql") + .start() + regularSpan2.finish() + + def trace = [regularSpan1, regularSpan2] + CapturingByteBufferConsumer sink = new CapturingByteBufferConsumer() + MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(1024, sink)) + + when: + packer.format(trace, mapper) + packer.flush() + + then: + sink.captured == null + } + static class CapturingByteBufferConsumer implements ByteBufferConsumer { ByteBuffer captured