diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
index adfb08b519e..a746c00dec0 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
@@ -7,6 +7,7 @@
import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA;
import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA;
+import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
import io.opentelemetry.api.common.Attributes;
@@ -27,12 +28,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
/**
* Stores aggregated {@link MetricData} for synchronous instruments.
@@ -40,64 +41,57 @@
*
This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
-public final class DefaultSynchronousMetricStorage
+public abstract class DefaultSynchronousMetricStorage
implements SynchronousMetricStorage {
private static final Logger internalLogger =
Logger.getLogger(DefaultSynchronousMetricStorage.class.getName());
private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);
- private final RegisteredReader registeredReader;
- private final MetricDescriptor metricDescriptor;
- private final AggregationTemporality aggregationTemporality;
- private final Aggregator aggregator;
- private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>();
private final AttributesProcessor attributesProcessor;
-
- private final MemoryMode memoryMode;
-
- // Only populated if memoryMode == REUSABLE_DATA
- private final ArrayList reusableResultList = new ArrayList<>();
-
- // Only populated if memoryMode == REUSABLE_DATA and
- // aggregationTemporality is DELTA
- private volatile ConcurrentHashMap>
- previousCollectionAggregatorHandles = new ConcurrentHashMap<>();
+ protected final MetricDescriptor metricDescriptor;
+ protected final Aggregator aggregator;
/**
* This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
* to be filled by the {@link MetricStorage#CARDINALITY_OVERFLOW} series.
*/
- private final int maxCardinality;
+ protected final int maxCardinality;
- private final ConcurrentLinkedQueue> aggregatorHandlePool =
- new ConcurrentLinkedQueue<>();
+ protected volatile boolean enabled;
- private volatile boolean enabled;
-
- DefaultSynchronousMetricStorage(
- RegisteredReader registeredReader,
+ private DefaultSynchronousMetricStorage(
MetricDescriptor metricDescriptor,
Aggregator aggregator,
AttributesProcessor attributesProcessor,
int maxCardinality,
boolean enabled) {
- this.registeredReader = registeredReader;
this.metricDescriptor = metricDescriptor;
- this.aggregationTemporality =
- registeredReader
- .getReader()
- .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality - 1;
- this.memoryMode = registeredReader.getReader().getMemoryMode();
this.enabled = enabled;
}
- // Visible for testing
- Queue> getAggregatorHandlePool() {
- return aggregatorHandlePool;
+ static DefaultSynchronousMetricStorage create(
+ RegisteredReader reader,
+ MetricDescriptor descriptor,
+ Aggregator aggregator,
+ AttributesProcessor processor,
+ int maxCardinality,
+ boolean enabled) {
+ AggregationTemporality aggregationTemporality =
+ reader.getReader().getAggregationTemporality(descriptor.getSourceInstrument().getType());
+ return aggregationTemporality == CUMULATIVE
+ ? new CumulativeSynchronousMetricStorage<>(
+ descriptor,
+ aggregator,
+ processor,
+ maxCardinality,
+ enabled,
+ reader.getReader().getMemoryMode())
+ : new DeltaSynchronousMetricStorage<>(
+ reader, descriptor, aggregator, processor, maxCardinality, enabled);
}
@Override
@@ -105,14 +99,7 @@ public void recordLong(long value, Attributes attributes, Context context) {
if (!enabled) {
return;
}
- AggregatorHolder aggregatorHolder = getHolderForRecord();
- try {
- AggregatorHandle handle =
- getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
- handle.recordLong(value, attributes, context);
- } finally {
- releaseHolderForRecord(aggregatorHolder);
- }
+ doRecordLong(value, attributes, context);
}
@Override
@@ -130,16 +117,13 @@ public void recordDouble(double value, Attributes attributes, Context context) {
+ ". Dropping measurement.");
return;
}
- AggregatorHolder aggregatorHolder = getHolderForRecord();
- try {
- AggregatorHandle handle =
- getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
- handle.recordDouble(value, attributes, context);
- } finally {
- releaseHolderForRecord(aggregatorHolder);
- }
+ doRecordDouble(value, attributes, context);
}
+ abstract void doRecordLong(long value, Attributes attributes, Context context);
+
+ abstract void doRecordDouble(double value, Attributes attributes, Context context);
+
@Override
public void setEnabled(boolean enabled) {
this.enabled = enabled;
@@ -150,37 +134,7 @@ public boolean isEnabled() {
return enabled;
}
- /**
- * Obtain the AggregatorHolder for recording measurements, re-reading the volatile
- * this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets
- * recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced.
- * Record operations increment recordInProgress by 2. Callers MUST call {@link
- * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that
- * its safe to proceed with Collect operations.
- */
- private AggregatorHolder getHolderForRecord() {
- do {
- AggregatorHolder aggregatorHolder = this.aggregatorHolder;
- int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2);
- if (recordsInProgress % 2 == 0) {
- return aggregatorHolder;
- } else {
- // Collect is in progress, decrement recordsInProgress to allow collect to proceed and
- // re-read aggregatorHolder
- aggregatorHolder.activeRecordingThreads.addAndGet(-2);
- }
- } while (true);
- }
-
- /**
- * Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate
- * that recording is complete, and it is safe to collect.
- */
- private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) {
- aggregatorHolder.activeRecordingThreads.addAndGet(-2);
- }
-
- private AggregatorHandle getAggregatorHandle(
+ protected AggregatorHandle getAggregatorHandle(
ConcurrentHashMap> aggregatorHandles,
Attributes attributes,
Context context) {
@@ -206,7 +160,7 @@ private AggregatorHandle getAggregatorHandle(
}
}
// Get handle from pool if available, else create a new one.
- AggregatorHandle newHandle = aggregatorHandlePool.poll();
+ AggregatorHandle newHandle = maybeGetPooledAggregatorHandle();
if (newHandle == null) {
newHandle = aggregator.createHandle();
}
@@ -214,20 +168,105 @@ private AggregatorHandle getAggregatorHandle(
return handle != null ? handle : newHandle;
}
+ @Nullable
+ abstract AggregatorHandle maybeGetPooledAggregatorHandle();
+
@Override
- public MetricData collect(
- Resource resource,
- InstrumentationScopeInfo instrumentationScopeInfo,
- long startEpochNanos,
- long epochNanos) {
- boolean reset = aggregationTemporality == DELTA;
- long start =
- aggregationTemporality == DELTA
- ? registeredReader.getLastCollectEpochNanos()
- : startEpochNanos;
-
- ConcurrentHashMap> aggregatorHandles;
- if (reset) {
+ public MetricDescriptor getMetricDescriptor() {
+ return metricDescriptor;
+ }
+
+ private static class DeltaSynchronousMetricStorage
+ extends DefaultSynchronousMetricStorage {
+ private final RegisteredReader registeredReader;
+ private final MemoryMode memoryMode;
+
+ private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>();
+ // Only populated if memoryMode == REUSABLE_DATA
+ private volatile ConcurrentHashMap>
+ previousCollectionAggregatorHandles = new ConcurrentHashMap<>();
+ // Only populated if memoryMode == REUSABLE_DATA
+ private final ArrayList reusableResultList = new ArrayList<>();
+ private final ConcurrentLinkedQueue> aggregatorHandlePool =
+ new ConcurrentLinkedQueue<>();
+
+ DeltaSynchronousMetricStorage(
+ RegisteredReader registeredReader,
+ MetricDescriptor metricDescriptor,
+ Aggregator aggregator,
+ AttributesProcessor attributesProcessor,
+ int maxCardinality,
+ boolean enabled) {
+ super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled);
+ this.registeredReader = registeredReader;
+ this.memoryMode = registeredReader.getReader().getMemoryMode();
+ }
+
+ @Override
+ void doRecordLong(long value, Attributes attributes, Context context) {
+ AggregatorHolder holderForRecord = getHolderForRecord();
+ try {
+ getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context)
+ .recordLong(value, attributes, context);
+ } finally {
+ releaseHolderForRecord(holderForRecord);
+ }
+ }
+
+ @Override
+ void doRecordDouble(double value, Attributes attributes, Context context) {
+ AggregatorHolder holderForRecord = getHolderForRecord();
+ try {
+ getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context)
+ .recordDouble(value, attributes, context);
+ } finally {
+ releaseHolderForRecord(holderForRecord);
+ }
+ }
+
+ @Nullable
+ @Override
+ AggregatorHandle maybeGetPooledAggregatorHandle() {
+ return aggregatorHandlePool.poll();
+ }
+
+ /**
+ * Obtain the AggregatorHolder for recording measurements, re-reading the volatile
+ * this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets
+ * recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced.
+ * Record operations increment recordInProgress by 2. Callers MUST call {@link
+ * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that
+ * its safe to proceed with Collect operations.
+ */
+ private AggregatorHolder getHolderForRecord() {
+ do {
+ AggregatorHolder aggregatorHolder = this.aggregatorHolder;
+ int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2);
+ if (recordsInProgress % 2 == 0) {
+ return aggregatorHolder;
+ } else {
+ // Collect is in progress, decrement recordsInProgress to allow collect to proceed and
+ // re-read aggregatorHolder
+ aggregatorHolder.activeRecordingThreads.addAndGet(-2);
+ }
+ } while (true);
+ }
+
+ /**
+ * Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to
+ * indicate that recording is complete, and it is safe to collect.
+ */
+ private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) {
+ aggregatorHolder.activeRecordingThreads.addAndGet(-2);
+ }
+
+ @Override
+ public MetricData collect(
+ Resource resource,
+ InstrumentationScopeInfo instrumentationScopeInfo,
+ long startEpochNanos,
+ long epochNanos) {
+ ConcurrentHashMap> aggregatorHandles;
AggregatorHolder holder = this.aggregatorHolder;
this.aggregatorHolder =
(memoryMode == REUSABLE_DATA)
@@ -243,87 +282,85 @@ public MetricData collect(
recordsInProgress = holder.activeRecordingThreads.get();
}
aggregatorHandles = holder.aggregatorHandles;
- } else {
- aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
- }
- List points;
- if (memoryMode == REUSABLE_DATA) {
- reusableResultList.clear();
- points = reusableResultList;
- } else {
- points = new ArrayList<>(aggregatorHandles.size());
- }
-
- // In DELTA aggregation temporality each Attributes is reset to 0
- // every time we perform a collection (by definition of DELTA).
- // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles
- // (into which the values are recorded) effectively starting from 0
- // for each recorded Attributes.
- // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing
- // a key-value from a map and putting it again on next recording will cost an allocation,
- // we are keeping the aggregator handles in their map, and only reset their value once
- // we finish collecting the aggregated value from each one.
- // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory,
- // hence during collect(), when the map is at full capacity, we try to clear away unused
- // aggregator handles, so on next recording cycle using this map, there will be room for newly
- // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided
- // if the user chooses to increase the maxCardinality.
- if (memoryMode == REUSABLE_DATA && reset) {
- if (aggregatorHandles.size() >= maxCardinality) {
- aggregatorHandles.forEach(
- (attribute, handle) -> {
- if (!handle.hasRecordedValues()) {
- aggregatorHandles.remove(attribute);
- }
- });
+ List points;
+ if (memoryMode == REUSABLE_DATA) {
+ reusableResultList.clear();
+ points = reusableResultList;
+ } else {
+ points = new ArrayList<>(aggregatorHandles.size());
}
- }
- // Grab aggregated points.
- aggregatorHandles.forEach(
- (attributes, handle) -> {
- if (!handle.hasRecordedValues()) {
- return;
- }
- T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset);
-
- if (reset && memoryMode == IMMUTABLE_DATA) {
- // Return the aggregator to the pool.
- // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is
- // always used as it is the place accumulating the values and never resets)
- // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid
- // using the pool since it allocates memory internally on each put() or remove()
- aggregatorHandlePool.offer(handle);
- }
-
- if (point != null) {
- points.add(point);
- }
- });
-
- // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are
- // created during collection.
- int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1);
- for (int i = 0; i < toDelete; i++) {
- aggregatorHandlePool.poll();
- }
+ // In DELTA aggregation temporality each Attributes is reset to 0
+ // every time we perform a collection (by definition of DELTA).
+ // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles
+ // (into which the values are recorded) effectively starting from 0
+ // for each recorded Attributes.
+ // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing
+ // a key-value from a map and putting it again on next recording will cost an allocation,
+ // we are keeping the aggregator handles in their map, and only reset their value once
+ // we finish collecting the aggregated value from each one.
+ // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory,
+ // hence during collect(), when the map is at full capacity, we try to clear away unused
+ // aggregator handles, so on next recording cycle using this map, there will be room for newly
+ // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided
+ // if the user chooses to increase the maxCardinality.
+ if (memoryMode == REUSABLE_DATA) {
+ if (aggregatorHandles.size() >= maxCardinality) {
+ aggregatorHandles.forEach(
+ (attribute, handle) -> {
+ if (!handle.hasRecordedValues()) {
+ aggregatorHandles.remove(attribute);
+ }
+ });
+ }
+ }
- if (reset && memoryMode == REUSABLE_DATA) {
- previousCollectionAggregatorHandles = aggregatorHandles;
- }
+ // Grab aggregated points.
+ aggregatorHandles.forEach(
+ (attributes, handle) -> {
+ if (!handle.hasRecordedValues()) {
+ return;
+ }
+ T point =
+ handle.aggregateThenMaybeReset(
+ registeredReader.getLastCollectEpochNanos(),
+ epochNanos,
+ attributes,
+ /* reset= */ true);
+
+ if (memoryMode == IMMUTABLE_DATA) {
+ // Return the aggregator to the pool.
+ // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is
+ // always used as it is the place accumulating the values and never resets)
+ // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid
+ // using the pool since it allocates memory internally on each put() or remove()
+ aggregatorHandlePool.offer(handle);
+ }
+
+ if (point != null) {
+ points.add(point);
+ }
+ });
+
+ // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are
+ // created during collection.
+ int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1);
+ for (int i = 0; i < toDelete; i++) {
+ aggregatorHandlePool.poll();
+ }
- if (points.isEmpty() || !enabled) {
- return EmptyMetricData.getInstance();
- }
+ if (memoryMode == REUSABLE_DATA) {
+ previousCollectionAggregatorHandles = aggregatorHandles;
+ }
- return aggregator.toMetricData(
- resource, instrumentationScopeInfo, metricDescriptor, points, aggregationTemporality);
- }
+ if (points.isEmpty() || !enabled) {
+ return EmptyMetricData.getInstance();
+ }
- @Override
- public MetricDescriptor getMetricDescriptor() {
- return metricDescriptor;
+ return aggregator.toMetricData(
+ resource, instrumentationScopeInfo, metricDescriptor, points, DELTA);
+ }
}
private static class AggregatorHolder {
@@ -352,4 +389,80 @@ private AggregatorHolder(ConcurrentHashMap> aggr
this.aggregatorHandles = aggregatorHandles;
}
}
+
+ private static class CumulativeSynchronousMetricStorage
+ extends DefaultSynchronousMetricStorage {
+ private final MemoryMode memoryMode;
+ private final ConcurrentHashMap> aggregatorHandles =
+ new ConcurrentHashMap<>();
+ // Only populated if memoryMode == REUSABLE_DATA
+ private final ArrayList reusableResultList = new ArrayList<>();
+
+ CumulativeSynchronousMetricStorage(
+ MetricDescriptor metricDescriptor,
+ Aggregator aggregator,
+ AttributesProcessor attributesProcessor,
+ int maxCardinality,
+ boolean enabled,
+ MemoryMode memoryMode) {
+ super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled);
+ this.memoryMode = memoryMode;
+ }
+
+ @Override
+ void doRecordLong(long value, Attributes attributes, Context context) {
+ getAggregatorHandle(aggregatorHandles, attributes, context)
+ .recordLong(value, attributes, context);
+ }
+
+ @Override
+ void doRecordDouble(double value, Attributes attributes, Context context) {
+ getAggregatorHandle(aggregatorHandles, attributes, context)
+ .recordDouble(value, attributes, context);
+ }
+
+ @Nullable
+ @Override
+ AggregatorHandle maybeGetPooledAggregatorHandle() {
+ // No aggregator handle pooling for cumulative temporality
+ return null;
+ }
+
+ @Override
+ public MetricData collect(
+ Resource resource,
+ InstrumentationScopeInfo instrumentationScopeInfo,
+ long startEpochNanos,
+ long epochNanos) {
+ List points;
+ if (memoryMode == REUSABLE_DATA) {
+ reusableResultList.clear();
+ points = reusableResultList;
+ } else {
+ points = new ArrayList<>(aggregatorHandles.size());
+ }
+
+ // Grab aggregated points.
+ aggregatorHandles.forEach(
+ (attributes, handle) -> {
+ if (!handle.hasRecordedValues()) {
+ return;
+ }
+ T point =
+ handle.aggregateThenMaybeReset(
+ startEpochNanos, epochNanos, attributes, /* reset= */ false);
+
+ if (point != null) {
+ points.add(point);
+ }
+ });
+
+ if (points.isEmpty() || !enabled) {
+ return EmptyMetricData.getInstance();
+ }
+
+ return aggregator.toMetricData(
+ resource, instrumentationScopeInfo, metricDescriptor, points, CUMULATIVE);
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java
index 9df16e17911..dc23f89c4d4 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java
@@ -52,7 +52,7 @@ static SynchronousMetricStorage create(
if (Aggregator.drop() == aggregator) {
return empty();
}
- return new DefaultSynchronousMetricStorage<>(
+ return DefaultSynchronousMetricStorage.create(
registeredReader,
metricDescriptor,
aggregator,
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java
index 0d74e5ee9cb..354c1d899e5 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java
@@ -9,6 +9,8 @@
import static io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilterInternal.asExemplarFilterInternal;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
+import static org.assertj.core.api.BDDAssertions.as;
+import static org.assertj.core.api.InstanceOfAssertFactories.collection;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -30,6 +32,7 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.Advice;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
@@ -101,7 +104,7 @@ private void initialize(MemoryMode memoryMode) {
void recordDouble_NaN(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
cumulativeReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -128,7 +131,7 @@ void attributesProcessor_applied(MemoryMode memoryMode) {
AttributesProcessor.append(Attributes.builder().put("modifiedK", "modifiedV").build());
AttributesProcessor spyAttributesProcessor = spy(attributesProcessor);
SynchronousMetricStorage storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
cumulativeReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -152,7 +155,7 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
cumulativeReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -163,7 +166,6 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) {
// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
@@ -175,7 +177,6 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) {
// Record measurement and collect at time 30
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
@@ -187,7 +188,6 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) {
// Record measurement and collect at time 35
storage.recordDouble(2, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35))
.hasDoubleSumSatisfying(
sum ->
@@ -201,7 +201,7 @@ void recordAndCollect_DeltaResets_ImmutableData() {
initialize(IMMUTABLE_DATA);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -212,14 +212,18 @@ void recordAndCollect_DeltaResets_ImmutableData() {
// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3)));
- assertThat(storage.getAggregatorHandlePool()).hasSize(1);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(1);
deltaReader.setLastCollectEpochNanos(10);
// Record measurement and collect at time 30
@@ -227,27 +231,35 @@ void recordAndCollect_DeltaResets_ImmutableData() {
// AggregatorHandle should be returned to the pool on reset so shouldn't create additional
// handles
verify(aggregator, times(1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3)));
- assertThat(storage.getAggregatorHandlePool()).hasSize(1);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(1);
deltaReader.setLastCollectEpochNanos(30);
// Record measurement and collect at time 35
storage.recordDouble(2, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(30).hasEpochNanos(35).hasValue(2)));
- assertThat(storage.getAggregatorHandlePool()).hasSize(1);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(1);
}
@Test
@@ -255,7 +267,7 @@ void recordAndCollect_DeltaResets_ReusableData() {
initialize(MemoryMode.REUSABLE_DATA);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -266,14 +278,18 @@ void recordAndCollect_DeltaResets_ReusableData() {
// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3)));
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
deltaReader.setLastCollectEpochNanos(10);
@@ -282,14 +298,18 @@ void recordAndCollect_DeltaResets_ReusableData() {
// We're switched to secondary map so a handle will be created
verify(aggregator, times(2)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3)));
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
deltaReader.setLastCollectEpochNanos(30);
@@ -301,7 +321,9 @@ void recordAndCollect_DeltaResets_ReusableData() {
// aggregator handle is still there, thus no handle was created for empty(), but it will for
// the "foo"
verify(aggregator, times(3)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35);
assertThat(metricData).hasDoubleSumSatisfying(DoubleSumAssert::isDelta);
@@ -329,7 +351,9 @@ void recordAndCollect_DeltaResets_ReusableData() {
Attributes.of(AttributeKey.stringKey("foo"), "bar"));
})));
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
deltaReader.setLastCollectEpochNanos(40);
storage.recordDouble(6, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current());
@@ -358,7 +382,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
cumulativeReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -373,7 +397,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
@@ -387,7 +410,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
assertThat(point.getEpochNanos()).isEqualTo(10);
assertThat(point.getValue()).isEqualTo(3);
})));
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(logs.getEvents()).isEmpty();
cumulativeReader.setLastCollectEpochNanos(10);
@@ -396,7 +418,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current());
// Should not create an additional handles for the overflow series
verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
.hasDoubleSumSatisfying(
sum ->
@@ -421,7 +442,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
point ->
assertThat(point.getAttributes())
.isEqualTo(MetricStorage.CARDINALITY_OVERFLOW))));
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
@@ -430,7 +450,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
initialize(IMMUTABLE_DATA);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -445,7 +465,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
@@ -459,7 +481,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
assertThat(point.getEpochNanos()).isEqualTo(10);
assertThat(point.getValue()).isEqualTo(3);
})));
- assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(CARDINALITY_LIMIT - 1);
assertThat(logs.getEvents()).isEmpty();
deltaReader.setLastCollectEpochNanos(10);
@@ -469,7 +493,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current());
// Should use handle returned to pool instead of creating new ones
verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 2);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(CARDINALITY_LIMIT - 2);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
.hasDoubleSumSatisfying(
sum ->
@@ -484,7 +510,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
Attributes.builder()
.put("key", "value" + CARDINALITY_LIMIT)
.build())));
- assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(CARDINALITY_LIMIT - 1);
assertThat(logs.getEvents()).isEmpty();
deltaReader.setLastCollectEpochNanos(20);
@@ -496,7 +524,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
}
// Should use handles returned to pool instead of creating new ones
verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
- assertThat(storage.getAggregatorHandlePool()).hasSize(0);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
@@ -522,7 +552,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
assertThat(point.getAttributes())
.isEqualTo(MetricStorage.CARDINALITY_OVERFLOW))));
- assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT);
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .hasSize(CARDINALITY_LIMIT);
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
@@ -531,7 +563,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() {
initialize(MemoryMode.REUSABLE_DATA);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -608,7 +640,9 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() {
assertThat(point.getAttributes())
.isEqualTo(MetricStorage.CARDINALITY_OVERFLOW))));
- assertThat(storage.getAggregatorHandlePool()).isEmpty();
+ assertThat(storage)
+ .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class)))
+ .isEmpty();
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
@@ -618,7 +652,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
initialize(MemoryMode.REUSABLE_DATA);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -763,7 +797,7 @@ void enabledThenDisable_isEnabled(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -782,7 +816,7 @@ void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -802,7 +836,7 @@ void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,
@@ -823,7 +857,7 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage> storage =
- new DefaultSynchronousMetricStorage<>(
+ DefaultSynchronousMetricStorage.create(
deltaReader,
METRIC_DESCRIPTOR,
aggregator,