diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index adfb08b519e..a746c00dec0 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -7,6 +7,7 @@ import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; import io.opentelemetry.api.common.Attributes; @@ -27,12 +28,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Stores aggregated {@link MetricData} for synchronous instruments. @@ -40,64 +41,57 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class DefaultSynchronousMetricStorage +public abstract class DefaultSynchronousMetricStorage implements SynchronousMetricStorage { private static final Logger internalLogger = Logger.getLogger(DefaultSynchronousMetricStorage.class.getName()); private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - private final RegisteredReader registeredReader; - private final MetricDescriptor metricDescriptor; - private final AggregationTemporality aggregationTemporality; - private final Aggregator aggregator; - private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); private final AttributesProcessor attributesProcessor; - - private final MemoryMode memoryMode; - - // Only populated if memoryMode == REUSABLE_DATA - private final ArrayList reusableResultList = new ArrayList<>(); - - // Only populated if memoryMode == REUSABLE_DATA and - // aggregationTemporality is DELTA - private volatile ConcurrentHashMap> - previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); + protected final MetricDescriptor metricDescriptor; + protected final Aggregator aggregator; /** * This field is set to 1 less than the actual intended cardinality limit, allowing the last slot * to be filled by the {@link MetricStorage#CARDINALITY_OVERFLOW} series. */ - private final int maxCardinality; + protected final int maxCardinality; - private final ConcurrentLinkedQueue> aggregatorHandlePool = - new ConcurrentLinkedQueue<>(); + protected volatile boolean enabled; - private volatile boolean enabled; - - DefaultSynchronousMetricStorage( - RegisteredReader registeredReader, + private DefaultSynchronousMetricStorage( MetricDescriptor metricDescriptor, Aggregator aggregator, AttributesProcessor attributesProcessor, int maxCardinality, boolean enabled) { - this.registeredReader = registeredReader; this.metricDescriptor = metricDescriptor; - this.aggregationTemporality = - registeredReader - .getReader() - .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); this.aggregator = aggregator; this.attributesProcessor = attributesProcessor; this.maxCardinality = maxCardinality - 1; - this.memoryMode = registeredReader.getReader().getMemoryMode(); this.enabled = enabled; } - // Visible for testing - Queue> getAggregatorHandlePool() { - return aggregatorHandlePool; + static DefaultSynchronousMetricStorage create( + RegisteredReader reader, + MetricDescriptor descriptor, + Aggregator aggregator, + AttributesProcessor processor, + int maxCardinality, + boolean enabled) { + AggregationTemporality aggregationTemporality = + reader.getReader().getAggregationTemporality(descriptor.getSourceInstrument().getType()); + return aggregationTemporality == CUMULATIVE + ? new CumulativeSynchronousMetricStorage<>( + descriptor, + aggregator, + processor, + maxCardinality, + enabled, + reader.getReader().getMemoryMode()) + : new DeltaSynchronousMetricStorage<>( + reader, descriptor, aggregator, processor, maxCardinality, enabled); } @Override @@ -105,14 +99,7 @@ public void recordLong(long value, Attributes attributes, Context context) { if (!enabled) { return; } - AggregatorHolder aggregatorHolder = getHolderForRecord(); - try { - AggregatorHandle handle = - getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); - handle.recordLong(value, attributes, context); - } finally { - releaseHolderForRecord(aggregatorHolder); - } + doRecordLong(value, attributes, context); } @Override @@ -130,16 +117,13 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } - AggregatorHolder aggregatorHolder = getHolderForRecord(); - try { - AggregatorHandle handle = - getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); - handle.recordDouble(value, attributes, context); - } finally { - releaseHolderForRecord(aggregatorHolder); - } + doRecordDouble(value, attributes, context); } + abstract void doRecordLong(long value, Attributes attributes, Context context); + + abstract void doRecordDouble(double value, Attributes attributes, Context context); + @Override public void setEnabled(boolean enabled) { this.enabled = enabled; @@ -150,37 +134,7 @@ public boolean isEnabled() { return enabled; } - /** - * Obtain the AggregatorHolder for recording measurements, re-reading the volatile - * this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets - * recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced. - * Record operations increment recordInProgress by 2. Callers MUST call {@link - * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that - * its safe to proceed with Collect operations. - */ - private AggregatorHolder getHolderForRecord() { - do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; - int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); - if (recordsInProgress % 2 == 0) { - return aggregatorHolder; - } else { - // Collect is in progress, decrement recordsInProgress to allow collect to proceed and - // re-read aggregatorHolder - aggregatorHolder.activeRecordingThreads.addAndGet(-2); - } - } while (true); - } - - /** - * Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate - * that recording is complete, and it is safe to collect. - */ - private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { - aggregatorHolder.activeRecordingThreads.addAndGet(-2); - } - - private AggregatorHandle getAggregatorHandle( + protected AggregatorHandle getAggregatorHandle( ConcurrentHashMap> aggregatorHandles, Attributes attributes, Context context) { @@ -206,7 +160,7 @@ private AggregatorHandle getAggregatorHandle( } } // Get handle from pool if available, else create a new one. - AggregatorHandle newHandle = aggregatorHandlePool.poll(); + AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); if (newHandle == null) { newHandle = aggregator.createHandle(); } @@ -214,20 +168,105 @@ private AggregatorHandle getAggregatorHandle( return handle != null ? handle : newHandle; } + @Nullable + abstract AggregatorHandle maybeGetPooledAggregatorHandle(); + @Override - public MetricData collect( - Resource resource, - InstrumentationScopeInfo instrumentationScopeInfo, - long startEpochNanos, - long epochNanos) { - boolean reset = aggregationTemporality == DELTA; - long start = - aggregationTemporality == DELTA - ? registeredReader.getLastCollectEpochNanos() - : startEpochNanos; - - ConcurrentHashMap> aggregatorHandles; - if (reset) { + public MetricDescriptor getMetricDescriptor() { + return metricDescriptor; + } + + private static class DeltaSynchronousMetricStorage + extends DefaultSynchronousMetricStorage { + private final RegisteredReader registeredReader; + private final MemoryMode memoryMode; + + private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); + // Only populated if memoryMode == REUSABLE_DATA + private volatile ConcurrentHashMap> + previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); + // Only populated if memoryMode == REUSABLE_DATA + private final ArrayList reusableResultList = new ArrayList<>(); + private final ConcurrentLinkedQueue> aggregatorHandlePool = + new ConcurrentLinkedQueue<>(); + + DeltaSynchronousMetricStorage( + RegisteredReader registeredReader, + MetricDescriptor metricDescriptor, + Aggregator aggregator, + AttributesProcessor attributesProcessor, + int maxCardinality, + boolean enabled) { + super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled); + this.registeredReader = registeredReader; + this.memoryMode = registeredReader.getReader().getMemoryMode(); + } + + @Override + void doRecordLong(long value, Attributes attributes, Context context) { + AggregatorHolder holderForRecord = getHolderForRecord(); + try { + getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) + .recordLong(value, attributes, context); + } finally { + releaseHolderForRecord(holderForRecord); + } + } + + @Override + void doRecordDouble(double value, Attributes attributes, Context context) { + AggregatorHolder holderForRecord = getHolderForRecord(); + try { + getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) + .recordDouble(value, attributes, context); + } finally { + releaseHolderForRecord(holderForRecord); + } + } + + @Nullable + @Override + AggregatorHandle maybeGetPooledAggregatorHandle() { + return aggregatorHandlePool.poll(); + } + + /** + * Obtain the AggregatorHolder for recording measurements, re-reading the volatile + * this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets + * recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced. + * Record operations increment recordInProgress by 2. Callers MUST call {@link + * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that + * its safe to proceed with Collect operations. + */ + private AggregatorHolder getHolderForRecord() { + do { + AggregatorHolder aggregatorHolder = this.aggregatorHolder; + int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); + if (recordsInProgress % 2 == 0) { + return aggregatorHolder; + } else { + // Collect is in progress, decrement recordsInProgress to allow collect to proceed and + // re-read aggregatorHolder + aggregatorHolder.activeRecordingThreads.addAndGet(-2); + } + } while (true); + } + + /** + * Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to + * indicate that recording is complete, and it is safe to collect. + */ + private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { + aggregatorHolder.activeRecordingThreads.addAndGet(-2); + } + + @Override + public MetricData collect( + Resource resource, + InstrumentationScopeInfo instrumentationScopeInfo, + long startEpochNanos, + long epochNanos) { + ConcurrentHashMap> aggregatorHandles; AggregatorHolder holder = this.aggregatorHolder; this.aggregatorHolder = (memoryMode == REUSABLE_DATA) @@ -243,87 +282,85 @@ public MetricData collect( recordsInProgress = holder.activeRecordingThreads.get(); } aggregatorHandles = holder.aggregatorHandles; - } else { - aggregatorHandles = this.aggregatorHolder.aggregatorHandles; - } - List points; - if (memoryMode == REUSABLE_DATA) { - reusableResultList.clear(); - points = reusableResultList; - } else { - points = new ArrayList<>(aggregatorHandles.size()); - } - - // In DELTA aggregation temporality each Attributes is reset to 0 - // every time we perform a collection (by definition of DELTA). - // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles - // (into which the values are recorded) effectively starting from 0 - // for each recorded Attributes. - // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing - // a key-value from a map and putting it again on next recording will cost an allocation, - // we are keeping the aggregator handles in their map, and only reset their value once - // we finish collecting the aggregated value from each one. - // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory, - // hence during collect(), when the map is at full capacity, we try to clear away unused - // aggregator handles, so on next recording cycle using this map, there will be room for newly - // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided - // if the user chooses to increase the maxCardinality. - if (memoryMode == REUSABLE_DATA && reset) { - if (aggregatorHandles.size() >= maxCardinality) { - aggregatorHandles.forEach( - (attribute, handle) -> { - if (!handle.hasRecordedValues()) { - aggregatorHandles.remove(attribute); - } - }); + List points; + if (memoryMode == REUSABLE_DATA) { + reusableResultList.clear(); + points = reusableResultList; + } else { + points = new ArrayList<>(aggregatorHandles.size()); } - } - // Grab aggregated points. - aggregatorHandles.forEach( - (attributes, handle) -> { - if (!handle.hasRecordedValues()) { - return; - } - T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset); - - if (reset && memoryMode == IMMUTABLE_DATA) { - // Return the aggregator to the pool. - // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is - // always used as it is the place accumulating the values and never resets) - // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid - // using the pool since it allocates memory internally on each put() or remove() - aggregatorHandlePool.offer(handle); - } - - if (point != null) { - points.add(point); - } - }); - - // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are - // created during collection. - int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1); - for (int i = 0; i < toDelete; i++) { - aggregatorHandlePool.poll(); - } + // In DELTA aggregation temporality each Attributes is reset to 0 + // every time we perform a collection (by definition of DELTA). + // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles + // (into which the values are recorded) effectively starting from 0 + // for each recorded Attributes. + // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing + // a key-value from a map and putting it again on next recording will cost an allocation, + // we are keeping the aggregator handles in their map, and only reset their value once + // we finish collecting the aggregated value from each one. + // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory, + // hence during collect(), when the map is at full capacity, we try to clear away unused + // aggregator handles, so on next recording cycle using this map, there will be room for newly + // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided + // if the user chooses to increase the maxCardinality. + if (memoryMode == REUSABLE_DATA) { + if (aggregatorHandles.size() >= maxCardinality) { + aggregatorHandles.forEach( + (attribute, handle) -> { + if (!handle.hasRecordedValues()) { + aggregatorHandles.remove(attribute); + } + }); + } + } - if (reset && memoryMode == REUSABLE_DATA) { - previousCollectionAggregatorHandles = aggregatorHandles; - } + // Grab aggregated points. + aggregatorHandles.forEach( + (attributes, handle) -> { + if (!handle.hasRecordedValues()) { + return; + } + T point = + handle.aggregateThenMaybeReset( + registeredReader.getLastCollectEpochNanos(), + epochNanos, + attributes, + /* reset= */ true); + + if (memoryMode == IMMUTABLE_DATA) { + // Return the aggregator to the pool. + // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is + // always used as it is the place accumulating the values and never resets) + // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid + // using the pool since it allocates memory internally on each put() or remove() + aggregatorHandlePool.offer(handle); + } + + if (point != null) { + points.add(point); + } + }); + + // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are + // created during collection. + int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1); + for (int i = 0; i < toDelete; i++) { + aggregatorHandlePool.poll(); + } - if (points.isEmpty() || !enabled) { - return EmptyMetricData.getInstance(); - } + if (memoryMode == REUSABLE_DATA) { + previousCollectionAggregatorHandles = aggregatorHandles; + } - return aggregator.toMetricData( - resource, instrumentationScopeInfo, metricDescriptor, points, aggregationTemporality); - } + if (points.isEmpty() || !enabled) { + return EmptyMetricData.getInstance(); + } - @Override - public MetricDescriptor getMetricDescriptor() { - return metricDescriptor; + return aggregator.toMetricData( + resource, instrumentationScopeInfo, metricDescriptor, points, DELTA); + } } private static class AggregatorHolder { @@ -352,4 +389,80 @@ private AggregatorHolder(ConcurrentHashMap> aggr this.aggregatorHandles = aggregatorHandles; } } + + private static class CumulativeSynchronousMetricStorage + extends DefaultSynchronousMetricStorage { + private final MemoryMode memoryMode; + private final ConcurrentHashMap> aggregatorHandles = + new ConcurrentHashMap<>(); + // Only populated if memoryMode == REUSABLE_DATA + private final ArrayList reusableResultList = new ArrayList<>(); + + CumulativeSynchronousMetricStorage( + MetricDescriptor metricDescriptor, + Aggregator aggregator, + AttributesProcessor attributesProcessor, + int maxCardinality, + boolean enabled, + MemoryMode memoryMode) { + super(metricDescriptor, aggregator, attributesProcessor, maxCardinality, enabled); + this.memoryMode = memoryMode; + } + + @Override + void doRecordLong(long value, Attributes attributes, Context context) { + getAggregatorHandle(aggregatorHandles, attributes, context) + .recordLong(value, attributes, context); + } + + @Override + void doRecordDouble(double value, Attributes attributes, Context context) { + getAggregatorHandle(aggregatorHandles, attributes, context) + .recordDouble(value, attributes, context); + } + + @Nullable + @Override + AggregatorHandle maybeGetPooledAggregatorHandle() { + // No aggregator handle pooling for cumulative temporality + return null; + } + + @Override + public MetricData collect( + Resource resource, + InstrumentationScopeInfo instrumentationScopeInfo, + long startEpochNanos, + long epochNanos) { + List points; + if (memoryMode == REUSABLE_DATA) { + reusableResultList.clear(); + points = reusableResultList; + } else { + points = new ArrayList<>(aggregatorHandles.size()); + } + + // Grab aggregated points. + aggregatorHandles.forEach( + (attributes, handle) -> { + if (!handle.hasRecordedValues()) { + return; + } + T point = + handle.aggregateThenMaybeReset( + startEpochNanos, epochNanos, attributes, /* reset= */ false); + + if (point != null) { + points.add(point); + } + }); + + if (points.isEmpty() || !enabled) { + return EmptyMetricData.getInstance(); + } + + return aggregator.toMetricData( + resource, instrumentationScopeInfo, metricDescriptor, points, CUMULATIVE); + } + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java index 9df16e17911..dc23f89c4d4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java @@ -52,7 +52,7 @@ static SynchronousMetricStorage create( if (Aggregator.drop() == aggregator) { return empty(); } - return new DefaultSynchronousMetricStorage<>( + return DefaultSynchronousMetricStorage.create( registeredReader, metricDescriptor, aggregator, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index 0d74e5ee9cb..354c1d899e5 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -9,6 +9,8 @@ import static io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilterInternal.asExemplarFilterInternal; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; +import static org.assertj.core.api.BDDAssertions.as; +import static org.assertj.core.api.InstanceOfAssertFactories.collection; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -30,6 +32,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; import io.opentelemetry.sdk.metrics.internal.descriptor.Advice; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; @@ -101,7 +104,7 @@ private void initialize(MemoryMode memoryMode) { void recordDouble_NaN(MemoryMode memoryMode) { initialize(memoryMode); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( cumulativeReader, METRIC_DESCRIPTOR, aggregator, @@ -128,7 +131,7 @@ void attributesProcessor_applied(MemoryMode memoryMode) { AttributesProcessor.append(Attributes.builder().put("modifiedK", "modifiedV").build()); AttributesProcessor spyAttributesProcessor = spy(attributesProcessor); SynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( cumulativeReader, METRIC_DESCRIPTOR, aggregator, @@ -152,7 +155,7 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { initialize(memoryMode); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( cumulativeReader, METRIC_DESCRIPTOR, aggregator, @@ -163,7 +166,6 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) .hasDoubleSumSatisfying( sum -> @@ -175,7 +177,6 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { // Record measurement and collect at time 30 storage.recordDouble(3, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) .hasDoubleSumSatisfying( sum -> @@ -187,7 +188,6 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { // Record measurement and collect at time 35 storage.recordDouble(2, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35)) .hasDoubleSumSatisfying( sum -> @@ -201,7 +201,7 @@ void recordAndCollect_DeltaResets_ImmutableData() { initialize(IMMUTABLE_DATA); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -212,14 +212,18 @@ void recordAndCollect_DeltaResets_ImmutableData() { // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); - assertThat(storage.getAggregatorHandlePool()).hasSize(1); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(1); deltaReader.setLastCollectEpochNanos(10); // Record measurement and collect at time 30 @@ -227,27 +231,35 @@ void recordAndCollect_DeltaResets_ImmutableData() { // AggregatorHandle should be returned to the pool on reset so shouldn't create additional // handles verify(aggregator, times(1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3))); - assertThat(storage.getAggregatorHandlePool()).hasSize(1); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(1); deltaReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 storage.recordDouble(2, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(30).hasEpochNanos(35).hasValue(2))); - assertThat(storage.getAggregatorHandlePool()).hasSize(1); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(1); } @Test @@ -255,7 +267,7 @@ void recordAndCollect_DeltaResets_ReusableData() { initialize(MemoryMode.REUSABLE_DATA); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -266,14 +278,18 @@ void recordAndCollect_DeltaResets_ReusableData() { // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); deltaReader.setLastCollectEpochNanos(10); @@ -282,14 +298,18 @@ void recordAndCollect_DeltaResets_ReusableData() { // We're switched to secondary map so a handle will be created verify(aggregator, times(2)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3))); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); deltaReader.setLastCollectEpochNanos(30); @@ -301,7 +321,9 @@ void recordAndCollect_DeltaResets_ReusableData() { // aggregator handle is still there, thus no handle was created for empty(), but it will for // the "foo" verify(aggregator, times(3)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35); assertThat(metricData).hasDoubleSumSatisfying(DoubleSumAssert::isDelta); @@ -329,7 +351,9 @@ void recordAndCollect_DeltaResets_ReusableData() { Attributes.of(AttributeKey.stringKey("foo"), "bar")); }))); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); deltaReader.setLastCollectEpochNanos(40); storage.recordDouble(6, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current()); @@ -358,7 +382,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { initialize(memoryMode); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( cumulativeReader, METRIC_DESCRIPTOR, aggregator, @@ -373,7 +397,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) .hasDoubleSumSatisfying( sum -> @@ -387,7 +410,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { assertThat(point.getEpochNanos()).isEqualTo(10); assertThat(point.getValue()).isEqualTo(3); }))); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(logs.getEvents()).isEmpty(); cumulativeReader.setLastCollectEpochNanos(10); @@ -396,7 +418,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { 3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current()); // Should not create an additional handles for the overflow series verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) .hasDoubleSumSatisfying( sum -> @@ -421,7 +442,6 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { point -> assertThat(point.getAttributes()) .isEqualTo(MetricStorage.CARDINALITY_OVERFLOW)))); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } @@ -430,7 +450,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { initialize(IMMUTABLE_DATA); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -445,7 +465,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) .hasDoubleSumSatisfying( sum -> @@ -459,7 +481,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { assertThat(point.getEpochNanos()).isEqualTo(10); assertThat(point.getValue()).isEqualTo(3); }))); - assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(CARDINALITY_LIMIT - 1); assertThat(logs.getEvents()).isEmpty(); deltaReader.setLastCollectEpochNanos(10); @@ -469,7 +493,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { 3, Attributes.builder().put("key", "value" + CARDINALITY_LIMIT).build(), Context.current()); // Should use handle returned to pool instead of creating new ones verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 2); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(CARDINALITY_LIMIT - 2); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) .hasDoubleSumSatisfying( sum -> @@ -484,7 +510,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { Attributes.builder() .put("key", "value" + CARDINALITY_LIMIT) .build()))); - assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(CARDINALITY_LIMIT - 1); assertThat(logs.getEvents()).isEmpty(); deltaReader.setLastCollectEpochNanos(20); @@ -496,7 +524,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { } // Should use handles returned to pool instead of creating new ones verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) .hasDoubleSumSatisfying( sum -> @@ -522,7 +552,9 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { assertThat(point.getAttributes()) .isEqualTo(MetricStorage.CARDINALITY_OVERFLOW)))); - assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .hasSize(CARDINALITY_LIMIT); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } @@ -531,7 +563,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { initialize(MemoryMode.REUSABLE_DATA); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -608,7 +640,9 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { assertThat(point.getAttributes()) .isEqualTo(MetricStorage.CARDINALITY_OVERFLOW)))); - assertThat(storage.getAggregatorHandlePool()).isEmpty(); + assertThat(storage) + .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .isEmpty(); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } @@ -618,7 +652,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { initialize(MemoryMode.REUSABLE_DATA); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -763,7 +797,7 @@ void enabledThenDisable_isEnabled(MemoryMode memoryMode) { initialize(memoryMode); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -782,7 +816,7 @@ void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) { initialize(memoryMode); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -802,7 +836,7 @@ void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) { initialize(memoryMode); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator, @@ -823,7 +857,7 @@ void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) { initialize(memoryMode); DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( + DefaultSynchronousMetricStorage.create( deltaReader, METRIC_DESCRIPTOR, aggregator,