-
Notifications
You must be signed in to change notification settings - Fork 81
timeseries [3/6] RUM-13949: Add CPU timeseries data-point reader and serializer
#3433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/timeseries
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| /* | ||
| * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. | ||
| * This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| * Copyright 2016-Present Datadog, Inc. | ||
| */ | ||
|
|
||
| package com.datadog.android.rum.internal.timeseries.provider | ||
|
|
||
| import com.datadog.android.api.InternalLogger | ||
| import com.datadog.android.core.internal.persistence.file.canReadSafe | ||
| import com.datadog.android.core.internal.persistence.file.existsSafe | ||
| import com.datadog.android.core.internal.persistence.file.readTextSafe | ||
| import com.datadog.android.internal.time.TimeProvider | ||
| import java.io.File | ||
|
|
||
| /** | ||
| * Reads CPU usage as a percentage by computing successive deltas of the utime field | ||
| * from `/proc/self/stat`. | ||
| * | ||
| * CLK_TCK = 100 Hz on Android bionic: 100 ticks/s ≡ 100% single-core CPU. | ||
| * Multi-core spikes are clamped to 100.0. The first call always returns null. | ||
| */ | ||
| internal class CpuDatapointReader( | ||
| internal val statFile: File = STAT_FILE, | ||
| private val cpuTimeProvider: TimeProvider, | ||
| override val intervalMs: Long, | ||
| private val internalLogger: InternalLogger | ||
| ) : DataPointsReader<Double>(cpuTimeProvider) { | ||
|
|
||
| private var lastUtime: Double? = null | ||
| private var lastTimestampMs: Long? = null | ||
|
|
||
| @Suppress("ReturnCount") | ||
| override fun readValue(): Double? { | ||
| val nowMs = cpuTimeProvider.getDeviceTimestampMillis() | ||
| val cpuTicks = readCpuTicks() ?: return null | ||
| val prevTicks = lastUtime | ||
| val prevMs = lastTimestampMs | ||
| lastUtime = cpuTicks | ||
| lastTimestampMs = nowMs | ||
| if (prevTicks == null || prevMs == null) return null | ||
| val elapsedMs = (nowMs - prevMs).coerceAtLeast(1L) | ||
| // CLK_TCK = 100 Hz on Android bionic → 100 ticks/s = 100% CPU on one core | ||
| return ((cpuTicks - prevTicks) * MS_PER_SECOND / elapsedMs).coerceIn(0.0, MAX_CPU_PERCENT) | ||
| } | ||
|
|
||
| @Suppress("ReturnCount") | ||
| private fun readCpuTicks(): Double? { | ||
| if (!statFile.existsSafe(internalLogger) || !statFile.canReadSafe(internalLogger)) return null | ||
| val stat = statFile.readTextSafe(internalLogger = internalLogger) ?: return null | ||
| val tokens = stat.split(' ') | ||
| return when { | ||
| tokens.size <= UTIME_IDX -> null | ||
| tokens.size <= STIME_IDX -> tokens[UTIME_IDX].toDoubleOrNull() | ||
| else -> { | ||
| val utime = tokens[UTIME_IDX].toDoubleOrNull() ?: return null | ||
| val stime = tokens[STIME_IDX].toDoubleOrNull() ?: return null | ||
| utime + stime | ||
| } | ||
| } | ||
| } | ||
|
|
||
| companion object { | ||
| private const val STAT_PATH = "/proc/self/stat" | ||
| internal val STAT_FILE = File(STAT_PATH) | ||
| private const val UTIME_IDX = 13 | ||
| private const val STIME_IDX = 14 | ||
| private const val MS_PER_SECOND = 1000.0 | ||
| private const val MAX_CPU_PERCENT = 100.0 | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| /* | ||
| * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. | ||
| * This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| * Copyright 2016-Present Datadog, Inc. | ||
| */ | ||
|
|
||
| package com.datadog.android.rum.internal.timeseries.serializer | ||
|
|
||
| import com.datadog.android.internal.time.TimeProvider | ||
| import com.datadog.android.rum.RumSessionType | ||
| import com.datadog.android.rum.internal.timeseries.DataPoint | ||
| import com.datadog.android.rum.internal.timeseries.DeltaCompression | ||
| import com.datadog.android.rum.internal.timeseries.DeltaCompression.mapToDeltaCompressed | ||
| import com.datadog.android.rum.internal.timeseries.DeltaCompression.roundToLongSafely | ||
| import com.datadog.android.rum.internal.toTimeseriesCpuSessionType | ||
| import com.datadog.android.rum.model.TimeseriesCpuEvent | ||
| import com.google.gson.JsonObject | ||
| import java.util.UUID | ||
|
|
||
| internal class CpuEventSerializer( | ||
| private val sessionId: String, | ||
| private val applicationId: String, | ||
| private val sessionType: RumSessionType, | ||
| private val timeProvider: TimeProvider, | ||
| private val useDeltaCompression: Boolean = false | ||
| ) : JsonSerializer<Double> { | ||
|
|
||
| override fun serialize(dataPoints: List<DataPoint<Double>>): JsonObject? { | ||
| if (dataPoints.isEmpty()) return null | ||
| val data = dataPoints.map { sample -> | ||
| TimeseriesCpuEvent.Data( | ||
| timestamp = sample.timestampNs, | ||
| dataPoint = TimeseriesCpuEvent.DataPoint(sample.value) | ||
| ) | ||
| } | ||
| val start = data.firstOrNull()?.timestamp ?: 0L | ||
| val end = data.lastOrNull()?.timestamp ?: 0L | ||
| val deltaEncoded = if (useDeltaCompression) encodeDelta(data) else null | ||
| val schema = if (deltaEncoded != null) { | ||
| TimeseriesCpuEvent.Schema.DELTA_SCALAR | ||
| } else { | ||
| TimeseriesCpuEvent.Schema.OBJECT | ||
| } | ||
| val json = TimeseriesCpuEvent( | ||
| dd = TimeseriesCpuEvent.Dd(), | ||
| application = TimeseriesCpuEvent.Application(id = applicationId), | ||
| session = TimeseriesCpuEvent.Session( | ||
| id = sessionId, | ||
| type = sessionType.toTimeseriesCpuSessionType() | ||
| ), | ||
| source = TimeseriesCpuEvent.Source.ANDROID, | ||
| date = timeProvider.getDeviceTimestampMillis(), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a CPU batch is flushed after buffering samples or at session stop, this sets the top-level Useful? React with 👍 / 👎. |
||
| service = null, | ||
| version = null, | ||
| timeseries = TimeseriesCpuEvent.Timeseries( | ||
| id = UUID.randomUUID().toString(), | ||
| schema = schema, | ||
| start = start, | ||
| end = end, | ||
| data = data | ||
| ) | ||
| ).toJson() as JsonObject | ||
|
|
||
| if (deltaEncoded != null) { | ||
| val timeseriesJson = json.getAsJsonObject("timeseries") | ||
| timeseriesJson.remove("data") | ||
| timeseriesJson.add("data", deltaEncoded) | ||
| } | ||
| return json | ||
| } | ||
|
|
||
| private fun encodeDelta(data: List<TimeseriesCpuEvent.Data>): JsonObject? { | ||
| if (data.size <= 1) return null | ||
|
|
||
| val ts = data.mapToDeltaCompressed { it.timestamp } | ||
| val cpuUsageArray = data.mapToDeltaCompressed { | ||
| roundToLongSafely(it.dataPoint.cpuUsage.toDouble(), replaceNaNWith = 0L) | ||
| } | ||
|
|
||
| return JsonObject().apply { | ||
| addProperty("precision", DeltaCompression.PRECISION) | ||
| addProperty("resolution", RESOLUTION_NS) | ||
| add("ts", ts) | ||
| add("value", cpuUsageArray) | ||
| } | ||
| } | ||
|
|
||
| companion object { | ||
| private const val RESOLUTION_NS = "ns" | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the device wall clock is adjusted while a RUM session is sampling, this timestamp can jump backward or forward because
getDeviceTimestampMillis()is backed bySystem.currentTimeMillis(). That makeselapsedMsnegative/huge, so a backward correction is coerced to 1 ms and reports a clamped 100% CPU spike, while a forward correction suppresses CPU usage. The CPU delta interval should be measured with the monotonic elapsed time provider and only use wall-clock time for the emitted sample timestamp.Useful? React with 👍 / 👎.