timeseries [2/6] RUM-13949: Implement timeseries pipeline infrastructure#3432
timeseries [2/6] RUM-13949: Implement timeseries pipeline infrastructure#3432satween wants to merge 2 commits into
timeseries [2/6] RUM-13949: Implement timeseries pipeline infrastructure#3432Conversation
timeseries RUM-13949: Implement timeseries pipeline infrastructure
cacd357 to
1a6f200
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## feature/timeseries #3432 +/- ##
======================================================
- Coverage 72.27% 72.27% -0.00%
======================================================
Files 964 972 +8
Lines 35554 35670 +116
Branches 5922 5944 +22
======================================================
+ Hits 25694 25777 +83
- Misses 8249 8281 +32
- Partials 1611 1612 +1
🚀 New features to boost your workflow:
|
1a6f200 to
7bb00bc
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7bb00bc938
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
7bb00bc to
f1063fa
Compare
kikoveiga
left a comment
There was a problem hiding this comment.
Nice! Just left some small comments, worth to check what Codex said about some concurrency issues.
This comment has been minimized.
This comment has been minimized.
fa10624 to
5968d81
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5968d81b3b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| ) { | ||
| if (!isActive(generation)) return@scheduleSafe | ||
| try { | ||
| synchronized(pipeline, pipeline::execute) |
There was a problem hiding this comment.
Re-check stop state after taking the pipeline lock
If a scheduled tick passes the isActive(generation) check and then onSessionStop() runs before this lock is acquired, the stop path sets the state to STOPPED and flushes the same pipeline first; after that, this tick still executes pipeline.execute(), can append a new sample, and will not reschedule or flush it because the final isActive check is false. In that stop/race window the last sample is silently left in the buffer, so the runnable should re-check activity after acquiring the lock or the stop path should otherwise prevent post-flush execution.
Useful? React with 👍 / 👎.
timeseries RUM-13949: Implement timeseries pipeline infrastructuretimeseries [2/5] RUM-13949: Implement timeseries pipeline infrastructure
timeseries [2/5] RUM-13949: Implement timeseries pipeline infrastructuretimeseries [2/6] RUM-13949: Implement timeseries pipeline infrastructure
| - "java.util.concurrent.atomic.AtomicBoolean.constructor(kotlin.Boolean)" | ||
| - "java.util.concurrent.atomic.AtomicBoolean.get()" | ||
| - "java.util.concurrent.atomic.AtomicBoolean.getAndSet(kotlin.Boolean)" | ||
| - "java.util.concurrent.atomic.AtomicReference.getAndSet(com.datadog.android.rum.internal.timeseries.RumSessionScopeTimeseries.State?)" |
| - "kotlin.collections.MutableList.add(kotlin.Int, com.datadog.android.sessionreplay.internal.recorder.mapper.MapperTypeWrapper)" | ||
| - "kotlin.collections.MutableList.add(kotlin.Int, com.datadog.android.sessionreplay.model.MobileSegment.Wireframe)" | ||
| - "kotlin.collections.MutableList.add(kotlin.Int, com.datadog.android.sessionreplay.model.MobileSegment.Wireframe.ShapeWireframe)" | ||
| - "kotlin.collections.MutableList.add(com.datadog.android.rum.internal.timeseries.DataPoint)" |
|
|
||
| @Test | ||
| fun `M keep session context W handleEvent(action) {before expiration}`( | ||
| forge: Forge |
There was a problem hiding this comment.
Is it the change by some custom formatter? ktlint 1.5.0 + config in develop branch doesn't do this. It seems that there are many formatting changes in this PR not related to the feature scope actually.
It makes diff to review bigger.
| whenever(mockRumFeatureScope.withWriteContext(any(), any())) doAnswer { inv -> | ||
| inv.getArgument<(DatadogContext, EventWriteScope) -> Unit>(inv.arguments.lastIndex) |
There was a problem hiding this comment.
by some reason Github code highlighter thinks that inv is a language keyword and highlights it respectively, although it is not in the list https://kotlinlang.org/docs/keyword-reference.html
| testedPipeline.flush() | ||
|
|
||
| // Then | ||
| verify(mockDataWriter, never()).write(any(), any(), any()) |
There was a problem hiding this comment.
| verify(mockDataWriter, never()).write(any(), any(), any()) | |
| verifyNoInteractions(mockDataWriter) |
same maybe can be applied in the tests above
| val pipelineB = mockPipeline() | ||
| val provider: (String, String, RumSessionType) -> List<Pipeline<*>> = | ||
| mock<(String, String, RumSessionType) -> List<Pipeline<*>>>().apply { | ||
| org.mockito.kotlin.whenever(this(fakeSessionId, fakeApplicationId, fakeSessionType)) |
There was a problem hiding this comment.
| org.mockito.kotlin.whenever(this(fakeSessionId, fakeApplicationId, fakeSessionType)) | |
| whenever(this(fakeSessionId, fakeApplicationId, fakeSessionType)) |
Introduces the core primitives for the timeseries data pipeline: `Timeseries` interface (with `@NoOpImplementation`), `DataPoint`, `Buffer`, `DeltaCompression`, `Pipeline`, `EventWriter`, `JsonSerializer` interface, `DataPointsReader` interface, `VitalReaderWrapper`, and the generic `RumSessionScopeTimeseries` per-session collector. Also adds detekt safe-call allowlist entries for the new APIs. ### What does this PR do? Adds the foundational building blocks shared by all timeseries metric types: the `Timeseries` lifecycle interface, immutable `DataPoint`, the `Buffer`/`DeltaCompression`/`Pipeline` data-processing chain, the `EventWriter` flush boundary, `JsonSerializer` interface, `DataPointsReader`/`VitalReaderWrapper` for OS-metric abstraction, and `RumSessionScopeTimeseries` — the generic per-session scheduler that drives sampling, batching, and writing for any metric type. ### Motivation Establishes the shared infrastructure that CPU and memory collectors will extend, keeping primitive types and pipeline logic isolated so they can be reviewed without metric-specific details. ### Additional Notes Unit tests cover all primitives. `DataPointForgeryFactory` and the `Configurator` registration are included so subsequent commits can write tests against generated `DataPoint` values. `RumSessionTypeExt` session-type mappings are added here (used by both CPU and memory serializers in later commits).
49598a9
5968d81 to
49598a9
Compare
- Switch `RumSessionScopeTimeseries` from owning a per-session executor to accepting a shared `ScheduledExecutorService` (no creation, no shutdown). - Add `RumSessionScopeTimeseriesFactory` so reviewers can see how `RumSessionScopeTimeseries` is instantiated. - Wire timeseries `start`/`stop` lifecycle into `RumSessionScope` (`startTimeseries`/`stopTimeseries` called from `renewSession`/`stopSession`). - Remove 3 unused entries added to `detekt_custom_safe_calls.yml` in this branch.
49598a9 to
76026c3
Compare

What does this PR do?
What does this PR do?
Adds the foundational building blocks shared by all timeseries metric types: the
Timeserieslifecycle interface, immutableDataPoint, theBuffer/DeltaCompression/Pipelinedata-processing chain, theEventWriterflush boundary,JsonSerializerinterface,DataPointsReader/VitalReaderWrapperfor OS-metric abstraction, andRumSessionScopeTimeseries— the generic per-session scheduler that drives sampling, batching, and writing for any metric type.Motivation
Establishes the shared infrastructure that CPU and memory collectors will extend, keeping primitive types and pipeline logic isolated so they can be reviewed without metric-specific details.
Review checklist (to be filled by reviewers)