diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java index 28151f88864..5f567908254 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java @@ -3,4 +3,12 @@ public class FeatureFlaggingConfig { public static final String FLAGGING_PROVIDER_ENABLED = "experimental.flagging.provider.enabled"; + + /** + * Killswitch for the EVP {@code flagevaluation} emission path. Default: enabled. Disabling it + * turns off EVP flag-evaluation counts while leaving the OTel {@code feature_flag.evaluations} + * metric path untouched. Maps to {@code DD_FLAGGING_EVALUATION_COUNTS_ENABLED}. + */ + public static final String FLAGGING_EVALUATION_COUNTS_ENABLED = + "flagging.evaluation.counts.enabled"; } diff --git a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java index 752adb8899d..6001a221a13 100644 --- a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java +++ b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java @@ -66,7 +66,9 @@ public enum AgentThread { LLMOBS_EVALS_PROCESSOR("dd-llmobs-evals-processor"), - FEATURE_FLAG_EXPOSURE_PROCESSOR("dd-ffe-exposure-processor"); + FEATURE_FLAG_EXPOSURE_PROCESSOR("dd-ffe-exposure-processor"), + + FEATURE_FLAG_EVALUATION_PROCESSOR("dd-ffe-evaluation-processor"); public final String threadName; diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index e2d171c3c3f..403f471dffc 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -1505,6 +1505,14 @@ "aliases": [] } ], + "DD_FLAGGING_EVALUATION_COUNTS_ENABLED": [ + { + "version": "A", + "type": "boolean", + "default": "true", + "aliases": [] + } + ], "DD_FORCE_CLEAR_TEXT_HTTP_FOR_INTAKE_CLIENT": [ { "version": "A", diff --git a/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java b/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java index 02689767bad..d50bdaf92ee 100644 --- a/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java +++ b/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java @@ -2,6 +2,8 @@ import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.config.FeatureFlaggingConfig; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,6 +13,7 @@ public class FeatureFlaggingSystem { private static volatile RemoteConfigService CONFIG_SERVICE; private static volatile ExposureWriter EXPOSURE_WRITER; + private static volatile FlagEvaluationWriter FLAG_EVAL_WRITER; private FeatureFlaggingSystem() {} @@ -27,10 +30,31 @@ public static void start(final SharedCommunicationObjects sco) { EXPOSURE_WRITER = new ExposureWriterImpl(sco, config); EXPOSURE_WRITER.init(); + // EVP flagevaluation writer — gated by the killswitch + // DD_FLAGGING_EVALUATION_COUNTS_ENABLED (default: on), read through the tracer config system. + final boolean evalCountsEnabled = + config + .configProvider() + .getBoolean(FeatureFlaggingConfig.FLAGGING_EVALUATION_COUNTS_ENABLED, true); + if (evalCountsEnabled) { + final FlagEvaluationWriterImpl evalWriter = new FlagEvaluationWriterImpl(sco, config); + evalWriter.start(); // registers with FeatureFlaggingGateway + FLAG_EVAL_WRITER = evalWriter; + LOGGER.debug("Flag evaluation EVP writer started"); + } else { + LOGGER.debug( + "Flag evaluation EVP writer disabled ({}=false)", + FeatureFlaggingConfig.FLAGGING_EVALUATION_COUNTS_ENABLED); + } + LOGGER.debug("Feature Flagging system started"); } public static void stop() { + if (FLAG_EVAL_WRITER != null) { + FLAG_EVAL_WRITER.close(); // also deregisters from gateway + FLAG_EVAL_WRITER = null; + } if (EXPOSURE_WRITER != null) { EXPOSURE_WRITER.close(); EXPOSURE_WRITER = null; diff --git a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java index 91c0aafdc7a..762ffe5997c 100644 --- a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java +++ b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java @@ -387,11 +387,15 @@ private static ProviderEvaluation resolveVariant( + e.getMessage()); } + // Stamp eval-time at the resolution point so first/last_evaluation reflect evaluation time, + // not hook-fire time. Passed to the hook via provider metadata "dd.eval.timestamp_ms". + final long evalTimestampMs = System.currentTimeMillis(); final ImmutableMetadata.ImmutableMetadataBuilder metadataBuilder = ImmutableMetadata.builder() .addString("flagKey", flag.key) .addString("variationType", flag.variationType.name()) - .addString("allocationKey", allocation.key); + .addString("allocationKey", allocation.key) + .addLong("dd.eval.timestamp_ms", evalTimestampMs); final ProviderEvaluation result = ProviderEvaluation.builder() .value(mappedValue) diff --git a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalEVPHook.java b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalEVPHook.java new file mode 100644 index 00000000000..089d77044ac --- /dev/null +++ b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalEVPHook.java @@ -0,0 +1,115 @@ +package datadog.trace.api.openfeature; + +import datadog.trace.api.featureflag.FeatureFlaggingGateway; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; +import dev.openfeature.sdk.FlagEvaluationDetails; +import dev.openfeature.sdk.Hook; +import dev.openfeature.sdk.HookContext; +import dev.openfeature.sdk.ImmutableMetadata; +import java.util.Collections; +import java.util.Map; + +/** + * OpenFeature {@code Hook} that captures flag evaluation events for EVP {@code flagevaluation} + * emission. + * + *

Contract: {@code finallyAfter} does ONLY cheap scalar extraction + a non-blocking offer to the + * writer's bounded queue. No inline aggregation on the hook thread. + * + *

This hook is registered alongside the existing OTel {@link FlagEvalHook} — it does NOT replace + * it (the existing OTel metrics hook is left unchanged). + * + *

The writer is resolved lazily from {@link FeatureFlaggingGateway#getFlagEvalWriter()} on each + * call, so the hook is always safe to register — if the writer is absent (killswitch off or not yet + * started) it is a no-op. + */ +class FlagEvalEVPHook implements Hook { + + /** + * Singleton instance: always registered when the provider is created; harmless when writer=null + * (killswitch off or not yet started). + */ + static final FlagEvalEVPHook INSTANCE = new FlagEvalEVPHook<>(null); + + /** + * Optional injected writer (test-only). When non-null, bypasses the gateway lookup. Production + * instances use {@code null} (resolved via gateway). + */ + private final FlagEvaluationWriter injectedWriter; + + /** Production constructor — resolves writer from gateway. */ + FlagEvalEVPHook() { + this.injectedWriter = null; + } + + /** Test-only constructor — injects a writer directly, bypassing the gateway. */ + FlagEvalEVPHook(final FlagEvaluationWriter writer) { + this.injectedWriter = writer; + } + + /** + * Cheap capture + non-blocking enqueue only. Runs at the {@code finally} stage so it covers + * success, error, and default-value paths. + */ + @Override + public void finallyAfter( + final HookContext ctx, + final FlagEvaluationDetails details, + final Map hints) { + // Resolve writer: prefer injected (test), then gateway + final FlagEvaluationWriter w = + injectedWriter != null ? injectedWriter : FeatureFlaggingGateway.getFlagEvalWriter(); + if (w == null || details == null) { + return; + } + try { + // Cheap scalar extraction — no JSON, no map lookups beyond metadata.asMap() + final String flagKey = details.getFlagKey(); + final ImmutableMetadata metadata = details.getFlagMetadata(); + + // allocationKey: "allocationKey" (camelCase) — consistent with FlagEvalHook.java + final String allocationKey = metadata != null ? metadata.getString("allocationKey") : null; + + // eval-time: from flag metadata "dd.eval.timestamp_ms" (Long), fallback to hook-fire time. + // ImmutableMetadata.getLong available since sdk 1.4+. + final Long evalTimeObj = metadata != null ? metadata.getLong("dd.eval.timestamp_ms") : null; + final long evalTimeMs = evalTimeObj != null ? evalTimeObj : System.currentTimeMillis(); + + // variant: the OpenFeature variant key (same source as the OTel FlagEvalHook), NOT the + // evaluated value. A null variant means no variant was selected (runtime default). + final String variant = details.getVariant(); + + // error message: prefer the human-readable message; fall back to the error code name when + // the message is empty (some providers populate only the code). null on success. + String errorMessage = details.getErrorMessage(); + if ((errorMessage == null || errorMessage.isEmpty()) && details.getErrorCode() != null) { + errorMessage = details.getErrorCode().name(); + } + if (errorMessage != null && errorMessage.isEmpty()) { + errorMessage = null; + } + + // targetingKey from evaluation context + final String targetingKey = + ctx != null && ctx.getCtx() != null ? ctx.getCtx().getTargetingKey() : null; + + // attrs: flatten EvaluationContext attributes for the full-tier canonical key + final Map attrs = extractAttrs(ctx); + + w.enqueue( + new FlagEvalEvent( + flagKey, variant, allocationKey, targetingKey, errorMessage, evalTimeMs, attrs)); + } catch (Exception e) { + // Never let EVP recording break flag evaluation + } + } + + /** Extracts converted, flattened attributes from the evaluation context. */ + private Map extractAttrs(final HookContext ctx) { + if (ctx == null || ctx.getCtx() == null) { + return Collections.emptyMap(); + } + return DDEvaluator.flattenContext(ctx.getCtx()); + } +} diff --git a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java index c492ef49c69..e43cb7e7de3 100644 --- a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java +++ b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java @@ -16,6 +16,7 @@ import dev.openfeature.sdk.exceptions.OpenFeatureError; import dev.openfeature.sdk.exceptions.ProviderNotReadyError; import java.lang.reflect.Constructor; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -168,10 +169,14 @@ private Evaluator buildEvaluator() throws Exception { @Override public List getProviderHooks() { - if (flagEvalHook == null) { - return Collections.emptyList(); + final List hooks = new ArrayList<>(2); + if (flagEvalHook != null) { + hooks.add(flagEvalHook); } - return Collections.singletonList(flagEvalHook); + // EVP flagevaluation hook: always registered; no-op when writer is absent (killswitch off). + // Writer is resolved lazily from FeatureFlaggingGateway.getFlagEvalWriter() on each call. + hooks.add(FlagEvalEVPHook.INSTANCE); + return Collections.unmodifiableList(hooks); } @Override diff --git a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalEVPHookTest.java b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalEVPHookTest.java new file mode 100644 index 00000000000..599d47f8da0 --- /dev/null +++ b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalEVPHookTest.java @@ -0,0 +1,366 @@ +package datadog.trace.api.openfeature; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; +import dev.openfeature.sdk.ErrorCode; +import dev.openfeature.sdk.FlagEvaluationDetails; +import dev.openfeature.sdk.FlagValueType; +import dev.openfeature.sdk.HookContext; +import dev.openfeature.sdk.ImmutableMetadata; +import dev.openfeature.sdk.MutableContext; +import dev.openfeature.sdk.Reason; +import dev.openfeature.sdk.Value; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link FlagEvalEVPHook}: cheap capture, non-blocking enqueue, eval-time metadata, + * absent-variant detection, and killswitch-via-writer-null behaviour. + */ +class FlagEvalEVPHookTest { + + // ---- helpers ---- + + /** + * Creates a writer that captures the enqueued event for assertion. Uses an anonymous class since + * FlagEvaluationWriter has multiple abstract methods. + */ + private FlagEvaluationWriter capturingWriter(final AtomicReference ref) { + return new FlagEvaluationWriter() { + @Override + public void enqueue(final FlagEvalEvent event) { + ref.set(event); + } + + @Override + public void start() {} + + @Override + public void close() {} + }; + } + + private static FlagEvalEVPHook hookWithWriter(final FlagEvaluationWriter writer) { + return new FlagEvalEVPHook<>(writer); + } + + private static FlagEvaluationDetails details( + final String flagKey, + final Object value, + final String variant, + final String reason, + final ImmutableMetadata metadata) { + final FlagEvaluationDetails.FlagEvaluationDetailsBuilder builder = + FlagEvaluationDetails.builder().flagKey(flagKey).value(value).reason(reason); + if (variant != null) { + builder.variant(variant); + } + if (metadata != null) { + builder.flagMetadata(metadata); + } + return builder.build(); + } + + private static HookContext hookCtxWithTargetingKey( + final String flagKey, final String targetingKey) { + final MutableContext ctx = new MutableContext(targetingKey); + return HookContext.builder() + .flagKey(flagKey) + .type(FlagValueType.STRING) + .defaultValue("default") + .ctx(ctx) + .build(); + } + + // ---- test: hook calls writer.enqueue once with flagKey, variant, allocationKey ---- + + @Test + void finallyAfterEnqueuesEventWithAllBasicFields() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + details( + "my-flag", + "on-value", + "on", + Reason.TARGETING_MATCH.name(), + ImmutableMetadata.builder().addString("allocationKey", "alloc-1").build()); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get(), "writer.enqueue must be called once"); + final FlagEvalEvent e = captured.get(); + assertEquals("my-flag", e.flagKey); + assertEquals("on", e.variant, "variant must be the OpenFeature variant key"); + assertEquals("alloc-1", e.allocationKey); + } + + // ---- variant comes from details.getVariant(), NOT details.getValue() ---- + + @Test + void variantIsTheVariantKeyNotTheEvaluatedValue() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + // value and variant DIFFER, so a value-vs-variant mistake is detectable. + final FlagEvaluationDetails det = + details( + "g1-flag", + "the-evaluated-value", // value + "the-variant-key", // variant + Reason.TARGETING_MATCH.name(), + null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "the-variant-key", + captured.get().variant, + "variant must be sourced from details.getVariant(), not details.getValue()"); + } + + // ---- test: evalTimeMs from metadata "dd.eval.timestamp_ms" ---- + + @Test + void evalTimeMsComesFromMetadataWhenPresent() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final long expectedTimestamp = 1_700_000_000_000L; + final FlagEvaluationDetails det = + details( + "ts-flag", + "v", + "v", + Reason.SPLIT.name(), + ImmutableMetadata.builder() + .addString("allocationKey", "a") + .addLong("dd.eval.timestamp_ms", expectedTimestamp) + .build()); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + expectedTimestamp, + captured.get().evalTimeMs, + "evalTimeMs must come from dd.eval.timestamp_ms metadata when present"); + } + + // ---- test: evalTimeMs falls back to System.currentTimeMillis() when absent ---- + + @Test + void evalTimeMsFallsBackToCurrentTimeWhenMetadataAbsent() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final long before = System.currentTimeMillis(); + final FlagEvaluationDetails det = + details("ts-flag", "v", "v", Reason.SPLIT.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + final long after = System.currentTimeMillis(); + assertNotNull(captured.get()); + final long ts = captured.get().evalTimeMs; + assertTrue( + ts >= before && ts <= after, + "evalTimeMs must fall back to hook-fire time when metadata absent. got: " + ts); + } + + // ---- test: absent variant -> variant is null -> runtime default ---- + + @Test + void absentVariantProducesNullVariant() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + // A runtime default returns the default value but no variant. + final FlagEvaluationDetails det = + details("def-flag", "default-value", null, Reason.DEFAULT.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertNull(captured.get().variant, "Absent variant must stay null (runtime default)"); + } + + // ---- test: error message captured from details (error object support) ---- + + @Test + void errorMessageCapturedFromDetails() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + FlagEvaluationDetails.builder() + .flagKey("err-flag") + .value("default") + .reason(Reason.ERROR.name()) + .errorCode(ErrorCode.TYPE_MISMATCH) + .errorMessage("value does not match declared type") + .build(); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "value does not match declared type", + captured.get().errorMessage, + "errorMessage must be captured from the evaluation details"); + } + + // ---- test: error code used as fallback message when error message is empty ---- + + @Test + void errorCodeUsedAsFallbackWhenMessageEmpty() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + FlagEvaluationDetails.builder() + .flagKey("err-flag") + .value("default") + .reason(Reason.ERROR.name()) + .errorCode(ErrorCode.FLAG_NOT_FOUND) + .build(); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "FLAG_NOT_FOUND", + captured.get().errorMessage, + "error code name must be used when no error message is present"); + } + + // ---- test: success path has no error message ---- + + @Test + void successPathHasNullErrorMessage() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + details("ok-flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertNull(captured.get().errorMessage, "success path must have no error message"); + } + + // ---- test: hook does NO aggregation on the hook thread ---- + + @Test + void finallyAfterOnlyCallsEnqueueNoOtherWriterMethods() { + final FlagEvaluationWriter writer = mock(FlagEvaluationWriter.class); + final FlagEvalEVPHook hook = hookWithWriter(writer); + + final FlagEvaluationDetails det = + details("flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + // Exactly one enqueue call, no start/close/aggregate + verify(writer, times(1)).enqueue(any(FlagEvalEvent.class)); + verify(writer, never()).close(); + verify(writer, never()).start(); + } + + // ---- test: writer=null -> no-op (killswitch off / not yet started) ---- + + @Test + void writerNullIsNoOp() { + final FlagEvalEVPHook hook = hookWithWriter(null); + final FlagEvaluationDetails det = + details("flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + // Must not throw; nothing is enqueued + hook.finallyAfter(null, det, Collections.emptyMap()); + } + + // ---- test: details=null -> no-op ---- + + @Test + void detailsNullIsNoOp() { + final FlagEvaluationWriter writer = mock(FlagEvaluationWriter.class); + final FlagEvalEVPHook hook = hookWithWriter(writer); + + // Should not throw + hook.finallyAfter(null, null, Collections.emptyMap()); + + verifyNoInteractions(writer); + } + + // ---- test: targetingKey extracted from evaluation context ---- + + @Test + void targetingKeyExtractedFromContext() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + details("ctx-flag", "v", "v", Reason.SPLIT.name(), null); + + final HookContext hookCtx = hookCtxWithTargetingKey("ctx-flag", "user-42"); + + hook.finallyAfter(hookCtx, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "user-42", + captured.get().targetingKey, + "targetingKey must be extracted from the evaluation context"); + } + + @Test + void contextAttributesAreFlattenedAndConvertedBeforeEnqueue() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalEVPHook hook = hookWithWriter(capturingWriter(captured)); + + final Map profile = new HashMap<>(); + profile.put("tier", "gold"); + final Map attributes = new HashMap<>(); + attributes.put("score", 42); + attributes.put("profile", profile); + final MutableContext context = + new MutableContext(Value.objectToValue(attributes).asStructure().asMap()); + context.setTargetingKey("user-42"); + + final HookContext hookCtx = + HookContext.builder() + .flagKey("ctx-flag") + .type(FlagValueType.STRING) + .defaultValue("default") + .ctx(context) + .build(); + final FlagEvaluationDetails det = + details("ctx-flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + hook.finallyAfter(hookCtx, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals(42, captured.get().attrs.get("score")); + assertEquals("gold", captured.get().attrs.get("profile.tier")); + assertTrue( + captured.get().attrs.values().stream().noneMatch(Value.class::isInstance), + "context attrs must contain converted scalar values, not OpenFeature Value wrappers"); + } +} diff --git a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java index 27d4dd5d2b5..e9394d8934c 100644 --- a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java +++ b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java @@ -330,8 +330,10 @@ public void testGetProviderHooksReturnsFlagEvalHook() { Provider provider = new Provider(new Options().initTimeout(10, MILLISECONDS), mock(Evaluator.class)); List hooks = provider.getProviderHooks(); - assertThat(hooks.size(), equalTo(1)); + // Two hooks: OTel FlagEvalHook (index 0) + EVP FlagEvalEVPHook (index 1) + assertThat(hooks.size(), equalTo(2)); assertThat(hooks.get(0) instanceof FlagEvalHook, equalTo(true)); + assertThat(hooks.get(1) instanceof FlagEvalEVPHook, equalTo(true)); } @Test @@ -343,9 +345,8 @@ public void testShutdownCleansUpMetrics() throws Exception { provider.initialize(null); provider.shutdown(); verify(evaluator).shutdown(); - // After shutdown, getProviderHooks still returns a list (hook is still present but metrics is - // shut down) - assertThat(provider.getProviderHooks().size(), equalTo(1)); + // After shutdown, getProviderHooks still returns a list with both OTel + EVP hooks + assertThat(provider.getProviderHooks().size(), equalTo(2)); } public interface EvaluateMethod { diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java index b9d73ffa7ab..8eec08c19fa 100644 --- a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java +++ b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java @@ -1,6 +1,7 @@ package datadog.trace.api.featureflag; import datadog.trace.api.featureflag.exposure.ExposureEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; import datadog.trace.api.featureflag.ufc.v1.ServerConfiguration; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -19,6 +20,15 @@ public interface ExposureListener extends Consumer {} private static final AtomicReference CURRENT_CONFIG = new AtomicReference<>(); + /** + * The active EVP flagevaluation writer. Registered by {@code FlagEvaluationWriterImpl.start()} + * when the killswitch {@code DD_FLAGGING_EVALUATION_COUNTS_ENABLED} is on (default). Read by + * {@code FlagEvalEVPHook} to route evaluations into the two-tier aggregator. {@code null} when + * the EVP path is disabled. + */ + private static final AtomicReference FLAG_EVAL_WRITER = + new AtomicReference<>(); + private FeatureFlaggingGateway() {} public static void addConfigListener(final ConfigListener listener) { @@ -49,4 +59,23 @@ public static void removeExposureListener(final ExposureListener listener) { public static void dispatch(final ExposureEvent event) { EXPOSURE_LISTENERS.forEach(listener -> listener.accept(event)); } + + /** + * Registers the active EVP flagevaluation writer. Called by {@code + * FlagEvaluationWriterImpl.start()} when the feature is enabled. Replaces any previously + * registered writer. + * + * @param writer the writer to register, or {@code null} to deregister + */ + public static void setFlagEvalWriter(final FlagEvaluationWriter writer) { + FLAG_EVAL_WRITER.set(writer); + } + + /** + * Returns the active EVP flagevaluation writer, or {@code null} when disabled (killswitch off or + * not yet started). + */ + public static FlagEvaluationWriter getFlagEvalWriter() { + return FLAG_EVAL_WRITER.get(); + } } diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvalEvent.java b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvalEvent.java new file mode 100644 index 00000000000..83a7c14148b --- /dev/null +++ b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvalEvent.java @@ -0,0 +1,76 @@ +package datadog.trace.api.featureflag.flagevaluation; + +import java.util.Collections; +import java.util.Map; + +/** + * Lightweight data record capturing a single flag evaluation for EVP flagevaluation emission. + * + *

This is the currency passed from the {@code FlagEvalEVPHook} (feature-flagging-api) to the + * {@code FlagEvaluationWriter} (feature-flagging-lib) via a non-blocking bounded queue. + * + *

All fields captured at hook-fire time on the evaluation thread. No aggregation happens here. + */ +public final class FlagEvalEvent { + + /** The feature flag key. Never null. */ + public final String flagKey; + + /** + * The evaluated variant/value as a string. {@code null} means the default value was returned + * (runtime default). + */ + public final String variant; + + /** The allocation key from flag metadata ("allocationKey"). May be null. */ + public final String allocationKey; + + /** The targeting key from the evaluation context. May be null. */ + public final String targetingKey; + + /** + * The evaluation error message when the evaluation failed, else {@code null}. Sourced from the + * OpenFeature evaluation details (error message, falling back to the error code). + */ + public final String errorMessage; + + /** + * Evaluation timestamp in milliseconds since epoch. Stamped at eval-entry time from flag metadata + * key {@code "dd.eval.timestamp_ms"}, or falls back to hook-fire time when absent. This ensures + * first/last_evaluation reflect evaluation time, not hook-fire time. + */ + public final long evalTimeMs; + + /** + * Flattened evaluation context attributes. Used for the full-tier canonical context key. May be + * empty but never null. + */ + public final Map attrs; + + public FlagEvalEvent( + final String flagKey, + final String variant, + final String allocationKey, + final String targetingKey, + final long evalTimeMs, + final Map attrs) { + this(flagKey, variant, allocationKey, targetingKey, null, evalTimeMs, attrs); + } + + public FlagEvalEvent( + final String flagKey, + final String variant, + final String allocationKey, + final String targetingKey, + final String errorMessage, + final long evalTimeMs, + final Map attrs) { + this.flagKey = flagKey; + this.variant = variant; + this.allocationKey = allocationKey; + this.targetingKey = targetingKey; + this.errorMessage = errorMessage; + this.evalTimeMs = evalTimeMs; + this.attrs = attrs != null ? attrs : Collections.emptyMap(); + } +} diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvaluationWriter.java b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvaluationWriter.java new file mode 100644 index 00000000000..9bfe40d11ca --- /dev/null +++ b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvaluationWriter.java @@ -0,0 +1,27 @@ +package datadog.trace.api.featureflag.flagevaluation; + +/** + * Defines an EVP flagevaluation writer responsible for aggregating flag evaluation events and + * flushing them to the EVP proxy. + * + *

Implementations must use a background thread (serializing handler) for aggregation and + * transport. The {@link #enqueue(FlagEvalEvent)} method must be non-blocking and callable from the + * OpenFeature hook thread without backpressure. + */ +public interface FlagEvaluationWriter extends AutoCloseable { + + /** + * Non-blocking enqueue of a flag evaluation event. May silently drop the event if the internal + * bounded queue is full (best-effort, observable via drop counter). + * + * @param event the flag evaluation event captured at hook-fire time + */ + void enqueue(FlagEvalEvent event); + + /** Starts the background serializing thread. Must be called once after construction. */ + void start(); + + /** Stops the background thread and releases resources. */ + @Override + void close(); +} diff --git a/products/feature-flagging/feature-flagging-lib/build.gradle.kts b/products/feature-flagging/feature-flagging-lib/build.gradle.kts index 2888ba1b25c..68a1ec654c5 100644 --- a/products/feature-flagging/feature-flagging-lib/build.gradle.kts +++ b/products/feature-flagging/feature-flagging-lib/build.gradle.kts @@ -1,6 +1,7 @@ plugins { `java-library` id("dd-trace-java.version-file") + id("me.champeau.jmh") } apply(from = "$rootDir/gradle/java.gradle") @@ -27,6 +28,14 @@ dependencies { testImplementation(libs.bundles.junit5) testImplementation(libs.bundles.mockito) + testImplementation("com.github.java-json-tools:json-schema-validator:2.2.10") { + exclude(group = "com.google.guava", module = "guava") + } testImplementation(project(":utils:test-utils")) testImplementation(project(":dd-java-agent:testing")) } + +jmh { + jmhVersion = libs.versions.jmh.get() + duplicateClassesStrategy = DuplicatesStrategy.EXCLUDE +} diff --git a/products/feature-flagging/feature-flagging-lib/gradle.lockfile b/products/feature-flagging/feature-flagging-lib/gradle.lockfile index 89e7e49d3f3..4c41577a4c0 100644 --- a/products/feature-flagging/feature-flagging-lib/gradle.lockfile +++ b/products/feature-flagging/feature-flagging-lib/gradle.lockfile @@ -1,57 +1,73 @@ # This is a Gradle generated file for dependency locking. # Manual edits can break the build and are not advised. # This file is expected to be part of source control. -cafe.cryptography:curve25519-elisabeth:0.1.0=runtimeClasspath,testRuntimeClasspath -cafe.cryptography:ed25519-elisabeth:0.1.0=runtimeClasspath,testRuntimeClasspath -ch.qos.logback:logback-classic:1.2.13=testCompileClasspath,testRuntimeClasspath -ch.qos.logback:logback-core:1.2.13=testCompileClasspath,testRuntimeClasspath -com.blogspot.mydailyjava:weak-lock-free:0.17=testCompileClasspath,testRuntimeClasspath -com.datadoghq.okhttp3:okhttp:3.12.15=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -com.datadoghq.okio:okio:1.17.6=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -com.datadoghq:dd-instrument-java:0.0.4=testCompileClasspath,testRuntimeClasspath -com.datadoghq:dd-javac-plugin-client:0.2.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -com.datadoghq:java-dogstatsd-client:4.4.5=runtimeClasspath,testRuntimeClasspath -com.datadoghq:sketches-java:0.8.3=runtimeClasspath,testRuntimeClasspath +cafe.cryptography:curve25519-elisabeth:0.1.0=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +cafe.cryptography:ed25519-elisabeth:0.1.0=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +ch.qos.logback:logback-classic:1.2.13=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +ch.qos.logback:logback-core:1.2.13=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.blogspot.mydailyjava:weak-lock-free:0.17=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq.okhttp3:okhttp:3.12.15=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq.okio:okio:1.17.6=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq:dd-instrument-java:0.0.4=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq:dd-javac-plugin-client:0.2.2=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.datadoghq:java-dogstatsd-client:4.4.5=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.datadoghq:sketches-java:0.8.3=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.fasterxml.jackson.core:jackson-annotations:2.2.3=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.core:jackson-core:2.2.3=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.core:jackson-databind:2.2.3=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.fge:btf:1.2=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.fge:msg-simple:1.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.fge:uri-template:0.9=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.java-json-tools:jackson-coreutils:1.9=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.java-json-tools:json-schema-core:1.2.10=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.java-json-tools:json-schema-validator:2.2.10=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath com.github.javaparser:javaparser-core:3.25.6=codenarc -com.github.jnr:jffi:1.3.15=runtimeClasspath,testRuntimeClasspath -com.github.jnr:jnr-a64asm:1.0.0=runtimeClasspath,testRuntimeClasspath -com.github.jnr:jnr-constants:0.10.4=runtimeClasspath,testRuntimeClasspath -com.github.jnr:jnr-enxio:0.32.20=runtimeClasspath,testRuntimeClasspath -com.github.jnr:jnr-ffi:2.2.19=runtimeClasspath,testRuntimeClasspath -com.github.jnr:jnr-posix:3.1.22=runtimeClasspath,testRuntimeClasspath -com.github.jnr:jnr-unixsocket:0.38.25=runtimeClasspath,testRuntimeClasspath -com.github.jnr:jnr-x86asm:1.0.2=runtimeClasspath,testRuntimeClasspath -com.github.spotbugs:spotbugs-annotations:4.9.8=compileClasspath,spotbugs,testCompileClasspath,testRuntimeClasspath +com.github.jnr:jffi:1.3.15=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-a64asm:1.0.0=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-constants:0.10.4=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-enxio:0.32.20=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-ffi:2.2.19=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-posix:3.1.22=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-unixsocket:0.38.25=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.jnr:jnr-x86asm:1.0.2=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +com.github.spotbugs:spotbugs-annotations:4.9.8=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,spotbugs,testCompileClasspath,testRuntimeClasspath com.github.spotbugs:spotbugs:4.9.8=spotbugs com.github.stephenc.jcip:jcip-annotations:1.0-1=spotbugs -com.google.code.findbugs:jsr305:3.0.2=compileClasspath,spotbugs,testCompileClasspath,testRuntimeClasspath +com.google.code.findbugs:jsr305:3.0.2=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,spotbugs,testCompileClasspath,testRuntimeClasspath com.google.code.gson:gson:2.13.2=spotbugs com.google.errorprone:error_prone_annotations:2.41.0=spotbugs -com.google.guava:guava:20.0=testCompileClasspath,testRuntimeClasspath -com.google.re2j:re2j:1.7=testRuntimeClasspath -com.squareup.moshi:moshi:1.11.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -com.squareup.okhttp3:logging-interceptor:3.12.12=testCompileClasspath,testRuntimeClasspath -com.squareup.okhttp3:okhttp:3.12.12=testCompileClasspath,testRuntimeClasspath -com.squareup.okio:okio:1.17.5=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.guava:guava:20.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.re2j:re2j:1.7=jmhRuntimeClasspath,testRuntimeClasspath +com.googlecode.libphonenumber:libphonenumber:8.0.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.squareup.moshi:moshi:1.11.0=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.squareup.okhttp3:logging-interceptor:3.12.12=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.squareup.okhttp3:okhttp:3.12.12=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +com.squareup.okio:okio:1.17.5=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.thoughtworks.qdox:qdox:1.12.1=codenarc -commons-fileupload:commons-fileupload:1.5=testCompileClasspath,testRuntimeClasspath -commons-io:commons-io:2.11.0=testCompileClasspath,testRuntimeClasspath +commons-fileupload:commons-fileupload:1.5=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +commons-io:commons-io:2.11.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath commons-io:commons-io:2.20.0=spotbugs -de.thetaphi:forbiddenapis:3.10=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.leangen.geantyref:geantyref:1.3.16=testRuntimeClasspath -io.sqreen:libsqreen:17.3.0=testRuntimeClasspath -javax.servlet:javax.servlet-api:3.1.0=testCompileClasspath,testRuntimeClasspath +de.thetaphi:forbiddenapis:3.10=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +io.leangen.geantyref:geantyref:1.3.16=jmhRuntimeClasspath,testRuntimeClasspath +io.sqreen:libsqreen:17.3.0=jmhRuntimeClasspath,testRuntimeClasspath +javax.activation:activation:1.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +javax.mail:mailapi:1.4.3=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +javax.servlet:javax.servlet-api:3.1.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath jaxen:jaxen:2.0.0=spotbugs -junit:junit:4.13.2=testRuntimeClasspath -net.bytebuddy:byte-buddy-agent:1.18.10=testCompileClasspath,testRuntimeClasspath -net.bytebuddy:byte-buddy:1.18.10=testCompileClasspath,testRuntimeClasspath -net.java.dev.jna:jna-platform:5.8.0=testRuntimeClasspath -net.java.dev.jna:jna:5.8.0=testRuntimeClasspath +joda-time:joda-time:2.9.7=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +junit:junit:4.13.2=jmhRuntimeClasspath,testRuntimeClasspath +net.bytebuddy:byte-buddy-agent:1.18.10=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +net.bytebuddy:byte-buddy:1.18.10=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +net.java.dev.jna:jna-platform:5.8.0=jmhRuntimeClasspath,testRuntimeClasspath +net.java.dev.jna:jna:5.8.0=jmhRuntimeClasspath,testRuntimeClasspath +net.sf.jopt-simple:jopt-simple:5.0.3=testCompileClasspath,testRuntimeClasspath +net.sf.jopt-simple:jopt-simple:5.0.4=jmh,jmhCompileClasspath,jmhRuntimeClasspath net.sf.saxon:Saxon-HE:12.9=spotbugs org.apache.ant:ant-antlr:1.10.14=codenarc org.apache.ant:ant-junit:1.10.14=codenarc org.apache.bcel:bcel:6.11.0=spotbugs org.apache.commons:commons-lang3:3.19.0=spotbugs +org.apache.commons:commons-math3:3.6.1=jmh,jmhCompileClasspath,jmhRuntimeClasspath org.apache.commons:commons-text:1.14.0=spotbugs org.apache.logging.log4j:log4j-api:2.25.2=spotbugs org.apache.logging.log4j:log4j-core:2.25.2=spotbugs @@ -60,60 +76,65 @@ org.codehaus.groovy:groovy-ant:3.0.23=codenarc org.codehaus.groovy:groovy-docgenerator:3.0.23=codenarc org.codehaus.groovy:groovy-groovydoc:3.0.23=codenarc org.codehaus.groovy:groovy-json:3.0.23=codenarc -org.codehaus.groovy:groovy-json:3.0.25=testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy-json:3.0.25=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath org.codehaus.groovy:groovy-templates:3.0.23=codenarc org.codehaus.groovy:groovy-xml:3.0.23=codenarc org.codehaus.groovy:groovy:3.0.23=codenarc -org.codehaus.groovy:groovy:3.0.25=testCompileClasspath,testRuntimeClasspath +org.codehaus.groovy:groovy:3.0.25=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath org.codenarc:CodeNarc:3.7.0=codenarc org.dom4j:dom4j:2.2.0=spotbugs org.gmetrics:GMetrics:2.1.0=codenarc -org.hamcrest:hamcrest-core:1.3=testRuntimeClasspath -org.hamcrest:hamcrest:3.0=testCompileClasspath,testRuntimeClasspath +org.hamcrest:hamcrest-core:1.3=jmhRuntimeClasspath,testRuntimeClasspath +org.hamcrest:hamcrest:3.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath org.jacoco:org.jacoco.agent:0.8.14=jacocoAgent,jacocoAnt org.jacoco:org.jacoco.ant:0.8.14=jacocoAnt org.jacoco:org.jacoco.core:0.8.14=jacocoAnt org.jacoco:org.jacoco.report:0.8.14=jacocoAnt -org.jctools:jctools-core-jdk11:4.0.6=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.jctools:jctools-core:4.0.6=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.junit.jupiter:junit-jupiter-api:5.14.1=testCompileClasspath,testRuntimeClasspath -org.junit.jupiter:junit-jupiter-engine:5.14.1=testRuntimeClasspath -org.junit.jupiter:junit-jupiter-params:5.14.1=testCompileClasspath,testRuntimeClasspath -org.junit.jupiter:junit-jupiter:5.14.1=testCompileClasspath,testRuntimeClasspath -org.junit.platform:junit-platform-commons:1.14.1=testCompileClasspath,testRuntimeClasspath -org.junit.platform:junit-platform-engine:1.14.1=testCompileClasspath,testRuntimeClasspath -org.junit.platform:junit-platform-launcher:1.14.1=testRuntimeClasspath -org.junit.platform:junit-platform-runner:1.14.1=testRuntimeClasspath -org.junit.platform:junit-platform-suite-api:1.14.1=testRuntimeClasspath -org.junit.platform:junit-platform-suite-commons:1.14.1=testRuntimeClasspath -org.junit:junit-bom:5.14.0=spotbugs -org.junit:junit-bom:5.14.1=testCompileClasspath,testRuntimeClasspath -org.mockito:mockito-core:4.4.0=testCompileClasspath,testRuntimeClasspath -org.mockito:mockito-junit-jupiter:4.4.0=testCompileClasspath,testRuntimeClasspath -org.objenesis:objenesis:3.3=testCompileClasspath,testRuntimeClasspath -org.opentest4j:opentest4j:1.3.0=testCompileClasspath,testRuntimeClasspath -org.ow2.asm:asm-analysis:9.7.1=runtimeClasspath,testRuntimeClasspath +org.jctools:jctools-core-jdk11:4.0.6=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.jctools:jctools-core:4.0.6=compileClasspath,jmhCompileClasspath,jmhRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.jupiter:junit-jupiter-api:5.14.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.jupiter:junit-jupiter-engine:5.14.1=jmhRuntimeClasspath,testRuntimeClasspath +org.junit.jupiter:junit-jupiter-params:5.14.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.jupiter:junit-jupiter:5.14.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-commons:1.14.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-engine:1.14.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-launcher:1.14.1=jmhRuntimeClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-runner:1.14.1=jmhRuntimeClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-suite-api:1.14.1=jmhRuntimeClasspath,testRuntimeClasspath +org.junit.platform:junit-platform-suite-commons:1.14.1=jmhRuntimeClasspath,testRuntimeClasspath +org.junit:junit-bom:5.14.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.mockito:mockito-core:4.4.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.mockito:mockito-junit-jupiter:4.4.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.mozilla:rhino:1.7.7.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.objenesis:objenesis:3.3=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.openjdk.jmh:jmh-core:1.37=jmh,jmhCompileClasspath,jmhRuntimeClasspath +org.openjdk.jmh:jmh-generator-asm:1.37=jmh,jmhCompileClasspath,jmhRuntimeClasspath +org.openjdk.jmh:jmh-generator-bytecode:1.37=jmh,jmhCompileClasspath,jmhRuntimeClasspath +org.openjdk.jmh:jmh-generator-reflection:1.37=jmh,jmhCompileClasspath,jmhRuntimeClasspath +org.opentest4j:opentest4j:1.3.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.ow2.asm:asm-analysis:9.7.1=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath org.ow2.asm:asm-analysis:9.9=spotbugs -org.ow2.asm:asm-commons:9.7.1=runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm-commons:9.7.1=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath org.ow2.asm:asm-commons:9.9=jacocoAnt,spotbugs -org.ow2.asm:asm-tree:9.7.1=runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm-tree:9.7.1=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath org.ow2.asm:asm-tree:9.9=jacocoAnt,spotbugs -org.ow2.asm:asm-util:9.7.1=runtimeClasspath,testRuntimeClasspath +org.ow2.asm:asm-util:9.7.1=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath org.ow2.asm:asm-util:9.9=spotbugs +org.ow2.asm:asm:9.0=jmh,jmhCompileClasspath org.ow2.asm:asm:9.7.1=runtimeClasspath org.ow2.asm:asm:9.9=jacocoAnt,spotbugs -org.ow2.asm:asm:9.9.1=testCompileClasspath,testRuntimeClasspath -org.slf4j:jcl-over-slf4j:1.7.30=testCompileClasspath,testRuntimeClasspath -org.slf4j:jul-to-slf4j:1.7.30=testCompileClasspath,testRuntimeClasspath -org.slf4j:log4j-over-slf4j:1.7.30=testCompileClasspath,testRuntimeClasspath -org.slf4j:slf4j-api:1.7.30=compileClasspath,runtimeClasspath -org.slf4j:slf4j-api:1.7.32=testCompileClasspath,testRuntimeClasspath +org.ow2.asm:asm:9.9.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.slf4j:jcl-over-slf4j:1.7.30=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.slf4j:jul-to-slf4j:1.7.30=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.slf4j:log4j-over-slf4j:1.7.30=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.slf4j:slf4j-api:1.7.30=compileClasspath,jmhCompileClasspath,runtimeClasspath +org.slf4j:slf4j-api:1.7.32=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath org.slf4j:slf4j-api:2.0.17=spotbugs,spotbugsSlf4j org.slf4j:slf4j-simple:2.0.17=spotbugsSlf4j -org.snakeyaml:snakeyaml-engine:2.9=runtimeClasspath,testRuntimeClasspath -org.spockframework:spock-bom:2.4-groovy-3.0=testCompileClasspath,testRuntimeClasspath -org.spockframework:spock-core:2.4-groovy-3.0=testCompileClasspath,testRuntimeClasspath -org.tabletest:tabletest-junit:1.2.1=testCompileClasspath,testRuntimeClasspath -org.tabletest:tabletest-parser:1.2.0=testCompileClasspath,testRuntimeClasspath +org.snakeyaml:snakeyaml-engine:2.9=jmhRuntimeClasspath,runtimeClasspath,testRuntimeClasspath +org.spockframework:spock-bom:2.4-groovy-3.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.spockframework:spock-core:2.4-groovy-3.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.tabletest:tabletest-junit:1.2.1=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath +org.tabletest:tabletest-parser:1.2.0=jmhRuntimeClasspath,testCompileClasspath,testRuntimeClasspath org.xmlresolver:xmlresolver:5.3.3=spotbugs -empty=annotationProcessor,spotbugsPlugins,testAnnotationProcessor +empty=annotationProcessor,jmhAnnotationProcessor,spotbugsPlugins,testAnnotationProcessor diff --git a/products/feature-flagging/feature-flagging-lib/src/jmh/java/com/datadog/featureflag/FlagEvaluationHotPathBenchmark.java b/products/feature-flagging/feature-flagging-lib/src/jmh/java/com/datadog/featureflag/FlagEvaluationHotPathBenchmark.java new file mode 100644 index 00000000000..996e38bcd39 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/jmh/java/com/datadog/featureflag/FlagEvaluationHotPathBenchmark.java @@ -0,0 +1,98 @@ +package com.datadog.featureflag; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.BackendApiFactory; +import datadog.trace.api.Config; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import java.util.HashMap; +import java.util.Map; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Hot-path benchmark for EVP {@code flagevaluation} recording. + * + *

The OpenFeature {@code Finally} hook runs synchronously on the caller's evaluation thread, so + * the cost it charges the user's evaluation must stay flat. This benchmark isolates the two stages: + * + *

    + *
  • {@code evalThreadCapture}: what the evaluation thread pays — building the lightweight + * {@link FlagEvalEvent} snapshot (the hook's scalar/shallow capture) plus the non-blocking + * {@code enqueue}. This is the cost that must stay flat under load. + *
  • {@code workerAggregate}: the deferred work that runs off the evaluation thread on the + * background worker — deterministic context prune + canonical-context key + two-tier map + * aggregation. Measured so the off-thread cost is characterized too. + *
+ * + *

Run: {@code ./gradlew :products:feature-flagging:feature-flagging-lib:jmh} (optionally {@code + * -PjmhIncludes=FlagEvaluationHotPathBenchmark}). Use {@code -prof gc} via JMH args for allocs. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 2, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(NANOSECONDS) +@Fork(value = 1) +public class FlagEvaluationHotPathBenchmark { + + private Map attrs; + + // Eval-thread stage: a writer whose bounded queue is large and never drained, so enqueue cost is + // measured without a competing consumer thread. + private FlagEvaluationWriterImpl writer; + + // Worker stage: the aggregating handler, exercised directly (off the eval thread in production). + private FlagEvaluationWriterImpl.SerializingHandlerForTest handler; + + @Setup(Level.Iteration) + public void setUp() { + attrs = new HashMap<>(); + attrs.put("tier", "enterprise"); + attrs.put("region", "us-east-1"); + attrs.put("seats", 42); + attrs.put("beta", Boolean.TRUE); + + final Config config = Config.get(); + // The factory is never invoked here (neither the writer worker nor the handler is started), + // so a plain factory suffices; createBackendApi() is only called from run(). + final BackendApiFactory factory = new BackendApiFactory(config, null); + final Map ddContext = new HashMap<>(); + ddContext.put("service", "bench-service"); + handler = FlagEvaluationWriterImpl.createHandlerForTest(factory, ddContext); + + // Capacity large enough that the benchmark never overflows within a measurement window. + writer = new FlagEvaluationWriterImpl(1 << 20, Long.MAX_VALUE, NANOSECONDS, factory, config); + } + + /** Eval-thread cost: build the capture snapshot + non-blocking enqueue. */ + @Benchmark + public void evalThreadCapture(final Blackhole blackhole) { + final FlagEvalEvent ev = + new FlagEvalEvent( + "checkout-flag", "treatment", "alloc-7", "user-123", null, 1_700_000_000_000L, attrs); + writer.enqueue(ev); + blackhole.consume(ev); + } + + /** Off-thread worker cost: deterministic prune + canonical key + two-tier aggregation. */ + @Benchmark + public void workerAggregate(final Blackhole blackhole) { + final FlagEvalEvent ev = + new FlagEvalEvent( + "checkout-flag", "treatment", "alloc-7", "user-123", null, 1_700_000_000_000L, attrs); + handler.aggregateEvent(ev); + blackhole.consume(handler.fullTier.size()); + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FlagEvaluationWriterImpl.java b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FlagEvaluationWriterImpl.java new file mode 100644 index 00000000000..a847394ba5f --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FlagEvaluationWriterImpl.java @@ -0,0 +1,804 @@ +package com.datadog.featureflag; + +import static datadog.trace.util.AgentThreadFactory.AgentThread.FEATURE_FLAG_EVALUATION_PROCESSOR; +import static datadog.trace.util.AgentThreadFactory.newAgentThread; +import static java.util.concurrent.TimeUnit.SECONDS; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import datadog.common.queue.MessagePassingBlockingQueue; +import datadog.common.queue.Queues; +import datadog.communication.BackendApi; +import datadog.communication.BackendApiFactory; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.trace.api.Config; +import datadog.trace.api.featureflag.FeatureFlaggingGateway; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; +import datadog.trace.api.intake.Intake; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import okhttp3.RequestBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EVP {@code flagevaluation} writer for Java. + * + *

Structural copy of {@link ExposureWriterImpl} with two-tier aggregation replacing the + * single-exposure buffer. Routes to {@code /evp_proxy/v2/api/v2/flagevaluations} via {@code + * BackendApiFactory.createBackendApi(Intake.EVENT_PLATFORM)}. + * + *

Two-tier aggregation contract: + * + *

    + *
  • Two-tier aggregation: full → degraded → drop-counted (no ultra-degraded). + *
  • Full key: (flagKey, variant, allocationKey, runtimeDefault, errorMessage, targetingKey, + * canonical-context-key). + *
  • Degraded key: (flagKey, variant, allocationKey, runtimeDefault, errorMessage) — no + * targetingKey/context. + *
  • Canonical context key: sorted entries, type-tagged length-delimited encoding — NOT a hash + * (collision-safe, comparable string identity). + *
  • Context pruning: deterministic (sort before cut), ≤256 fields, string values ≤256 chars; + * the pruned attributes are what gets aggregated and serialized. + *
  • Caps: globalCap=131072, perFlagCap=10000, degradedCap=32768. + *
  • Eval-time: min/max of firstEvalMs/lastEvalMs across events in the same bucket. + *
  • Runtime default: absent variant means {@code runtimeDefaultUsed=true}. + *
  • Flush interval: 10 seconds. + *
  • Queue: bounded MessagePassingBlockingQueue (capacity 2^16), non-blocking offer; on overflow + * the event is dropped and the {@code droppedQueueOverflow} counter is incremented and + * surfaced on flush. + *
  • Shutdown: {@link #close()} drains the queue and performs a final flush before the worker + * thread exits. + *
+ */ +public class FlagEvaluationWriterImpl implements FlagEvaluationWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlagEvaluationWriterImpl.class); + + static final int DEFAULT_CAPACITY = 1 << 16; // 65536 elements + static final int FLUSH_INTERVAL_SECONDS = 10; + static final int GLOBAL_CAP = 131_072; + static final int PER_FLAG_CAP = 10_000; + static final int DEGRADED_CAP = 32_768; + + // Context pruning limits: max fields and max string value length + static final int MAX_CONTEXT_FIELDS = 256; + static final int MAX_FIELD_LENGTH = 256; + + // Type tags for canonical context key (type-tagged length-delimited encoding) + private static final byte CTX_TAG_STRING = 's'; + private static final byte CTX_TAG_BOOL = 'b'; + private static final byte CTX_TAG_INT = 'i'; + private static final byte CTX_TAG_LONG = 'l'; + private static final byte CTX_TAG_FLOAT = 'f'; + private static final byte CTX_TAG_DOUBLE = 'd'; + private static final byte CTX_TAG_OTHER = 'o'; + + private final MessagePassingBlockingQueue queue; + private final FlagEvaluationSerializingHandler serializer; + private final Thread serializerThread; + + /** + * Observable counter for events dropped because the bounded hand-off queue was full when the hook + * tried to enqueue (backpressure). Incremented on the hook thread, surfaced on flush. + */ + private final AtomicLong droppedQueueOverflow = new AtomicLong(0); + + public FlagEvaluationWriterImpl(final SharedCommunicationObjects sco, final Config config) { + this(DEFAULT_CAPACITY, FLUSH_INTERVAL_SECONDS, SECONDS, sco, config); + } + + FlagEvaluationWriterImpl( + final int capacity, + final long flushInterval, + final TimeUnit timeUnit, + final SharedCommunicationObjects sco, + final Config config) { + this(capacity, flushInterval, timeUnit, new BackendApiFactory(config, sco), config); + } + + /** Package-private constructor allowing a {@link BackendApiFactory} to be injected for tests. */ + FlagEvaluationWriterImpl( + final int capacity, + final long flushInterval, + final TimeUnit timeUnit, + final BackendApiFactory backendApiFactory, + final Config config) { + this.queue = Queues.mpscBlockingConsumerArrayQueue(capacity); + final Map context = new HashMap<>(4); + context.put("service", config.getServiceName() == null ? "unknown" : config.getServiceName()); + if (config.getEnv() != null) { + context.put("env", config.getEnv()); + } + if (config.getVersion() != null) { + context.put("version", config.getVersion()); + } + this.serializer = + new FlagEvaluationSerializingHandler( + backendApiFactory, + queue, + flushInterval, + timeUnit, + context, + droppedQueueOverflow, + this::close); + this.serializerThread = newAgentThread(FEATURE_FLAG_EVALUATION_PROCESSOR, serializer); + } + + @Override + public void start() { + // Register with the gateway so FlagEvalEVPHook can route evaluations to this writer + FeatureFlaggingGateway.setFlagEvalWriter(this); + this.serializerThread.start(); + } + + /** Test seam: starts the worker thread WITHOUT registering with the global gateway. */ + void startForTest() { + this.serializerThread.start(); + } + + /** Test seam: current full-tier bucket count in the worker's aggregator. */ + int aggregatorFullTierSizeForTest() { + return serializer.fullTier.size(); + } + + @Override + public void close() { + // Deregister from the gateway so no new events are enqueued. + FeatureFlaggingGateway.setFlagEvalWriter(null); + if (!this.serializerThread.isAlive()) { + return; + } + // Ask the worker to drain the queue and final-flush, then interrupt to wake it from poll(). + serializer.requestShutdown(); + this.serializerThread.interrupt(); + try { + // Bounded wait for the worker's final flush so queued events are not lost on shutdown. + this.serializerThread.join(TimeUnit.SECONDS.toMillis(5)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void enqueue(final FlagEvalEvent event) { + if (event == null) { + return; + } + // Non-blocking offer. Bound the context snapshot before the queue, then count overflow so loss + // is observable rather than silent; the count is surfaced on the next flush. + final FlagEvalEvent boundedEvent = + new FlagEvalEvent( + event.flagKey, + event.variant, + event.allocationKey, + event.targetingKey, + event.errorMessage, + event.evalTimeMs, + pruneContext(event.attrs)); + if (!queue.offer(boundedEvent)) { + droppedQueueOverflow.incrementAndGet(); + } + } + + /** Returns the count of events dropped due to queue-overflow backpressure (observable). */ + long droppedQueueOverflow() { + return droppedQueueOverflow.get(); + } + + /** Test seam: returns one queued event without starting the worker. */ + FlagEvalEvent pollQueuedEventForTest() { + return queue.poll(); + } + + // ---- Deterministic context pruning ---- + + /** + * Produces the deterministically-pruned context map used for both the canonical key and the + * serialized payload. Keys are sorted FIRST, then the first {@link #MAX_CONTEXT_FIELDS} + * non-oversized values are kept, so two logically-identical contexts always prune to the same + * subset (and therefore the same canonical key). Oversized string values (>{@link + * #MAX_FIELD_LENGTH} chars) are skipped. Returns an empty map for null/empty input. + */ + static Map pruneContext(final Map attrs) { + if (attrs == null || attrs.isEmpty()) { + return java.util.Collections.emptyMap(); + } + // TreeMap gives a deterministic, sorted ordering BEFORE we apply the field cap. + final TreeMap out = new TreeMap<>(); + final TreeMap sorted = new TreeMap<>(attrs); + int count = 0; + for (final Map.Entry entry : sorted.entrySet()) { + if (count >= MAX_CONTEXT_FIELDS) { + break; + } + final Object v = entry.getValue(); + if (v instanceof String && ((String) v).length() > MAX_FIELD_LENGTH) { + continue; // skip oversized string values + } + out.put(entry.getKey(), v); + count++; + } + return out; + } + + // ---- Canonical context key ---- + // Sorted entries, type-tagged length-delimited encoding. NOT a hash (collision-safe string key). + // Implementation mirrors dd-trace-go/openfeature/flagevaluation.go canonicalContextKey(). + + /** + * Builds the canonical context key from the already-pruned context map for the full-tier bucket + * identity. Expects a pruned, sorted map (e.g. produced by {@link #pruneContext(Map)}); iterating + * a {@link TreeMap} yields keys in sorted order, so the encoding is deterministic. + * + *

Returns an empty string for null/empty context. + */ + static String canonicalContextKey(final Map prunedAttrs) { + if (prunedAttrs == null || prunedAttrs.isEmpty()) { + return ""; + } + // Ensure deterministic ordering even if a non-sorted map was passed. + final Map sorted = + (prunedAttrs instanceof TreeMap) ? prunedAttrs : new TreeMap<>(prunedAttrs); + final StringBuilder sb = new StringBuilder(); + for (final Map.Entry entry : sorted.entrySet()) { + appendLengthDelimited(sb, entry.getKey()); + appendContextValue(sb, entry.getValue()); + } + return sb.toString(); + } + + private static void appendLengthDelimited(final StringBuilder sb, final String s) { + // Encode as big-endian 8-hex-char length prefix + raw chars (deterministic, unambiguous + // field boundary regardless of value content). + sb.append(String.format("%08x", (long) s.length())); + sb.append(s); + } + + private static void appendContextValue(final StringBuilder sb, final Object v) { + if (v instanceof Boolean) { + sb.append((char) CTX_TAG_BOOL); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Long) { + sb.append((char) CTX_TAG_LONG); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Integer) { + sb.append((char) CTX_TAG_INT); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Float) { + sb.append((char) CTX_TAG_FLOAT); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Double) { + sb.append((char) CTX_TAG_DOUBLE); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof String) { + sb.append((char) CTX_TAG_STRING); + appendLengthDelimited(sb, (String) v); + } else { + sb.append((char) CTX_TAG_OTHER); + appendLengthDelimited(sb, v == null ? "" : v.toString()); + } + } + + // ---- Data classes ---- + + /** Aggregation bucket for a single (full or degraded) key. */ + static class EvalBucket { + long count; + long firstEvalMs; + long lastEvalMs; + boolean runtimeDefaultUsed; + String flagKey; + String variant; + String allocationKey; + String targetingKey; + String errorMessage; + Map prunedAttrs; // pruned context for serialization (full tier only) + + EvalBucket( + final String flagKey, + final String variant, + final String allocationKey, + final String targetingKey, + final String errorMessage, + final long evalTimeMs, + final boolean runtimeDefaultUsed, + final Map prunedAttrs) { + this.flagKey = flagKey; + this.variant = variant; + this.allocationKey = allocationKey; + this.targetingKey = targetingKey; + this.errorMessage = errorMessage; + this.firstEvalMs = evalTimeMs; + this.lastEvalMs = evalTimeMs; + this.count = 1; + this.runtimeDefaultUsed = runtimeDefaultUsed; + this.prunedAttrs = prunedAttrs; + } + + /** Number of context fields stored in this bucket (after pruning). */ + int prunedContextFieldCount() { + return prunedAttrs == null ? 0 : prunedAttrs.size(); + } + + void merge(final long evalTimeMs, final boolean isDefault) { + count++; + if (evalTimeMs < firstEvalMs) firstEvalMs = evalTimeMs; + if (evalTimeMs > lastEvalMs) lastEvalMs = evalTimeMs; + if (isDefault) runtimeDefaultUsed = true; + } + } + + /** Snapshot produced by {@link SerializingHandlerForTest#drainAndAggregate()} for tests. */ + static class AggregatedState { + final Map fullTier; + final Map degradedTier; + final long droppedDegradedOverflow; + + AggregatedState( + final Map fullTier, + final Map degradedTier, + final long droppedDegradedOverflow) { + this.fullTier = fullTier; + this.degradedTier = degradedTier; + this.droppedDegradedOverflow = droppedDegradedOverflow; + } + } + + // ---- Serializing handler (background thread logic) ---- + + static class FlagEvaluationSerializingHandler implements Runnable { + private final MessagePassingBlockingQueue queue; + private final long ticksRequiredToFlush; + + @SuppressFBWarnings( + value = "AT_NONATOMIC_64BIT_PRIMITIVE", + justification = "the field is confined to the single serializer thread") + private long lastTicks; + + private final JsonAdapter jsonAdapter; + final BackendApiFactory backendApiFactory; + final Map context; + private final AtomicLong droppedQueueOverflow; + private final Runnable errorCallback; + + // Two-tier aggregation maps (accessed only from the serializer thread; package-private for + // test) + final Map fullTier = new HashMap<>(); + final Map degradedTier = new HashMap<>(); + + // Per-flag full-tier count tracking + final Map perFlagCount = new HashMap<>(); + + // Global full-tier count + @SuppressFBWarnings( + value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", + justification = "the field is confined to the single serializer thread") + int globalFullCount = 0; + + // Observable counter for events dropped when both tiers are at capacity + final AtomicLong droppedDegradedOverflow = new AtomicLong(0); + + // Shutdown coordination: set by close(), drives a final drain+flush before the worker exits. + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final CountDownLatch finalFlushDone = new CountDownLatch(1); + + FlagEvaluationSerializingHandler( + final BackendApiFactory backendApiFactory, + final MessagePassingBlockingQueue queue, + final long flushInterval, + final TimeUnit timeUnit, + final Map context, + final AtomicLong droppedQueueOverflow, + final Runnable errorCallback) { + this.queue = queue; + this.jsonAdapter = new Moshi.Builder().build().adapter(FlagEvaluationsRequest.class); + this.backendApiFactory = backendApiFactory; + this.context = context; + this.droppedQueueOverflow = droppedQueueOverflow; + this.lastTicks = System.nanoTime(); + this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval); + this.errorCallback = errorCallback; + LOGGER.debug("starting flag evaluation serializer"); + } + + /** Signals the worker to drain the queue and perform a final flush before exiting. */ + void requestShutdown() { + shutdownRequested.set(true); + } + + @Override + public void run() { + final BackendApi evp = backendApiFactory.createBackendApi(Intake.EVENT_PLATFORM); + if (evp == null) { + finalFlushDone.countDown(); + errorCallback.run(); + throw new IllegalArgumentException("EVP Proxy not available"); + } + try { + runDutyCycle(evp); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + // On exit (interrupt or shutdown request), drain everything still buffered and flush it so + // queued events are not lost on shutdown. + drainAndFlush(evp); + finalFlushDone.countDown(); + } + LOGGER.debug("flag evaluation processor worker exited."); + } + + private void runDutyCycle(final BackendApi evp) throws InterruptedException { + final Thread thread = Thread.currentThread(); + while (!thread.isInterrupted() && !shutdownRequested.get()) { + FlagEvalEvent event; + while ((event = queue.poll(100, TimeUnit.MILLISECONDS)) != null) { + aggregateEvent(event); + } + flushIfNecessary(evp); + } + } + + /** Drains all remaining queued events and performs a final flush. Used on shutdown. */ + void drainAndFlush(final BackendApi evp) { + FlagEvalEvent event; + while ((event = queue.poll()) != null) { + aggregateEvent(event); + } + flush(evp); + } + + // ---- Aggregation logic ---- + + /** Routes an event into the full tier or degraded tier, or drops and counts on overflow. */ + void aggregateEvent(final FlagEvalEvent event) { + final boolean isDefault = event.variant == null; + + // Prune the context deterministically ONCE; both the canonical key and the stored payload + // use this pruned view (never the raw attrs). + final Map prunedAttrs = pruneContext(event.attrs); + final String ctxKey = canonicalContextKey(prunedAttrs); + final String fullKey = buildFullKey(event, ctxKey); + + // Try full tier + if (fullTier.containsKey(fullKey)) { + fullTier.get(fullKey).merge(event.evalTimeMs, isDefault); + return; + } + + // Check full-tier caps before inserting + final int flagCount = perFlagCount.getOrDefault(event.flagKey, 0); + if (globalFullCount < GLOBAL_CAP && flagCount < PER_FLAG_CAP) { + final EvalBucket bucket = + new EvalBucket( + event.flagKey, + event.variant, + event.allocationKey, + event.targetingKey, + event.errorMessage, + event.evalTimeMs, + isDefault, + prunedAttrs); + fullTier.put(fullKey, bucket); + globalFullCount++; + perFlagCount.put(event.flagKey, flagCount + 1); + return; + } + + // Full tier saturated — route to degraded tier + final String degradedKey = buildDegradedKey(event); + if (degradedTier.containsKey(degradedKey)) { + degradedTier.get(degradedKey).merge(event.evalTimeMs, isDefault); + return; + } + + if (degradedTier.size() < DEGRADED_CAP) { + final EvalBucket bucket = + new EvalBucket( + event.flagKey, + event.variant, + event.allocationKey, + null, // degraded omits targetingKey + event.errorMessage, + event.evalTimeMs, + isDefault, + null); // degraded omits context + degradedTier.put(degradedKey, bucket); + return; + } + + // Both tiers saturated — drop and count + droppedDegradedOverflow.incrementAndGet(); + } + + private static String buildFullKey(final FlagEvalEvent event, final String ctxKey) { + return event.flagKey + + '\0' + + nullToEmpty(event.variant) + + '\0' + + nullToEmpty(event.allocationKey) + + '\0' + + (event.variant == null ? "1" : "0") + + '\0' + + nullToEmpty(event.errorMessage) + + '\0' + + nullToEmpty(event.targetingKey) + + '\0' + + ctxKey; + } + + private static String buildDegradedKey(final FlagEvalEvent event) { + return event.flagKey + + '\0' + + nullToEmpty(event.variant) + + '\0' + + nullToEmpty(event.allocationKey) + + '\0' + + (event.variant == null ? "1" : "0") + + '\0' + + nullToEmpty(event.errorMessage); + } + + private static String nullToEmpty(final String s) { + return s != null ? s : ""; + } + + // ---- Flush logic ---- + + void flushIfNecessary(final BackendApi evp) { + if (fullTier.isEmpty() && degradedTier.isEmpty() && droppedQueueOverflow.get() == 0) { + return; + } + if (shouldFlush()) { + flush(evp); + } + } + + void flush(final BackendApi evp) { + // Surface backpressure (queue-overflow) drops as an observable warning even when there is + // nothing else to flush. + final long qDrops = droppedQueueOverflow.getAndSet(0); + if (qDrops > 0) { + LOGGER.warn( + "flag evaluation queue full — dropped {} evaluation(s) under backpressure" + + " (best-effort telemetry)", + qDrops); + } + final long dgDrops = droppedDegradedOverflow.getAndSet(0); + if (dgDrops > 0) { + LOGGER.warn( + "degraded aggregation tier full — dropped {} evaluation(s); raise degraded cap" + + " (best-effort telemetry)", + dgDrops); + } + + if (fullTier.isEmpty() && degradedTier.isEmpty()) { + return; + } + try { + final List events = buildEventList(); + if (events.isEmpty()) { + return; + } + final FlagEvaluationsRequest request = new FlagEvaluationsRequest(context, events); + final String json = jsonAdapter.toJson(request); + final RequestBody requestBody = + RequestBody.create(okhttp3.MediaType.parse("application/json"), json); + // Routes to /evp_proxy/v2/api/v2/flagevaluations via BackendApiFactory(EVENT_PLATFORM) + evp.post("flagevaluations", requestBody, stream -> null, null, false); + fullTier.clear(); + degradedTier.clear(); + perFlagCount.clear(); + globalFullCount = 0; + lastTicks = System.nanoTime(); + } catch (Exception e) { + LOGGER.error("Could not submit flag evaluations", e); + } + } + + private List buildEventList() { + final List events = + new ArrayList<>(fullTier.size() + degradedTier.size()); + for (final EvalBucket bucket : fullTier.values()) { + events.add(FlagEvaluationEvent.fromBucket(bucket, true)); + } + for (final EvalBucket bucket : degradedTier.values()) { + events.add(FlagEvaluationEvent.fromBucket(bucket, false)); + } + return events; + } + + private boolean shouldFlush() { + final long nanoTime = System.nanoTime(); + final long ticks = nanoTime - lastTicks; + if (ticks > ticksRequiredToFlush) { + lastTicks = nanoTime; + return true; + } + return false; + } + } + + // ---- Test-seam inner class (package-private) ---- + + /** + * Test-accessible handler that exposes {@link #drainAndAggregate()} and {@link + * #flush(BackendApi)} without starting a real background thread. + */ + static class SerializingHandlerForTest extends FlagEvaluationSerializingHandler { + + SerializingHandlerForTest(final BackendApiFactory factory, final Map context) { + super( + factory, + Queues.mpscBlockingConsumerArrayQueue(DEFAULT_CAPACITY), + Long.MAX_VALUE, // effectively never auto-flush + TimeUnit.NANOSECONDS, + context, + new AtomicLong(0), + () -> {}); + } + + private final List staged = new ArrayList<>(); + + /** Adds an event to the staged list (simulates hook enqueue). */ + void add(final FlagEvalEvent event) { + staged.add(event); + } + + /** Aggregates all staged events and returns the current aggregation state. */ + AggregatedState drainAndAggregate() { + for (final FlagEvalEvent e : staged) { + aggregateEvent(e); + } + staged.clear(); + return new AggregatedState( + new HashMap<>(fullTier), new HashMap<>(degradedTier), droppedDegradedOverflow.get()); + } + + /** Simulates filling the full tier to GLOBAL_CAP by injecting synthetic distinct buckets. */ + void simulateFullTierAtCap() { + for (int i = globalFullCount; i < GLOBAL_CAP; i++) { + final String key = "synthetic-full-" + i; + fullTier.put(key, new EvalBucket(key, "on", "alloc", null, null, 1L, false, null)); + globalFullCount++; + perFlagCount.merge(key, 1, Integer::sum); + } + } + + /** + * Simulates filling the degraded tier to DEGRADED_CAP by injecting synthetic distinct buckets. + */ + void simulateDegradedTierAtCap() { + for (int i = degradedTier.size(); i < DEGRADED_CAP; i++) { + final String key = "synthetic-dg-" + i; + degradedTier.put(key, new EvalBucket(key, "on", "alloc", null, null, 1L, false, null)); + } + } + } + + /** Factory method for test use — creates a SerializingHandlerForTest. */ + static SerializingHandlerForTest createHandlerForTest( + final BackendApiFactory factory, final Map context) { + return new SerializingHandlerForTest(factory, context); + } + + // ---- Request/response DTOs for JSON serialization ---- + // Structure mirrors the flageval-worker batchedflagevaluations.json schema: + // variant/allocation/flag are {"key": ...} objects, error is {"message": ...}, and per-event + // context nests evaluation attributes under {"evaluation": {...}}. + + static class FlagEvaluationsRequest { + public final Map context; + public final List flagEvaluations; + + FlagEvaluationsRequest( + final Map context, final List flagEvaluations) { + this.context = context; + this.flagEvaluations = flagEvaluations; + } + } + + static class FlagEvaluationEvent { + public final long timestamp; + public final FlagKeyObject flag; + public final long first_evaluation; + public final long last_evaluation; + public final long evaluation_count; + public final KeyObject variant; // null = omitted (degraded tier or runtime default) + public final KeyObject allocation; // null = omitted + public final String targeting_key; // null = omitted (degraded tier) + public final Boolean runtime_default_used; // null = omitted when false + public final EventContext context; // null = omitted (degraded tier) + public final ErrorObject error; // null = omitted (no error) + + FlagEvaluationEvent( + final long timestamp, + final String flagKey, + final long firstEvalMs, + final long lastEvalMs, + final long count, + final String variant, + final String allocation, + final String targetingKey, + final boolean runtimeDefaultUsed, + final String errorMessage, + final Map evaluationAttrs) { + this.timestamp = timestamp; + this.flag = new FlagKeyObject(flagKey); + this.first_evaluation = firstEvalMs; + this.last_evaluation = lastEvalMs; + this.evaluation_count = count; + this.variant = (variant != null && !variant.isEmpty()) ? new KeyObject(variant) : null; + this.allocation = + (allocation != null && !allocation.isEmpty()) ? new KeyObject(allocation) : null; + this.targeting_key = targetingKey; + this.runtime_default_used = runtimeDefaultUsed ? Boolean.TRUE : null; + this.context = + (evaluationAttrs != null && !evaluationAttrs.isEmpty()) + ? new EventContext(evaluationAttrs) + : null; + this.error = + (errorMessage != null && !errorMessage.isEmpty()) ? new ErrorObject(errorMessage) : null; + } + + static FlagEvaluationEvent fromBucket(final EvalBucket bucket, final boolean isFullTier) { + return new FlagEvaluationEvent( + bucket.lastEvalMs, + bucket.flagKey, + bucket.firstEvalMs, + bucket.lastEvalMs, + bucket.count, + bucket.variant, + bucket.allocationKey, + isFullTier ? bucket.targetingKey : null, // degraded omits targetingKey + bucket.runtimeDefaultUsed, + bucket.errorMessage, + isFullTier ? bucket.prunedAttrs : null); // degraded omits context + } + } + + /** {@code {"key": "..."}} object for variant / allocation. */ + static class KeyObject { + public final String key; + + KeyObject(final String key) { + this.key = key; + } + } + + /** {@code {"key": "..."}} object for the flag. */ + static class FlagKeyObject { + public final String key; + + FlagKeyObject(final String key) { + this.key = key; + } + } + + /** {@code {"message": "..."}} object for an evaluation error. */ + static class ErrorObject { + public final String message; + + ErrorObject(final String message) { + this.message = message; + } + } + + /** Per-event {@code context} object: evaluation attributes nest under {@code evaluation}. */ + static class EventContext { + public final Map evaluation; + + EventContext(final Map evaluation) { + this.evaluation = evaluation; + } + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/FlagEvaluationWriterImplTest.java b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/FlagEvaluationWriterImplTest.java new file mode 100644 index 00000000000..95a8ac85c58 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/FlagEvaluationWriterImplTest.java @@ -0,0 +1,638 @@ +package com.datadog.featureflag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.fge.jackson.JsonLoader; +import com.github.fge.jsonschema.core.exceptions.ProcessingException; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchema; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.Types; +import datadog.communication.BackendApi; +import datadog.communication.BackendApiFactory; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import okhttp3.RequestBody; +import okio.Buffer; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for FlagEvaluationWriterImpl: two-tier aggregation, canonical context key, caps, + * deterministic pruning, schema-structural payload validation, and EVP transport. + */ +class FlagEvaluationWriterImplTest { + + // Moshi adapter for parsing the emitted JSON back into a Map for structural assertions. + private static final JsonAdapter> JSON_MAP; + private static final long REALISTIC_EVAL_MS = 1_760_000_000_000L; + + static { + final Moshi moshi = new Moshi.Builder().build(); + final Type type = Types.newParameterizedType(Map.class, String.class, Object.class); + JSON_MAP = moshi.adapter(type); + } + + // ---- helpers ---- + + private static FlagEvalEvent event( + String flagKey, + String variant, + String allocationKey, + String targetingKey, + long evalTimeMs, + Map attrs) { + return new FlagEvalEvent(flagKey, variant, allocationKey, targetingKey, evalTimeMs, attrs); + } + + private static FlagEvalEvent errorEvent(String flagKey, String errorMessage, long evalTimeMs) { + return new FlagEvalEvent( + flagKey, null, null, null, errorMessage, evalTimeMs, Collections.emptyMap()); + } + + private static FlagEvalEvent simpleEvent(String flagKey, String variant) { + return event(flagKey, variant, "alloc1", "user-1", 1000L, Collections.emptyMap()); + } + + private static String repeat(char c, int count) { + char[] chars = new char[count]; + Arrays.fill(chars, c); + return new String(chars); + } + + private TestWriterSetup buildTestWriter(BackendApi mockEvp) { + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + + Map context = new HashMap<>(); + context.put("service", "test-service"); + + FlagEvaluationWriterImpl.SerializingHandlerForTest handler = + FlagEvaluationWriterImpl.createHandlerForTest(factory, context); + + return new TestWriterSetup(handler, mockEvp); + } + + static class TestWriterSetup { + final FlagEvaluationWriterImpl.SerializingHandlerForTest handler; + final BackendApi mockEvp; + + TestWriterSetup( + FlagEvaluationWriterImpl.SerializingHandlerForTest handler, BackendApi mockEvp) { + this.handler = handler; + this.mockEvp = mockEvp; + } + } + + static class CapturedJson { + final String raw; + final Map parsed; + + CapturedJson(final String raw, final Map parsed) { + this.raw = raw; + this.parsed = parsed; + } + } + + /** Captures the JSON body posted by a flush(), preserving raw JSON for schema validation. */ + private CapturedJson flushAndCapture(TestWriterSetup setup) throws Exception { + final RequestBody[] captured = {null}; + when(setup.mockEvp.post(eq("flagevaluations"), any(RequestBody.class), any(), any(), eq(false))) + .thenAnswer( + inv -> { + captured[0] = inv.getArgument(1); + return null; + }); + setup.handler.drainAndAggregate(); + setup.handler.flush(setup.mockEvp); + assertNotNull(captured[0], "RequestBody must have been posted"); + Buffer buf = new Buffer(); + captured[0].writeTo(buf); + String raw = buf.readUtf8(); + return new CapturedJson(raw, JSON_MAP.fromJson(raw)); + } + + /** Captures the JSON body posted by a flush(), parsed into a Map. */ + private Map flushAndCaptureJson(TestWriterSetup setup) throws Exception { + return flushAndCapture(setup).parsed; + } + + // ---- test: two identical events -> one full-tier bucket, count 2, first <= last ---- + + @Test + void identicalEventsAggregateIntoOneBucketWithCount2() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(event("flag-a", "on", "alloc1", "user-1", 1000L, Collections.emptyMap())); + setup.handler.add(event("flag-a", "on", "alloc1", "user-1", 2000L, Collections.emptyMap())); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals( + 1, state.fullTier.size(), "Identical events must produce exactly 1 full-tier bucket"); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertEquals(2, bucket.count, "Count must be 2"); + assertEquals(1000L, bucket.firstEvalMs, "first_evaluation must be min(1000, 2000)=1000"); + assertEquals(2000L, bucket.lastEvalMs, "last_evaluation must be max(1000, 2000)=2000"); + assertTrue(bucket.firstEvalMs <= bucket.lastEvalMs); + } + + // ---- variant comes from the OpenFeature variant, distinct from the evaluated value ---- + + @Test + void variantIsTheVariantKeyNotTheEvaluatedValue() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + // The event already carries the variant key ("on"); the evaluated value ("on-value") is + // intentionally NOT part of the event — the hook is responsible for sourcing variant from + // details.getVariant(). Here we assert the emitted variant.key == the variant key, never a + // value. + setup.handler.add(event("flag-v", "on", "alloc1", "user-1", 1000L, Collections.emptyMap())); + + Map json = flushAndCaptureJson(setup); + Map ev = firstEvent(json); + Map variantObj = (Map) ev.get("variant"); + assertNotNull(variantObj, "variant object must be present"); + assertEquals("on", variantObj.get("key"), "variant.key must be the variant key"); + } + + // ---- type-tagged canonical key distinguishes int 1 vs string "1" ---- + + @Test + void differentValueTypesProduceDifferentBuckets() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + Map attrsInt = new HashMap<>(); + attrsInt.put("score", 1); + Map attrsStr = new HashMap<>(); + attrsStr.put("score", "1"); + + setup.handler.add(event("flag-b", "on", "alloc1", "user-1", 1000L, attrsInt)); + setup.handler.add(event("flag-b", "on", "alloc1", "user-1", 1000L, attrsStr)); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals( + 2, + state.fullTier.size(), + "int 1 vs String \"1\" must produce 2 distinct buckets (type-tagged canonical key)"); + } + + // ---- full-tier overflow past GLOBAL_CAP routes to degraded ---- + + @Test + void globalCapOverflowRoutesToDegradedTier() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.simulateFullTierAtCap(); + setup.handler.add(simpleEvent("extra-flag", "on")); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertTrue(state.degradedTier.size() > 0, "Overflow past GLOBAL_CAP must route to degraded"); + assertEquals(0, state.droppedDegradedOverflow, "No drops yet (degraded not full)"); + } + + // ---- degraded-tier overflow increments the observable dropped counter ---- + + @Test + void degradedCapOverflowIncrementsDroppedCounter() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.simulateFullTierAtCap(); + setup.handler.simulateDegradedTierAtCap(); + setup.handler.add(simpleEvent("drop-flag", "on")); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertTrue( + state.droppedDegradedOverflow > 0, + "Beyond DEGRADED_CAP must increment droppedDegradedOverflow counter"); + } + + // ---- queue-overflow backpressure increments an observable drop counter on enqueue ---- + + @Test + void queueOverflowIncrementsObservableDropCounter() { + // Tiny capacity so we can overflow deterministically. Capacity must be a power of two. + BackendApi mockEvp = mock(BackendApi.class); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl(2, 10L, java.util.concurrent.TimeUnit.SECONDS, factory, cfg()); + + // Never start() the worker, so nothing drains the queue; offers past capacity must be counted. + for (int i = 0; i < 100; i++) { + writer.enqueue(simpleEvent("of-flag", "on")); + } + + assertTrue( + writer.droppedQueueOverflow() > 0, + "queue-overflow drops must be counted in the observable droppedQueueOverflow counter"); + } + + private static datadog.trace.api.Config cfg() { + datadog.trace.api.Config config = mock(datadog.trace.api.Config.class); + when(config.getServiceName()).thenReturn("test-service"); + return config; + } + + // ---- enqueue does NOT aggregate (no buckets formed) — aggregation is the worker's job ---- + + @Test + void enqueueDoesNotAggregateOnTheCallingThread() { + BackendApi mockEvp = mock(BackendApi.class); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl( + 16, Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS, factory, cfg()); + + // Do NOT start the worker. enqueue() must only offer to the queue; the aggregation maps stay + // empty because nothing on the calling thread aggregates. + writer.enqueue(simpleEvent("g2-flag", "on")); + writer.enqueue(simpleEvent("g2-flag", "on")); + + assertEquals( + 0, + writer.aggregatorFullTierSizeForTest(), + "enqueue must not build aggregation buckets on the calling thread"); + assertEquals(0, writer.droppedQueueOverflow(), "no overflow expected for 2 events in cap 16"); + } + + @Test + void enqueueQueuesPrunedContextSnapshotBeforeBuffering() { + BackendApi mockEvp = mock(BackendApi.class); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl( + 16, Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS, factory, cfg()); + + Map rawAttrs = new HashMap<>(); + rawAttrs.put("oversized", repeat('x', 300)); + for (int i = 0; i < 300; i++) { + rawAttrs.put(String.format("k%03d", i), "v" + i); + } + + writer.enqueue(event("bounded-flag", "on", "alloc1", "user-1", 1000L, rawAttrs)); + + FlagEvalEvent queued = writer.pollQueuedEventForTest(); + assertNotNull(queued, "enqueue must offer one event to the bounded queue"); + assertEquals(256, queued.attrs.size(), "queued attrs must be pruned before buffering"); + assertFalse(queued.attrs.containsKey("oversized"), "oversized string must not be queued"); + assertTrue(queued.attrs.containsKey("k000"), "deterministic first sorted key survives"); + assertFalse(queued.attrs.containsKey("k299"), "fields beyond the 256-field cap are not queued"); + } + + // ---- close() drains the queue and final-flushes before the worker exits ---- + + @Test + void closeDrainsAndFinalFlushesQueuedEvents() throws Exception { + final java.util.concurrent.CountDownLatch posted = new java.util.concurrent.CountDownLatch(1); + final RequestBody[] captured = {null}; + BackendApi mockEvp = mock(BackendApi.class); + when(mockEvp.post(eq("flagevaluations"), any(RequestBody.class), any(), any(), eq(false))) + .thenAnswer( + inv -> { + captured[0] = inv.getArgument(1); + posted.countDown(); + return null; + }); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + + // Large flush interval so the ONLY flush that can happen is the shutdown drain-flush. + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl( + 64, + java.util.concurrent.TimeUnit.DAYS.toSeconds(1), + java.util.concurrent.TimeUnit.SECONDS, + factory, + cfg()); + writer.startForTest(); + writer.enqueue(simpleEvent("shutdown-flag", "on")); + + // close() must drain + final-flush, not just interrupt. + writer.close(); + + assertTrue( + posted.await(5, java.util.concurrent.TimeUnit.SECONDS), + "close() must drain the queue and final-flush before exit"); + assertNotNull(captured[0], "a flag evaluation payload must have been posted on shutdown"); + Buffer buf = new Buffer(); + captured[0].writeTo(buf); + @SuppressWarnings("unchecked") + Map json = JSON_MAP.fromJson(buf.readUtf8()); + assertNotNull( + eventForFlag(json, "shutdown-flag"), "the queued event must be in the final flush"); + } + + @Test + void emittedTimestampUsesLastEvaluationTimeNotFlushTime() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add( + event("ts-flag", "on", "alloc1", "user-1", REALISTIC_EVAL_MS, Collections.emptyMap())); + setup.handler.add( + event("ts-flag", "on", "alloc1", "user-1", REALISTIC_EVAL_MS + 10, Collections.emptyMap())); + + Map json = flushAndCaptureJson(setup); + Map ev = firstEvent(json); + assertEquals( + (double) (REALISTIC_EVAL_MS + 10), + ((Number) ev.get("timestamp")).doubleValue(), + 0.0, + "event timestamp must use the aggregate evaluation time, not flush time"); + } + + // ---- absent variant -> runtimeDefaultUsed=true ---- + + @Test + void absentVariantSetsRuntimeDefaultUsed() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(event("flag-c", null, "alloc1", "user-1", 1000L, Collections.emptyMap())); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals(1, state.fullTier.size()); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertTrue(bucket.runtimeDefaultUsed, "Absent variant must set runtimeDefaultUsed=true"); + } + + // ---- degraded event omits targeting_key + context ---- + + @Test + void degradedTierEventOmitsTargetingKeyAndContext() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.simulateFullTierAtCap(); + + Map attrs = new HashMap<>(); + attrs.put("region", "us-east-1"); + setup.handler.add(event("dg-flag", "on", "alloc1", "user-99", 1000L, attrs)); + + Map json = flushAndCaptureJson(setup); + Map dg = eventForFlag(json, "dg-flag"); + assertNotNull(dg, "degraded event must be emitted"); + assertNull(dg.get("targeting_key"), "Degraded event must omit targeting_key"); + assertNull(dg.get("context"), "Degraded event must omit context"); + } + + // ---- context >256 fields is pruned to <=256 and stored pruned ---- + + @Test + void contextExceeding256FieldsIsPrunedToStoredPrunedAttrs() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + Map hugeAttrs = new HashMap<>(); + for (int i = 0; i < 300; i++) { + hugeAttrs.put("key" + i, "v" + i); + } + setup.handler.add(event("flag-d", "on", "alloc1", "user-1", 1000L, hugeAttrs)); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals(1, state.fullTier.size()); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertEquals( + 256, + bucket.prunedContextFieldCount(), + "Context must be pruned to exactly 256 fields and the PRUNED attrs stored"); + assertEquals(256, bucket.prunedAttrs.size(), "stored prunedAttrs map must be the pruned view"); + } + + // ---- pruning is deterministic — same input -> same kept subset across runs ---- + + @Test + void pruningIsDeterministicSortBeforeCut() { + Map attrs = new HashMap<>(); + for (int i = 0; i < 300; i++) { + attrs.put(String.format("k%03d", i), "v" + i); + } + Map p1 = FlagEvaluationWriterImpl.pruneContext(attrs); + Map p2 = FlagEvaluationWriterImpl.pruneContext(new HashMap<>(attrs)); + assertEquals(256, p1.size()); + assertEquals(p1.keySet(), p2.keySet(), "Pruned key set must be deterministic across calls"); + // Sorted-before-cut means the first 256 sorted keys survive: k000..k255, never k256+. + assertTrue(p1.containsKey("k000"), "lowest sorted key must survive"); + assertTrue(p1.containsKey("k255"), "256th sorted key must survive"); + assertFalse(p1.containsKey("k256"), "257th sorted key must be cut"); + assertFalse(p1.containsKey("k299"), "highest sorted key must be cut"); + } + + // ---- string context value >256 chars is skipped from the pruned attrs ---- + + @Test + void contextValueExceeding256CharsIsSkippedFromPrunedAttrs() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + StringBuilder longValBuilder = new StringBuilder(300); + for (int i = 0; i < 300; i++) longValBuilder.append('x'); + String longVal = longValBuilder.toString(); + + Map attrs = new HashMap<>(); + attrs.put("long-val", longVal); // >256 chars + attrs.put("short-val", "ok"); + setup.handler.add(event("flag-e", "on", "alloc1", "user-1", 1000L, attrs)); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals(1, state.fullTier.size()); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertFalse( + bucket.prunedAttrs.containsKey("long-val"), + "Oversized string value must be skipped from the pruned attrs"); + assertTrue(bucket.prunedAttrs.containsKey("short-val"), "Normal value must survive pruning"); + } + + // ---- flush posts to the "flagevaluations" endpoint ---- + + @Test + void flushPostsToFlagevaluationsEndpoint() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(event("flag-f", "on", "alloc1", "user-1", 1000L, Collections.emptyMap())); + setup.handler.drainAndAggregate(); + setup.handler.flush(mockEvp); + + verify(mockEvp).post(eq("flagevaluations"), any(RequestBody.class), any(), any(), eq(false)); + } + + // ---- emitted full-tier payload validates against the flageval-worker schema shape ---- + + @Test + void fullTierPayloadValidatesAgainstWorkerSchema() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + Map attrs = new HashMap<>(); + attrs.put("region", "us-east-1"); + setup.handler.add(event("my-flag", "on", "alloc-x", "user-1", REALISTIC_EVAL_MS, attrs)); + + CapturedJson captured = flushAndCapture(setup); + FlagEvalSchema.assertValidJson(captured.raw); + + // And spot-check the object-vs-string shape the prior code got wrong: + Map json = captured.parsed; + Map ev = firstEvent(json); + assertObjectWithKey(ev.get("variant"), "on", "variant must serialize as {\"key\":...}"); + assertObjectWithKey( + ev.get("allocation"), "alloc-x", "allocation must serialize as {\"key\":...}"); + assertObjectWithKey(ev.get("flag"), "my-flag", "flag must serialize as {\"key\":...}"); + // context must nest under "evaluation", not raw at top level (additionalProperties:false). + Map ctx = (Map) ev.get("context"); + assertNotNull(ctx, "full-tier context must be present"); + Map evalAttrs = (Map) ctx.get("evaluation"); + assertNotNull(evalAttrs, "context.evaluation must hold the attributes"); + assertEquals("us-east-1", evalAttrs.get("region")); + } + + // ---- error path serializes error as {"message":...} and validates against the schema ---- + + @Test + void errorPayloadSerializesErrorObjectAndValidatesSchema() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(errorEvent("err-flag", "type mismatch", REALISTIC_EVAL_MS)); + + CapturedJson captured = flushAndCapture(setup); + FlagEvalSchema.assertValidJson(captured.raw); + + Map json = captured.parsed; + Map ev = firstEvent(json); + Map error = (Map) ev.get("error"); + assertNotNull(error, "error must be present on the error path"); + assertEquals( + "type mismatch", error.get("message"), "error must serialize as {\"message\":...}"); + // runtime default: absent variant -> runtime_default_used true. + assertEquals(Boolean.TRUE, ev.get("runtime_default_used")); + } + + @Test + void workerSchemaRejectsTopLevelReason() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add( + event("reason-schema-flag", "on", "alloc-x", "user-1", REALISTIC_EVAL_MS, new HashMap<>())); + + CapturedJson captured = flushAndCapture(setup); + JsonNode json = JsonLoader.fromString(captured.raw).deepCopy(); + ((ObjectNode) json.get("flagEvaluations").get(0)).put("reason", "targeting_match"); + + FlagEvalSchema.assertInvalidJson(json.toString()); + } + + @Test + void flagEvalEventDoesNotCarryReason() { + boolean hasReasonField = + Arrays.stream(FlagEvalEvent.class.getDeclaredFields()) + .anyMatch(field -> field.getName().equals("reason")); + + assertFalse(hasReasonField, "EVP event snapshots must not carry hidden reason cardinality"); + } + + private static void assertObjectWithKey(Object o, String expectedKey, String msg) { + assertTrue(o instanceof Map, msg + " (must be a JSON object, not a bare string)"); + assertEquals(expectedKey, ((Map) o).get("key"), msg); + } + + @SuppressWarnings("unchecked") + private static Map firstEvent(Map batch) { + List events = (List) batch.get("flagEvaluations"); + assertNotNull(events, "flagEvaluations array must be present"); + assertFalse(events.isEmpty(), "flagEvaluations must not be empty"); + return (Map) events.get(0); + } + + @SuppressWarnings("unchecked") + private static Map eventForFlag(Map batch, String flagKey) { + List events = (List) batch.get("flagEvaluations"); + for (Object o : events) { + Map ev = (Map) o; + Map flag = (Map) ev.get("flag"); + if (flag != null && flagKey.equals(flag.get("key"))) { + return ev; + } + } + return null; + } + + static final class FlagEvalSchema { + private static final JsonSchema WORKER_SCHEMA = loadSchema(); + + static void assertValidJson(String json) { + ProcessingReport report = validate(json); + assertTrue(report.isSuccess(), "worker schema errors:\n" + report); + } + + static void assertInvalidJson(String json) { + ProcessingReport report = validate(json); + assertFalse(report.isSuccess(), "worker schema unexpectedly accepted payload"); + } + + private static ProcessingReport validate(String json) { + try { + JsonNode payload = JsonLoader.fromString(json); + return WORKER_SCHEMA.validate(payload); + } catch (IOException | ProcessingException e) { + throw new AssertionError("worker schema validation failed", e); + } + } + + private static JsonSchema loadSchema() { + try { + JsonNode schema = + JsonLoader.fromResource("/flagevaluation/batchedflagevaluations.json").deepCopy(); + stripEmptyRequired(schema); + return JsonSchemaFactory.byDefault().getJsonSchema(schema); + } catch (IOException | ProcessingException e) { + throw new ExceptionInInitializerError(e); + } + } + + private static void stripEmptyRequired(JsonNode node) { + if (node instanceof ObjectNode) { + ObjectNode object = (ObjectNode) node; + JsonNode required = object.get("required"); + if (required != null && required.isArray() && required.size() == 0) { + object.remove("required"); + } + Iterator> fields = object.fields(); + while (fields.hasNext()) { + stripEmptyRequired(fields.next().getValue()); + } + } else if (node != null && node.isArray()) { + for (JsonNode child : node) { + stripEmptyRequired(child); + } + } + } + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/test/resources/flagevaluation/batchedflagevaluations.json b/products/feature-flagging/feature-flagging-lib/src/test/resources/flagevaluation/batchedflagevaluations.json new file mode 100644 index 00000000000..b3a4b1fda67 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/test/resources/flagevaluation/batchedflagevaluations.json @@ -0,0 +1,184 @@ +{ + "title": "BatchedFlagEvaluations", + "type": "object", + "properties": { + "context": { + "title": "InputContextDatadog", + "type": "object", + "properties": { + "geo": { + "type": "object", + "properties": { + "country_iso_code": { "type": "string" }, + "country": { "type": "string" } + }, + "required": [], + "additionalProperties": false + }, + "rum": { + "type": "object", + "properties": { + "application": { + "type": "object", + "properties": { + "id": { "type": "string" } + }, + "required": [], + "additionalProperties": false + }, + "view": { + "type": "object", + "properties": { + "url": { "type": "string" } + }, + "required": [], + "additionalProperties": false + } + }, + "required": [], + "additionalProperties": false + }, + "service": { "type": "string" }, + "version": { "type": "string" }, + "env": { "type": "string" }, + "device": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "type": { "type": "string" }, + "brand": { "type": "string" }, + "model": { "type": "string" } + }, + "required": [], + "additionalProperties": false + }, + "os": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "version": { "type": "string" } + }, + "required": [], + "additionalProperties": false + } + }, + "required": [], + "additionalProperties": false + }, + "flagEvaluations": { + "type": "array", + "items": { + "type": "object", + "properties": { + "timestamp": { + "type": "integer", + "description": "The timestamp (milliseconds since epoch) at which the evaluation occurred." + }, + "flag": { + "type": "object", + "properties": { + "key": { "type": "string" } + }, + "required": ["key"], + "additionalProperties": false + }, + "first_evaluation": { + "type": "integer", + "minimum": 1759276800000 + }, + "last_evaluation": { + "type": "integer", + "minimum": 1759276800000 + }, + "evaluation_count": { + "type": "integer", + "minimum": 1 + }, + "runtime_default_used": { "type": "boolean" }, + "targeting_key": { "type": "string" }, + "context": { + "type": "object", + "properties": { + "evaluation": { "type": "object" }, + "dd": { + "type": "object", + "properties": { + "service": { "type": "string" }, + "rum": { + "type": "object", + "properties": { + "application": { + "type": "object", + "properties": { + "id": { "type": "string" } + }, + "required": [], + "additionalProperties": false + }, + "view": { + "type": "object", + "properties": { + "url": { "type": "string" } + }, + "required": [], + "additionalProperties": false + } + }, + "required": [], + "additionalProperties": false + } + }, + "required": [], + "additionalProperties": true + } + }, + "required": [], + "additionalProperties": false + }, + "variant": { + "type": "object", + "properties": { + "key": { "type": "string" } + }, + "required": ["key"], + "additionalProperties": false + }, + "allocation": { + "type": "object", + "properties": { + "key": { "type": "string" } + }, + "required": ["key"], + "additionalProperties": false + }, + "targeting_rule": { + "type": "object", + "properties": { + "key": { "type": "string" } + }, + "required": ["key"], + "additionalProperties": false + }, + "error": { + "type": "object", + "properties": { + "message": { "type": "string" } + }, + "required": ["message"], + "additionalProperties": false + } + }, + "required": [ + "timestamp", + "flag", + "first_evaluation", + "last_evaluation", + "evaluation_count" + ], + "additionalProperties": false + } + } + }, + "required": ["flagEvaluations"], + "additionalProperties": false +}