Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -972,29 +972,23 @@ void createdTimestamp() throws IOException {

LongCounter counter = meter.counterBuilder("requests").build();
testClock.advance(Duration.ofMillis(1));
long bearStartNanos = testClock.now();
counter.add(3, Attributes.builder().put("animal", "bear").build());
testClock.advance(Duration.ofMillis(1));
long mouseStartNanos = testClock.now();
counter.add(2, Attributes.builder().put("animal", "mouse").build());
testClock.advance(Duration.ofMillis(1));

// There is a curious difference between Prometheus and OpenTelemetry:
// In Prometheus metrics the _created timestamp is per data point,
// i.e. the _created timestamp says when this specific set of label values
// was first observed.
// In the OTel Java SDK the _created timestamp is the initialization time
// of the SdkMeterProvider, i.e. all data points will have the same _created timestamp.
// So we expect the _created timestamp to be the start time of the application,
// not the timestamp when the counter or an individual data point was created.
String expected =
""
+ "# TYPE requests counter\n"
+ "requests_total{animal=\"bear\",otel_scope_name=\"test\"} 3.0\n"
+ "requests_created{animal=\"bear\",otel_scope_name=\"test\"} "
+ createdTimestamp
+ convertTimestamp(bearStartNanos)
+ "\n"
+ "requests_total{animal=\"mouse\",otel_scope_name=\"test\"} 2.0\n"
+ "requests_created{animal=\"mouse\",otel_scope_name=\"test\"} "
+ createdTimestamp
+ convertTimestamp(mouseStartNanos)
+ "\n"
+ "# TYPE target info\n"
+ "target_info{service_name=\"unknown_service:java\",telemetry_sdk_language=\"java\",telemetry_sdk_name=\"opentelemetry\",telemetry_sdk_version=\"1.x.x\"} 1\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -40,7 +41,7 @@ public static class ThreadState {

@Setup(Level.Trial)
public final void setup() {
aggregatorHandle = aggregation.getAggregator().createHandle();
aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now());
valueSupplier = valueGen.supplier();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -46,7 +47,7 @@ public static class ThreadState {

@Setup(Level.Invocation)
public final void setup() {
aggregatorHandle = aggregation.getAggregator().createHandle();
aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now());
valueSupplier = valueGen.supplier();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class InstrumentGarbageCollectionBenchmarkTest {

/**
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, any {@link
* MetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely allocates memory
* which is then subsequently garbage collected. It is done so comparatively to {@link
* MetricStorage#collect(Resource, InstrumentationScopeInfo, long)} barely allocates memory which
* is then subsequently garbage collected. It is done so comparatively to {@link
* MemoryMode#IMMUTABLE_DATA},
*
* <p>It runs the JMH test {@link InstrumentGarbageCollectionBenchmark} with GC profiler, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ Collection<MetricData> collectAll(RegisteredReader registeredReader, long epochN
// Only invoke callbacks if meter is enabled
if (meterEnabled) {
for (CallbackRegistration callbackRegistration : currentRegisteredCallbacks) {
callbackRegistration.invokeCallback(
registeredReader, meterProviderSharedState.getStartEpochNanos(), epochNanos);
callbackRegistration.invokeCallback(registeredReader);
}
}

Expand All @@ -145,10 +144,7 @@ Collection<MetricData> collectAll(RegisteredReader registeredReader, long epochN
for (MetricStorage storage : storages) {
MetricData current =
storage.collect(
meterProviderSharedState.getResource(),
getInstrumentationScopeInfo(),
meterProviderSharedState.getStartEpochNanos(),
epochNanos);
meterProviderSharedState.getResource(), getInstrumentationScopeInfo(), epochNanos);
// Ignore if the metric data doesn't have any data points, for example when aggregation is
// Aggregation#drop()
if (!current.isEmpty()) {
Expand Down Expand Up @@ -288,6 +284,7 @@ WriteableMetricStorage registerSynchronousMetricStorage(InstrumentDescriptor ins
SynchronousMetricStorage.create(
reader,
registeredView,
meterProviderSharedState.getClock(),
instrument,
meterProviderSharedState.getExemplarFilter(),
meterEnabled)));
Expand Down Expand Up @@ -317,7 +314,11 @@ SdkObservableMeasurement registerObservableMeasurement(
registeredStorages.add(
registry.register(
AsynchronousMetricStorage.create(
reader, registeredView, instrumentDescriptor, meterEnabled)));
reader,
registeredView,
meterProviderSharedState.getClock(),
instrumentDescriptor,
meterEnabled)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public static SdkMeterProviderBuilder builder() {
Resource resource,
ExemplarFilterInternal exemplarFilter,
ScopeConfigurator<MeterConfig> meterConfigurator) {
long startEpochNanos = clock.now();
this.registeredViews = registeredViews;
this.registeredReaders =
metricReaders.entrySet().stream()
Expand All @@ -83,8 +82,7 @@ public static SdkMeterProviderBuilder builder() {
ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews)))
.collect(toList());
this.metricProducers = metricProducers;
this.sharedState =
MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos);
this.sharedState = MeterProviderSharedState.create(clock, resource, exemplarFilter);
this.registry =
new ComponentRegistry<>(
instrumentationLibraryInfo ->
Expand All @@ -99,7 +97,6 @@ public static SdkMeterProviderBuilder builder() {
readerMetricProducers.add(new LeasedMetricProducer(registry, sharedState, registeredReader));
MetricReader reader = registeredReader.getReader();
reader.register(new SdkCollectionRegistration(readerMetricProducers, sharedState));
registeredReader.setLastCollectEpochNanos(startEpochNanos);
if (reader instanceof PeriodicMetricReader) {
setReaderMeterProvider((PeriodicMetricReader) reader, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ static Aggregator<?> drop() {
}

/**
* Returns a new {@link AggregatorHandle}. This MUST by used by the synchronous to aggregate
* recorded measurements during the collection cycle.
* Returns a new {@link AggregatorHandle}. Used by both synchronous and asynchronous metric
* storage to aggregate recorded measurements during the collection cycle.
*
* @param creationEpochNanos the epoch timestamp (nanos) at which the handle is being created,
* stored via {@link AggregatorHandle#getCreationEpochNanos()}. Whether this value is used as
* the start timestamp of reported data points depends on the instrument and temporality — see
* {@link AggregatorHandle#getCreationEpochNanos()} for details.
* @return a new {@link AggregatorHandle}.
*/
AggregatorHandle<T> createHandle();
AggregatorHandle<T> createHandle(long creationEpochNanos);

/**
* Returns a new DELTA point by computing the difference between two cumulative points.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ public abstract class AggregatorHandle<T extends PointData> {
private static final String UNSUPPORTED_DOUBLE_MESSAGE =
"This aggregator does not support double values.";

private final long creationEpochNanos;
// A reservoir of sampled exemplars for this time period.
@Nullable private final DoubleExemplarReservoir doubleReservoirFactory;
@Nullable private final LongExemplarReservoir longReservoirFactory;
private final boolean isDoubleType;
private volatile boolean valuesRecorded = false;

protected AggregatorHandle(ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) {
protected AggregatorHandle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) {
this.creationEpochNanos = creationEpochNanos;
this.isDoubleType = isDoubleType;
if (isDoubleType) {
this.doubleReservoirFactory = reservoirFactory.createDoubleExemplarReservoir();
Expand Down Expand Up @@ -145,4 +148,22 @@ private static <S> S throwUnsupportedIfNull(@Nullable S value, String message) {
}
return value;
}

/**
* Returns the epoch timestamp (nanos) at which this handle was created.
*
* <p>For cumulative synchronous instruments, this is the time of the first measurement for the
* series and is used as {@link PointData#getStartEpochNanos()}.
*
* <p>For cumulative asynchronous instruments, this is either the instrument creation time (if the
* series first appeared during the first collection cycle) or the preceding collection interval's
* timestamp (if the series appeared in a later cycle), and is used as {@link
* PointData#getStartEpochNanos()}.
*
* <p>Not used for delta instruments; their start epoch is computed directly from the reader's
* last collection time or instrument creation time.
*/
public long getCreationEpochNanos() {
return creationEpochNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ public DoubleBase2ExponentialHistogramAggregator(
}

@Override
public AggregatorHandle<ExponentialHistogramPointData> createHandle() {
return new Handle(reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode);
public AggregatorHandle<ExponentialHistogramPointData> createHandle(long creationEpochNanos) {
return new Handle(
creationEpochNanos, reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode);
}

@Override
Expand Down Expand Up @@ -104,12 +105,13 @@ static final class Handle extends AggregatorHandle<ExponentialHistogramPointData
@Nullable private final MutableExponentialHistogramPointData reusablePoint;

Handle(
long creationEpochNanos,
ExemplarReservoirFactory reservoirFactory,
int maxBuckets,
int maxScale,
boolean recordMinMax,
MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
this.maxBuckets = maxBuckets;
this.maxScale = maxScale;
this.recordMinMax = recordMinMax;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ public DoubleExplicitBucketHistogramAggregator(
}

@Override
public AggregatorHandle<HistogramPointData> createHandle() {
return new Handle(boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode);
public AggregatorHandle<HistogramPointData> createHandle(long creationEpochNanos) {
return new Handle(
creationEpochNanos, boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -120,12 +121,13 @@ static final class Handle extends AggregatorHandle<HistogramPointData> {
@Nullable private final MutableHistogramPointData reusablePoint;

Handle(
long creationEpochNanos,
List<Double> boundaryList,
double[] boundaries,
boolean recordMinMax,
ExemplarReservoirFactory reservoirFactory,
MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
this.boundaryList = boundaryList;
this.boundaries = boundaries;
this.recordMinMax = recordMinMax;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public DoubleLastValueAggregator(
}

@Override
public AggregatorHandle<DoublePointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<DoublePointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -99,8 +99,9 @@ static final class Handle extends AggregatorHandle<DoublePointData> {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

private Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
private Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableDoublePointData();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public DoubleSumAggregator(
}

@Override
public AggregatorHandle<DoublePointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<DoublePointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -112,8 +112,9 @@ static final class Handle extends AggregatorHandle<DoublePointData> {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
reusablePoint = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableDoublePointData() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public List<? extends ExemplarData> getExemplars() {

private static final AggregatorHandle<PointData> HANDLE =
new AggregatorHandle<PointData>(
ExemplarReservoirFactory.noSamples(), /* isDoubleType= */ true) {
0, ExemplarReservoirFactory.noSamples(), /* isDoubleType= */ true) {
@Override
protected PointData doAggregateThenMaybeResetDoubles(
long startEpochNanos,
Expand All @@ -75,7 +75,7 @@ protected void doRecordDouble(double value) {}
private DropAggregator() {}

@Override
public AggregatorHandle<PointData> createHandle() {
public AggregatorHandle<PointData> createHandle(long creationEpochNanos) {
return HANDLE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public LongLastValueAggregator(ExemplarReservoirFactory reservoirFactory, Memory
}

@Override
public AggregatorHandle<LongPointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<LongPointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -95,8 +95,9 @@ static final class Handle extends AggregatorHandle<LongPointData> {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePoint;

Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ false);
Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableLongPointData();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public LongSumAggregator(
}

@Override
public AggregatorHandle<LongPointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<LongPointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -105,8 +105,9 @@ static final class Handle extends AggregatorHandle<LongPointData> {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePointData;

Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ false);
Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false);
reusablePointData =
memoryMode == MemoryMode.REUSABLE_DATA ? new MutableLongPointData() : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class RegisteredReader {
private final int id = ID_COUNTER.incrementAndGet();
private final MetricReader metricReader;
private final ViewRegistry viewRegistry;
private volatile long lastCollectEpochNanos;
private volatile long lastCollectEpochNanos = -1;

/** Construct a new collection info object storing information for collection against a reader. */
public static RegisteredReader create(MetricReader reader, ViewRegistry viewRegistry) {
Expand All @@ -52,13 +52,14 @@ public void setLastCollectEpochNanos(long epochNanos) {
}

/**
* Get the time of the last collection for the reader.
* Get the time of the last collection for the reader, or {@code defaultEpochNanos} if not set.
*
* <p>Used to compute the {@link PointData#getStartEpochNanos()} for instruments aggregations with
* {@link AggregationTemporality#DELTA} temporality.
*/
public long getLastCollectEpochNanos() {
return lastCollectEpochNanos;
public long getLastCollectEpochNanosOrDefault(long defaultEpochNanos) {
long lastCollectEpochNanos = this.lastCollectEpochNanos;
return lastCollectEpochNanos == -1 ? defaultEpochNanos : lastCollectEpochNanos;
}

/** Get the {@link ViewRegistry} for the reader. */
Expand Down
Loading
Loading