diff --git a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java
index 5ac24663c4b..57bcdaf3b5a 100644
--- a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java
+++ b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusMetricReaderTest.java
@@ -972,29 +972,23 @@ void createdTimestamp() throws IOException {
LongCounter counter = meter.counterBuilder("requests").build();
testClock.advance(Duration.ofMillis(1));
+ long bearStartNanos = testClock.now();
counter.add(3, Attributes.builder().put("animal", "bear").build());
testClock.advance(Duration.ofMillis(1));
+ long mouseStartNanos = testClock.now();
counter.add(2, Attributes.builder().put("animal", "mouse").build());
testClock.advance(Duration.ofMillis(1));
- // There is a curious difference between Prometheus and OpenTelemetry:
- // In Prometheus metrics the _created timestamp is per data point,
- // i.e. the _created timestamp says when this specific set of label values
- // was first observed.
- // In the OTel Java SDK the _created timestamp is the initialization time
- // of the SdkMeterProvider, i.e. all data points will have the same _created timestamp.
- // So we expect the _created timestamp to be the start time of the application,
- // not the timestamp when the counter or an individual data point was created.
String expected =
""
+ "# TYPE requests counter\n"
+ "requests_total{animal=\"bear\",otel_scope_name=\"test\"} 3.0\n"
+ "requests_created{animal=\"bear\",otel_scope_name=\"test\"} "
- + createdTimestamp
+ + convertTimestamp(bearStartNanos)
+ "\n"
+ "requests_total{animal=\"mouse\",otel_scope_name=\"test\"} 2.0\n"
+ "requests_created{animal=\"mouse\",otel_scope_name=\"test\"} "
- + createdTimestamp
+ + convertTimestamp(mouseStartNanos)
+ "\n"
+ "# TYPE target info\n"
+ "target_info{service_name=\"unknown_service:java\",telemetry_sdk_language=\"java\",telemetry_sdk_name=\"opentelemetry\",telemetry_sdk_version=\"1.x.x\"} 1\n"
diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java
index 436eeb9bbb1..34e57de3032 100644
--- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java
+++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java
@@ -7,6 +7,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import org.openjdk.jmh.annotations.Benchmark;
@@ -40,7 +41,7 @@ public static class ThreadState {
@Setup(Level.Trial)
public final void setup() {
- aggregatorHandle = aggregation.getAggregator().createHandle();
+ aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now());
valueSupplier = valueGen.supplier();
}
diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java
index 456e8e9fd94..3f22ccca84b 100644
--- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java
+++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java
@@ -7,6 +7,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import org.openjdk.jmh.annotations.Benchmark;
@@ -46,7 +47,7 @@ public static class ThreadState {
@Setup(Level.Invocation)
public final void setup() {
- aggregatorHandle = aggregation.getAggregator().createHandle();
+ aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now());
valueSupplier = valueGen.supplier();
}
diff --git a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java
index 6fba53e86cd..e06a5941c42 100644
--- a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java
+++ b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/InstrumentGarbageCollectionBenchmarkTest.java
@@ -30,8 +30,8 @@ public class InstrumentGarbageCollectionBenchmarkTest {
/**
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, any {@link
- * MetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely allocates memory
- * which is then subsequently garbage collected. It is done so comparatively to {@link
+ * MetricStorage#collect(Resource, InstrumentationScopeInfo, long)} barely allocates memory which
+ * is then subsequently garbage collected. It is done so comparatively to {@link
* MemoryMode#IMMUTABLE_DATA},
*
*
It runs the JMH test {@link InstrumentGarbageCollectionBenchmark} with GC profiler, and
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java
index 01dead0ce85..8fb64ae7f38 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java
@@ -132,8 +132,7 @@ Collection collectAll(RegisteredReader registeredReader, long epochN
// Only invoke callbacks if meter is enabled
if (meterEnabled) {
for (CallbackRegistration callbackRegistration : currentRegisteredCallbacks) {
- callbackRegistration.invokeCallback(
- registeredReader, meterProviderSharedState.getStartEpochNanos(), epochNanos);
+ callbackRegistration.invokeCallback(registeredReader);
}
}
@@ -145,10 +144,7 @@ Collection collectAll(RegisteredReader registeredReader, long epochN
for (MetricStorage storage : storages) {
MetricData current =
storage.collect(
- meterProviderSharedState.getResource(),
- getInstrumentationScopeInfo(),
- meterProviderSharedState.getStartEpochNanos(),
- epochNanos);
+ meterProviderSharedState.getResource(), getInstrumentationScopeInfo(), epochNanos);
// Ignore if the metric data doesn't have any data points, for example when aggregation is
// Aggregation#drop()
if (!current.isEmpty()) {
@@ -288,6 +284,7 @@ WriteableMetricStorage registerSynchronousMetricStorage(InstrumentDescriptor ins
SynchronousMetricStorage.create(
reader,
registeredView,
+ meterProviderSharedState.getClock(),
instrument,
meterProviderSharedState.getExemplarFilter(),
meterEnabled)));
@@ -317,7 +314,11 @@ SdkObservableMeasurement registerObservableMeasurement(
registeredStorages.add(
registry.register(
AsynchronousMetricStorage.create(
- reader, registeredView, instrumentDescriptor, meterEnabled)));
+ reader,
+ registeredView,
+ meterProviderSharedState.getClock(),
+ instrumentDescriptor,
+ meterEnabled)));
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java
index a4aa68b5a23..1ec856f8c05 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java
@@ -72,7 +72,6 @@ public static SdkMeterProviderBuilder builder() {
Resource resource,
ExemplarFilterInternal exemplarFilter,
ScopeConfigurator meterConfigurator) {
- long startEpochNanos = clock.now();
this.registeredViews = registeredViews;
this.registeredReaders =
metricReaders.entrySet().stream()
@@ -83,8 +82,7 @@ public static SdkMeterProviderBuilder builder() {
ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews)))
.collect(toList());
this.metricProducers = metricProducers;
- this.sharedState =
- MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos);
+ this.sharedState = MeterProviderSharedState.create(clock, resource, exemplarFilter);
this.registry =
new ComponentRegistry<>(
instrumentationLibraryInfo ->
@@ -99,7 +97,6 @@ public static SdkMeterProviderBuilder builder() {
readerMetricProducers.add(new LeasedMetricProducer(registry, sharedState, registeredReader));
MetricReader reader = registeredReader.getReader();
reader.register(new SdkCollectionRegistration(readerMetricProducers, sharedState));
- registeredReader.setLastCollectEpochNanos(startEpochNanos);
if (reader instanceof PeriodicMetricReader) {
setReaderMeterProvider((PeriodicMetricReader) reader, this);
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java
index 79492f61c66..b5c0b28c93f 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java
@@ -30,12 +30,16 @@ static Aggregator> drop() {
}
/**
- * Returns a new {@link AggregatorHandle}. This MUST by used by the synchronous to aggregate
- * recorded measurements during the collection cycle.
+ * Returns a new {@link AggregatorHandle}. Used by both synchronous and asynchronous metric
+ * storage to aggregate recorded measurements during the collection cycle.
*
+ * @param creationEpochNanos the epoch timestamp (nanos) at which the handle is being created,
+ * stored via {@link AggregatorHandle#getCreationEpochNanos()}. Whether this value is used as
+ * the start timestamp of reported data points depends on the instrument and temporality — see
+ * {@link AggregatorHandle#getCreationEpochNanos()} for details.
* @return a new {@link AggregatorHandle}.
*/
- AggregatorHandle createHandle();
+ AggregatorHandle createHandle(long creationEpochNanos);
/**
* Returns a new DELTA point by computing the difference between two cumulative points.
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java
index 6312e5415f7..2115d8dd3c8 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java
@@ -35,13 +35,16 @@ public abstract class AggregatorHandle {
private static final String UNSUPPORTED_DOUBLE_MESSAGE =
"This aggregator does not support double values.";
+ private final long creationEpochNanos;
// A reservoir of sampled exemplars for this time period.
@Nullable private final DoubleExemplarReservoir doubleReservoirFactory;
@Nullable private final LongExemplarReservoir longReservoirFactory;
private final boolean isDoubleType;
private volatile boolean valuesRecorded = false;
- protected AggregatorHandle(ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) {
+ protected AggregatorHandle(
+ long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) {
+ this.creationEpochNanos = creationEpochNanos;
this.isDoubleType = isDoubleType;
if (isDoubleType) {
this.doubleReservoirFactory = reservoirFactory.createDoubleExemplarReservoir();
@@ -145,4 +148,22 @@ private static S throwUnsupportedIfNull(@Nullable S value, String message) {
}
return value;
}
+
+ /**
+ * Returns the epoch timestamp (nanos) at which this handle was created.
+ *
+ * For cumulative synchronous instruments, this is the time of the first measurement for the
+ * series and is used as {@link PointData#getStartEpochNanos()}.
+ *
+ *
For cumulative asynchronous instruments, this is either the instrument creation time (if the
+ * series first appeared during the first collection cycle) or the preceding collection interval's
+ * timestamp (if the series appeared in a later cycle), and is used as {@link
+ * PointData#getStartEpochNanos()}.
+ *
+ *
Not used for delta instruments; their start epoch is computed directly from the reader's
+ * last collection time or instrument creation time.
+ */
+ public long getCreationEpochNanos() {
+ return creationEpochNanos;
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java
index 072f79a3e1c..fe5c301f1fe 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java
@@ -66,8 +66,9 @@ public DoubleBase2ExponentialHistogramAggregator(
}
@Override
- public AggregatorHandle createHandle() {
- return new Handle(reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode);
+ public AggregatorHandle createHandle(long creationEpochNanos) {
+ return new Handle(
+ creationEpochNanos, reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode);
}
@Override
@@ -104,12 +105,13 @@ static final class Handle extends AggregatorHandle createHandle() {
- return new Handle(boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode);
+ public AggregatorHandle createHandle(long creationEpochNanos) {
+ return new Handle(
+ creationEpochNanos, boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode);
}
@Override
@@ -120,12 +121,13 @@ static final class Handle extends AggregatorHandle {
@Nullable private final MutableHistogramPointData reusablePoint;
Handle(
+ long creationEpochNanos,
List boundaryList,
double[] boundaries,
boolean recordMinMax,
ExemplarReservoirFactory reservoirFactory,
MemoryMode memoryMode) {
- super(reservoirFactory, /* isDoubleType= */ true);
+ super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
this.boundaryList = boundaryList;
this.boundaries = boundaries;
this.recordMinMax = recordMinMax;
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java
index 1baadf0763d..0117db10f66 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java
@@ -50,8 +50,8 @@ public DoubleLastValueAggregator(
}
@Override
- public AggregatorHandle createHandle() {
- return new Handle(reservoirFactory, memoryMode);
+ public AggregatorHandle createHandle(long creationEpochNanos) {
+ return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}
@Override
@@ -99,8 +99,9 @@ static final class Handle extends AggregatorHandle {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;
- private Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
- super(reservoirFactory, /* isDoubleType= */ true);
+ private Handle(
+ long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
+ super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableDoublePointData();
} else {
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java
index 29b444412f6..b22be360364 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java
@@ -55,8 +55,8 @@ public DoubleSumAggregator(
}
@Override
- public AggregatorHandle createHandle() {
- return new Handle(reservoirFactory, memoryMode);
+ public AggregatorHandle createHandle(long creationEpochNanos) {
+ return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}
@Override
@@ -112,8 +112,9 @@ static final class Handle extends AggregatorHandle {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;
- Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
- super(reservoirFactory, /* isDoubleType= */ true);
+ Handle(
+ long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
+ super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
reusablePoint = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableDoublePointData() : null;
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java
index 1813bef9fcc..b74e06a0791 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java
@@ -54,7 +54,7 @@ public List extends ExemplarData> getExemplars() {
private static final AggregatorHandle HANDLE =
new AggregatorHandle(
- ExemplarReservoirFactory.noSamples(), /* isDoubleType= */ true) {
+ 0, ExemplarReservoirFactory.noSamples(), /* isDoubleType= */ true) {
@Override
protected PointData doAggregateThenMaybeResetDoubles(
long startEpochNanos,
@@ -75,7 +75,7 @@ protected void doRecordDouble(double value) {}
private DropAggregator() {}
@Override
- public AggregatorHandle createHandle() {
+ public AggregatorHandle createHandle(long creationEpochNanos) {
return HANDLE;
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java
index bb38d32f061..62acee67e96 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java
@@ -47,8 +47,8 @@ public LongLastValueAggregator(ExemplarReservoirFactory reservoirFactory, Memory
}
@Override
- public AggregatorHandle createHandle() {
- return new Handle(reservoirFactory, memoryMode);
+ public AggregatorHandle createHandle(long creationEpochNanos) {
+ return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}
@Override
@@ -95,8 +95,9 @@ static final class Handle extends AggregatorHandle {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePoint;
- Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
- super(reservoirFactory, /* isDoubleType= */ false);
+ Handle(
+ long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
+ super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableLongPointData();
} else {
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java
index 6ea232036ff..92581f3b8d1 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java
@@ -48,8 +48,8 @@ public LongSumAggregator(
}
@Override
- public AggregatorHandle createHandle() {
- return new Handle(reservoirFactory, memoryMode);
+ public AggregatorHandle createHandle(long creationEpochNanos) {
+ return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}
@Override
@@ -105,8 +105,9 @@ static final class Handle extends AggregatorHandle {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePointData;
- Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
- super(reservoirFactory, /* isDoubleType= */ false);
+ Handle(
+ long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
+ super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false);
reusablePointData =
memoryMode == MemoryMode.REUSABLE_DATA ? new MutableLongPointData() : null;
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java
index c56d01b9cda..bf414655695 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReader.java
@@ -26,7 +26,7 @@ public class RegisteredReader {
private final int id = ID_COUNTER.incrementAndGet();
private final MetricReader metricReader;
private final ViewRegistry viewRegistry;
- private volatile long lastCollectEpochNanos;
+ private volatile long lastCollectEpochNanos = -1;
/** Construct a new collection info object storing information for collection against a reader. */
public static RegisteredReader create(MetricReader reader, ViewRegistry viewRegistry) {
@@ -52,13 +52,14 @@ public void setLastCollectEpochNanos(long epochNanos) {
}
/**
- * Get the time of the last collection for the reader.
+ * Get the time of the last collection for the reader, or {@code defaultEpochNanos} if not set.
*
* Used to compute the {@link PointData#getStartEpochNanos()} for instruments aggregations with
* {@link AggregationTemporality#DELTA} temporality.
*/
- public long getLastCollectEpochNanos() {
- return lastCollectEpochNanos;
+ public long getLastCollectEpochNanosOrDefault(long defaultEpochNanos) {
+ long lastCollectEpochNanos = this.lastCollectEpochNanos;
+ return lastCollectEpochNanos == -1 ? defaultEpochNanos : lastCollectEpochNanos;
}
/** Get the {@link ViewRegistry} for the reader. */
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java
index 59bfd4275bf..3b4d1ac585a 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java
@@ -11,6 +11,7 @@
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
@@ -35,10 +36,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
/**
* Stores aggregated {@link MetricData} for asynchronous instruments.
@@ -46,16 +47,18 @@
*
This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
-public final class AsynchronousMetricStorage implements MetricStorage {
+public abstract class AsynchronousMetricStorage implements MetricStorage {
private static final Logger logger = Logger.getLogger(AsynchronousMetricStorage.class.getName());
private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
- private final RegisteredReader registeredReader;
+ protected final RegisteredReader registeredReader;
private final MetricDescriptor metricDescriptor;
private final AggregationTemporality aggregationTemporality;
- private final Aggregator aggregator;
+ protected final Aggregator aggregator;
private final AttributesProcessor attributesProcessor;
- private final MemoryMode memoryMode;
+ protected final long instrumentCreationEpochNanos;
+
+ protected final MemoryMode memoryMode;
/**
* This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
@@ -64,61 +67,34 @@ public final class AsynchronousMetricStorage implements Met
private final int maxCardinality;
// Handles responsible for aggregating data recorded during callbacks
- private final Map> aggregatorHandles;
-
- // Only populated if aggregationTemporality == DELTA
- private Map lastPoints;
-
- // Only populated if memoryMode == REUSABLE_DATA
- private final ObjectPool reusablePointsPool;
- private final ObjectPool> reusableHandlesPool;
- private final Function> handleBuilder;
- private final BiConsumer> handleReleaser;
- private final BiConsumer pointReleaser;
+ protected final Map> aggregatorHandles;
- private final List reusablePointsList = new ArrayList<>();
- // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every
- // collection
- private Map reusablePointsMap = new PooledHashMap<>();
-
- // Time information relative to recording of data in aggregatorHandles, set while calling
- // callbacks
- private long startEpochNanos;
- private long epochNanos;
+ protected final List reusablePointsList = new ArrayList<>();
private volatile boolean enabled;
private AsynchronousMetricStorage(
RegisteredReader registeredReader,
MetricDescriptor metricDescriptor,
+ AggregationTemporality aggregationTemporality,
Aggregator aggregator,
AttributesProcessor attributesProcessor,
int maxCardinality,
+ Clock clock,
boolean enabled) {
this.registeredReader = registeredReader;
this.metricDescriptor = metricDescriptor;
- this.aggregationTemporality =
- registeredReader
- .getReader()
- .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
+ this.aggregationTemporality = aggregationTemporality;
this.memoryMode = registeredReader.getReader().getMemoryMode();
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
+ this.instrumentCreationEpochNanos = clock.now();
this.maxCardinality = maxCardinality - 1;
this.enabled = enabled;
- this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint);
- this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle);
- this.handleBuilder = ignored -> reusableHandlesPool.borrowObject();
- this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle);
- this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point);
-
- if (memoryMode == REUSABLE_DATA) {
- this.lastPoints = new PooledHashMap<>();
- this.aggregatorHandles = new PooledHashMap<>();
- } else {
- this.lastPoints = new HashMap<>();
- this.aggregatorHandles = new HashMap<>();
- }
+
+ // Concurrent hashmap only used to allow for removal during iteration during collection.
+ this.aggregatorHandles =
+ memoryMode == REUSABLE_DATA ? new PooledHashMap<>() : new ConcurrentHashMap<>();
}
/**
@@ -128,49 +104,62 @@ private AsynchronousMetricStorage(
public static AsynchronousMetricStorage create(
RegisteredReader registeredReader,
RegisteredView registeredView,
+ Clock clock,
InstrumentDescriptor instrumentDescriptor,
boolean enabled) {
View view = registeredView.getView();
MetricDescriptor metricDescriptor =
MetricDescriptor.create(view, registeredView.getViewSourceInfo(), instrumentDescriptor);
+ AggregationTemporality aggregationTemporality =
+ registeredReader
+ .getReader()
+ .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
Aggregator aggregator =
((AggregatorFactory) view.getAggregation())
.createAggregator(
instrumentDescriptor,
ExemplarFilterInternal.asExemplarFilterInternal(ExemplarFilter.alwaysOff()),
registeredReader.getReader().getMemoryMode());
- return new AsynchronousMetricStorage<>(
- registeredReader,
- metricDescriptor,
- aggregator,
- registeredView.getViewAttributesProcessor(),
- registeredView.getCardinalityLimit(),
- enabled);
+ AttributesProcessor attributesProcessor = registeredView.getViewAttributesProcessor();
+ int cardinalityLimit = registeredView.getCardinalityLimit();
+ return aggregationTemporality == AggregationTemporality.DELTA
+ ? new DeltaAsynchronousMetricStorage<>(
+ registeredReader,
+ metricDescriptor,
+ aggregator,
+ attributesProcessor,
+ cardinalityLimit,
+ clock,
+ enabled)
+ : new CumulativeAsynchronousMetricStorage<>(
+ registeredReader,
+ metricDescriptor,
+ aggregator,
+ attributesProcessor,
+ cardinalityLimit,
+ clock,
+ enabled);
}
/** Record callback measurement from {@link ObservableLongMeasurement}. */
void record(Attributes attributes, long value) {
- attributes = validateAndProcessAttributes(attributes);
- AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder);
+ AggregatorHandle handle = getAggregatorHandle(attributes);
handle.recordLong(value, attributes, Context.current());
}
/** Record callback measurement from {@link ObservableDoubleMeasurement}. */
void record(Attributes attributes, double value) {
- attributes = validateAndProcessAttributes(attributes);
- AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder);
+ AggregatorHandle handle = getAggregatorHandle(attributes);
handle.recordDouble(value, attributes, Context.current());
}
- void setEpochInformation(long startEpochNanos, long epochNanos) {
- this.startEpochNanos =
- aggregationTemporality == AggregationTemporality.DELTA
- ? registeredReader.getLastCollectEpochNanos()
- : startEpochNanos;
- this.epochNanos = epochNanos;
- }
-
- private Attributes validateAndProcessAttributes(Attributes attributes) {
+ private AggregatorHandle getAggregatorHandle(Attributes attributes) {
+ Context context = Context.current();
+ attributes = attributesProcessor.process(attributes, context);
+ AggregatorHandle handle = aggregatorHandles.get(attributes);
+ if (handle != null) {
+ return handle;
+ }
if (aggregatorHandles.size() >= maxCardinality) {
throttlingLogger.log(
Level.WARNING,
@@ -179,14 +168,34 @@ private Attributes validateAndProcessAttributes(Attributes attributes) {
+ " has exceeded the maximum allowed cardinality ("
+ maxCardinality
+ ").");
- return MetricStorage.CARDINALITY_OVERFLOW;
+ attributes = MetricStorage.CARDINALITY_OVERFLOW;
+ // Return handle for overflow series, first checking if a handle already exists for it
+ handle = aggregatorHandles.get(attributes);
+ if (handle != null) {
+ return handle;
+ }
}
+ // Get handle from pool if available, else create a new one.
+ // Note: pooled handles (used only for delta temporality) retain their original
+ // creationEpochNanos, but delta storage does not use the handle's creation epoch for the
+ // start epoch — it uses the reader's last collect time directly in doCollect(). So the stale
+ // creation epoch on a recycled handle does not affect correctness.
+ AggregatorHandle newHandle = maybeGetPooledAggregatorHandle();
+ if (newHandle == null) {
+ newHandle = createAggregatorHandle();
+ }
+ handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
+ return handle != null ? handle : newHandle;
+ }
- Context context = Context.current();
- attributes = attributesProcessor.process(attributes, context);
- return attributes;
+ protected AggregatorHandle createAggregatorHandle() {
+ return aggregator.createHandle(
+ registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos));
}
+ @Nullable
+ abstract AggregatorHandle maybeGetPooledAggregatorHandle();
+
@Override
public MetricDescriptor getMetricDescriptor() {
return metricDescriptor;
@@ -197,20 +206,15 @@ public RegisteredReader getRegisteredReader() {
return registeredReader;
}
+ @Override
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
@Override
public MetricData collect(
- Resource resource,
- InstrumentationScopeInfo instrumentationScopeInfo,
- long startEpochNanos,
- long epochNanos) {
- Collection result =
- aggregationTemporality == AggregationTemporality.DELTA
- ? collectWithDeltaAggregationTemporality()
- : collectWithCumulativeAggregationTemporality();
-
- // collectWith*AggregationTemporality() methods are responsible for resetting the handle
- aggregatorHandles.forEach(handleReleaser);
- aggregatorHandles.clear();
+ Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) {
+ Collection result = doCollect(epochNanos);
return enabled
? aggregator.toMetricData(
@@ -218,106 +222,194 @@ public MetricData collect(
: EmptyMetricData.getInstance();
}
- private Collection collectWithDeltaAggregationTemporality() {
- Map currentPoints;
- if (memoryMode == REUSABLE_DATA) {
- // deltaPoints computed in the previous collection can be released
- reusablePointsList.forEach(reusablePointsPool::returnObject);
- reusablePointsList.clear();
+ abstract Collection doCollect(long epochNanos);
+
+ private static final class DeltaAsynchronousMetricStorage
+ extends AsynchronousMetricStorage {
+ // This reference and lastPoints will be swapped at every collection
+ private Map reusablePointsMap = new PooledHashMap<>();
+ private Map lastPoints;
+
+ private final ObjectPool reusablePointsPool;
+ private final ObjectPool> reusableHandlesPool;
+
+ DeltaAsynchronousMetricStorage(
+ RegisteredReader registeredReader,
+ MetricDescriptor metricDescriptor,
+ Aggregator aggregator,
+ AttributesProcessor attributesProcessor,
+ int maxCardinality,
+ Clock clock,
+ boolean enabled) {
+ super(
+ registeredReader,
+ metricDescriptor,
+ AggregationTemporality.DELTA,
+ aggregator,
+ attributesProcessor,
+ maxCardinality,
+ clock,
+ enabled);
+
+ this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint);
+ this.reusableHandlesPool = new ObjectPool<>(this::createAggregatorHandle);
+ // Concurrent hashmap only used to allow for removal during iteration during collection.
+ this.lastPoints =
+ memoryMode == REUSABLE_DATA ? new PooledHashMap<>() : new ConcurrentHashMap<>();
+ }
- currentPoints = reusablePointsMap;
- } else {
- currentPoints = new HashMap<>();
+ @Nullable
+ @Override
+ AggregatorHandle maybeGetPooledAggregatorHandle() {
+ return reusableHandlesPool.borrowObject();
}
- aggregatorHandles.forEach(
- (attributes, handle) -> {
- T point =
- handle.aggregateThenMaybeReset(
- this.startEpochNanos, this.epochNanos, attributes, /* reset= */ true);
-
- T pointForCurrentPoints;
- if (memoryMode == REUSABLE_DATA) {
- // AggregatorHandle is going to modify the point eventually, but we must persist its
- // value to used it at the next collection (within lastPoints). Thus, we make a copy.
- pointForCurrentPoints = reusablePointsPool.borrowObject();
- aggregator.copyPoint(point, pointForCurrentPoints);
- } else {
- pointForCurrentPoints = point;
- }
- currentPoints.put(attributes, pointForCurrentPoints);
- });
-
- List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>();
- currentPoints.forEach(
- (attributes, currentPoint) -> {
- T lastPoint = lastPoints.remove(attributes);
-
- T deltaPoint;
- if (lastPoint == null) {
+ @Override
+ Collection doCollect(long epochNanos) {
+ Map currentPoints;
+ if (memoryMode == REUSABLE_DATA) {
+ // deltaPoints computed in the previous collection can be released
+ reusablePointsList.forEach(reusablePointsPool::returnObject);
+ reusablePointsList.clear();
+
+ currentPoints = reusablePointsMap;
+ } else {
+ currentPoints = new HashMap<>();
+ }
+
+ // Start time for asynchronous delta instruments is the time of the last collection, or if no
+ // collection has yet taken place, the time the instrument was created.
+ long startEpochNanos =
+ registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos);
+
+ aggregatorHandles.forEach(
+ (attributes, handle) -> {
+ T point =
+ handle.aggregateThenMaybeReset(
+ startEpochNanos, epochNanos, attributes, /* reset= */ true);
+
+ T pointForCurrentPoints;
if (memoryMode == REUSABLE_DATA) {
- // All deltaPoints are released at the end of the collection. Thus, we need a copy
- // to make sure currentPoint can still be used within lastPoints during the next
- // collection.
- deltaPoint = reusablePointsPool.borrowObject();
- aggregator.copyPoint(currentPoint, deltaPoint);
+ // AggregatorHandle is going to modify the point eventually, but we must persist its
+ // value to used it at the next collection (within lastPoints). Thus, we make a copy.
+ pointForCurrentPoints = reusablePointsPool.borrowObject();
+ aggregator.copyPoint(point, pointForCurrentPoints);
} else {
- deltaPoint = currentPoint;
+ pointForCurrentPoints = point;
}
- } else {
- if (memoryMode == REUSABLE_DATA) {
- aggregator.diffInPlace(lastPoint, currentPoint);
- deltaPoint = lastPoint;
+ currentPoints.put(attributes, pointForCurrentPoints);
+ });
+
+ List deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList<>();
+ currentPoints.forEach(
+ (attributes, currentPoint) -> {
+ T lastPoint = lastPoints.remove(attributes);
+
+ T deltaPoint;
+ if (lastPoint == null) {
+ if (memoryMode == REUSABLE_DATA) {
+ // All deltaPoints are released at the end of the collection. Thus, we need a copy
+ // to make sure currentPoint can still be used within lastPoints during the next
+ // collection.
+ deltaPoint = reusablePointsPool.borrowObject();
+ aggregator.copyPoint(currentPoint, deltaPoint);
+ } else {
+ deltaPoint = currentPoint;
+ }
} else {
- deltaPoint = aggregator.diff(lastPoint, currentPoint);
+ if (memoryMode == REUSABLE_DATA) {
+ aggregator.diffInPlace(lastPoint, currentPoint);
+ deltaPoint = lastPoint;
+ } else {
+ deltaPoint = aggregator.diff(lastPoint, currentPoint);
+ }
}
- }
- deltaPoints.add(deltaPoint);
- });
-
- if (memoryMode == REUSABLE_DATA) {
- // - If the point was used to compute a delta, it's now in deltaPoints (and thus in
- // reusablePointsList)
- // - If the point hasn't been used, it's still in lastPoints and can be returned
- lastPoints.forEach(pointReleaser);
- lastPoints.clear();
-
- Map tmp = lastPoints;
- lastPoints = reusablePointsMap;
- reusablePointsMap = tmp;
- } else {
- lastPoints = currentPoints;
+ deltaPoints.add(deltaPoint);
+ });
+
+ if (memoryMode == REUSABLE_DATA) {
+ // - If the point was used to compute a delta, it's now in deltaPoints (and thus in
+ // reusablePointsList)
+ // - If the point hasn't been used, it's still in lastPoints and can be returned
+ lastPoints.forEach((attributes, point) -> reusablePointsPool.returnObject(point));
+ lastPoints.clear();
+
+ Map tmp = lastPoints;
+ lastPoints = reusablePointsMap;
+ reusablePointsMap = tmp;
+ } else {
+ lastPoints = currentPoints;
+ }
+
+ aggregatorHandles.forEach(
+ (unused, aggregatorHandle) -> reusableHandlesPool.returnObject(aggregatorHandle));
+ aggregatorHandles.clear();
+
+ return deltaPoints;
}
-
- return deltaPoints;
}
- private Collection collectWithCumulativeAggregationTemporality() {
- List currentPoints;
- if (memoryMode == REUSABLE_DATA) {
- // We should not return the points in this list to the pool, they belong to the
- // AggregatorHandle
- reusablePointsList.clear();
- currentPoints = reusablePointsList;
- } else {
- currentPoints = new ArrayList<>();
+ private static final class CumulativeAsynchronousMetricStorage
+ extends AsynchronousMetricStorage {
+ CumulativeAsynchronousMetricStorage(
+ RegisteredReader registeredReader,
+ MetricDescriptor metricDescriptor,
+ Aggregator aggregator,
+ AttributesProcessor attributesProcessor,
+ int maxCardinality,
+ Clock clock,
+ boolean enabled) {
+ super(
+ registeredReader,
+ metricDescriptor,
+ AggregationTemporality.CUMULATIVE,
+ aggregator,
+ attributesProcessor,
+ maxCardinality,
+ clock,
+ enabled);
}
- aggregatorHandles.forEach(
- (attributes, handle) -> {
- T value =
- handle.aggregateThenMaybeReset(
- AsynchronousMetricStorage.this.startEpochNanos,
- AsynchronousMetricStorage.this.epochNanos,
- attributes,
- /* reset= */ true);
- currentPoints.add(value);
- });
- return currentPoints;
- }
+ @Nullable
+ @Override
+ AggregatorHandle maybeGetPooledAggregatorHandle() {
+ return null;
+ }
- @Override
- public void setEnabled(boolean enabled) {
- this.enabled = enabled;
+ @Override
+ Collection doCollect(long epochNanos) {
+ List currentPoints;
+ if (memoryMode == REUSABLE_DATA) {
+ // We should not return the points in this list to the pool, they belong to the
+ // AggregatorHandle
+ reusablePointsList.clear();
+ currentPoints = reusablePointsList;
+ } else {
+ currentPoints = new ArrayList<>();
+ }
+
+ // Asynchronous instruments manage their own state. If they stop reporting a measurement for a
+ // collection, the series ends. We retain aggregator handles across collections to allow
+ // series to report a consistent start time for their lifetime. While collecting, remove any
+ // series without measurements this collection.
+ // Start time for cumulative asynchronous instruments is:
+ // - The instrument creation time if the series first appeared during the first collection
+ // cycle
+ // - Otherwise, the preceding collection interval's timestamp
+ // This logic is handled in AggregatorHandle creation via #createAggregatorHandle()
+ aggregatorHandles.forEach(
+ (attributes, handle) -> {
+ if (!handle.hasRecordedValues()) {
+ aggregatorHandles.remove(attributes);
+ return;
+ }
+ T value =
+ handle.aggregateThenMaybeReset(
+ handle.getCreationEpochNanos(), epochNanos, attributes, /* reset= */ true);
+ currentPoints.add(value);
+ });
+
+ return currentPoints;
+ }
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java
index b603de7fc21..5eb7f0b1eb6 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistration.java
@@ -56,8 +56,8 @@ private CallbackRegistration(
*
* The {@code observableMeasurements} define the set of measurements the {@code runnable} may
* record to. The active reader of each {@code observableMeasurements} is set via {@link
- * SdkObservableMeasurement#setActiveReader(RegisteredReader, long, long)} before {@code runnable}
- * is called, and set to {@code null} afterwards.
+ * SdkObservableMeasurement#setActiveReader(RegisteredReader)} before {@code runnable} is called,
+ * and set to {@code null} afterwards.
*
* @param observableMeasurements the measurements that the runnable may record to
* @param runnable the callback
@@ -73,7 +73,7 @@ public String toString() {
return "CallbackRegistration{instrumentDescriptors=" + instrumentDescriptors + "}";
}
- public void invokeCallback(RegisteredReader reader, long startEpochNanos, long epochNanos) {
+ public void invokeCallback(RegisteredReader reader) {
// Return early if no storages are registered
if (!hasStorages) {
return;
@@ -81,8 +81,7 @@ public void invokeCallback(RegisteredReader reader, long startEpochNanos, long e
// Set the active reader on each observable measurement so that measurements are only recorded
// to relevant storages
observableMeasurements.forEach(
- observableMeasurement ->
- observableMeasurement.setActiveReader(reader, startEpochNanos, epochNanos));
+ observableMeasurement -> observableMeasurement.setActiveReader(reader));
// Restore the context class loader that was active when the callback was registered.
Thread currentThread = Thread.currentThread();
ClassLoader previousContextClassLoader = currentThread.getContextClassLoader();
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
index a746c00dec0..a44508d6659 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
@@ -12,6 +12,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
@@ -49,6 +50,7 @@ public abstract class DefaultSynchronousMetricStorage
private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);
private final AttributesProcessor attributesProcessor;
+ protected final Clock clock;
protected final MetricDescriptor metricDescriptor;
protected final Aggregator aggregator;
@@ -64,11 +66,13 @@ private DefaultSynchronousMetricStorage(
MetricDescriptor metricDescriptor,
Aggregator aggregator,
AttributesProcessor attributesProcessor,
+ Clock clock,
int maxCardinality,
boolean enabled) {
this.metricDescriptor = metricDescriptor;
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
+ this.clock = clock;
this.maxCardinality = maxCardinality - 1;
this.enabled = enabled;
}
@@ -79,6 +83,7 @@ static DefaultSynchronousMetricStorage create(
Aggregator aggregator,
AttributesProcessor processor,
int maxCardinality,
+ Clock clock,
boolean enabled) {
AggregationTemporality aggregationTemporality =
reader.getReader().getAggregationTemporality(descriptor.getSourceInstrument().getType());
@@ -87,11 +92,12 @@ static DefaultSynchronousMetricStorage create(
descriptor,
aggregator,
processor,
+ clock,
maxCardinality,
enabled,
reader.getReader().getMemoryMode())
: new DeltaSynchronousMetricStorage<>(
- reader, descriptor, aggregator, processor, maxCardinality, enabled);
+ reader, descriptor, aggregator, processor, clock, maxCardinality, enabled);
}
@Override
@@ -160,9 +166,13 @@ protected AggregatorHandle getAggregatorHandle(
}
}
// Get handle from pool if available, else create a new one.
+ // Note: pooled handles (used only for delta temporality) retain their original
+ // creationEpochNanos, but delta storage does not use the handle's creation time for the
+ // start epoch — it uses the reader's last collect time directly in collect(). So the stale
+ // creation time on a recycled handle does not affect correctness.
AggregatorHandle newHandle = maybeGetPooledAggregatorHandle();
if (newHandle == null) {
- newHandle = aggregator.createHandle();
+ newHandle = aggregator.createHandle(clock.now());
}
handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
return handle != null ? handle : newHandle;
@@ -178,6 +188,7 @@ public MetricDescriptor getMetricDescriptor() {
private static class DeltaSynchronousMetricStorage
extends DefaultSynchronousMetricStorage {
+ private final long instrumentCreationEpochNanos;
private final RegisteredReader registeredReader;
private final MemoryMode memoryMode;
@@ -195,9 +206,11 @@ private static class DeltaSynchronousMetricStorage
MetricDescriptor metricDescriptor,
Aggregator aggregator,
AttributesProcessor attributesProcessor,
+ Clock clock,
int maxCardinality,
boolean enabled) {
- super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled);
+ super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled);
+ this.instrumentCreationEpochNanos = clock.now();
this.registeredReader = registeredReader;
this.memoryMode = registeredReader.getReader().getMemoryMode();
}
@@ -262,10 +275,7 @@ private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) {
@Override
public MetricData collect(
- Resource resource,
- InstrumentationScopeInfo instrumentationScopeInfo,
- long startEpochNanos,
- long epochNanos) {
+ Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) {
ConcurrentHashMap> aggregatorHandles;
AggregatorHolder holder = this.aggregatorHolder;
this.aggregatorHolder =
@@ -316,6 +326,11 @@ public MetricData collect(
}
}
+ // Start time for synchronous delta instruments is the time of the last collection, or if no
+ // collection has yet taken place, the time the instrument was created.
+ long startEpochNanos =
+ registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos);
+
// Grab aggregated points.
aggregatorHandles.forEach(
(attributes, handle) -> {
@@ -324,10 +339,7 @@ public MetricData collect(
}
T point =
handle.aggregateThenMaybeReset(
- registeredReader.getLastCollectEpochNanos(),
- epochNanos,
- attributes,
- /* reset= */ true);
+ startEpochNanos, epochNanos, attributes, /* reset= */ true);
if (memoryMode == IMMUTABLE_DATA) {
// Return the aggregator to the pool.
@@ -402,10 +414,11 @@ private static class CumulativeSynchronousMetricStorage
MetricDescriptor metricDescriptor,
Aggregator aggregator,
AttributesProcessor attributesProcessor,
+ Clock clock,
int maxCardinality,
boolean enabled,
MemoryMode memoryMode) {
- super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled);
+ super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled);
this.memoryMode = memoryMode;
}
@@ -430,10 +443,7 @@ AggregatorHandle maybeGetPooledAggregatorHandle() {
@Override
public MetricData collect(
- Resource resource,
- InstrumentationScopeInfo instrumentationScopeInfo,
- long startEpochNanos,
- long epochNanos) {
+ Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) {
List points;
if (memoryMode == REUSABLE_DATA) {
reusableResultList.clear();
@@ -448,9 +458,11 @@ public MetricData collect(
if (!handle.hasRecordedValues()) {
return;
}
+ // Start time for cumulative synchronous instruments is the time the first series
+ // measurement was recorded. I.e. the time the AggregatorHandle was created.
T point =
handle.aggregateThenMaybeReset(
- startEpochNanos, epochNanos, attributes, /* reset= */ false);
+ handle.getCreationEpochNanos(), epochNanos, attributes, /* reset= */ false);
if (point != null) {
points.add(point);
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java
index 9a0ed207877..c8580e79f61 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java
@@ -27,10 +27,7 @@ public MetricDescriptor getMetricDescriptor() {
@Override
public MetricData collect(
- Resource resource,
- InstrumentationScopeInfo instrumentationScopeInfo,
- long startEpochNanos,
- long epochNanos) {
+ Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) {
return EmptyMetricData.getInstance();
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java
index 1e42b854463..0cd4895f068 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterProviderSharedState.java
@@ -23,9 +23,9 @@
public abstract class MeterProviderSharedState {
public static MeterProviderSharedState create(
- Clock clock, Resource resource, ExemplarFilterInternal exemplarFilter, long startEpochNanos) {
+ Clock clock, Resource resource, ExemplarFilterInternal exemplarFilter) {
MeterProviderSharedState sharedState =
- new AutoValue_MeterProviderSharedState(clock, resource, startEpochNanos, exemplarFilter);
+ new AutoValue_MeterProviderSharedState(clock, resource, exemplarFilter);
return sharedState;
}
@@ -37,9 +37,6 @@ public static MeterProviderSharedState create(
/** Returns the {@link Resource} to attach telemetry to. */
public abstract Resource getResource();
- /** Returns the timestamp when the {@link SdkMeterProvider} was started, in epoch nanos. */
- public abstract long getStartEpochNanos();
-
/** Returns the {@link ExemplarFilterInternal} for remembering synchronous measurements. */
public abstract ExemplarFilterInternal getExemplarFilter();
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java
index 6bada056d8a..b4ff10922fc 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java
@@ -38,15 +38,11 @@ public interface MetricStorage {
*
* @param resource The resource associated with the metrics.
* @param instrumentationScopeInfo The instrumentation scope generating the metrics.
- * @param startEpochNanos The start timestamp for this SDK.
* @param epochNanos The timestamp for this collection.
* @return The {@link MetricData} from this collection period.
*/
MetricData collect(
- Resource resource,
- InstrumentationScopeInfo instrumentationScopeInfo,
- long startEpochNanos,
- long epochNanos);
+ Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos);
void setEnabled(boolean enabled);
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java
index b16f396d57a..db8ef8c5bb1 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java
@@ -70,19 +70,11 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() {
* Set the active reader, and clock information. {@link #unsetActiveReader()} MUST be called
* after.
*/
- public void setActiveReader(
- RegisteredReader registeredReader, long startEpochNanos, long epochNanos) {
+ public void setActiveReader(RegisteredReader registeredReader) {
this.activeReader = registeredReader;
- for (AsynchronousMetricStorage> storage : storages) {
- if (storage.getRegisteredReader().equals(activeReader)) {
- storage.setEpochInformation(startEpochNanos, epochNanos);
- }
- }
}
- /**
- * Unset the active reader. Called after {@link #setActiveReader(RegisteredReader, long, long)}.
- */
+ /** Unset the active reader. Called after {@link #setActiveReader(RegisteredReader)}. */
public void unsetActiveReader() {
this.activeReader = null;
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java
index dc23f89c4d4..38c9552cbe0 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java
@@ -5,6 +5,7 @@
package io.opentelemetry.sdk.metrics.internal.state;
+import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
@@ -38,6 +39,7 @@ static SynchronousMetricStorage empty() {
static SynchronousMetricStorage create(
RegisteredReader registeredReader,
RegisteredView registeredView,
+ Clock clock,
InstrumentDescriptor instrumentDescriptor,
ExemplarFilterInternal exemplarFilter,
boolean enabled) {
@@ -58,6 +60,7 @@ static SynchronousMetricStorage create(
aggregator,
registeredView.getViewAttributesProcessor(),
registeredView.getCardinalityLimit(),
+ clock,
enabled);
}
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java
index 6c54636ee4f..14ce8e8609e 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/InstrumentBuilderTest.java
@@ -23,8 +23,7 @@ class InstrumentBuilderTest {
MeterProviderSharedState.create(
TestClock.create(),
Resource.getDefault(),
- asExemplarFilterInternal(ExemplarFilter.alwaysOff()),
- 0);
+ asExemplarFilterInternal(ExemplarFilter.alwaysOff()));
static final InstrumentationScopeInfo SCOPE = InstrumentationScopeInfo.create("scope-name");
public static final SdkMeter SDK_METER =
new SdkMeter(
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java
index 34882efc751..5f421a1c3b0 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java
@@ -69,6 +69,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
doubleCounter.add(12d, Attributes.empty());
doubleCounter.add(12d);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric ->
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java
index 740775e766f..c39558ce978 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java
@@ -103,6 +103,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
doubleGauge.set(12d, Attributes.empty());
doubleGauge.set(13d);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(cumulativeReader.collectAllMetrics())
.satisfiesExactly(
metric ->
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java
index c06948acf88..409607050fb 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java
@@ -74,6 +74,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
doubleHistogram.record(12d, Attributes.empty());
doubleHistogram.record(12d);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric ->
@@ -203,8 +204,10 @@ void collectMetrics_ExponentialHistogramAggregation() {
.build();
testClock.advance(Duration.ofNanos(SECOND_NANOS));
doubleHistogram.record(12d, Attributes.builder().put("key", "value").build());
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
doubleHistogram.record(12d);
doubleHistogram.record(13d);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric ->
@@ -243,7 +246,7 @@ void collectMetrics_ExponentialHistogramAggregation() {
.hasCounts(Collections.emptyList())),
point ->
point
- .hasStartEpochNanos(testClock.now() - SECOND_NANOS)
+ .hasStartEpochNanos(testClock.now() - 2 * SECOND_NANOS)
.hasEpochNanos(testClock.now())
.hasAttributes(
Attributes.builder().put("key", "value").build())
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java
index a15dcf422cc..244d3d3c5e2 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java
@@ -68,6 +68,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
doubleUpDownCounter.add(12d, Attributes.empty());
doubleUpDownCounter.add(12d);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric ->
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java
index 160a07b9e48..fbfe9fb1803 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java
@@ -62,6 +62,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
longCounter.add(12, Attributes.empty());
longCounter.add(12);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric ->
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java
index 3879f2e8a76..98d464a019a 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java
@@ -92,6 +92,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
longGauge.set(12, Attributes.empty());
longGauge.set(13);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(cumulativeReader.collectAllMetrics())
.satisfiesExactly(
metric ->
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java
index 1f0321f55bf..fabff7b312d 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java
@@ -74,6 +74,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
longHistogram.record(12, Attributes.empty());
longHistogram.record(12);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(reader.collectAllMetrics())
.satisfiesExactly(
metric ->
@@ -376,8 +377,10 @@ void collectMetrics_ExponentialHistogramAggregation() {
.build();
testClock.advance(Duration.ofNanos(SECOND_NANOS));
longHistogram.record(12L, Attributes.builder().put("key", "value").build());
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
longHistogram.record(12L);
longHistogram.record(13L);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(reader.collectAllMetrics())
.satisfiesExactly(
metric ->
@@ -416,7 +419,7 @@ void collectMetrics_ExponentialHistogramAggregation() {
.hasCounts(Collections.emptyList())),
point ->
point
- .hasStartEpochNanos(testClock.now() - SECOND_NANOS)
+ .hasStartEpochNanos(testClock.now() - 2 * SECOND_NANOS)
.hasEpochNanos(testClock.now())
.hasAttributes(
Attributes.builder().put("key", "value").build())
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java
index 39eb6500207..63bb43a1468 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java
@@ -61,6 +61,7 @@ void collectMetrics_WithEmptyAttributes() {
testClock.advance(Duration.ofNanos(SECOND_NANOS));
longUpDownCounter.add(12, Attributes.empty());
longUpDownCounter.add(12);
+ testClock.advance(Duration.ofNanos(SECOND_NANOS));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric ->
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java
index 5a96360ceb9..778aa5fd375 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java
@@ -39,6 +39,14 @@ class AggregatorHandleTest {
@Mock DoubleExemplarReservoir doubleReservoir;
@Mock LongExemplarReservoir longReservoir;
+ @Test
+ void testCreationTimeEpochNanos() {
+ long creationEpochNanos = 12345L;
+ TestDoubleAggregatorHandle handle =
+ new TestDoubleAggregatorHandle(doubleReservoir, creationEpochNanos);
+ assertThat(handle.getCreationEpochNanos()).isEqualTo(creationEpochNanos);
+ }
+
@Test
void testRecordings() {
TestLongAggregatorHandle testLongAggregator = new TestLongAggregatorHandle(longReservoir);
@@ -102,14 +110,18 @@ void testGenerateExemplarsOnCollect() {
private static class TestDoubleAggregatorHandle extends TestAggregatorHandle {
TestDoubleAggregatorHandle(DoubleExemplarReservoir reservoir) {
- super(reservoir, null, /* isDoubleType= */ true);
+ super(reservoir, null, /* isDoubleType= */ true, /* creationEpochNanos= */ 0);
+ }
+
+ TestDoubleAggregatorHandle(DoubleExemplarReservoir reservoir, long creationEpochNanos) {
+ super(reservoir, null, /* isDoubleType= */ true, creationEpochNanos);
}
}
private static class TestLongAggregatorHandle extends TestAggregatorHandle {
TestLongAggregatorHandle(LongExemplarReservoir reservoir) {
- super(null, reservoir, /* isDoubleType= */ false);
+ super(null, reservoir, /* isDoubleType= */ false, /* creationEpochNanos= */ 0);
}
}
@@ -121,8 +133,10 @@ private abstract static class TestAggregatorHandle extends AggregatorHandle handle = aggregator.createHandle();
+ AggregatorHandle> handle = aggregator.createHandle(0);
assertThat(handle).isInstanceOf(DoubleBase2ExponentialHistogramAggregator.Handle.class);
ExponentialHistogramPointData point =
((DoubleBase2ExponentialHistogramAggregator.Handle) handle)
@@ -127,7 +127,7 @@ void createHandle(MemoryMode memoryMode) {
void testRecordings(MemoryMode memoryMode) {
initialize(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(0.5, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(1.0, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(12.0, Attributes.empty(), Context.current());
@@ -172,7 +172,7 @@ void testRecordings(MemoryMode memoryMode) {
void testInvalidRecording(MemoryMode memoryMode) {
initialize(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
// Non-finite recordings should be ignored
aggregatorHandle.recordDouble(Double.POSITIVE_INFINITY, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(Double.NEGATIVE_INFINITY, Attributes.empty(), Context.current());
@@ -189,7 +189,7 @@ void testInvalidRecording(MemoryMode memoryMode) {
@ParameterizedTest
@MethodSource("provideAggregator")
void testRecordingsAtLimits(DoubleBase2ExponentialHistogramAggregator aggregator) {
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(Double.MIN_VALUE, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(Double.MAX_VALUE, Attributes.empty(), Context.current());
@@ -251,7 +251,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) {
List exemplars = Collections.singletonList(exemplar);
Mockito.when(reservoir.collectAndResetDoubles(Attributes.empty())).thenReturn(exemplars);
- AggregatorHandle aggregatorHandle = agg.createHandle();
+ AggregatorHandle aggregatorHandle = agg.createHandle(0);
aggregatorHandle.recordDouble(0, attributes, Context.root());
assertThat(
@@ -267,7 +267,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) {
void aggregateThenMaybeReset(MemoryMode memoryMode) {
initialize(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(5.0, Attributes.empty(), Context.current());
assertThat(
@@ -284,7 +284,7 @@ void aggregateThenMaybeReset(MemoryMode memoryMode) {
void testInsert1M(MemoryMode memoryMode) {
initialize(memoryMode);
- AggregatorHandle handle = aggregator.createHandle();
+ AggregatorHandle handle = aggregator.createHandle(0);
int n = 1024 * 1024 - 1;
double min = 16.0 / n;
@@ -310,7 +310,7 @@ void testDownScale(MemoryMode memoryMode) {
initialize(memoryMode);
DoubleBase2ExponentialHistogramAggregator.Handle handle =
- (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle();
+ (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0);
// record a measurement to initialize positive buckets
handle.recordDouble(0.5, Attributes.empty(), Context.current());
@@ -357,7 +357,7 @@ void testToMetricData(MemoryMode memoryMode) {
reservoirFactory, 160, MAX_SCALE, /* recordMinMax= */ true, memoryMode);
AggregatorHandle aggregatorHandle =
- cumulativeAggregator.createHandle();
+ cumulativeAggregator.createHandle(0);
aggregatorHandle.recordDouble(0, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(0, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(123.456, Attributes.empty(), Context.current());
@@ -422,7 +422,7 @@ void testToMetricData(MemoryMode memoryMode) {
void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException {
initialize(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
ImmutableList updates = ImmutableList.of(0D, 0.1D, -0.1D, 1D, -1D, 100D);
int numberOfThreads = updates.size();
int numberOfUpdates = 10000;
@@ -492,7 +492,7 @@ public void verifyMutableDataUsedInReusableDataMemoryMode() {
initialize(MemoryMode.REUSABLE_DATA);
DoubleBase2ExponentialHistogramAggregator.Handle handle =
- (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle();
+ (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0);
// record a measurement to initialize positive buckets
handle.recordDouble(0.5, Attributes.empty(), Context.current());
@@ -525,7 +525,7 @@ public void verifyImmutableDataUsedInImmutableDataMemoryMode() {
initialize(MemoryMode.IMMUTABLE_DATA);
DoubleBase2ExponentialHistogramAggregator.Handle handle =
- (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle();
+ (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0);
// record a measurement to initialize positive buckets
handle.recordDouble(0.5, Attributes.empty(), Context.current());
@@ -548,7 +548,7 @@ public void reusablePoint_emptyFirstThenRecordAndCheck() {
initialize(MemoryMode.REUSABLE_DATA);
DoubleBase2ExponentialHistogramAggregator.Handle handle =
- (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle();
+ (DoubleBase2ExponentialHistogramAggregator.Handle) aggregator.createHandle(0);
// Let's create a point without buckets
ExponentialHistogramPointData point =
@@ -581,7 +581,7 @@ void testRecordMinMaxDisabled(MemoryMode memoryMode) {
DoubleBase2ExponentialHistogramAggregator aggregator =
new DoubleBase2ExponentialHistogramAggregator(
ExemplarReservoirFactory.noSamples(), 160, 20, /* recordMinMax= */ false, memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(0.5, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(1.0, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(12.0, Attributes.empty(), Context.current());
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java
index 66fd7dd43b6..0d12083f306 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java
@@ -77,7 +77,7 @@ private void init(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void createHandle(MemoryMode memoryMode) {
init(memoryMode);
- assertThat(aggregator.createHandle())
+ assertThat(aggregator.createHandle(0))
.isInstanceOf(DoubleExplicitBucketHistogramAggregator.Handle.class);
}
@@ -85,7 +85,7 @@ void createHandle(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void testRecordings(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(20, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(5, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(150, Attributes.empty(), Context.current());
@@ -125,7 +125,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) {
DoubleExplicitBucketHistogramAggregator aggregator =
new DoubleExplicitBucketHistogramAggregator(
boundaries, /* recordMinMax= */ true, reservoirFactory, memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(0, attributes, Context.root());
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
@@ -148,7 +148,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void aggregateThenMaybeReset(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(100, Attributes.empty(), Context.current());
assertThat(
@@ -187,7 +187,7 @@ void aggregateThenMaybeReset(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void toMetricData(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(10, Attributes.empty(), Context.current());
MetricData metricData =
@@ -257,7 +257,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void testHistogramCounts(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(1.1, Attributes.empty(), Context.current());
HistogramPointData point =
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
@@ -268,7 +268,7 @@ void testHistogramCounts(MemoryMode memoryMode) {
@Test
void testReusableDataMemoryMode() {
init(MemoryMode.REUSABLE_DATA);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(10, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(20, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(30, Attributes.empty(), Context.current());
@@ -297,7 +297,7 @@ void testRecordMinMaxDisabled(MemoryMode memoryMode) {
/* recordMinMax= */ false,
ExemplarReservoirFactory.noSamples(),
memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(20, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(5, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(150, Attributes.empty(), Context.current());
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java
index ee8f3645a93..0828b93de25 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java
@@ -49,7 +49,7 @@ private void init(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void createHandle(MemoryMode memoryMode) {
init(memoryMode);
- assertThat(aggregator.createHandle()).isInstanceOf(DoubleLastValueAggregator.Handle.class);
+ assertThat(aggregator.createHandle(0)).isInstanceOf(DoubleLastValueAggregator.Handle.class);
}
@ParameterizedTest
@@ -57,7 +57,7 @@ void createHandle(MemoryMode memoryMode) {
void multipleRecords(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current());
assertThat(
aggregatorHandle
@@ -78,7 +78,7 @@ void multipleRecords(MemoryMode memoryMode) {
void aggregateThenMaybeReset(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(13.1, Attributes.empty(), Context.current());
assertThat(
@@ -228,7 +228,7 @@ void copyPoint(MemoryMode memoryMode) {
void toMetricData(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(10, Attributes.empty(), Context.current());
MetricData metricData =
@@ -258,7 +258,7 @@ void toMetricData(MemoryMode memoryMode) {
@Test
void testReusableDataOnCollect() {
init(MemoryMode.REUSABLE_DATA);
- AggregatorHandle handle = aggregator.createHandle();
+ AggregatorHandle handle = aggregator.createHandle(0);
handle.recordDouble(1, Attributes.empty(), Context.current());
DoublePointData pointData =
handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ false);
@@ -279,7 +279,7 @@ void testReusableDataOnCollect() {
@Test
void testNullPointerExceptionWhenUnset() {
init(MemoryMode.REUSABLE_DATA);
- AggregatorHandle handle = aggregator.createHandle();
+ AggregatorHandle handle = aggregator.createHandle(0);
assertThatThrownBy(
() -> handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ true))
.isInstanceOf(NullPointerException.class);
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java
index 4afd1672bc5..1e4353b21fc 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java
@@ -84,14 +84,14 @@ private void init(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void createHandle(MemoryMode memoryMode) {
init(memoryMode);
- assertThat(aggregator.createHandle()).isInstanceOf(DoubleSumAggregator.Handle.class);
+ assertThat(aggregator.createHandle(0)).isInstanceOf(DoubleSumAggregator.Handle.class);
}
@ParameterizedTest
@EnumSource(MemoryMode.class)
void multipleRecords(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(12.1, Attributes.empty(), Context.current());
@@ -108,7 +108,7 @@ void multipleRecords(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void multipleRecords_WithNegatives(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(12, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(12, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(-23, Attributes.empty(), Context.current());
@@ -126,7 +126,7 @@ void multipleRecords_WithNegatives(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void aggregateThenMaybeReset(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(13, Attributes.empty(), Context.current());
aggregatorHandle.recordDouble(12, Attributes.empty(), Context.current());
@@ -172,7 +172,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) {
Advice.empty()),
reservoirFactory,
memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(0, attributes, Context.root());
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
@@ -315,7 +315,7 @@ void copyPoint(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void toMetricData(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(10, Attributes.empty(), Context.current());
MetricData metricData =
@@ -375,7 +375,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) {
@Test
void sameObjectReturnedOnReusableDataMemoryMode() {
init(MemoryMode.REUSABLE_DATA);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordDouble(1.0, Attributes.empty(), Context.current());
DoublePointData firstCollection =
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java
index 1cf98c58aca..dc24e8011be 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java
@@ -50,14 +50,14 @@ private void init(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void createHandle(MemoryMode memoryMode) {
init(memoryMode);
- assertThat(aggregator.createHandle()).isInstanceOf(LongLastValueAggregator.Handle.class);
+ assertThat(aggregator.createHandle(0)).isInstanceOf(LongLastValueAggregator.Handle.class);
}
@ParameterizedTest
@EnumSource(MemoryMode.class)
void multipleRecords(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(12, Attributes.empty(), Context.current());
assertThat(
aggregatorHandle
@@ -77,7 +77,7 @@ void multipleRecords(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void aggregateThenMaybeReset(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(13, Attributes.empty(), Context.current());
assertThat(
@@ -190,7 +190,7 @@ void copyPoint(MemoryMode memoryMode) {
void toMetricData(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(10, Attributes.empty(), Context.current());
MetricData metricData =
@@ -218,7 +218,7 @@ void toMetricData(MemoryMode memoryMode) {
@Test
void testReusablePointOnCollect() {
init(MemoryMode.REUSABLE_DATA);
- AggregatorHandle handle = aggregator.createHandle();
+ AggregatorHandle handle = aggregator.createHandle(0);
handle.recordLong(1, Attributes.empty(), Context.current());
LongPointData pointData =
handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ false);
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java
index d2742050ea0..b984731190c 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java
@@ -83,14 +83,14 @@ private void init(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void createHandle(MemoryMode memoryMode) {
init(memoryMode);
- assertThat(aggregator.createHandle()).isInstanceOf(LongSumAggregator.Handle.class);
+ assertThat(aggregator.createHandle(0)).isInstanceOf(LongSumAggregator.Handle.class);
}
@ParameterizedTest
@EnumSource(MemoryMode.class)
void multipleRecords(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(12, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(12, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(12, Attributes.empty(), Context.current());
@@ -107,7 +107,7 @@ void multipleRecords(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void multipleRecords_WithNegatives(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(12, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(12, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(-23, Attributes.empty(), Context.current());
@@ -125,7 +125,7 @@ void multipleRecords_WithNegatives(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void aggregateThenMaybeReset(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(13, Attributes.empty(), Context.current());
aggregatorHandle.recordLong(12, Attributes.empty(), Context.current());
@@ -171,7 +171,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) {
Advice.empty()),
reservoirFactory,
memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(0, attributes, Context.root());
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
@@ -313,7 +313,7 @@ void copyPoint(MemoryMode memoryMode) {
@EnumSource(MemoryMode.class)
void toMetricData(MemoryMode memoryMode) {
init(memoryMode);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(10, Attributes.empty(), Context.current());
MetricData metricData =
@@ -374,7 +374,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) {
@Test
void sameObjectReturnedOnReusableDataMemoryMode() {
init(MemoryMode.REUSABLE_DATA);
- AggregatorHandle aggregatorHandle = aggregator.createHandle();
+ AggregatorHandle aggregatorHandle = aggregator.createHandle(0);
aggregatorHandle.recordLong(1L, Attributes.empty(), Context.current());
LongPointData firstCollection =
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java
index e151876e6a9..a0a4cf7125b 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/export/RegisteredReaderTest.java
@@ -38,13 +38,13 @@ void getReader() {
}
@Test
- void setAndGetLastCollectEpochNanos() {
+ void getLastCollectEpochNanosOrDefault() {
RegisteredReader registeredReader = RegisteredReader.create(reader, ViewRegistry.create());
- assertThat(registeredReader.getLastCollectEpochNanos()).isEqualTo(0);
+ assertThat(registeredReader.getLastCollectEpochNanosOrDefault(4)).isEqualTo(4);
registeredReader.setLastCollectEpochNanos(1);
- assertThat(registeredReader.getLastCollectEpochNanos()).isEqualTo(1);
+ assertThat(registeredReader.getLastCollectEpochNanosOrDefault(4)).isEqualTo(1);
registeredReader.setLastCollectEpochNanos(5);
- assertThat(registeredReader.getLastCollectEpochNanos()).isEqualTo(5);
+ assertThat(registeredReader.getLastCollectEpochNanosOrDefault(5)).isEqualTo(5);
}
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java
index 860307d4777..e50539d52bf 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java
@@ -34,6 +34,8 @@
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.time.TestClock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Collection;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -47,12 +49,14 @@
@ExtendWith(MockitoExtension.class)
class AsynchronousMetricStorageTest {
+ private static final long START_SECOND_NANOS = 1_000_000_000L;
+
private static final int CARDINALITY_LIMIT = 25;
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(AsynchronousMetricStorage.class);
- private final TestClock testClock = TestClock.create();
+ private final TestClock testClock = TestClock.create(Instant.ofEpochSecond(1));
private final Resource resource = Resource.empty();
private final InstrumentationScopeInfo scope = InstrumentationScopeInfo.empty();
private final InstrumentSelector selector = InstrumentSelector.builder().setName("*").build();
@@ -72,7 +76,11 @@ class AsynchronousMetricStorageTest {
// Not using @BeforeEach since many methods require executing them for each MemoryMode
void setup(MemoryMode memoryMode) {
- when(reader.getAggregationTemporality(any())).thenReturn(AggregationTemporality.CUMULATIVE);
+ setup(memoryMode, AggregationTemporality.CUMULATIVE);
+ }
+
+ void setup(MemoryMode memoryMode, AggregationTemporality temporality) {
+ when(reader.getAggregationTemporality(any())).thenReturn(temporality);
when(reader.getMemoryMode()).thenReturn(memoryMode);
registeredReader = RegisteredReader.create(reader, ViewRegistry.create());
@@ -80,6 +88,7 @@ void setup(MemoryMode memoryMode) {
AsynchronousMetricStorage.create(
registeredReader,
registeredView,
+ testClock,
InstrumentDescriptor.create(
"long-counter",
"description",
@@ -92,6 +101,7 @@ void setup(MemoryMode memoryMode) {
AsynchronousMetricStorage.create(
registeredReader,
registeredView,
+ testClock,
InstrumentDescriptor.create(
"double-counter",
"description",
@@ -107,12 +117,11 @@ void setup(MemoryMode memoryMode) {
void recordLong(MemoryMode memoryMode) {
setup(memoryMode);
- longCounterStorage.setEpochInformation(0, 1);
longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1);
longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2);
longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3);
- assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.satisfies(
metricData ->
assertThat(metricData)
@@ -133,12 +142,11 @@ void recordLong(MemoryMode memoryMode) {
void recordDouble(MemoryMode memoryMode) {
setup(memoryMode);
- longCounterStorage.setEpochInformation(0, 1);
doubleCounterStorage.record(Attributes.builder().put("key", "a").build(), 1.1);
doubleCounterStorage.record(Attributes.builder().put("key", "b").build(), 2.2);
doubleCounterStorage.record(Attributes.builder().put("key", "c").build(), 3.3);
- assertThat(doubleCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
+ assertThat(doubleCounterStorage.collect(resource, scope, testClock.now()))
.satisfies(
metricData ->
assertThat(metricData)
@@ -170,6 +178,7 @@ void record_ProcessesAttributes(MemoryMode memoryMode) {
AttributesProcessor.filterByKeyName(key -> key.equals("key1")),
CARDINALITY_LIMIT,
SourceInfo.noSourceInfo()),
+ testClock,
InstrumentDescriptor.create(
"name",
"description",
@@ -179,10 +188,9 @@ void record_ProcessesAttributes(MemoryMode memoryMode) {
Advice.empty()),
/* enabled= */ true);
- storage.setEpochInformation(0, 1);
storage.record(Attributes.builder().put("key1", "a").put("key2", "b").build(), 1);
- assertThat(storage.collect(resource, scope, 0, testClock.nanoTime()))
+ assertThat(storage.collect(resource, scope, testClock.now()))
.satisfies(
metricData ->
assertThat(metricData)
@@ -199,12 +207,11 @@ void record_ProcessesAttributes(MemoryMode memoryMode) {
void record_MaxCardinality(MemoryMode memoryMode) {
setup(memoryMode);
- longCounterStorage.setEpochInformation(0, 1);
for (int i = 0; i <= CARDINALITY_LIMIT + 1; i++) {
longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1);
}
- assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.satisfies(
metricData ->
assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT));
@@ -213,28 +220,26 @@ void record_MaxCardinality(MemoryMode memoryMode) {
@ParameterizedTest
@EnumSource(MemoryMode.class)
- void record_HandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) {
- setup(memoryMode);
+ void record_DeltaHandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) {
+ setup(memoryMode, AggregationTemporality.DELTA);
- longCounterStorage.setEpochInformation(0, 1);
for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) {
longCounterStorage.record(Attributes.builder().put("key" + i, "val").build(), 1);
}
- assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.satisfies(
metricData ->
assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1));
logs.assertDoesNotContain(
"Instrument long-counter has exceeded the maximum allowed cardinality");
- longCounterStorage.setEpochInformation(1, 2);
for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) {
// Different attribute
longCounterStorage.record(Attributes.builder().put("key" + i, "val2").build(), 1);
}
- assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.satisfies(
metricData ->
assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT - 1));
@@ -247,11 +252,10 @@ void record_HandlesSpotsAreReleasedAfterCollection(MemoryMode memoryMode) {
void record_DuplicateAttributes(MemoryMode memoryMode) {
setup(memoryMode);
- longCounterStorage.setEpochInformation(0, 1);
longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 1);
longCounterStorage.record(Attributes.builder().put("key1", "a").build(), 2);
- assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.satisfies(
metricData ->
assertThat(metricData)
@@ -267,64 +271,66 @@ void record_DuplicateAttributes(MemoryMode memoryMode) {
void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) {
setup(memoryMode);
- // Record measurement and collect at time 10
- longCounterStorage.setEpochInformation(0, 10);
+ // Record measurement and collect at time START_SECONDS + 10s
+ testClock.advance(Duration.ofSeconds(10));
longCounterStorage.record(Attributes.empty(), 3);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.hasLongSumSatisfying(
sum ->
sum.isCumulative()
.hasPointsSatisfying(
point ->
point
- .hasStartEpochNanos(0)
- .hasEpochNanos(10)
+ .hasStartEpochNanos(START_SECOND_NANOS)
+ .hasEpochNanos(testClock.now())
.hasValue(3)
.hasAttributes(Attributes.empty())));
- registeredReader.setLastCollectEpochNanos(10);
+ registeredReader.setLastCollectEpochNanos(testClock.now());
- // Record measurements and collect at time 30
- longCounterStorage.setEpochInformation(0, 30);
+ // Record measurements and collect at time START_SECONDS + 30s
+ testClock.advance(Duration.ofSeconds(20));
longCounterStorage.record(Attributes.empty(), 3);
longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.hasLongSumSatisfying(
sum ->
sum.isCumulative()
.hasPointsSatisfying(
point ->
point
- .hasStartEpochNanos(0)
- .hasEpochNanos(30)
+ .hasStartEpochNanos(START_SECOND_NANOS)
+ .hasEpochNanos(testClock.now())
.hasValue(3)
.hasAttributes(Attributes.empty()),
point ->
point
- .hasStartEpochNanos(0)
- .hasEpochNanos(30)
+ .hasStartEpochNanos(
+ testClock.now() - Duration.ofSeconds(20).toNanos())
+ .hasEpochNanos(testClock.now())
.hasValue(6)
.hasAttributes(Attributes.builder().put("key", "value1").build())));
- registeredReader.setLastCollectEpochNanos(30);
+ registeredReader.setLastCollectEpochNanos(testClock.now());
- // Record measurement and collect at time 35
- longCounterStorage.setEpochInformation(0, 35);
+ // Record measurement and collect at time START_SECONDS + 35s
+ testClock.advance(Duration.ofSeconds(5));
longCounterStorage.record(Attributes.empty(), 4);
longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0))
+ assertThat(longCounterStorage.collect(resource, scope, testClock.now()))
.hasLongSumSatisfying(
sum ->
sum.isCumulative()
.hasPointsSatisfying(
point ->
point
- .hasStartEpochNanos(0)
- .hasEpochNanos(35)
+ .hasStartEpochNanos(START_SECOND_NANOS)
+ .hasEpochNanos(testClock.now())
.hasValue(4)
.hasAttributes(Attributes.empty()),
point ->
point
- .hasStartEpochNanos(0)
- .hasEpochNanos(35)
+ .hasStartEpochNanos(
+ testClock.now() - Duration.ofSeconds(5).toNanos())
+ .hasEpochNanos(testClock.now())
.hasValue(5)
.hasAttributes(Attributes.builder().put("key", "value2").build())));
}
@@ -339,6 +345,7 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) {
AsynchronousMetricStorage.create(
registeredReader,
registeredView,
+ testClock,
InstrumentDescriptor.create(
"long-counter",
"description",
@@ -349,26 +356,24 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) {
/* enabled= */ true);
// Record measurement and collect at time 10
- longCounterStorage.setEpochInformation(0, 10);
longCounterStorage.record(Attributes.empty(), 3);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0))
+ assertThat(longCounterStorage.collect(resource, scope, 10))
.hasLongSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point ->
point
- .hasStartEpochNanos(0)
+ .hasStartEpochNanos(testClock.now())
.hasEpochNanos(10)
.hasValue(3)
.hasAttributes(Attributes.empty())));
registeredReader.setLastCollectEpochNanos(10);
// Record measurement and collect at time 30
- longCounterStorage.setEpochInformation(0, 30);
longCounterStorage.record(Attributes.empty(), 3);
longCounterStorage.record(Attributes.builder().put("key", "value1").build(), 6);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0))
+ assertThat(longCounterStorage.collect(resource, scope, 30))
.hasLongSumSatisfying(
sum ->
sum.isDelta()
@@ -388,10 +393,9 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) {
registeredReader.setLastCollectEpochNanos(30);
// Record measurement and collect at time 35
- longCounterStorage.setEpochInformation(0, 35);
longCounterStorage.record(Attributes.empty(), 4);
longCounterStorage.record(Attributes.builder().put("key", "value2").build(), 5);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0))
+ assertThat(longCounterStorage.collect(resource, scope, 35))
.hasLongSumSatisfying(
sum ->
sum.isDelta()
@@ -414,13 +418,12 @@ void collect_DeltaComputesDiff(MemoryMode memoryMode) {
void collect_reusableData_reusedObjectsAreReturnedOnSecondCall() {
setup(REUSABLE_DATA);
- longCounterStorage.setEpochInformation(0, 1);
longCounterStorage.record(Attributes.builder().put("key", "a").build(), 1);
longCounterStorage.record(Attributes.builder().put("key", "b").build(), 2);
longCounterStorage.record(Attributes.builder().put("key", "c").build(), 3);
MetricData firstCollectMetricData =
- longCounterStorage.collect(resource, scope, 0, testClock.nanoTime());
+ longCounterStorage.collect(resource, scope, testClock.now());
assertThat(firstCollectMetricData)
.satisfies(
metricData ->
@@ -445,7 +448,7 @@ void collect_reusableData_reusedObjectsAreReturnedOnSecondCall() {
.isInstanceOf(MutableLongPointData.class))));
MetricData secondCollectMetricData =
- longCounterStorage.collect(resource, scope, 0, testClock.nanoTime());
+ longCounterStorage.collect(resource, scope, testClock.now());
Collection extends PointData> secondCollectPoints =
secondCollectMetricData.getData().getPoints();
@@ -470,7 +473,7 @@ void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) {
longCounterStorage.record(Attributes.empty(), 10);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isTrue();
+ assertThat(longCounterStorage.collect(resource, scope, 0).isEmpty()).isTrue();
}
@ParameterizedTest
@@ -483,6 +486,6 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) {
longCounterStorage.record(Attributes.empty(), 10);
- assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isFalse();
+ assertThat(longCounterStorage.collect(resource, scope, 0).isEmpty()).isFalse();
}
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java
index c54466a0603..47f45028e6e 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java
@@ -144,14 +144,11 @@ void invokeCallback_Double() {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement1), callback);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
assertThat(counter.get()).isEqualTo(1.1);
- verify(storage1).setEpochInformation(0, 1);
verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1);
- verify(storage2, never()).setEpochInformation(anyLong(), anyLong());
verify(storage2, never()).record(any(), anyDouble());
- verify(storage3, never()).setEpochInformation(anyLong(), anyLong());
verify(storage3, never()).record(any(), anyDouble());
}
@@ -165,14 +162,11 @@ void invokeCallback_Long() {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement2), callback);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
assertThat(counter.get()).isEqualTo(1);
- verify(storage1, never()).setEpochInformation(anyLong(), anyLong());
verify(storage1, never()).record(any(), anyLong());
- verify(storage2).setEpochInformation(0, 1);
verify(storage2).record(Attributes.builder().put("key", "val").build(), 1);
- verify(storage3).setEpochInformation(0, 1);
verify(storage3).record(Attributes.builder().put("key", "val").build(), 1);
}
@@ -190,15 +184,12 @@ void invokeCallback_MultipleMeasurements() {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Arrays.asList(measurement1, measurement2), callback);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
assertThat(doubleCounter.get()).isEqualTo(1.1);
assertThat(longCounter.get()).isEqualTo(1);
- verify(storage1).setEpochInformation(0, 1);
verify(storage1).record(Attributes.builder().put("key", "val").build(), 1.1);
- verify(storage2).setEpochInformation(0, 1);
verify(storage2).record(Attributes.builder().put("key", "val").build(), 1);
- verify(storage3).setEpochInformation(0, 1);
verify(storage3).record(Attributes.builder().put("key", "val").build(), 1);
}
@@ -215,7 +206,7 @@ void invokeCallback_NoStorage() {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement), callback);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
assertThat(counter.get()).isEqualTo(0);
}
@@ -236,7 +227,7 @@ void invokeCallback_RestoresContextClassLoader() {
// Simulate invocation on a thread with null context class loader (like DaemonThreadFactory)
Thread.currentThread().setContextClassLoader(null);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
// Callback should have seen the registration-time classloader
assertThat(observedClassLoader.get()).isSameAs(registrationClassLoader);
@@ -264,7 +255,7 @@ void invokeCallback_RestoresContextClassLoaderOnException() {
// Simulate invocation on a thread with null context class loader
Thread.currentThread().setContextClassLoader(null);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
// Context classloader should still be restored even after exception
assertThat(Thread.currentThread().getContextClassLoader()).isNull();
@@ -282,7 +273,7 @@ void invokeCallback_MultipleMeasurements_ThrowsException() {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Arrays.asList(measurement1, measurement2), callback);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
verify(storage1, never()).record(any(), anyLong());
verify(storage1, never()).record(any(), anyDouble());
@@ -302,7 +293,7 @@ void invokeCallback_SingleMeasurement_ThrowsException() {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement2), callback);
- callbackRegistration.invokeCallback(registeredReader, 0, 1);
+ callbackRegistration.invokeCallback(registeredReader);
verify(storage1, never()).record(any(), anyLong());
verify(storage1, never()).record(any(), anyDouble());
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
index 9f91d13fe24..f287b76b05e 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
@@ -102,10 +102,7 @@ public MetricDescriptor getMetricDescriptor() {
@Override
public MetricData collect(
- Resource resource,
- InstrumentationScopeInfo instrumentationScopeInfo,
- long startEpochNanos,
- long epochNanos) {
+ Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) {
return null;
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java
index b21797d9756..8b859f55479 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java
@@ -11,7 +11,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.github.netmikey.logunit.api.LogCapturer;
@@ -79,18 +78,7 @@ private void setup(MemoryMode memoryMode) {
void setupAndSetActiveReader(MemoryMode memoryMode) {
setup(memoryMode);
- sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10);
- }
-
- @Test
- void setActiveReader_SetsEpochInformation() {
- setup(MemoryMode.IMMUTABLE_DATA);
-
- sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10);
-
- verify(mockAsyncStorage1).setEpochInformation(0, 10);
- verify(mockAsyncStorage2).getRegisteredReader();
- verifyNoMoreInteractions(mockAsyncStorage2);
+ sdkObservableMeasurement.setActiveReader(registeredReader1);
}
@Test
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java
index 354c1d899e5..ec2ad73652f 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java
@@ -11,6 +11,7 @@
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static org.assertj.core.api.BDDAssertions.as;
import static org.assertj.core.api.InstanceOfAssertFactories.collection;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -44,6 +45,7 @@
import io.opentelemetry.sdk.testing.assertj.DoubleSumAssert;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.testing.time.TestClock;
+import java.time.Duration;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -110,14 +112,15 @@ void recordDouble_NaN(MemoryMode memoryMode) {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
storage.recordDouble(Double.NaN, Attributes.empty(), Context.current());
logs.assertContains(
"Instrument name has recorded measurement Not-a-Number (NaN) value with attributes {}. Dropping measurement.");
- verify(aggregator, never()).createHandle();
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
+ verify(aggregator, never()).createHandle(anyLong());
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10))
.isEqualTo(EmptyMetricData.getInstance());
}
@@ -137,9 +140,10 @@ void attributesProcessor_applied(MemoryMode memoryMode) {
aggregator,
spyAttributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
storage.recordDouble(1, attributes, Context.root());
- MetricData md = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now());
+ MetricData md = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, testClock.now());
assertThat(md)
.hasDoubleSumSatisfying(
sum ->
@@ -161,39 +165,109 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
- verify(aggregator, times(1)).createHandle();
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
+ verify(aggregator, times(1)).createHandle(testClock.now());
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isCumulative()
.hasPointsSatisfying(
- point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3)));
+ point ->
+ point
+ .hasStartEpochNanos(testClock.now())
+ .hasEpochNanos(10)
+ .hasValue(3)));
cumulativeReader.setLastCollectEpochNanos(10);
// Record measurement and collect at time 30
storage.recordDouble(3, Attributes.empty(), Context.current());
- verify(aggregator, times(1)).createHandle();
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
+ verify(aggregator, times(1)).createHandle(testClock.now());
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30))
.hasDoubleSumSatisfying(
sum ->
sum.isCumulative()
.hasPointsSatisfying(
- point -> point.hasStartEpochNanos(0).hasEpochNanos(30).hasValue(6)));
+ point ->
+ point
+ .hasStartEpochNanos(testClock.now())
+ .hasEpochNanos(30)
+ .hasValue(6)));
cumulativeReader.setLastCollectEpochNanos(30);
// Record measurement and collect at time 35
storage.recordDouble(2, Attributes.empty(), Context.current());
- verify(aggregator, times(1)).createHandle();
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35))
+ verify(aggregator, times(1)).createHandle(testClock.now());
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35))
+ .hasDoubleSumSatisfying(
+ sum ->
+ sum.isCumulative()
+ .hasPointsSatisfying(
+ point ->
+ point
+ .hasStartEpochNanos(testClock.now())
+ .hasEpochNanos(35)
+ .hasValue(8)));
+ }
+
+ @ParameterizedTest
+ @EnumSource(MemoryMode.class)
+ void recordAndCollect_CumulativeNewSeriesAfterFirstCollection(MemoryMode memoryMode) {
+ initialize(memoryMode);
+
+ DefaultSynchronousMetricStorage> storage =
+ DefaultSynchronousMetricStorage.create(
+ cumulativeReader,
+ METRIC_DESCRIPTOR,
+ aggregator,
+ attributesProcessor,
+ CARDINALITY_LIMIT,
+ testClock,
+ /* enabled= */ true);
+
+ // Record for series A and collect at time 10
+ storage.recordDouble(3, Attributes.builder().put("series", "A").build(), Context.current());
+ long seriesACreationTime = testClock.now();
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isCumulative()
.hasPointsSatisfying(
- point -> point.hasStartEpochNanos(0).hasEpochNanos(35).hasValue(8)));
+ point ->
+ point
+ .hasStartEpochNanos(seriesACreationTime)
+ .hasEpochNanos(10)
+ .hasValue(3)
+ .hasAttributes(Attributes.builder().put("series", "A").build())));
+ cumulativeReader.setLastCollectEpochNanos(10);
+
+ // Advance clock and record for both series A and a new series B
+ testClock.advance(Duration.ofSeconds(20));
+ storage.recordDouble(5, Attributes.builder().put("series", "A").build(), Context.current());
+ storage.recordDouble(7, Attributes.builder().put("series", "B").build(), Context.current());
+ long seriesBCreationTime = testClock.now();
+ // Series B's start time must be clock.now() at its first measurement, NOT instrument
+ // creation time or last collection time.
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30))
+ .hasDoubleSumSatisfying(
+ sum ->
+ sum.isCumulative()
+ .hasPointsSatisfying(
+ point ->
+ point
+ .hasStartEpochNanos(seriesACreationTime)
+ .hasEpochNanos(30)
+ .hasValue(8)
+ .hasAttributes(Attributes.builder().put("series", "A").build()),
+ point ->
+ point
+ .hasStartEpochNanos(seriesBCreationTime)
+ .hasEpochNanos(30)
+ .hasValue(7)
+ .hasAttributes(Attributes.builder().put("series", "B").build())));
}
@Test
@@ -207,20 +281,25 @@ void recordAndCollect_DeltaResets_ImmutableData() {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
- verify(aggregator, times(1)).createHandle();
+ verify(aggregator, times(1)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
- point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3)));
+ point ->
+ point
+ .hasStartEpochNanos(testClock.now())
+ .hasEpochNanos(10)
+ .hasValue(3)));
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(1);
@@ -230,11 +309,11 @@ void recordAndCollect_DeltaResets_ImmutableData() {
storage.recordDouble(3, Attributes.empty(), Context.current());
// AggregatorHandle should be returned to the pool on reset so shouldn't create additional
// handles
- verify(aggregator, times(1)).createHandle();
+ verify(aggregator, times(1)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
@@ -247,11 +326,11 @@ void recordAndCollect_DeltaResets_ImmutableData() {
// Record measurement and collect at time 35
storage.recordDouble(2, Attributes.empty(), Context.current());
- verify(aggregator, times(1)).createHandle();
+ verify(aggregator, times(1)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
@@ -273,20 +352,25 @@ void recordAndCollect_DeltaResets_ReusableData() {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
- verify(aggregator, times(1)).createHandle();
+ verify(aggregator, times(1)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
- point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3)));
+ point ->
+ point
+ .hasStartEpochNanos(testClock.now())
+ .hasEpochNanos(10)
+ .hasValue(3)));
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
@@ -297,11 +381,11 @@ void recordAndCollect_DeltaResets_ReusableData() {
storage.recordDouble(3, Attributes.empty(), Context.current());
// We're switched to secondary map so a handle will be created
- verify(aggregator, times(2)).createHandle();
+ verify(aggregator, times(2)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
@@ -320,12 +404,12 @@ void recordAndCollect_DeltaResets_ReusableData() {
// We don't delete aggregator handles unless max cardinality reached, hence
// aggregator handle is still there, thus no handle was created for empty(), but it will for
// the "foo"
- verify(aggregator, times(3)).createHandle();
+ verify(aggregator, times(3)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35);
+ MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35);
assertThat(metricData).hasDoubleSumSatisfying(DoubleSumAssert::isDelta);
assertThat(metricData)
.hasDoubleSumSatisfying(
@@ -358,7 +442,7 @@ void recordAndCollect_DeltaResets_ReusableData() {
deltaReader.setLastCollectEpochNanos(40);
storage.recordDouble(6, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current());
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 45))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 45))
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
@@ -388,6 +472,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
// Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow
@@ -396,8 +481,8 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
- verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
+ verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now());
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10))
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
@@ -406,7 +491,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
.hasSize(CARDINALITY_LIMIT - 1)
.allSatisfy(
point -> {
- assertThat(point.getStartEpochNanos()).isEqualTo(0);
+ assertThat(point.getStartEpochNanos()).isEqualTo(testClock.now());
assertThat(point.getEpochNanos()).isEqualTo(10);
assertThat(point.getValue()).isEqualTo(3);
})));
@@ -417,8 +502,8 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current());
// Should not create an additional handles for the overflow series
- verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
+ verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(testClock.now());
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20))
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
@@ -427,7 +512,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
.hasSize(CARDINALITY_LIMIT)
.allSatisfy(
point -> {
- assertThat(point.getStartEpochNanos()).isEqualTo(0);
+ assertThat(point.getStartEpochNanos()).isEqualTo(testClock.now());
assertThat(point.getEpochNanos()).isEqualTo(20);
assertThat(point.getValue()).isEqualTo(3);
})
@@ -456,6 +541,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
// Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow
@@ -464,11 +550,11 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
- verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
+ verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10))
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
@@ -477,7 +563,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
.hasSize(CARDINALITY_LIMIT - 1)
.allSatisfy(
point -> {
- assertThat(point.getStartEpochNanos()).isEqualTo(0);
+ assertThat(point.getStartEpochNanos()).isEqualTo(testClock.now());
assertThat(point.getEpochNanos()).isEqualTo(10);
assertThat(point.getValue()).isEqualTo(3);
})));
@@ -492,11 +578,11 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current());
// Should use handle returned to pool instead of creating new ones
- verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
+ verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(CARDINALITY_LIMIT - 2);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
@@ -523,11 +609,11 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
// Should use handles returned to pool instead of creating new ones
- verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
+ verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(testClock.now());
assertThat(storage)
.extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
.hasSize(0);
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30))
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
@@ -569,6 +655,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
// Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow
@@ -577,10 +664,10 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
- verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
+ verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now());
// First collect
- MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10);
+ MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10);
assertThat(metricData)
.hasDoubleSumSatisfying(
@@ -591,7 +678,8 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() {
.hasSize(CARDINALITY_LIMIT - 1)
.allSatisfy(
point -> {
- Assertions.assertThat(point.getStartEpochNanos()).isEqualTo(0);
+ Assertions.assertThat(point.getStartEpochNanos())
+ .isEqualTo(testClock.now());
Assertions.assertThat(point.getEpochNanos()).isEqualTo(10);
Assertions.assertThat(point.getValue()).isEqualTo(3);
})));
@@ -610,10 +698,10 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() {
// After first collection, we expect the secondary map which is empty to be used,
// hence handle creation will still take place
// The +1 is for the overflow handle
- verify(aggregator, times((CARDINALITY_LIMIT - 1) * 2 + 1)).createHandle();
+ verify(aggregator, times((CARDINALITY_LIMIT - 1) * 2 + 1)).createHandle(testClock.now());
// Second collect
- metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20);
+ metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20);
assertThat(metricData)
.hasDoubleSumSatisfying(
@@ -658,6 +746,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
// 1st recording: Recording goes to active map
@@ -668,7 +757,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
// This will switch next recordings to the secondary map (which is empty)
// by making it the active map
- storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10);
+ storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10);
// 2nd recording
deltaReader.setLastCollectEpochNanos(10);
@@ -678,7 +767,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
}
// This switches maps again, so next recordings will be to the first map
- storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10, 20);
+ storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20);
// 3rd recording: We're recording unseen attributes to a map we know is full,
// since it was filled during 1st recording
@@ -688,7 +777,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
- MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20, 30);
+ MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30);
assertOnlyOverflowWasRecorded(metricData, 20, 30, 15 * 3);
@@ -700,7 +789,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
- metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30, 40);
+ metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 40);
assertOnlyOverflowWasRecorded(metricData, 30, 40, 15 * 3);
@@ -712,7 +801,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
- metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 40, 50);
+ metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 50);
assertNumberOfPoints(metricData, 10);
assertAllPointsWithValue(metricData, 40, 50, 3);
@@ -726,7 +815,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
4, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
- metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 50, 60);
+ metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 60);
assertNumberOfPoints(metricData, 12);
assertAllPointsWithValue(metricData, 50, 60, 4);
@@ -803,6 +892,7 @@ void enabledThenDisable_isEnabled(MemoryMode memoryMode) {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
storage.setEnabled(false);
@@ -822,6 +912,7 @@ void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
storage.setEnabled(false);
@@ -842,13 +933,14 @@ void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
storage.setEnabled(false);
storage.recordDouble(10d, Attributes.empty(), Context.current());
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isTrue();
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10).isEmpty()).isTrue();
}
@ParameterizedTest
@@ -863,6 +955,7 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) {
aggregator,
attributesProcessor,
CARDINALITY_LIMIT,
+ testClock,
/* enabled= */ true);
storage.setEnabled(false);
@@ -870,6 +963,6 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) {
storage.recordDouble(10d, Attributes.empty(), Context.current());
- assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isFalse();
+ assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10).isEmpty()).isFalse();
}
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java
index 54a4c36e841..cf8c04ec44e 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java
@@ -63,7 +63,7 @@ void minimumBucketsCanAccommodateMaxRange() {
Advice.empty()),
asExemplarFilterInternal(ExemplarFilter.alwaysOff()),
MemoryMode.IMMUTABLE_DATA);
- AggregatorHandle handle = aggregator.createHandle();
+ AggregatorHandle handle = aggregator.createHandle(0);
// Record max range
handle.recordDouble(Double.MIN_VALUE, Attributes.empty(), Context.current());
handle.recordDouble(Double.MAX_VALUE, Attributes.empty(), Context.current());