Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ public boolean isDirty() {

@Override
public void mark() {
mark = buffer.position();
++messageCount;
int current = buffer.position();
if (current != mark) {
// count only non-empty messages
++messageCount;
mark = current;
}
}

@Override
Expand Down Expand Up @@ -101,4 +105,9 @@ public void reset() {
buffer.limit(buffer.capacity());
mark = 0;
}

// for tests only
int getMessageCount() {
return messageCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,45 @@ public class FlushingBufferTest {
public void testBufferCapacity() {
assertEquals(5, new FlushingBuffer(5, (messageCount, buffer) -> {}).capacity());
}

@Test
public void testMessageCount() {
FlushingBuffer fb = new FlushingBuffer(10, (messageCount, buffer) -> {});

// initial counter
assertEquals(0, fb.getMessageCount());

fb.mark();
fb.mark();

// counter doesn't change if no data pushed into the buffer
assertEquals(0, fb.getMessageCount());

fb.put((byte) 1);
// still zero because the message counter increases on mark
assertEquals(0, fb.getMessageCount());

fb.mark();
// expect increased message counter
assertEquals(1, fb.getMessageCount());

fb.mark();
fb.mark();
// no change to the counter expected for consecutive mark calls

fb.putChar('a');
fb.putChar('b');
fb.putChar('c');
// no change to the message counter expected before mark call
assertEquals(1, fb.getMessageCount());

fb.mark();
// expect increased message counter
assertEquals(2, fb.getMessageCount());

fb.mark();
fb.mark();
// no change to the counter expected for consecutive mark calls
assertEquals(2, fb.getMessageCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,20 @@ public void accept(int messageCount, ByteBuffer buffer) {
mapper.reset();
if (response.success()) {
if (log.isDebugEnabled()) {
log.debug("Successfully sent {} traces to the API", messageCount);
log.debug(
"Successfully sent {} traces {} bytes to the API {}",
messageCount,
buffer.position(),
mapper.endpoint());
}
healthMetrics.onSend(messageCount, sizeInBytes, response);
} else {
if (log.isDebugEnabled()) {
log.debug(
"Failed to send {} traces of size {} bytes to the API", messageCount, sizeInBytes);
"Failed to send {} traces of size {} bytes to the API {}",
messageCount,
sizeInBytes,
mapper.endpoint());
}
healthMetrics.onFailedSend(messageCount, sizeInBytes, response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
List<? extends CoreSpan<?>> llmobsSpans =
trace.stream().filter(LLMObsSpanMapper::isLLMObsSpan).collect(Collectors.toList());

if (llmobsSpans.isEmpty()) {
// do nothing if no llmobs spans in the trace
return;
}

writable.startMap(3);

writable.writeUTF8(EVENT_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,36 @@ class LLMObsSpanMapperTest extends DDCoreSpecification {
spanData["tags"].contains("language:jvm")
}

def "test LLMObsSpanMapper writes no spans when none are LLMObs spans"() {
setup:
def mapper = new LLMObsSpanMapper()
def tracer = tracerBuilder().writer(new ListWriter()).build()

def regularSpan1 = tracer.buildSpan("http.request")
.withResourceName("GET /api/users")
.withTag("http.method", "GET")
.withTag("http.url", "https://example.com/api/users")
.start()
regularSpan1.finish()

def regularSpan2 = tracer.buildSpan("database.query")
.withResourceName("SELECT * FROM users")
.withTag("db.type", "postgresql")
.start()
regularSpan2.finish()

def trace = [regularSpan1, regularSpan2]
CapturingByteBufferConsumer sink = new CapturingByteBufferConsumer()
MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(1024, sink))

when:
packer.format(trace, mapper)
packer.flush()

then:
sink.captured == null
}

static class CapturingByteBufferConsumer implements ByteBufferConsumer {

ByteBuffer captured
Expand Down