diff --git a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java index 5ac24663c4b..57bcdaf3b5a 100644 --- a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java +++ b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java @@ -972,29 +972,23 @@ void createdTimestamp() throws IOException { LongCounter counter = meter.counterBuilder("requests").build(); testClock.advance(Duration.ofMillis(1)); + long bearStartNanos = testClock.now(); counter.add(3, Attributes.builder().put("animal", "bear").build()); testClock.advance(Duration.ofMillis(1)); + long mouseStartNanos = testClock.now(); counter.add(2, Attributes.builder().put("animal", "mouse").build()); testClock.advance(Duration.ofMillis(1)); - // There is a curious difference between Prometheus and OpenTelemetry: - // In Prometheus metrics the _created timestamp is per data point, - // i.e. the _created timestamp says when this specific set of label values - // was first observed. - // In the OTel Java SDK the _created timestamp is the initialization time - // of the SdkMeterProvider, i.e. all data points will have the same _created timestamp. - // So we expect the _created timestamp to be the start time of the application, - // not the timestamp when the counter or an individual data point was created. String expected = "" + "# TYPE requests counter\n" + "requests_total{animal=\"bear\",otel_scope_name=\"test\"} 3.0\n" + "requests_created{animal=\"bear\",otel_scope_name=\"test\"} " - + createdTimestamp + + convertTimestamp(bearStartNanos) + "\n" + "requests_total{animal=\"mouse\",otel_scope_name=\"test\"} 2.0\n" + "requests_created{animal=\"mouse\",otel_scope_name=\"test\"} " - + createdTimestamp + + convertTimestamp(mouseStartNanos) + "\n" + "# TYPE target info\n" + "target_info{service_name=\"unknown_service:java\",telemetry_sdk_language=\"java\",telemetry_sdk_name=\"opentelemetry\",telemetry_sdk_version=\"1.x.x\"} 1\n" diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java index 436eeb9bbb1..34e57de3032 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java @@ -7,6 +7,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.Clock; import java.util.concurrent.TimeUnit; import java.util.function.DoubleSupplier; import org.openjdk.jmh.annotations.Benchmark; @@ -40,7 +41,7 @@ public static class ThreadState { @Setup(Level.Trial) public final void setup() { - aggregatorHandle = aggregation.getAggregator().createHandle(); + aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now()); valueSupplier = valueGen.supplier(); } diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java index 456e8e9fd94..3f22ccca84b 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java @@ -7,6 +7,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.Clock; import java.util.concurrent.TimeUnit; import java.util.function.DoubleSupplier; import org.openjdk.jmh.annotations.Benchmark; @@ -46,7 +47,7 @@ public static class ThreadState { @Setup(Level.Invocation) public final void setup() { - aggregatorHandle = aggregation.getAggregator().createHandle(); + aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now()); valueSupplier = valueGen.supplier(); } diff --git a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java index 6fba53e86cd..e06a5941c42 100644 --- a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java +++ b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java @@ -30,8 +30,8 @@ public class InstrumentGarbageCollectionBenchmarkTest { /** * This test validates that in {@link MemoryMode#REUSABLE_DATA}, any {@link - * MetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely allocates memory - * which is then subsequently garbage collected. It is done so comparatively to {@link + * MetricStorage#collect(Resource, InstrumentationScopeInfo, long)} barely allocates memory which + * is then subsequently garbage collected. It is done so comparatively to {@link * MemoryMode#IMMUTABLE_DATA}, * *

It runs the JMH test {@link InstrumentGarbageCollectionBenchmark} with GC profiler, and diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java index 01dead0ce85..8fb64ae7f38 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java @@ -132,8 +132,7 @@ Collection collectAll(RegisteredReader registeredReader, long epochN // Only invoke callbacks if meter is enabled if (meterEnabled) { for (CallbackRegistration callbackRegistration : currentRegisteredCallbacks) { - callbackRegistration.invokeCallback( - registeredReader, meterProviderSharedState.getStartEpochNanos(), epochNanos); + callbackRegistration.invokeCallback(registeredReader); } } @@ -145,10 +144,7 @@ Collection collectAll(RegisteredReader registeredReader, long epochN for (MetricStorage storage : storages) { MetricData current = storage.collect( - meterProviderSharedState.getResource(), - getInstrumentationScopeInfo(), - meterProviderSharedState.getStartEpochNanos(), - epochNanos); + meterProviderSharedState.getResource(), getInstrumentationScopeInfo(), epochNanos); // Ignore if the metric data doesn't have any data points, for example when aggregation is // Aggregation#drop() if (!current.isEmpty()) { @@ -288,6 +284,7 @@ WriteableMetricStorage registerSynchronousMetricStorage(InstrumentDescriptor ins SynchronousMetricStorage.create( reader, registeredView, + meterProviderSharedState.getClock(), instrument, meterProviderSharedState.getExemplarFilter(), meterEnabled))); @@ -317,7 +314,11 @@ SdkObservableMeasurement registerObservableMeasurement( registeredStorages.add( registry.register( AsynchronousMetricStorage.create( - reader, registeredView, instrumentDescriptor, meterEnabled))); + reader, + registeredView, + meterProviderSharedState.getClock(), + instrumentDescriptor, + meterEnabled))); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java index a4aa68b5a23..1ec856f8c05 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java @@ -72,7 +72,6 @@ public static SdkMeterProviderBuilder builder() { Resource resource, ExemplarFilterInternal exemplarFilter, ScopeConfigurator meterConfigurator) { - long startEpochNanos = clock.now(); this.registeredViews = registeredViews; this.registeredReaders = metricReaders.entrySet().stream() @@ -83,8 +82,7 @@ public static SdkMeterProviderBuilder builder() { ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews))) .collect(toList()); this.metricProducers = metricProducers; - this.sharedState = - MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos); + this.sharedState = MeterProviderSharedState.create(clock, resource, exemplarFilter); this.registry = new ComponentRegistry<>( instrumentationLibraryInfo -> @@ -99,7 +97,6 @@ public static SdkMeterProviderBuilder builder() { readerMetricProducers.add(new LeasedMetricProducer(registry, sharedState, registeredReader)); MetricReader reader = registeredReader.getReader(); reader.register(new SdkCollectionRegistration(readerMetricProducers, sharedState)); - registeredReader.setLastCollectEpochNanos(startEpochNanos); if (reader instanceof PeriodicMetricReader) { setReaderMeterProvider((PeriodicMetricReader) reader, this); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java index 79492f61c66..b5c0b28c93f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java @@ -30,12 +30,16 @@ static Aggregator drop() { } /** - * Returns a new {@link AggregatorHandle}. This MUST by used by the synchronous to aggregate - * recorded measurements during the collection cycle. + * Returns a new {@link AggregatorHandle}. Used by both synchronous and asynchronous metric + * storage to aggregate recorded measurements during the collection cycle. * + * @param creationEpochNanos the epoch timestamp (nanos) at which the handle is being created, + * stored via {@link AggregatorHandle#getCreationEpochNanos()}. Whether this value is used as + * the start timestamp of reported data points depends on the instrument and temporality — see + * {@link AggregatorHandle#getCreationEpochNanos()} for details. * @return a new {@link AggregatorHandle}. */ - AggregatorHandle createHandle(); + AggregatorHandle createHandle(long creationEpochNanos); /** * Returns a new DELTA point by computing the difference between two cumulative points. diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java index 6312e5415f7..2115d8dd3c8 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java @@ -35,13 +35,16 @@ public abstract class AggregatorHandle { private static final String UNSUPPORTED_DOUBLE_MESSAGE = "This aggregator does not support double values."; + private final long creationEpochNanos; // A reservoir of sampled exemplars for this time period. @Nullable private final DoubleExemplarReservoir doubleReservoirFactory; @Nullable private final LongExemplarReservoir longReservoirFactory; private final boolean isDoubleType; private volatile boolean valuesRecorded = false; - protected AggregatorHandle(ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) { + protected AggregatorHandle( + long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) { + this.creationEpochNanos = creationEpochNanos; this.isDoubleType = isDoubleType; if (isDoubleType) { this.doubleReservoirFactory = reservoirFactory.createDoubleExemplarReservoir(); @@ -145,4 +148,22 @@ private static S throwUnsupportedIfNull(@Nullable S value, String message) { } return value; } + + /** + * Returns the epoch timestamp (nanos) at which this handle was created. + * + *

For cumulative synchronous instruments, this is the time of the first measurement for the + * series and is used as {@link PointData#getStartEpochNanos()}. + * + *

For cumulative asynchronous instruments, this is either the instrument creation time (if the + * series first appeared during the first collection cycle) or the preceding collection interval's + * timestamp (if the series appeared in a later cycle), and is used as {@link + * PointData#getStartEpochNanos()}. + * + *

Not used for delta instruments; their start epoch is computed directly from the reader's + * last collection time or instrument creation time. + */ + public long getCreationEpochNanos() { + return creationEpochNanos; + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java index 072f79a3e1c..fe5c301f1fe 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java @@ -66,8 +66,9 @@ public DoubleBase2ExponentialHistogramAggregator( } @Override - public AggregatorHandle createHandle() { - return new Handle(reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode); + public AggregatorHandle createHandle(long creationEpochNanos) { + return new Handle( + creationEpochNanos, reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode); } @Override @@ -104,12 +105,13 @@ static final class Handle extends AggregatorHandle createHandle() { - return new Handle(boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode); + public AggregatorHandle createHandle(long creationEpochNanos) { + return new Handle( + creationEpochNanos, boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode); } @Override @@ -120,12 +121,13 @@ static final class Handle extends AggregatorHandle { @Nullable private final MutableHistogramPointData reusablePoint; Handle( + long creationEpochNanos, List boundaryList, double[] boundaries, boolean recordMinMax, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { - super(reservoirFactory, /* isDoubleType= */ true); + super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true); this.boundaryList = boundaryList; this.boundaries = boundaries; this.recordMinMax = recordMinMax; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index 1baadf0763d..0117db10f66 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -50,8 +50,8 @@ public DoubleLastValueAggregator( } @Override - public AggregatorHandle createHandle() { - return new Handle(reservoirFactory, memoryMode); + public AggregatorHandle createHandle(long creationEpochNanos) { + return new Handle(creationEpochNanos, reservoirFactory, memoryMode); } @Override @@ -99,8 +99,9 @@ static final class Handle extends AggregatorHandle { // Only used when memoryMode is REUSABLE_DATA @Nullable private final MutableDoublePointData reusablePoint; - private Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { - super(reservoirFactory, /* isDoubleType= */ true); + private Handle( + long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { + super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true); if (memoryMode == MemoryMode.REUSABLE_DATA) { reusablePoint = new MutableDoublePointData(); } else { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java index 29b444412f6..b22be360364 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java @@ -55,8 +55,8 @@ public DoubleSumAggregator( } @Override - public AggregatorHandle createHandle() { - return new Handle(reservoirFactory, memoryMode); + public AggregatorHandle createHandle(long creationEpochNanos) { + return new Handle(creationEpochNanos, reservoirFactory, memoryMode); } @Override @@ -112,8 +112,9 @@ static final class Handle extends AggregatorHandle { // Only used if memoryMode == MemoryMode.REUSABLE_DATA @Nullable private final MutableDoublePointData reusablePoint; - Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { - super(reservoirFactory, /* isDoubleType= */ true); + Handle( + long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { + super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true); reusablePoint = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableDoublePointData() : null; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java index 1813bef9fcc..b74e06a0791 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java @@ -54,7 +54,7 @@ public List getExemplars() { private static final AggregatorHandle HANDLE = new AggregatorHandle( - ExemplarReservoirFactory.noSamples(), /* isDoubleType= */ true) { + 0, ExemplarReservoirFactory.noSamples(), /* isDoubleType= */ true) { @Override protected PointData doAggregateThenMaybeResetDoubles( long startEpochNanos, @@ -75,7 +75,7 @@ protected void doRecordDouble(double value) {} private DropAggregator() {} @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle(long creationEpochNanos) { return HANDLE; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java index bb38d32f061..62acee67e96 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java @@ -47,8 +47,8 @@ public LongLastValueAggregator(ExemplarReservoirFactory reservoirFactory, Memory } @Override - public AggregatorHandle createHandle() { - return new Handle(reservoirFactory, memoryMode); + public AggregatorHandle createHandle(long creationEpochNanos) { + return new Handle(creationEpochNanos, reservoirFactory, memoryMode); } @Override @@ -95,8 +95,9 @@ static final class Handle extends AggregatorHandle { // Only used when memoryMode is REUSABLE_DATA @Nullable private final MutableLongPointData reusablePoint; - Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { - super(reservoirFactory, /* isDoubleType= */ false); + Handle( + long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { + super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false); if (memoryMode == MemoryMode.REUSABLE_DATA) { reusablePoint = new MutableLongPointData(); } else { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java index 6ea232036ff..92581f3b8d1 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java @@ -48,8 +48,8 @@ public LongSumAggregator( } @Override - public AggregatorHandle createHandle() { - return new Handle(reservoirFactory, memoryMode); + public AggregatorHandle createHandle(long creationEpochNanos) { + return new Handle(creationEpochNanos, reservoirFactory, memoryMode); } @Override @@ -105,8 +105,9 @@ static final class Handle extends AggregatorHandle { // Only used if memoryMode == MemoryMode.REUSABLE_DATA @Nullable private final MutableLongPointData reusablePointData; - Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { - super(reservoirFactory, /* isDoubleType= */ false); + Handle( + long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) { + super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false); reusablePointData = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableLongPointData() : null; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java index c56d01b9cda..bf414655695 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java @@ -26,7 +26,7 @@ public class RegisteredReader { private final int id = ID_COUNTER.incrementAndGet(); private final MetricReader metricReader; private final ViewRegistry viewRegistry; - private volatile long lastCollectEpochNanos; + private volatile long lastCollectEpochNanos = -1; /** Construct a new collection info object storing information for collection against a reader. */ public static RegisteredReader create(MetricReader reader, ViewRegistry viewRegistry) { @@ -52,13 +52,14 @@ public void setLastCollectEpochNanos(long epochNanos) { } /** - * Get the time of the last collection for the reader. + * Get the time of the last collection for the reader, or {@code defaultEpochNanos} if not set. * *

Used to compute the {@link PointData#getStartEpochNanos()} for instruments aggregations with * {@link AggregationTemporality#DELTA} temporality. */ - public long getLastCollectEpochNanos() { - return lastCollectEpochNanos; + public long getLastCollectEpochNanosOrDefault(long defaultEpochNanos) { + long lastCollectEpochNanos = this.lastCollectEpochNanos; + return lastCollectEpochNanos == -1 ? defaultEpochNanos : lastCollectEpochNanos; } /** Get the {@link ViewRegistry} for the reader. */ diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 59bfd4275bf..3b4d1ac585a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -11,6 +11,7 @@ import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.internal.ThrottlingLogger; @@ -35,10 +36,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Function; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Stores aggregated {@link MetricData} for asynchronous instruments. @@ -46,16 +47,18 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class AsynchronousMetricStorage implements MetricStorage { +public abstract class AsynchronousMetricStorage implements MetricStorage { private static final Logger logger = Logger.getLogger(AsynchronousMetricStorage.class.getName()); private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger); - private final RegisteredReader registeredReader; + protected final RegisteredReader registeredReader; private final MetricDescriptor metricDescriptor; private final AggregationTemporality aggregationTemporality; - private final Aggregator aggregator; + protected final Aggregator aggregator; private final AttributesProcessor attributesProcessor; - private final MemoryMode memoryMode; + protected final long instrumentCreationEpochNanos; + + protected final MemoryMode memoryMode; /** * This field is set to 1 less than the actual intended cardinality limit, allowing the last slot @@ -64,61 +67,34 @@ public final class AsynchronousMetricStorage implements Met private final int maxCardinality; // Handles responsible for aggregating data recorded during callbacks - private final Map> aggregatorHandles; - - // Only populated if aggregationTemporality == DELTA - private Map lastPoints; - - // Only populated if memoryMode == REUSABLE_DATA - private final ObjectPool reusablePointsPool; - private final ObjectPool> reusableHandlesPool; - private final Function> handleBuilder; - private final BiConsumer> handleReleaser; - private final BiConsumer pointReleaser; + protected final Map> aggregatorHandles; - private final List reusablePointsList = new ArrayList<>(); - // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every - // collection - private Map reusablePointsMap = new PooledHashMap<>(); - - // Time information relative to recording of data in aggregatorHandles, set while calling - // callbacks - private long startEpochNanos; - private long epochNanos; + protected final List reusablePointsList = new ArrayList<>(); private volatile boolean enabled; private AsynchronousMetricStorage( RegisteredReader registeredReader, MetricDescriptor metricDescriptor, + AggregationTemporality aggregationTemporality, Aggregator aggregator, AttributesProcessor attributesProcessor, int maxCardinality, + Clock clock, boolean enabled) { this.registeredReader = registeredReader; this.metricDescriptor = metricDescriptor; - this.aggregationTemporality = - registeredReader - .getReader() - .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); + this.aggregationTemporality = aggregationTemporality; this.memoryMode = registeredReader.getReader().getMemoryMode(); this.aggregator = aggregator; this.attributesProcessor = attributesProcessor; + this.instrumentCreationEpochNanos = clock.now(); this.maxCardinality = maxCardinality - 1; this.enabled = enabled; - this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); - this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); - this.handleBuilder = ignored -> reusableHandlesPool.borrowObject(); - this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); - this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point); - - if (memoryMode == REUSABLE_DATA) { - this.lastPoints = new PooledHashMap<>(); - this.aggregatorHandles = new PooledHashMap<>(); - } else { - this.lastPoints = new HashMap<>(); - this.aggregatorHandles = new HashMap<>(); - } + + // Concurrent hashmap only used to allow for removal during iteration during collection. + this.aggregatorHandles = + memoryMode == REUSABLE_DATA ? new PooledHashMap<>() : new ConcurrentHashMap<>(); } /** @@ -128,49 +104,62 @@ private AsynchronousMetricStorage( public static AsynchronousMetricStorage create( RegisteredReader registeredReader, RegisteredView registeredView, + Clock clock, InstrumentDescriptor instrumentDescriptor, boolean enabled) { View view = registeredView.getView(); MetricDescriptor metricDescriptor = MetricDescriptor.create(view, registeredView.getViewSourceInfo(), instrumentDescriptor); + AggregationTemporality aggregationTemporality = + registeredReader + .getReader() + .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); Aggregator aggregator = ((AggregatorFactory) view.getAggregation()) .createAggregator( instrumentDescriptor, ExemplarFilterInternal.asExemplarFilterInternal(ExemplarFilter.alwaysOff()), registeredReader.getReader().getMemoryMode()); - return new AsynchronousMetricStorage<>( - registeredReader, - metricDescriptor, - aggregator, - registeredView.getViewAttributesProcessor(), - registeredView.getCardinalityLimit(), - enabled); + AttributesProcessor attributesProcessor = registeredView.getViewAttributesProcessor(); + int cardinalityLimit = registeredView.getCardinalityLimit(); + return aggregationTemporality == AggregationTemporality.DELTA + ? new DeltaAsynchronousMetricStorage<>( + registeredReader, + metricDescriptor, + aggregator, + attributesProcessor, + cardinalityLimit, + clock, + enabled) + : new CumulativeAsynchronousMetricStorage<>( + registeredReader, + metricDescriptor, + aggregator, + attributesProcessor, + cardinalityLimit, + clock, + enabled); } /** Record callback measurement from {@link ObservableLongMeasurement}. */ void record(Attributes attributes, long value) { - attributes = validateAndProcessAttributes(attributes); - AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); + AggregatorHandle handle = getAggregatorHandle(attributes); handle.recordLong(value, attributes, Context.current()); } /** Record callback measurement from {@link ObservableDoubleMeasurement}. */ void record(Attributes attributes, double value) { - attributes = validateAndProcessAttributes(attributes); - AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); + AggregatorHandle handle = getAggregatorHandle(attributes); handle.recordDouble(value, attributes, Context.current()); } - void setEpochInformation(long startEpochNanos, long epochNanos) { - this.startEpochNanos = - aggregationTemporality == AggregationTemporality.DELTA - ? registeredReader.getLastCollectEpochNanos() - : startEpochNanos; - this.epochNanos = epochNanos; - } - - private Attributes validateAndProcessAttributes(Attributes attributes) { + private AggregatorHandle getAggregatorHandle(Attributes attributes) { + Context context = Context.current(); + attributes = attributesProcessor.process(attributes, context); + AggregatorHandle handle = aggregatorHandles.get(attributes); + if (handle != null) { + return handle; + } if (aggregatorHandles.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, @@ -179,14 +168,34 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { + " has exceeded the maximum allowed cardinality (" + maxCardinality + ")."); - return MetricStorage.CARDINALITY_OVERFLOW; + attributes = MetricStorage.CARDINALITY_OVERFLOW; + // Return handle for overflow series, first checking if a handle already exists for it + handle = aggregatorHandles.get(attributes); + if (handle != null) { + return handle; + } } + // Get handle from pool if available, else create a new one. + // Note: pooled handles (used only for delta temporality) retain their original + // creationEpochNanos, but delta storage does not use the handle's creation epoch for the + // start epoch — it uses the reader's last collect time directly in doCollect(). So the stale + // creation epoch on a recycled handle does not affect correctness. + AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); + if (newHandle == null) { + newHandle = createAggregatorHandle(); + } + handle = aggregatorHandles.putIfAbsent(attributes, newHandle); + return handle != null ? handle : newHandle; + } - Context context = Context.current(); - attributes = attributesProcessor.process(attributes, context); - return attributes; + protected AggregatorHandle createAggregatorHandle() { + return aggregator.createHandle( + registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos)); } + @Nullable + abstract AggregatorHandle maybeGetPooledAggregatorHandle(); + @Override public MetricDescriptor getMetricDescriptor() { return metricDescriptor; @@ -197,20 +206,15 @@ public RegisteredReader getRegisteredReader() { return registeredReader; } + @Override + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + @Override public MetricData collect( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - long startEpochNanos, - long epochNanos) { - Collection result = - aggregationTemporality == AggregationTemporality.DELTA - ? collectWithDeltaAggregationTemporality() - : collectWithCumulativeAggregationTemporality(); - - // collectWith*AggregationTemporality() methods are responsible for resetting the handle - aggregatorHandles.forEach(handleReleaser); - aggregatorHandles.clear(); + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { + Collection result = doCollect(epochNanos); return enabled ? aggregator.toMetricData( @@ -218,106 +222,194 @@ public MetricData collect( : EmptyMetricData.getInstance(); } - private Collection collectWithDeltaAggregationTemporality() { - Map currentPoints; - if (memoryMode == REUSABLE_DATA) { - // deltaPoints computed in the previous collection can be released - reusablePointsList.forEach(reusablePointsPool::returnObject); - reusablePointsList.clear(); + abstract Collection doCollect(long epochNanos); + + private static final class DeltaAsynchronousMetricStorage + extends AsynchronousMetricStorage { + // This reference and lastPoints will be swapped at every collection + private Map reusablePointsMap = new PooledHashMap<>(); + private Map lastPoints; + + private final ObjectPool reusablePointsPool; + private final ObjectPool> reusableHandlesPool; + + DeltaAsynchronousMetricStorage( + RegisteredReader registeredReader, + MetricDescriptor metricDescriptor, + Aggregator aggregator, + AttributesProcessor attributesProcessor, + int maxCardinality, + Clock clock, + boolean enabled) { + super( + registeredReader, + metricDescriptor, + AggregationTemporality.DELTA, + aggregator, + attributesProcessor, + maxCardinality, + clock, + enabled); + + this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); + this.reusableHandlesPool = new ObjectPool<>(this::createAggregatorHandle); + // Concurrent hashmap only used to allow for removal during iteration during collection. + this.lastPoints = + memoryMode == REUSABLE_DATA ? new PooledHashMap<>() : new ConcurrentHashMap<>(); + } - currentPoints = reusablePointsMap; - } else { - currentPoints = new HashMap<>(); + @Nullable + @Override + AggregatorHandle maybeGetPooledAggregatorHandle() { + return reusableHandlesPool.borrowObject(); } - aggregatorHandles.forEach( - (attributes, handle) -> { - T point = - handle.aggregateThenMaybeReset( - this.startEpochNanos, this.epochNanos, attributes, /* reset= */ true); - - T pointForCurrentPoints; - if (memoryMode == REUSABLE_DATA) { - // AggregatorHandle is going to modify the point eventually, but we must persist its - // value to used it at the next collection (within lastPoints). Thus, we make a copy. - pointForCurrentPoints = reusablePointsPool.borrowObject(); - aggregator.copyPoint(point, pointForCurrentPoints); - } else { - pointForCurrentPoints = point; - } - currentPoints.put(attributes, pointForCurrentPoints); - }); - - List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); - currentPoints.forEach( - (attributes, currentPoint) -> { - T lastPoint = lastPoints.remove(attributes); - - T deltaPoint; - if (lastPoint == null) { + @Override + Collection doCollect(long epochNanos) { + Map currentPoints; + if (memoryMode == REUSABLE_DATA) { + // deltaPoints computed in the previous collection can be released + reusablePointsList.forEach(reusablePointsPool::returnObject); + reusablePointsList.clear(); + + currentPoints = reusablePointsMap; + } else { + currentPoints = new HashMap<>(); + } + + // Start time for asynchronous delta instruments is the time of the last collection, or if no + // collection has yet taken place, the time the instrument was created. + long startEpochNanos = + registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos); + + aggregatorHandles.forEach( + (attributes, handle) -> { + T point = + handle.aggregateThenMaybeReset( + startEpochNanos, epochNanos, attributes, /* reset= */ true); + + T pointForCurrentPoints; if (memoryMode == REUSABLE_DATA) { - // All deltaPoints are released at the end of the collection. Thus, we need a copy - // to make sure currentPoint can still be used within lastPoints during the next - // collection. - deltaPoint = reusablePointsPool.borrowObject(); - aggregator.copyPoint(currentPoint, deltaPoint); + // AggregatorHandle is going to modify the point eventually, but we must persist its + // value to used it at the next collection (within lastPoints). Thus, we make a copy. + pointForCurrentPoints = reusablePointsPool.borrowObject(); + aggregator.copyPoint(point, pointForCurrentPoints); } else { - deltaPoint = currentPoint; + pointForCurrentPoints = point; } - } else { - if (memoryMode == REUSABLE_DATA) { - aggregator.diffInPlace(lastPoint, currentPoint); - deltaPoint = lastPoint; + currentPoints.put(attributes, pointForCurrentPoints); + }); + + List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>(); + currentPoints.forEach( + (attributes, currentPoint) -> { + T lastPoint = lastPoints.remove(attributes); + + T deltaPoint; + if (lastPoint == null) { + if (memoryMode == REUSABLE_DATA) { + // All deltaPoints are released at the end of the collection. Thus, we need a copy + // to make sure currentPoint can still be used within lastPoints during the next + // collection. + deltaPoint = reusablePointsPool.borrowObject(); + aggregator.copyPoint(currentPoint, deltaPoint); + } else { + deltaPoint = currentPoint; + } } else { - deltaPoint = aggregator.diff(lastPoint, currentPoint); + if (memoryMode == REUSABLE_DATA) { + aggregator.diffInPlace(lastPoint, currentPoint); + deltaPoint = lastPoint; + } else { + deltaPoint = aggregator.diff(lastPoint, currentPoint); + } } - } - deltaPoints.add(deltaPoint); - }); - - if (memoryMode == REUSABLE_DATA) { - // - If the point was used to compute a delta, it's now in deltaPoints (and thus in - // reusablePointsList) - // - If the point hasn't been used, it's still in lastPoints and can be returned - lastPoints.forEach(pointReleaser); - lastPoints.clear(); - - Map tmp = lastPoints; - lastPoints = reusablePointsMap; - reusablePointsMap = tmp; - } else { - lastPoints = currentPoints; + deltaPoints.add(deltaPoint); + }); + + if (memoryMode == REUSABLE_DATA) { + // - If the point was used to compute a delta, it's now in deltaPoints (and thus in + // reusablePointsList) + // - If the point hasn't been used, it's still in lastPoints and can be returned + lastPoints.forEach((attributes, point) -> reusablePointsPool.returnObject(point)); + lastPoints.clear(); + + Map tmp = lastPoints; + lastPoints = reusablePointsMap; + reusablePointsMap = tmp; + } else { + lastPoints = currentPoints; + } + + aggregatorHandles.forEach( + (unused, aggregatorHandle) -> reusableHandlesPool.returnObject(aggregatorHandle)); + aggregatorHandles.clear(); + + return deltaPoints; } - - return deltaPoints; } - private Collection collectWithCumulativeAggregationTemporality() { - List currentPoints; - if (memoryMode == REUSABLE_DATA) { - // We should not return the points in this list to the pool, they belong to the - // AggregatorHandle - reusablePointsList.clear(); - currentPoints = reusablePointsList; - } else { - currentPoints = new ArrayList<>(); + private static final class CumulativeAsynchronousMetricStorage + extends AsynchronousMetricStorage { + CumulativeAsynchronousMetricStorage( + RegisteredReader registeredReader, + MetricDescriptor metricDescriptor, + Aggregator aggregator, + AttributesProcessor attributesProcessor, + int maxCardinality, + Clock clock, + boolean enabled) { + super( + registeredReader, + metricDescriptor, + AggregationTemporality.CUMULATIVE, + aggregator, + attributesProcessor, + maxCardinality, + clock, + enabled); } - aggregatorHandles.forEach( - (attributes, handle) -> { - T value = - handle.aggregateThenMaybeReset( - AsynchronousMetricStorage.this.startEpochNanos, - AsynchronousMetricStorage.this.epochNanos, - attributes, - /* reset= */ true); - currentPoints.add(value); - }); - return currentPoints; - } + @Nullable + @Override + AggregatorHandle maybeGetPooledAggregatorHandle() { + return null; + } - @Override - public void setEnabled(boolean enabled) { - this.enabled = enabled; + @Override + Collection doCollect(long epochNanos) { + List currentPoints; + if (memoryMode == REUSABLE_DATA) { + // We should not return the points in this list to the pool, they belong to the + // AggregatorHandle + reusablePointsList.clear(); + currentPoints = reusablePointsList; + } else { + currentPoints = new ArrayList<>(); + } + + // Asynchronous instruments manage their own state. If they stop reporting a measurement for a + // collection, the series ends. We retain aggregator handles across collections to allow + // series to report a consistent start time for their lifetime. While collecting, remove any + // series without measurements this collection. + // Start time for cumulative asynchronous instruments is: + // - The instrument creation time if the series first appeared during the first collection + // cycle + // - Otherwise, the preceding collection interval's timestamp + // This logic is handled in AggregatorHandle creation via #createAggregatorHandle() + aggregatorHandles.forEach( + (attributes, handle) -> { + if (!handle.hasRecordedValues()) { + aggregatorHandles.remove(attributes); + return; + } + T value = + handle.aggregateThenMaybeReset( + handle.getCreationEpochNanos(), epochNanos, attributes, /* reset= */ true); + currentPoints.add(value); + }); + + return currentPoints; + } } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java index b603de7fc21..5eb7f0b1eb6 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java @@ -56,8 +56,8 @@ private CallbackRegistration( * *

The {@code observableMeasurements} define the set of measurements the {@code runnable} may * record to. The active reader of each {@code observableMeasurements} is set via {@link - * SdkObservableMeasurement#setActiveReader(RegisteredReader, long, long)} before {@code runnable} - * is called, and set to {@code null} afterwards. + * SdkObservableMeasurement#setActiveReader(RegisteredReader)} before {@code runnable} is called, + * and set to {@code null} afterwards. * * @param observableMeasurements the measurements that the runnable may record to * @param runnable the callback @@ -73,7 +73,7 @@ public String toString() { return "CallbackRegistration{instrumentDescriptors=" + instrumentDescriptors + "}"; } - public void invokeCallback(RegisteredReader reader, long startEpochNanos, long epochNanos) { + public void invokeCallback(RegisteredReader reader) { // Return early if no storages are registered if (!hasStorages) { return; @@ -81,8 +81,7 @@ public void invokeCallback(RegisteredReader reader, long startEpochNanos, long e // Set the active reader on each observable measurement so that measurements are only recorded // to relevant storages observableMeasurements.forEach( - observableMeasurement -> - observableMeasurement.setActiveReader(reader, startEpochNanos, epochNanos)); + observableMeasurement -> observableMeasurement.setActiveReader(reader)); // Restore the context class loader that was active when the callback was registered. Thread currentThread = Thread.currentThread(); ClassLoader previousContextClassLoader = currentThread.getContextClassLoader(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index a746c00dec0..a44508d6659 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -12,6 +12,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.internal.ThrottlingLogger; @@ -49,6 +50,7 @@ public abstract class DefaultSynchronousMetricStorage private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); private final AttributesProcessor attributesProcessor; + protected final Clock clock; protected final MetricDescriptor metricDescriptor; protected final Aggregator aggregator; @@ -64,11 +66,13 @@ private DefaultSynchronousMetricStorage( MetricDescriptor metricDescriptor, Aggregator aggregator, AttributesProcessor attributesProcessor, + Clock clock, int maxCardinality, boolean enabled) { this.metricDescriptor = metricDescriptor; this.aggregator = aggregator; this.attributesProcessor = attributesProcessor; + this.clock = clock; this.maxCardinality = maxCardinality - 1; this.enabled = enabled; } @@ -79,6 +83,7 @@ static DefaultSynchronousMetricStorage create( Aggregator aggregator, AttributesProcessor processor, int maxCardinality, + Clock clock, boolean enabled) { AggregationTemporality aggregationTemporality = reader.getReader().getAggregationTemporality(descriptor.getSourceInstrument().getType()); @@ -87,11 +92,12 @@ static DefaultSynchronousMetricStorage create( descriptor, aggregator, processor, + clock, maxCardinality, enabled, reader.getReader().getMemoryMode()) : new DeltaSynchronousMetricStorage<>( - reader, descriptor, aggregator, processor, maxCardinality, enabled); + reader, descriptor, aggregator, processor, clock, maxCardinality, enabled); } @Override @@ -160,9 +166,13 @@ protected AggregatorHandle getAggregatorHandle( } } // Get handle from pool if available, else create a new one. + // Note: pooled handles (used only for delta temporality) retain their original + // creationEpochNanos, but delta storage does not use the handle's creation time for the + // start epoch — it uses the reader's last collect time directly in collect(). So the stale + // creation time on a recycled handle does not affect correctness. AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); if (newHandle == null) { - newHandle = aggregator.createHandle(); + newHandle = aggregator.createHandle(clock.now()); } handle = aggregatorHandles.putIfAbsent(attributes, newHandle); return handle != null ? handle : newHandle; @@ -178,6 +188,7 @@ public MetricDescriptor getMetricDescriptor() { private static class DeltaSynchronousMetricStorage extends DefaultSynchronousMetricStorage { + private final long instrumentCreationEpochNanos; private final RegisteredReader registeredReader; private final MemoryMode memoryMode; @@ -195,9 +206,11 @@ private static class DeltaSynchronousMetricStorage MetricDescriptor metricDescriptor, Aggregator aggregator, AttributesProcessor attributesProcessor, + Clock clock, int maxCardinality, boolean enabled) { - super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled); + super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled); + this.instrumentCreationEpochNanos = clock.now(); this.registeredReader = registeredReader; this.memoryMode = registeredReader.getReader().getMemoryMode(); } @@ -262,10 +275,7 @@ private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { @Override public MetricData collect( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - long startEpochNanos, - long epochNanos) { + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { ConcurrentHashMap> aggregatorHandles; AggregatorHolder holder = this.aggregatorHolder; this.aggregatorHolder = @@ -316,6 +326,11 @@ public MetricData collect( } } + // Start time for synchronous delta instruments is the time of the last collection, or if no + // collection has yet taken place, the time the instrument was created. + long startEpochNanos = + registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos); + // Grab aggregated points. aggregatorHandles.forEach( (attributes, handle) -> { @@ -324,10 +339,7 @@ public MetricData collect( } T point = handle.aggregateThenMaybeReset( - registeredReader.getLastCollectEpochNanos(), - epochNanos, - attributes, - /* reset= */ true); + startEpochNanos, epochNanos, attributes, /* reset= */ true); if (memoryMode == IMMUTABLE_DATA) { // Return the aggregator to the pool. @@ -402,10 +414,11 @@ private static class CumulativeSynchronousMetricStorage MetricDescriptor metricDescriptor, Aggregator aggregator, AttributesProcessor attributesProcessor, + Clock clock, int maxCardinality, boolean enabled, MemoryMode memoryMode) { - super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled); + super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled); this.memoryMode = memoryMode; } @@ -430,10 +443,7 @@ AggregatorHandle maybeGetPooledAggregatorHandle() { @Override public MetricData collect( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - long startEpochNanos, - long epochNanos) { + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { List points; if (memoryMode == REUSABLE_DATA) { reusableResultList.clear(); @@ -448,9 +458,11 @@ public MetricData collect( if (!handle.hasRecordedValues()) { return; } + // Start time for cumulative synchronous instruments is the time the first series + // measurement was recorded. I.e. the time the AggregatorHandle was created. T point = handle.aggregateThenMaybeReset( - startEpochNanos, epochNanos, attributes, /* reset= */ false); + handle.getCreationEpochNanos(), epochNanos, attributes, /* reset= */ false); if (point != null) { points.add(point); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java index 9a0ed207877..c8580e79f61 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java @@ -27,10 +27,7 @@ public MetricDescriptor getMetricDescriptor() { @Override public MetricData collect( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - long startEpochNanos, - long epochNanos) { + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { return EmptyMetricData.getInstance(); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java index 1e42b854463..0cd4895f068 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java @@ -23,9 +23,9 @@ public abstract class MeterProviderSharedState { public static MeterProviderSharedState create( - Clock clock, Resource resource, ExemplarFilterInternal exemplarFilter, long startEpochNanos) { + Clock clock, Resource resource, ExemplarFilterInternal exemplarFilter) { MeterProviderSharedState sharedState = - new AutoValue_MeterProviderSharedState(clock, resource, startEpochNanos, exemplarFilter); + new AutoValue_MeterProviderSharedState(clock, resource, exemplarFilter); return sharedState; } @@ -37,9 +37,6 @@ public static MeterProviderSharedState create( /** Returns the {@link Resource} to attach telemetry to. */ public abstract Resource getResource(); - /** Returns the timestamp when the {@link SdkMeterProvider} was started, in epoch nanos. */ - public abstract long getStartEpochNanos(); - /** Returns the {@link ExemplarFilterInternal} for remembering synchronous measurements. */ public abstract ExemplarFilterInternal getExemplarFilter(); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java index 6bada056d8a..b4ff10922fc 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java @@ -38,15 +38,11 @@ public interface MetricStorage { * * @param resource The resource associated with the metrics. * @param instrumentationScopeInfo The instrumentation scope generating the metrics. - * @param startEpochNanos The start timestamp for this SDK. * @param epochNanos The timestamp for this collection. * @return The {@link MetricData} from this collection period. */ MetricData collect( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - long startEpochNanos, - long epochNanos); + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos); void setEnabled(boolean enabled); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java index b16f396d57a..db8ef8c5bb1 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java @@ -70,19 +70,11 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() { * Set the active reader, and clock information. {@link #unsetActiveReader()} MUST be called * after. */ - public void setActiveReader( - RegisteredReader registeredReader, long startEpochNanos, long epochNanos) { + public void setActiveReader(RegisteredReader registeredReader) { this.activeReader = registeredReader; - for (AsynchronousMetricStorage storage : storages) { - if (storage.getRegisteredReader().equals(activeReader)) { - storage.setEpochInformation(startEpochNanos, epochNanos); - } - } } - /** - * Unset the active reader. Called after {@link #setActiveReader(RegisteredReader, long, long)}. - */ + /** Unset the active reader. Called after {@link #setActiveReader(RegisteredReader)}. */ public void unsetActiveReader() { this.activeReader = null; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java index dc23f89c4d4..38c9552cbe0 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.metrics.internal.state; +import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; @@ -38,6 +39,7 @@ static SynchronousMetricStorage empty() { static SynchronousMetricStorage create( RegisteredReader registeredReader, RegisteredView registeredView, + Clock clock, InstrumentDescriptor instrumentDescriptor, ExemplarFilterInternal exemplarFilter, boolean enabled) { @@ -58,6 +60,7 @@ static SynchronousMetricStorage create( aggregator, registeredView.getViewAttributesProcessor(), registeredView.getCardinalityLimit(), + clock, enabled); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java index 6c54636ee4f..14ce8e8609e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java @@ -23,8 +23,7 @@ class InstrumentBuilderTest { MeterProviderSharedState.create( TestClock.create(), Resource.getDefault(), - asExemplarFilterInternal(ExemplarFilter.alwaysOff()), - 0); + asExemplarFilterInternal(ExemplarFilter.alwaysOff())); static final InstrumentationScopeInfo SCOPE = InstrumentationScopeInfo.create("scope-name"); public static final SdkMeter SDK_METER = new SdkMeter( diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java index 34882efc751..5f421a1c3b0 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java @@ -69,6 +69,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); doubleCounter.add(12d, Attributes.empty()); doubleCounter.add(12d); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( metric -> diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java index 740775e766f..c39558ce978 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java @@ -103,6 +103,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); doubleGauge.set(12d, Attributes.empty()); doubleGauge.set(13d); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(cumulativeReader.collectAllMetrics()) .satisfiesExactly( metric -> diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java index c06948acf88..409607050fb 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java @@ -74,6 +74,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); doubleHistogram.record(12d, Attributes.empty()); doubleHistogram.record(12d); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( metric -> @@ -203,8 +204,10 @@ void collectMetrics_ExponentialHistogramAggregation() { .build(); testClock.advance(Duration.ofNanos(SECOND_NANOS)); doubleHistogram.record(12d, Attributes.builder().put("key", "value").build()); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); doubleHistogram.record(12d); doubleHistogram.record(13d); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( metric -> @@ -243,7 +246,7 @@ void collectMetrics_ExponentialHistogramAggregation() { .hasCounts(Collections.emptyList())), point -> point - .hasStartEpochNanos(testClock.now() - SECOND_NANOS) + .hasStartEpochNanos(testClock.now() - 2 * SECOND_NANOS) .hasEpochNanos(testClock.now()) .hasAttributes( Attributes.builder().put("key", "value").build()) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java index a15dcf422cc..244d3d3c5e2 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java @@ -68,6 +68,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); doubleUpDownCounter.add(12d, Attributes.empty()); doubleUpDownCounter.add(12d); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( metric -> diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java index 160a07b9e48..fbfe9fb1803 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java @@ -62,6 +62,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); longCounter.add(12, Attributes.empty()); longCounter.add(12); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( metric -> diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java index 3879f2e8a76..98d464a019a 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java @@ -92,6 +92,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); longGauge.set(12, Attributes.empty()); longGauge.set(13); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(cumulativeReader.collectAllMetrics()) .satisfiesExactly( metric -> diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java index 1f0321f55bf..fabff7b312d 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java @@ -74,6 +74,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); longHistogram.record(12, Attributes.empty()); longHistogram.record(12); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(reader.collectAllMetrics()) .satisfiesExactly( metric -> @@ -376,8 +377,10 @@ void collectMetrics_ExponentialHistogramAggregation() { .build(); testClock.advance(Duration.ofNanos(SECOND_NANOS)); longHistogram.record(12L, Attributes.builder().put("key", "value").build()); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); longHistogram.record(12L); longHistogram.record(13L); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(reader.collectAllMetrics()) .satisfiesExactly( metric -> @@ -416,7 +419,7 @@ void collectMetrics_ExponentialHistogramAggregation() { .hasCounts(Collections.emptyList())), point -> point - .hasStartEpochNanos(testClock.now() - SECOND_NANOS) + .hasStartEpochNanos(testClock.now() - 2 * SECOND_NANOS) .hasEpochNanos(testClock.now()) .hasAttributes( Attributes.builder().put("key", "value").build()) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java index 39eb6500207..63bb43a1468 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java @@ -61,6 +61,7 @@ void collectMetrics_WithEmptyAttributes() { testClock.advance(Duration.ofNanos(SECOND_NANOS)); longUpDownCounter.add(12, Attributes.empty()); longUpDownCounter.add(12); + testClock.advance(Duration.ofNanos(SECOND_NANOS)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( metric -> diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java index 5a96360ceb9..778aa5fd375 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java @@ -39,6 +39,14 @@ class AggregatorHandleTest { @Mock DoubleExemplarReservoir doubleReservoir; @Mock LongExemplarReservoir longReservoir; + @Test + void testCreationTimeEpochNanos() { + long creationEpochNanos = 12345L; + TestDoubleAggregatorHandle handle = + new TestDoubleAggregatorHandle(doubleReservoir, creationEpochNanos); + assertThat(handle.getCreationEpochNanos()).isEqualTo(creationEpochNanos); + } + @Test void testRecordings() { TestLongAggregatorHandle testLongAggregator = new TestLongAggregatorHandle(longReservoir); @@ -102,14 +110,18 @@ void testGenerateExemplarsOnCollect() { private static class TestDoubleAggregatorHandle extends TestAggregatorHandle { TestDoubleAggregatorHandle(DoubleExemplarReservoir reservoir) { - super(reservoir, null, /* isDoubleType= */ true); + super(reservoir, null, /* isDoubleType= */ true, /* creationEpochNanos= */ 0); + } + + TestDoubleAggregatorHandle(DoubleExemplarReservoir reservoir, long creationEpochNanos) { + super(reservoir, null, /* isDoubleType= */ true, creationEpochNanos); } } private static class TestLongAggregatorHandle extends TestAggregatorHandle { TestLongAggregatorHandle(LongExemplarReservoir reservoir) { - super(null, reservoir, /* isDoubleType= */ false); + super(null, reservoir, /* isDoubleType= */ false, /* creationEpochNanos= */ 0); } } @@ -121,8 +133,10 @@ private abstract static class TestAggregatorHandle extends AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(0); assertThat(handle).isInstanceOf(DoubleBase2ExponentialHistogramAggregator.Handle.class); ExponentialHistogramPointData point = ((DoubleBase2ExponentialHistogramAggregator.Handle) handle) @@ -127,7 +127,7 @@ void createHandle(MemoryMode memoryMode) { void testRecordings(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(0.5, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(1.0, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(12.0, Attributes.empty(), Context.current()); @@ -172,7 +172,7 @@ void testRecordings(MemoryMode memoryMode) { void testInvalidRecording(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); // Non-finite recordings should be ignored aggregatorHandle.recordDouble(Double.POSITIVE_INFINITY, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(Double.NEGATIVE_INFINITY, Attributes.empty(), Context.current()); @@ -189,7 +189,7 @@ void testInvalidRecording(MemoryMode memoryMode) { @ParameterizedTest @MethodSource("provideAggregator") void testRecordingsAtLimits(DoubleBase2ExponentialHistogramAggregator aggregator) { - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(Double.MIN_VALUE, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(Double.MAX_VALUE, Attributes.empty(), Context.current()); @@ -251,7 +251,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { List exemplars = Collections.singletonList(exemplar); Mockito.when(reservoir.collectAndResetDoubles(Attributes.empty())).thenReturn(exemplars); - AggregatorHandle aggregatorHandle = agg.createHandle(); + AggregatorHandle aggregatorHandle = agg.createHandle(0); aggregatorHandle.recordDouble(0, attributes, Context.root()); assertThat( @@ -267,7 +267,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { void aggregateThenMaybeReset(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(5.0, Attributes.empty(), Context.current()); assertThat( @@ -284,7 +284,7 @@ void aggregateThenMaybeReset(MemoryMode memoryMode) { void testInsert1M(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(0); int n = 1024 * 1024 - 1; double min = 16.0 / n; @@ -310,7 +310,7 @@ void testDownScale(MemoryMode memoryMode) { initialize(memoryMode); DoubleBase2ExponentialHistogramAggregator.Handle handle = - (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(); + (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0); // record a measurement to initialize positive buckets handle.recordDouble(0.5, Attributes.empty(), Context.current()); @@ -357,7 +357,7 @@ void testToMetricData(MemoryMode memoryMode) { reservoirFactory, 160, MAX_SCALE, /* recordMinMax= */ true, memoryMode); AggregatorHandle aggregatorHandle = - cumulativeAggregator.createHandle(); + cumulativeAggregator.createHandle(0); aggregatorHandle.recordDouble(0, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(0, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(123.456, Attributes.empty(), Context.current()); @@ -422,7 +422,7 @@ void testToMetricData(MemoryMode memoryMode) { void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException { initialize(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); ImmutableList updates = ImmutableList.of(0D, 0.1D, -0.1D, 1D, -1D, 100D); int numberOfThreads = updates.size(); int numberOfUpdates = 10000; @@ -492,7 +492,7 @@ public void verifyMutableDataUsedInReusableDataMemoryMode() { initialize(MemoryMode.REUSABLE_DATA); DoubleBase2ExponentialHistogramAggregator.Handle handle = - (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(); + (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0); // record a measurement to initialize positive buckets handle.recordDouble(0.5, Attributes.empty(), Context.current()); @@ -525,7 +525,7 @@ public void verifyImmutableDataUsedInImmutableDataMemoryMode() { initialize(MemoryMode.IMMUTABLE_DATA); DoubleBase2ExponentialHistogramAggregator.Handle handle = - (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(); + (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0); // record a measurement to initialize positive buckets handle.recordDouble(0.5, Attributes.empty(), Context.current()); @@ -548,7 +548,7 @@ public void reusablePoint_emptyFirstThenRecordAndCheck() { initialize(MemoryMode.REUSABLE_DATA); DoubleBase2ExponentialHistogramAggregator.Handle handle = - (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(); + (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0); // Let's create a point without buckets ExponentialHistogramPointData point = @@ -581,7 +581,7 @@ void testRecordMinMaxDisabled(MemoryMode memoryMode) { DoubleBase2ExponentialHistogramAggregator aggregator = new DoubleBase2ExponentialHistogramAggregator( ExemplarReservoirFactory.noSamples(), 160, 20, /* recordMinMax= */ false, memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(0.5, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(1.0, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(12.0, Attributes.empty(), Context.current()); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java index 66fd7dd43b6..0d12083f306 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java @@ -77,7 +77,7 @@ private void init(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void createHandle(MemoryMode memoryMode) { init(memoryMode); - assertThat(aggregator.createHandle()) + assertThat(aggregator.createHandle(0)) .isInstanceOf(DoubleExplicitBucketHistogramAggregator.Handle.class); } @@ -85,7 +85,7 @@ void createHandle(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void testRecordings(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(20, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(5, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(150, Attributes.empty(), Context.current()); @@ -125,7 +125,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { DoubleExplicitBucketHistogramAggregator aggregator = new DoubleExplicitBucketHistogramAggregator( boundaries, /* recordMinMax= */ true, reservoirFactory, memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(0, attributes, Context.root()); assertThat( aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)) @@ -148,7 +148,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(100, Attributes.empty(), Context.current()); assertThat( @@ -187,7 +187,7 @@ void aggregateThenMaybeReset(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(10, Attributes.empty(), Context.current()); MetricData metricData = @@ -257,7 +257,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void testHistogramCounts(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(1.1, Attributes.empty(), Context.current()); HistogramPointData point = aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true); @@ -268,7 +268,7 @@ void testHistogramCounts(MemoryMode memoryMode) { @Test void testReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(10, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(20, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(30, Attributes.empty(), Context.current()); @@ -297,7 +297,7 @@ void testRecordMinMaxDisabled(MemoryMode memoryMode) { /* recordMinMax= */ false, ExemplarReservoirFactory.noSamples(), memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(20, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(5, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(150, Attributes.empty(), Context.current()); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java index ee8f3645a93..0828b93de25 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java @@ -49,7 +49,7 @@ private void init(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void createHandle(MemoryMode memoryMode) { init(memoryMode); - assertThat(aggregator.createHandle()).isInstanceOf(DoubleLastValueAggregator.Handle.class); + assertThat(aggregator.createHandle(0)).isInstanceOf(DoubleLastValueAggregator.Handle.class); } @ParameterizedTest @@ -57,7 +57,7 @@ void createHandle(MemoryMode memoryMode) { void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current()); assertThat( aggregatorHandle @@ -78,7 +78,7 @@ void multipleRecords(MemoryMode memoryMode) { void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(13.1, Attributes.empty(), Context.current()); assertThat( @@ -228,7 +228,7 @@ void copyPoint(MemoryMode memoryMode) { void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(10, Attributes.empty(), Context.current()); MetricData metricData = @@ -258,7 +258,7 @@ void toMetricData(MemoryMode memoryMode) { @Test void testReusableDataOnCollect() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(0); handle.recordDouble(1, Attributes.empty(), Context.current()); DoublePointData pointData = handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ false); @@ -279,7 +279,7 @@ void testReusableDataOnCollect() { @Test void testNullPointerExceptionWhenUnset() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(0); assertThatThrownBy( () -> handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ true)) .isInstanceOf(NullPointerException.class); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java index 4afd1672bc5..1e4353b21fc 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java @@ -84,14 +84,14 @@ private void init(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void createHandle(MemoryMode memoryMode) { init(memoryMode); - assertThat(aggregator.createHandle()).isInstanceOf(DoubleSumAggregator.Handle.class); + assertThat(aggregator.createHandle(0)).isInstanceOf(DoubleSumAggregator.Handle.class); } @ParameterizedTest @EnumSource(MemoryMode.class) void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current()); @@ -108,7 +108,7 @@ void multipleRecords(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void multipleRecords_WithNegatives(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(12, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(12, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(-23, Attributes.empty(), Context.current()); @@ -126,7 +126,7 @@ void multipleRecords_WithNegatives(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(13, Attributes.empty(), Context.current()); aggregatorHandle.recordDouble(12, Attributes.empty(), Context.current()); @@ -172,7 +172,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { Advice.empty()), reservoirFactory, memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(0, attributes, Context.root()); assertThat( aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)) @@ -315,7 +315,7 @@ void copyPoint(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(10, Attributes.empty(), Context.current()); MetricData metricData = @@ -375,7 +375,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) { @Test void sameObjectReturnedOnReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordDouble(1.0, Attributes.empty(), Context.current()); DoublePointData firstCollection = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java index 1cf98c58aca..dc24e8011be 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java @@ -50,14 +50,14 @@ private void init(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void createHandle(MemoryMode memoryMode) { init(memoryMode); - assertThat(aggregator.createHandle()).isInstanceOf(LongLastValueAggregator.Handle.class); + assertThat(aggregator.createHandle(0)).isInstanceOf(LongLastValueAggregator.Handle.class); } @ParameterizedTest @EnumSource(MemoryMode.class) void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(12, Attributes.empty(), Context.current()); assertThat( aggregatorHandle @@ -77,7 +77,7 @@ void multipleRecords(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(13, Attributes.empty(), Context.current()); assertThat( @@ -190,7 +190,7 @@ void copyPoint(MemoryMode memoryMode) { void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(10, Attributes.empty(), Context.current()); MetricData metricData = @@ -218,7 +218,7 @@ void toMetricData(MemoryMode memoryMode) { @Test void testReusablePointOnCollect() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(0); handle.recordLong(1, Attributes.empty(), Context.current()); LongPointData pointData = handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ false); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java index d2742050ea0..b984731190c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java @@ -83,14 +83,14 @@ private void init(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void createHandle(MemoryMode memoryMode) { init(memoryMode); - assertThat(aggregator.createHandle()).isInstanceOf(LongSumAggregator.Handle.class); + assertThat(aggregator.createHandle(0)).isInstanceOf(LongSumAggregator.Handle.class); } @ParameterizedTest @EnumSource(MemoryMode.class) void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(12, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(12, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(12, Attributes.empty(), Context.current()); @@ -107,7 +107,7 @@ void multipleRecords(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void multipleRecords_WithNegatives(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(12, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(12, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(-23, Attributes.empty(), Context.current()); @@ -125,7 +125,7 @@ void multipleRecords_WithNegatives(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(13, Attributes.empty(), Context.current()); aggregatorHandle.recordLong(12, Attributes.empty(), Context.current()); @@ -171,7 +171,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { Advice.empty()), reservoirFactory, memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(0, attributes, Context.root()); assertThat( aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)) @@ -313,7 +313,7 @@ void copyPoint(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(10, Attributes.empty(), Context.current()); MetricData metricData = @@ -374,7 +374,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) { @Test void sameObjectReturnedOnReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(0); aggregatorHandle.recordLong(1L, Attributes.empty(), Context.current()); LongPointData firstCollection = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java index e151876e6a9..a0a4cf7125b 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java @@ -38,13 +38,13 @@ void getReader() { } @Test - void setAndGetLastCollectEpochNanos() { + void getLastCollectEpochNanosOrDefault() { RegisteredReader registeredReader = RegisteredReader.create(reader, ViewRegistry.create()); - assertThat(registeredReader.getLastCollectEpochNanos()).isEqualTo(0); + assertThat(registeredReader.getLastCollectEpochNanosOrDefault(4)).isEqualTo(4); registeredReader.setLastCollectEpochNanos(1); - assertThat(registeredReader.getLastCollectEpochNanos()).isEqualTo(1); + assertThat(registeredReader.getLastCollectEpochNanosOrDefault(4)).isEqualTo(1); registeredReader.setLastCollectEpochNanos(5); - assertThat(registeredReader.getLastCollectEpochNanos()).isEqualTo(5); + assertThat(registeredReader.getLastCollectEpochNanosOrDefault(5)).isEqualTo(5); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 860307d4777..e50539d52bf 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -34,6 +34,8 @@ import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.time.TestClock; +import java.time.Duration; +import java.time.Instant; import java.util.Collection; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -47,12 +49,14 @@ @ExtendWith(MockitoExtension.class) class AsynchronousMetricStorageTest { + private static final long START_SECOND_NANOS = 1_000_000_000L; + private static final int CARDINALITY_LIMIT = 25; @RegisterExtension LogCapturer logs = LogCapturer.create().captureForType(AsynchronousMetricStorage.class); - private final TestClock testClock = TestClock.create(); + private final TestClock testClock = TestClock.create(Instant.ofEpochSecond(1)); private final Resource resource = Resource.empty(); private final InstrumentationScopeInfo scope = InstrumentationScopeInfo.empty(); private final InstrumentSelector selector = InstrumentSelector.builder().setName("*").build(); @@ -72,7 +76,11 @@ class AsynchronousMetricStorageTest { // Not using @BeforeEach since many methods require executing them for each MemoryMode void setup(MemoryMode memoryMode) { - when(reader.getAggregationTemporality(any())).thenReturn(AggregationTemporality.CUMULATIVE); + setup(memoryMode, AggregationTemporality.CUMULATIVE); + } + + void setup(MemoryMode memoryMode, AggregationTemporality temporality) { + when(reader.getAggregationTemporality(any())).thenReturn(temporality); when(reader.getMemoryMode()).thenReturn(memoryMode); registeredReader = RegisteredReader.create(reader, ViewRegistry.create()); @@ -80,6 +88,7 @@ void setup(MemoryMode memoryMode) { AsynchronousMetricStorage.create( registeredReader, registeredView, + testClock, InstrumentDescriptor.create( "long-counter", "description", @@ -92,6 +101,7 @@ void setup(MemoryMode memoryMode) { AsynchronousMetricStorage.create( registeredReader, registeredView, + testClock, InstrumentDescriptor.create( "double-counter", "description", @@ -107,12 +117,11 @@ void setup(MemoryMode memoryMode) { void recordLong(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.setEpochInformation(0, 1); longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1); longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2); longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3); - assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .satisfies( metricData -> assertThat(metricData) @@ -133,12 +142,11 @@ void recordLong(MemoryMode memoryMode) { void recordDouble(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.setEpochInformation(0, 1); doubleCounterStorage.record(Attributes.builder().put("key", "a").build(), 1.1); doubleCounterStorage.record(Attributes.builder().put("key", "b").build(), 2.2); doubleCounterStorage.record(Attributes.builder().put("key", "c").build(), 3.3); - assertThat(doubleCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + assertThat(doubleCounterStorage.collect(resource, scope, testClock.now())) .satisfies( metricData -> assertThat(metricData) @@ -170,6 +178,7 @@ void record_ProcessesAttributes(MemoryMode memoryMode) { AttributesProcessor.filterByKeyName(key -> key.equals("key1")), CARDINALITY_LIMIT, SourceInfo.noSourceInfo()), + testClock, InstrumentDescriptor.create( "name", "description", @@ -179,10 +188,9 @@ void record_ProcessesAttributes(MemoryMode memoryMode) { Advice.empty()), /* enabled= */ true); - storage.setEpochInformation(0, 1); storage.record(Attributes.builder().put("key1", "a").put("key2", "b").build(), 1); - assertThat(storage.collect(resource, scope, 0, testClock.nanoTime())) + assertThat(storage.collect(resource, scope, testClock.now())) .satisfies( metricData -> assertThat(metricData) @@ -199,12 +207,11 @@ void record_ProcessesAttributes(MemoryMode memoryMode) { void record_MaxCardinality(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.setEpochInformation(0, 1); for (int i = 0; i <= CARDINALITY_LIMIT + 1; i++) { longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1); } - assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .satisfies( metricData -> assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT)); @@ -213,28 +220,26 @@ void record_MaxCardinality(MemoryMode memoryMode) { @ParameterizedTest @EnumSource(MemoryMode.class) - void record_HandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) { - setup(memoryMode); + void record_DeltaHandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) { + setup(memoryMode, AggregationTemporality.DELTA); - longCounterStorage.setEpochInformation(0, 1); for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1); } - assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .satisfies( metricData -> assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1)); logs.assertDoesNotContain( "Instrument long-counter has exceeded the maximum allowed cardinality"); - longCounterStorage.setEpochInformation(1, 2); for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { // Different attribute longCounterStorage.record(Attributes.builder().put("key" + i, "val2").build(), 1); } - assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .satisfies( metricData -> assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1)); @@ -247,11 +252,10 @@ void record_HandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) { void record_DuplicateAttributes(MemoryMode memoryMode) { setup(memoryMode); - longCounterStorage.setEpochInformation(0, 1); longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 1); longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 2); - assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .satisfies( metricData -> assertThat(metricData) @@ -267,64 +271,66 @@ void record_DuplicateAttributes(MemoryMode memoryMode) { void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { setup(memoryMode); - // Record measurement and collect at time 10 - longCounterStorage.setEpochInformation(0, 10); + // Record measurement and collect at time START_SECONDS + 10s + testClock.advance(Duration.ofSeconds(10)); longCounterStorage.record(Attributes.empty(), 3); - assertThat(longCounterStorage.collect(resource, scope, 0, 0)) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .hasLongSumSatisfying( sum -> sum.isCumulative() .hasPointsSatisfying( point -> point - .hasStartEpochNanos(0) - .hasEpochNanos(10) + .hasStartEpochNanos(START_SECOND_NANOS) + .hasEpochNanos(testClock.now()) .hasValue(3) .hasAttributes(Attributes.empty()))); - registeredReader.setLastCollectEpochNanos(10); + registeredReader.setLastCollectEpochNanos(testClock.now()); - // Record measurements and collect at time 30 - longCounterStorage.setEpochInformation(0, 30); + // Record measurements and collect at time START_SECONDS + 30s + testClock.advance(Duration.ofSeconds(20)); longCounterStorage.record(Attributes.empty(), 3); longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6); - assertThat(longCounterStorage.collect(resource, scope, 0, 0)) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .hasLongSumSatisfying( sum -> sum.isCumulative() .hasPointsSatisfying( point -> point - .hasStartEpochNanos(0) - .hasEpochNanos(30) + .hasStartEpochNanos(START_SECOND_NANOS) + .hasEpochNanos(testClock.now()) .hasValue(3) .hasAttributes(Attributes.empty()), point -> point - .hasStartEpochNanos(0) - .hasEpochNanos(30) + .hasStartEpochNanos( + testClock.now() - Duration.ofSeconds(20).toNanos()) + .hasEpochNanos(testClock.now()) .hasValue(6) .hasAttributes(Attributes.builder().put("key", "value1").build()))); - registeredReader.setLastCollectEpochNanos(30); + registeredReader.setLastCollectEpochNanos(testClock.now()); - // Record measurement and collect at time 35 - longCounterStorage.setEpochInformation(0, 35); + // Record measurement and collect at time START_SECONDS + 35s + testClock.advance(Duration.ofSeconds(5)); longCounterStorage.record(Attributes.empty(), 4); longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5); - assertThat(longCounterStorage.collect(resource, scope, 0, 0)) + assertThat(longCounterStorage.collect(resource, scope, testClock.now())) .hasLongSumSatisfying( sum -> sum.isCumulative() .hasPointsSatisfying( point -> point - .hasStartEpochNanos(0) - .hasEpochNanos(35) + .hasStartEpochNanos(START_SECOND_NANOS) + .hasEpochNanos(testClock.now()) .hasValue(4) .hasAttributes(Attributes.empty()), point -> point - .hasStartEpochNanos(0) - .hasEpochNanos(35) + .hasStartEpochNanos( + testClock.now() - Duration.ofSeconds(5).toNanos()) + .hasEpochNanos(testClock.now()) .hasValue(5) .hasAttributes(Attributes.builder().put("key", "value2").build()))); } @@ -339,6 +345,7 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { AsynchronousMetricStorage.create( registeredReader, registeredView, + testClock, InstrumentDescriptor.create( "long-counter", "description", @@ -349,26 +356,24 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { /* enabled= */ true); // Record measurement and collect at time 10 - longCounterStorage.setEpochInformation(0, 10); longCounterStorage.record(Attributes.empty(), 3); - assertThat(longCounterStorage.collect(resource, scope, 0, 0)) + assertThat(longCounterStorage.collect(resource, scope, 10)) .hasLongSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point - .hasStartEpochNanos(0) + .hasStartEpochNanos(testClock.now()) .hasEpochNanos(10) .hasValue(3) .hasAttributes(Attributes.empty()))); registeredReader.setLastCollectEpochNanos(10); // Record measurement and collect at time 30 - longCounterStorage.setEpochInformation(0, 30); longCounterStorage.record(Attributes.empty(), 3); longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6); - assertThat(longCounterStorage.collect(resource, scope, 0, 0)) + assertThat(longCounterStorage.collect(resource, scope, 30)) .hasLongSumSatisfying( sum -> sum.isDelta() @@ -388,10 +393,9 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { registeredReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 - longCounterStorage.setEpochInformation(0, 35); longCounterStorage.record(Attributes.empty(), 4); longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5); - assertThat(longCounterStorage.collect(resource, scope, 0, 0)) + assertThat(longCounterStorage.collect(resource, scope, 35)) .hasLongSumSatisfying( sum -> sum.isDelta() @@ -414,13 +418,12 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) { void collect_reusableData_reusedObjectsAreReturnedOnSecondCall() { setup(REUSABLE_DATA); - longCounterStorage.setEpochInformation(0, 1); longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1); longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2); longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3); MetricData firstCollectMetricData = - longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()); + longCounterStorage.collect(resource, scope, testClock.now()); assertThat(firstCollectMetricData) .satisfies( metricData -> @@ -445,7 +448,7 @@ void collect_reusableData_reusedObjectsAreReturnedOnSecondCall() { .isInstanceOf(MutableLongPointData.class)))); MetricData secondCollectMetricData = - longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()); + longCounterStorage.collect(resource, scope, testClock.now()); Collection secondCollectPoints = secondCollectMetricData.getData().getPoints(); @@ -470,7 +473,7 @@ void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) { longCounterStorage.record(Attributes.empty(), 10); - assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isTrue(); + assertThat(longCounterStorage.collect(resource, scope, 0).isEmpty()).isTrue(); } @ParameterizedTest @@ -483,6 +486,6 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) { longCounterStorage.record(Attributes.empty(), 10); - assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isFalse(); + assertThat(longCounterStorage.collect(resource, scope, 0).isEmpty()).isFalse(); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java index c54466a0603..47f45028e6e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java @@ -144,14 +144,11 @@ void invokeCallback_Double() { CallbackRegistration callbackRegistration = CallbackRegistration.create(Collections.singletonList(measurement1), callback); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); assertThat(counter.get()).isEqualTo(1.1); - verify(storage1).setEpochInformation(0, 1); verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1); - verify(storage2, never()).setEpochInformation(anyLong(), anyLong()); verify(storage2, never()).record(any(), anyDouble()); - verify(storage3, never()).setEpochInformation(anyLong(), anyLong()); verify(storage3, never()).record(any(), anyDouble()); } @@ -165,14 +162,11 @@ void invokeCallback_Long() { CallbackRegistration callbackRegistration = CallbackRegistration.create(Collections.singletonList(measurement2), callback); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); assertThat(counter.get()).isEqualTo(1); - verify(storage1, never()).setEpochInformation(anyLong(), anyLong()); verify(storage1, never()).record(any(), anyLong()); - verify(storage2).setEpochInformation(0, 1); verify(storage2).record(Attributes.builder().put("key", "val").build(), 1); - verify(storage3).setEpochInformation(0, 1); verify(storage3).record(Attributes.builder().put("key", "val").build(), 1); } @@ -190,15 +184,12 @@ void invokeCallback_MultipleMeasurements() { CallbackRegistration callbackRegistration = CallbackRegistration.create(Arrays.asList(measurement1, measurement2), callback); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); assertThat(doubleCounter.get()).isEqualTo(1.1); assertThat(longCounter.get()).isEqualTo(1); - verify(storage1).setEpochInformation(0, 1); verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1); - verify(storage2).setEpochInformation(0, 1); verify(storage2).record(Attributes.builder().put("key", "val").build(), 1); - verify(storage3).setEpochInformation(0, 1); verify(storage3).record(Attributes.builder().put("key", "val").build(), 1); } @@ -215,7 +206,7 @@ void invokeCallback_NoStorage() { CallbackRegistration callbackRegistration = CallbackRegistration.create(Collections.singletonList(measurement), callback); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); assertThat(counter.get()).isEqualTo(0); } @@ -236,7 +227,7 @@ void invokeCallback_RestoresContextClassLoader() { // Simulate invocation on a thread with null context class loader (like DaemonThreadFactory) Thread.currentThread().setContextClassLoader(null); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); // Callback should have seen the registration-time classloader assertThat(observedClassLoader.get()).isSameAs(registrationClassLoader); @@ -264,7 +255,7 @@ void invokeCallback_RestoresContextClassLoaderOnException() { // Simulate invocation on a thread with null context class loader Thread.currentThread().setContextClassLoader(null); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); // Context classloader should still be restored even after exception assertThat(Thread.currentThread().getContextClassLoader()).isNull(); @@ -282,7 +273,7 @@ void invokeCallback_MultipleMeasurements_ThrowsException() { CallbackRegistration callbackRegistration = CallbackRegistration.create(Arrays.asList(measurement1, measurement2), callback); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); verify(storage1, never()).record(any(), anyLong()); verify(storage1, never()).record(any(), anyDouble()); @@ -302,7 +293,7 @@ void invokeCallback_SingleMeasurement_ThrowsException() { CallbackRegistration callbackRegistration = CallbackRegistration.create(Collections.singletonList(measurement2), callback); - callbackRegistration.invokeCallback(registeredReader, 0, 1); + callbackRegistration.invokeCallback(registeredReader); verify(storage1, never()).record(any(), anyLong()); verify(storage1, never()).record(any(), anyDouble()); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java index 9f91d13fe24..f287b76b05e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java @@ -102,10 +102,7 @@ public MetricDescriptor getMetricDescriptor() { @Override public MetricData collect( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - long startEpochNanos, - long epochNanos) { + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { return null; } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java index b21797d9756..8b859f55479 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java @@ -11,7 +11,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.github.netmikey.logunit.api.LogCapturer; @@ -79,18 +78,7 @@ private void setup(MemoryMode memoryMode) { void setupAndSetActiveReader(MemoryMode memoryMode) { setup(memoryMode); - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); - } - - @Test - void setActiveReader_SetsEpochInformation() { - setup(MemoryMode.IMMUTABLE_DATA); - - sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); - - verify(mockAsyncStorage1).setEpochInformation(0, 10); - verify(mockAsyncStorage2).getRegisteredReader(); - verifyNoMoreInteractions(mockAsyncStorage2); + sdkObservableMeasurement.setActiveReader(registeredReader1); } @Test diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index 354c1d899e5..ec2ad73652f 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -11,6 +11,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; import static org.assertj.core.api.BDDAssertions.as; import static org.assertj.core.api.InstanceOfAssertFactories.collection; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -44,6 +45,7 @@ import io.opentelemetry.sdk.testing.assertj.DoubleSumAssert; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; +import java.time.Duration; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -110,14 +112,15 @@ void recordDouble_NaN(MemoryMode memoryMode) { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); storage.recordDouble(Double.NaN, Attributes.empty(), Context.current()); logs.assertContains( "Instrument name has recorded measurement Not-a-Number (NaN) value with attributes {}. Dropping measurement."); - verify(aggregator, never()).createHandle(); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + verify(aggregator, never()).createHandle(anyLong()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .isEqualTo(EmptyMetricData.getInstance()); } @@ -137,9 +140,10 @@ void attributesProcessor_applied(MemoryMode memoryMode) { aggregator, spyAttributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); storage.recordDouble(1, attributes, Context.root()); - MetricData md = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now()); + MetricData md = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, testClock.now()); assertThat(md) .hasDoubleSumSatisfying( sum -> @@ -161,39 +165,109 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); - verify(aggregator, times(1)).createHandle(); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + verify(aggregator, times(1)).createHandle(testClock.now()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> sum.isCumulative() .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); + point -> + point + .hasStartEpochNanos(testClock.now()) + .hasEpochNanos(10) + .hasValue(3))); cumulativeReader.setLastCollectEpochNanos(10); // Record measurement and collect at time 30 storage.recordDouble(3, Attributes.empty(), Context.current()); - verify(aggregator, times(1)).createHandle(); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + verify(aggregator, times(1)).createHandle(testClock.now()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) .hasDoubleSumSatisfying( sum -> sum.isCumulative() .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(30).hasValue(6))); + point -> + point + .hasStartEpochNanos(testClock.now()) + .hasEpochNanos(30) + .hasValue(6))); cumulativeReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 storage.recordDouble(2, Attributes.empty(), Context.current()); - verify(aggregator, times(1)).createHandle(); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35)) + verify(aggregator, times(1)).createHandle(testClock.now()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35)) + .hasDoubleSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(testClock.now()) + .hasEpochNanos(35) + .hasValue(8))); + } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void recordAndCollect_CumulativeNewSeriesAfterFirstCollection(MemoryMode memoryMode) { + initialize(memoryMode); + + DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage.create( + cumulativeReader, + METRIC_DESCRIPTOR, + aggregator, + attributesProcessor, + CARDINALITY_LIMIT, + testClock, + /* enabled= */ true); + + // Record for series A and collect at time 10 + storage.recordDouble(3, Attributes.builder().put("series", "A").build(), Context.current()); + long seriesACreationTime = testClock.now(); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> sum.isCumulative() .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(35).hasValue(8))); + point -> + point + .hasStartEpochNanos(seriesACreationTime) + .hasEpochNanos(10) + .hasValue(3) + .hasAttributes(Attributes.builder().put("series", "A").build()))); + cumulativeReader.setLastCollectEpochNanos(10); + + // Advance clock and record for both series A and a new series B + testClock.advance(Duration.ofSeconds(20)); + storage.recordDouble(5, Attributes.builder().put("series", "A").build(), Context.current()); + storage.recordDouble(7, Attributes.builder().put("series", "B").build(), Context.current()); + long seriesBCreationTime = testClock.now(); + // Series B's start time must be clock.now() at its first measurement, NOT instrument + // creation time or last collection time. + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) + .hasDoubleSumSatisfying( + sum -> + sum.isCumulative() + .hasPointsSatisfying( + point -> + point + .hasStartEpochNanos(seriesACreationTime) + .hasEpochNanos(30) + .hasValue(8) + .hasAttributes(Attributes.builder().put("series", "A").build()), + point -> + point + .hasStartEpochNanos(seriesBCreationTime) + .hasEpochNanos(30) + .hasValue(7) + .hasAttributes(Attributes.builder().put("series", "B").build()))); } @Test @@ -207,20 +281,25 @@ void recordAndCollect_DeltaResets_ImmutableData() { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); - verify(aggregator, times(1)).createHandle(); + verify(aggregator, times(1)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); + point -> + point + .hasStartEpochNanos(testClock.now()) + .hasEpochNanos(10) + .hasValue(3))); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(1); @@ -230,11 +309,11 @@ void recordAndCollect_DeltaResets_ImmutableData() { storage.recordDouble(3, Attributes.empty(), Context.current()); // AggregatorHandle should be returned to the pool on reset so shouldn't create additional // handles - verify(aggregator, times(1)).createHandle(); + verify(aggregator, times(1)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) .hasDoubleSumSatisfying( sum -> sum.isDelta() @@ -247,11 +326,11 @@ void recordAndCollect_DeltaResets_ImmutableData() { // Record measurement and collect at time 35 storage.recordDouble(2, Attributes.empty(), Context.current()); - verify(aggregator, times(1)).createHandle(); + verify(aggregator, times(1)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35)) .hasDoubleSumSatisfying( sum -> sum.isDelta() @@ -273,20 +352,25 @@ void recordAndCollect_DeltaResets_ReusableData() { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); - verify(aggregator, times(1)).createHandle(); + verify(aggregator, times(1)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( - point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); + point -> + point + .hasStartEpochNanos(testClock.now()) + .hasEpochNanos(10) + .hasValue(3))); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); @@ -297,11 +381,11 @@ void recordAndCollect_DeltaResets_ReusableData() { storage.recordDouble(3, Attributes.empty(), Context.current()); // We're switched to secondary map so a handle will be created - verify(aggregator, times(2)).createHandle(); + verify(aggregator, times(2)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) .hasDoubleSumSatisfying( sum -> sum.isDelta() @@ -320,12 +404,12 @@ void recordAndCollect_DeltaResets_ReusableData() { // We don't delete aggregator handles unless max cardinality reached, hence // aggregator handle is still there, thus no handle was created for empty(), but it will for // the "foo" - verify(aggregator, times(3)).createHandle(); + verify(aggregator, times(3)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35); + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35); assertThat(metricData).hasDoubleSumSatisfying(DoubleSumAssert::isDelta); assertThat(metricData) .hasDoubleSumSatisfying( @@ -358,7 +442,7 @@ void recordAndCollect_DeltaResets_ReusableData() { deltaReader.setLastCollectEpochNanos(40); storage.recordDouble(6, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current()); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 45)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 45)) .hasDoubleSumSatisfying( sum -> sum.satisfies( @@ -388,6 +472,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); // Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow @@ -396,8 +481,8 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { storage.recordDouble( 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> sum.satisfies( @@ -406,7 +491,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { .hasSize(CARDINALITY_LIMIT - 1) .allSatisfy( point -> { - assertThat(point.getStartEpochNanos()).isEqualTo(0); + assertThat(point.getStartEpochNanos()).isEqualTo(testClock.now()); assertThat(point.getEpochNanos()).isEqualTo(10); assertThat(point.getValue()).isEqualTo(3); }))); @@ -417,8 +502,8 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { storage.recordDouble( 3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current()); // Should not create an additional handles for the overflow series - verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) + verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(testClock.now()); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20)) .hasDoubleSumSatisfying( sum -> sum.satisfies( @@ -427,7 +512,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { .hasSize(CARDINALITY_LIMIT) .allSatisfy( point -> { - assertThat(point.getStartEpochNanos()).isEqualTo(0); + assertThat(point.getStartEpochNanos()).isEqualTo(testClock.now()); assertThat(point.getEpochNanos()).isEqualTo(20); assertThat(point.getValue()).isEqualTo(3); }) @@ -456,6 +541,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); // Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow @@ -464,11 +550,11 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { storage.recordDouble( 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); + verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> sum.satisfies( @@ -477,7 +563,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { .hasSize(CARDINALITY_LIMIT - 1) .allSatisfy( point -> { - assertThat(point.getStartEpochNanos()).isEqualTo(0); + assertThat(point.getStartEpochNanos()).isEqualTo(testClock.now()); assertThat(point.getEpochNanos()).isEqualTo(10); assertThat(point.getValue()).isEqualTo(3); }))); @@ -492,11 +578,11 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { storage.recordDouble( 3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current()); // Should use handle returned to pool instead of creating new ones - verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); + verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(CARDINALITY_LIMIT - 2); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20)) .hasDoubleSumSatisfying( sum -> sum.isDelta() @@ -523,11 +609,11 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } // Should use handles returned to pool instead of creating new ones - verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); + verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(testClock.now()); assertThat(storage) .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) .hasSize(0); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) .hasDoubleSumSatisfying( sum -> sum.satisfies( @@ -569,6 +655,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); // Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow @@ -577,10 +664,10 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { storage.recordDouble( 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); + verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now()); // First collect - MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10); + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10); assertThat(metricData) .hasDoubleSumSatisfying( @@ -591,7 +678,8 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { .hasSize(CARDINALITY_LIMIT - 1) .allSatisfy( point -> { - Assertions.assertThat(point.getStartEpochNanos()).isEqualTo(0); + Assertions.assertThat(point.getStartEpochNanos()) + .isEqualTo(testClock.now()); Assertions.assertThat(point.getEpochNanos()).isEqualTo(10); Assertions.assertThat(point.getValue()).isEqualTo(3); }))); @@ -610,10 +698,10 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { // After first collection, we expect the secondary map which is empty to be used, // hence handle creation will still take place // The +1 is for the overflow handle - verify(aggregator, times((CARDINALITY_LIMIT - 1) * 2 + 1)).createHandle(); + verify(aggregator, times((CARDINALITY_LIMIT - 1) * 2 + 1)).createHandle(testClock.now()); // Second collect - metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20); + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20); assertThat(metricData) .hasDoubleSumSatisfying( @@ -658,6 +746,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); // 1st recording: Recording goes to active map @@ -668,7 +757,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { // This will switch next recordings to the secondary map (which is empty) // by making it the active map - storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10); + storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10); // 2nd recording deltaReader.setLastCollectEpochNanos(10); @@ -678,7 +767,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { } // This switches maps again, so next recordings will be to the first map - storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10, 20); + storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20); // 3rd recording: We're recording unseen attributes to a map we know is full, // since it was filled during 1st recording @@ -688,7 +777,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20, 30); + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30); assertOnlyOverflowWasRecorded(metricData, 20, 30, 15 * 3); @@ -700,7 +789,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30, 40); + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 40); assertOnlyOverflowWasRecorded(metricData, 30, 40, 15 * 3); @@ -712,7 +801,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 40, 50); + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 50); assertNumberOfPoints(metricData, 10); assertAllPointsWithValue(metricData, 40, 50, 3); @@ -726,7 +815,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { 4, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 50, 60); + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 60); assertNumberOfPoints(metricData, 12); assertAllPointsWithValue(metricData, 50, 60, 4); @@ -803,6 +892,7 @@ void enabledThenDisable_isEnabled(MemoryMode memoryMode) { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); storage.setEnabled(false); @@ -822,6 +912,7 @@ void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); storage.setEnabled(false); @@ -842,13 +933,14 @@ void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); storage.setEnabled(false); storage.recordDouble(10d, Attributes.empty(), Context.current()); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isTrue(); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10).isEmpty()).isTrue(); } @ParameterizedTest @@ -863,6 +955,7 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) { aggregator, attributesProcessor, CARDINALITY_LIMIT, + testClock, /* enabled= */ true); storage.setEnabled(false); @@ -870,6 +963,6 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) { storage.recordDouble(10d, Attributes.empty(), Context.current()); - assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isFalse(); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10).isEmpty()).isFalse(); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java index 54a4c36e841..cf8c04ec44e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java @@ -63,7 +63,7 @@ void minimumBucketsCanAccommodateMaxRange() { Advice.empty()), asExemplarFilterInternal(ExemplarFilter.alwaysOff()), MemoryMode.IMMUTABLE_DATA); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(0); // Record max range handle.recordDouble(Double.MIN_VALUE, Attributes.empty(), Context.current()); handle.recordDouble(Double.MAX_VALUE, Attributes.empty(), Context.current());