Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import datadog.communication.serialization.msgpack.MsgPackWriter;
import datadog.trace.api.ProcessTags;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.git.GitInfo;
import datadog.trace.api.git.GitInfoProvider;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import java.util.List;

Expand Down Expand Up @@ -38,6 +40,7 @@ public final class SerializingMetricWriter implements MetricWriter {
private static final byte[] HTTP_METHOD = "HTTPMethod".getBytes(ISO_8859_1);
private static final byte[] HTTP_ENDPOINT = "HTTPEndpoint".getBytes(ISO_8859_1);
private static final byte[] SERVICE_SOURCE = "srv_src".getBytes(ISO_8859_1);
private static final byte[] GIT_COMMIT_SHA = "GitCommitSha".getBytes(ISO_8859_1);

// Constant declared here for compile-time folding
public static final int TRISTATE_TRUE = TriState.TRUE.serialValue;
Expand All @@ -49,6 +52,26 @@ public final class SerializingMetricWriter implements MetricWriter {
private final GrowableBuffer buffer;
private long sequence = 0;

static class GitLazyInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I'd move this class outside as this is not really a serialization concern.

static GitLazyInfo INSTANCE = new GitLazyInfo();
final UTF8BytesString commitSha;

private GitLazyInfo() {
final GitInfo gitInfo = GitInfoProvider.INSTANCE.getGitInfo();
if (gitInfo != null && gitInfo.getCommit() != null && gitInfo.getCommit().getSha() != null) {
commitSha = UTF8BytesString.create(gitInfo.getCommit().getSha());
} else {
commitSha = null;
}
}

// @VisibleForTesting
static void reset() {
GitInfoProvider.INSTANCE.invalidateCache();
INSTANCE = new GitLazyInfo();
}
}

public SerializingMetricWriter(WellKnownTags wellKnownTags, Sink sink) {
this(wellKnownTags, sink, 512 * 1024);
}
Expand All @@ -64,7 +87,9 @@ public SerializingMetricWriter(WellKnownTags wellKnownTags, Sink sink, int initi
public void startBucket(int metricCount, long start, long duration) {
final UTF8BytesString processTags = ProcessTags.getTagsForSerialization();
final boolean writeProcessTags = processTags != null;
writer.startMap(7 + (writeProcessTags ? 1 : 0));
final UTF8BytesString gitSha = GitLazyInfo.INSTANCE.commitSha;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Going on the same topic, I think the git commit hash should be passed via its constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializingMetricsWriter is initialised at startup time, reason why I used a lazy singleton. Otherwise I would not have added this complexity

final boolean writeGitCommitSha = gitSha != null;
writer.startMap(7 + (writeProcessTags ? 1 : 0) + (writeGitCommitSha ? 1 : 0));

writer.writeUTF8(RUNTIME_ID);
writer.writeUTF8(wellKnownTags.getRuntimeId());
Expand All @@ -89,6 +114,11 @@ public void startBucket(int metricCount, long start, long duration) {
writer.writeUTF8(processTags);
}

if (writeGitCommitSha) {
writer.writeUTF8(GIT_COMMIT_SHA);
writer.writeUTF8(gitSha);
}

writer.writeUTF8(STATS);

writer.startArray(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.common.metrics

import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
import static datadog.trace.api.git.UserSuppliedGitInfoBuilder.DD_GIT_COMMIT_SHA
import static java.util.concurrent.TimeUnit.MILLISECONDS
import static java.util.concurrent.TimeUnit.SECONDS

Expand All @@ -10,6 +11,8 @@ import datadog.trace.api.Config
import datadog.trace.api.Pair
import datadog.trace.api.ProcessTags
import datadog.trace.api.WellKnownTags
import datadog.trace.api.env.CapturedEnvironment
import datadog.trace.api.git.GitInfoProvider
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString
import datadog.trace.test.util.DDSpecification
import java.nio.ByteBuffer
Expand All @@ -25,8 +28,8 @@ class SerializingMetricWriterTest extends DDSpecification {

def "should produce correct message #iterationIndex with process tags enabled #withProcessTags" () {
setup:
if (withProcessTags) {
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "true")
if (!withProcessTags) {
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
}
ProcessTags.reset()
long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
Expand All @@ -45,6 +48,9 @@ class SerializingMetricWriterTest extends DDSpecification {
then:
sink.validatedInput()

cleanup:
removeSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED)
ProcessTags.reset()

where:
content << [
Expand Down Expand Up @@ -132,8 +138,108 @@ class SerializingMetricWriterTest extends DDSpecification {
withProcessTags << [true, false]
}

def "ServiceSource optional in the payload"() {
setup:
long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")

// Create keys with different combinations of HTTP fields
def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users")
def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null)

def content = [
Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
]

ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)

when:
writer.startBucket(content.size(), startTime, duration)
for (Pair<MetricKey, AggregateMetric> pair : content) {
writer.add(pair.getLeft(), pair.getRight())
}
writer.finishBucket()

then:
sink.validatedInput()
}

def "HTTPMethod and HTTPEndpoint fields are optional in payload"() {
setup:
long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")

class ValidatingSink implements Sink {
// Create keys with different combinations of HTTP fields
def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users")
def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null)
def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders")
def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null)

def content = [
Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L)))
]

ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)

when:
writer.startBucket(content.size(), startTime, duration)
for (Pair<MetricKey, AggregateMetric> pair : content) {
writer.add(pair.getLeft(), pair.getRight())
}
writer.finishBucket()

then:
sink.validatedInput()
// Test passes if validation in ValidatingSink succeeds
// ValidatingSink verifies that map size matches actual number of fields
// and that HTTPMethod/HTTPEndpoint are only present when non-empty
}

def "add git sha commit info when supplied is #userSupplied"() {
setup:
if (userSupplied != null) {
CapturedEnvironment.get().properties.put(DD_GIT_COMMIT_SHA, userSupplied)
}
SerializingMetricWriter.GitLazyInfo.reset()
long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")

// Create keys with different combinations of HTTP fields
def key = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users")

def content = [Pair.of(key, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),]

ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)
Comment on lines +206 to +222
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Having the class passed in the constructor should also help testing.


when:
writer.startBucket(content.size(), startTime, duration)
for (Pair<MetricKey, AggregateMetric> pair : content) {
writer.add(pair.getLeft(), pair.getRight())
}
writer.finishBucket()

then:
sink.validatedInput()

cleanup:
CapturedEnvironment.get().properties.remove(DD_GIT_COMMIT_SHA)
SerializingMetricWriter.GitLazyInfo.reset()

where:
userSupplied << [null, "123456"]
}

static class ValidatingSink implements Sink {

private final WellKnownTags wellKnownTags
private final long startTimeNanos
Expand All @@ -157,7 +263,9 @@ class SerializingMetricWriterTest extends DDSpecification {
void accept(int messageCount, ByteBuffer buffer) {
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(buffer)
int mapSize = unpacker.unpackMapHeader()
assert mapSize == (7 + (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0))
String gitCommitSha = GitInfoProvider.INSTANCE.getGitInfo()?.getCommit()?.getSha()
assert mapSize == (7 + (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0)
+ (gitCommitSha != null ? 1 : 0))
assert unpacker.unpackString() == "RuntimeID"
assert unpacker.unpackString() == wellKnownTags.getRuntimeId() as String
assert unpacker.unpackString() == "Sequence"
Expand All @@ -174,6 +282,10 @@ class SerializingMetricWriterTest extends DDSpecification {
assert unpacker.unpackString() == "ProcessTags"
assert unpacker.unpackString() == ProcessTags.tagsForSerialization as String
}
if (gitCommitSha != null) {
assert unpacker.unpackString() == "GitCommitSha"
assert unpacker.unpackString() == gitCommitSha
}
assert unpacker.unpackString() == "Stats"
int outerLength = unpacker.unpackArrayHeader()
assert outerLength == 1
Expand Down Expand Up @@ -278,69 +390,4 @@ class SerializingMetricWriterTest extends DDSpecification {
return validated
}
}

def "ServiceSource optional in the payload"() {
setup:
long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")

// Create keys with different combinations of HTTP fields
def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users")
def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null)

def content = [
Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
]

ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)

when:
writer.startBucket(content.size(), startTime, duration)
for (Pair<MetricKey, AggregateMetric> pair : content) {
writer.add(pair.getLeft(), pair.getRight())
}
writer.finishBucket()

then:
sink.validatedInput()
}

def "HTTPMethod and HTTPEndpoint fields are optional in payload"() {
setup:
long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")

// Create keys with different combinations of HTTP fields
def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users")
def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null)
def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders")
def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null)

def content = [
Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L)))
]

ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)

when:
writer.startBucket(content.size(), startTime, duration)
for (Pair<MetricKey, AggregateMetric> pair : content) {
writer.add(pair.getLeft(), pair.getRight())
}
writer.finishBucket()

then:
sink.validatedInput()
// Test passes if validation in ValidatingSink succeeds
// ValidatingSink verifies that map size matches actual number of fields
// and that HTTPMethod/HTTPEndpoint are only present when non-empty
}
}
Loading