This is inspired from TimeLine Analytics and it's extention, but most importantly, system being backed by new agentic runtime Golem that's also a durable execution engine.
Watch the talk from Afsal at LambdaConf:2024:Estes-Park:Colorado or refer presentation slides here
You will get more idea about the dashboard soon.
A text-based DSL for expressing temporal analytics over event streams. Write a query, deploy it, and the system materializes a durable agent graph that processes events in real time.
A user is experiencing connection-induced rebuffering when:
- They started playing at some point (
has_existed) - There was no recent seek event (
!has_existed_within— rules out seek-induced buffering) - The current player state is
"buffer"(latest_event_to_state == "buffer")
All three must be true simultaneously, and we measure how long that condition holds.
duration_where(
has_existed(playerStateChange == "play")
&& !has_existed_within(playerStateChange == "seek", 5)
&& latest_event_to_state(playerStateChange) == "buffer"
)With cross-session aggregation per CDN:
duration_where(
has_existed(playerStateChange == "play")
&& !has_existed_within(playerStateChange == "seek", 5)
&& latest_event_to_state(playerStateChange) == "buffer"
) | aggregate(group_by(cdn), count, sum, avg) |
seek | seek
|
buffer| ---(buffer)---
play | ---(play)--
t ---------------------------------------------->
t1 t2 t3 t10
(play)--------------------------------------
-----t1
t1---------- t7-------------
t2---(seek+5)---t7
t3-------------(bufer)
-------------
t1 t3
t7--------t10
t1------t2----------t7
3sec /
2sec /
1sec /
0sec----------------------/
t7 t8 t9 t10
The summary of the above timeline is as follows:
User did start playing at some point. After playing user did perform a seek event at some point. We extend this event to a configurable 5 seconds. Even after extending the seek event to 5 seconds, we can see there still exists 3 seconds of buffering, indicating this buffering may not be the direct outcome of seek - contributing to the connection induced rebuffering!
Time spent idle per region:
duration_in_cur_state(
latest_event_to_state(status) == "idle"
) | aggregate(group_by(region), count, avg, max)Credit card location anomaly — location changed too quickly:
duration_in_cur_state(
latest_event_to_state(location))
) < 600If the cardholder has been at the current location for less than 600 seconds, the location just changed — flag it as a potential anomaly (e.g., New York → London in 10 minutes).
User engaged: played and never errored:
has_existed(playerStateChange == "play") && !has_existed(error == "fatal")Run the full CIRR pipeline end-to-end with a single command. Requires Docker (for Kafka) and Golem CLI 1.4.1+.
cargo make demoThis will:
- Start a local Golem server and Kafka broker
- Build and deploy the timeline WASM component
- Initialize 9 CIRR sessions across 3 CDNs (akamai, cloudfront, fastly)
- Feed realistic playback events (init → play → buffer → play) to each session
- Open a dashboard at http://localhost:3000
The CIRR expression compiles into 8 Golem agents per session. The Computation Progress tab in the dashboard lets you query every sub-computation's result and its progress at any point in time:
duration-where-1 ← root: cumulative seconds where CIRR is true
and-2 ← all 3 conditions combined
and-3 ← has-existed ∧ ¬has-existed-within
has-existed-4 ← LEAF: has playerStateChange == "play" ever occurred?
not-5 ← negation of has-existed-within-6
has-existed-within-6 ← LEAF: was there a seek within the last 5 time units?
equal-to-7 ← is the current player state "buffer"?
latest-event-to-state-8 ← LEAF: what is the latest playerStateChange value?
Once the demo is running and the dashboard is open at http://localhost:3000:
- Click the Computation Progress tab
- Click the
demo-akamai-1preset button (session ID is pre-filled) - Leave query time at 250 and click Query All Nodes
You'll immediately see every sub-computation's result:
- latest-event-to-state-8 (leaf) shows
"buffer"— the latestplayerStateChangevalue - has-existed-4 (leaf) shows
true— "play" has existed at some point - has-existed-within-6 (leaf) shows
false— no seek event within the last 5 time units - not-5 (derived) shows
true— negation of has-existed-within-6 (¬false = true) - equal-to-7 (derived) shows
true— latest state equals "buffer" - and-3 (derived) shows
true— has_existed ∧ ¬has_existed_within - and-2 (derived) shows
true— all three CIRR conditions are met - duration-where-1 (root) shows the cumulative rebuffering duration in seconds
Try changing the query time to 120 (before the buffer event) and clicking again — you'll see how the sub-computations differ.
- Leaf nodes (green) hold raw state computed from ingested events
- Derived nodes (blue) hold recomputed results from their children
Each result is a local point-in-time lookup (~10ms per agent invoke) — no cascading RPC. The push cascade has already propagated all changes upward.
Once you've seen the per-session sub-computations, switch to aggregated metrics:
- Click the Live Dashboard tab
- Click
cdn: akamai— results appear immediately
The metrics show cross-session totals for all 3 akamai sessions:
- Count = 3 (three sessions on this CDN)
- Sum — total CIRR rebuffering duration (seconds) across all 3 sessions
- Avg — average CIRR per session (sum ÷ 3)
Click cdn: cloudfront or cdn: fastly to compare CDNs.
Press Ctrl+C to stop the dashboard. The demo script automatically stops Golem and Kafka.
The project is a merging the ideas from the TimeLine DSL — a composable language for expressing temporal analytics over event streams, into Golem's Runtime. Each node in a timeline expression maps to a durable Golem agent (worker). The architecture is fully push-based: leaf nodes ingest events and push state changes upward through the agent tree. Derived nodes recompute incrementally on each notification and cascade changes to their parents. Point-in-time queries are local lookups on precomputed state — no cascading RPC required at query time.
┌─────────────────┐
│ TimelineDriver │ (1) Walks the DSL tree, spawns agents,
│ (orchestrator) │ wires ParentRef / AggregatorRef
└────────┬────────┘
│ spawns & wires
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ EventProcessor │ │ EventProcessor │ │ EventProcessor │
│ (leaf: has_exist)│ │ (leaf: latest) │ │ (leaf: within) │
└────────┬─────────┘ └────────┬─────────┘ └────────┬─────────┘
│ push │ push │ push
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐
│TimelineProcessor │ │TimelineProcessor │ (2) Receives on_child_state_changed,
│ (derived: And) │ │ (derived: EqualTo│ recomputes, pushes to parent
└────────┬─────────┘ └────────┬─────────┘
│ push │ push
└────────┬───────────┘
▼
┌──────────────────┐ ┌──────────────────┐
│TimelineProcessor │──────▶│ Aggregator │ (3) Root pushes deltas
│ (root: Duration) │ delta │ (cross-session) │ to aggregator
└──────────────────┘ └──────────────────┘- Event ingestion — An event arrives at an
EventProcessorviaadd_event. - Leaf computation — The leaf evaluates its operation (e.g., "has
status == errorever been true?") and records the result in its localStateDynamicsTimeLine. - Parent notification — If the state changed, the leaf calls
on_child_state_changedon its parentTimelineProcessor(identified by theParentRefwired during initialization). - Derived recomputation — The
TimelineProcessorupdates its own state and, if changed, pushes upward to its parent. This cascade continues until the root node is reached. - Aggregator update — If the root node has an
AggregatorRef, it computes the delta between the old and new numeric value and callson_deltaon theAggregatoragent. The aggregator maintains only running accumulators (sum, count), so each session contributes O(1) memory. - Query —
get_leaf_result(t)orget_derived_result(t)performs a local point lookup on the precomputedStateDynamicsTimeLine. No RPC cascade is needed.
Use the REPL or golem agent invoke to submit a timeline expression to the driver.
For example, the CIRR expression EqualTo(TlLatestEventToState("playerStateChange"), "play"):
golem agent invoke \
'timeline-driver("cirr")' \
'timeline:core/timeline-driver.{initialize-timeline}' \
'{nodes: [comparison(equal-to, 1, string-value("play")), tl-latest-event-to-state("playerStateChange")]}'This spawns the required EventProcessor and TimelineProcessor agents, wired together.
To aggregate CIRR duration across CDN sessions, pass an AggregationConfig alongside the timeline.
For example, to group by the cdn column in event data and compute Count, Sum, and Avg:
golem agent invoke \
'timeline-driver("cirr-cdn-x-session-1")' \
'timeline:core/timeline-driver.{initialize-timeline}' \
'{nodes: [tl-duration-where(1), and(2, 3), comparison(equal-to, 4, string-value("buffer")), tl-latest-event-to-state("playerStateChange"), tl-has-existed(col-name: "playerStateChange", value: string-value("play"), op: equal))]}' \
'{group-by-column: "cdn", aggregations: [count, sum, avg]}'Each session's root node pushes deltas to the shared aggregator-cdn-x agent.
Query the aggregated metrics across all sessions for that CDN:
golem agent invoke \
'aggregator("aggregator-cdn-x")' \
'timeline:core/aggregator.{get-aggregation-result}'This returns { count, sum, avg, min, max } — the running aggregate of CIRR duration
across all sessions grouped under cdn-x.
Once the timeline is initialized (this may not be required as such in near future as initialisation is idempotent in golem), feed events to the leaf EventProcessor agents. The driver logs which agent names it created — use those to target events:
golem agent invoke \
'event-processor("cirr-latest-event-to-state-8")' \
'timeline:core/event-processor.{add-event}' \
'{time: 1, event: [("playerStateChange", string-value("play"))]}'
golem agent invoke \
'event-processor("cirr-latest-event-to-state-8")' \
'timeline:core/event-processor.{add-event}' \
'{time: 5, event: [("playerStateChange", string-value("pause"))]}'golem agent invoke \
'event-processor("cirr-latest-event-to-state-8")' \
'timeline:core/event-processor.{get-leaf-result}' \
'3'This section walks through a realistic end-to-end deployment of CIRR at a hypothetical streaming platform (think Disney+, Netflix, etc.) where player telemetry events flow through Pulsar or Kafka.
┌────────────────┐ ┌───────────────┐ ┌────────────────────────────────────┐
│ Video Players │────▶│ Pulsar/Kafka │────▶ │ Feeder (Pulsar Consumer) │
│ (millions of │ │ Topic: │ │ │
│ sessions) │ │ player-events│ │ 1. Extract session_id from msg │
└────────────────┘ └───────────────┘ │ 2. If new session: │
│ → initialize_timeline(sess_id) │
│ 3. Route event to leaf agents │
│ using naming convention: │
│ "{sess_id}-node-{N}" │
└──────────────┬─────────────────────┘
│
┌──────────────▼─────────────────────┐
│ Golem Cloud (K8s) │
│ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Leaf │ │ Leaf │ │
│ │ Agents │──▶ │ Derived │──▶ ..│
│ │(per sess)│ │ Agents │ │
│ └──────────┘ └────┬─────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Aggregator │ │
│ │ (per CDN) │ │
│ └─────────────┘ │
└────────────────────────────────────┘
The feeder is a standalone process (not a Golem agent) that bridges the message broker and Golem. Here is the event routing logic:
┌─────────────────────────────────────------------------------─┐
│ Feeder (Consumer) │
│ │
│ Event arrives from Pulsar: │
│ { session_id: "sess-42", │
│ time: 7, │
│ event: [("playerStateChange", │
│ "buffer")] } │
│ │
│ 1. session_id = "sess-42" │
│ │
│ 2. First event for this session? │
│ YES → call initialize_timeline │
│ on TimelineDriver("sess-42") │
│ NO → skip (already initialized) │
│ │
│ 3. Column is "playerStateChange" │
│ → route to has-existed-4 AND latest-event-to-state-8 │
│ │
│ If column were "userAction" │
│ → route to has-existed-within-6 only │
└────────────┬─────────---─┬───────────────────────────────────┘
│ │
┌──────────────────▼---┐ ┌─────▼────────────────────────────-─┐
│ EventProcessor │ │ EventProcessor │
│ "sess-42-has- │ │ "sess-42-latest-event-to-state-8" │
│ existed-4" │ │ (TlLatestEventToState) │
│ (TlHasExisted) │ │ │
└─────────────────────-┘ └────────────────────────────────────┘
Note that one event can fan out to multiple leaves. A playerStateChange event
must be sent to both has-existed-4 (which checks "has play ever existed?") and latest-event-to-state-8
(which tracks the latest state). The feeder is responsible for this fan-out.
The feeder only needs to track which sessions have been initialized (a simple
HashSet<SessionId>, not a full plan per session). The routing logic is static
and identical for every session:
// Static routing table — derived once from the CIRR workflow
fn route_event(session_id: &str, column: &str) -> Vec<String> {
match column {
"playerStateChange" => vec![
format!("{session_id}-has-existed-4"),
format!("{session_id}-latest-event-to-state-8"),
],
"userAction" => vec![
format!("{session_id}-has-existed-within-6"),
],
_ => vec![], // unknown column, ignore
}
}Timeline: 8 PM Friday
Afsal starts "The Mandalorian" John starts "Moana"
session_id = "afsal-mando-1" session_id = "john-moana-1"
│ │
▼ ▼
┌─── Pulsar Topic: player-events ─────────────────────────────────┐
│ {sid:"afsal-mando-1", time:1, playerStateChange:"play"} │
│ {sid:"john-moana-1", time:1, playerStateChange:"play"} │
│ {sid:"afsal-mando-1", time:5, playerStateChange:"buffer"} │
│ {sid:"john-moana-1", time:3, userAction:"seek"} │
│ {sid:"afsal-mando-1", time:7, userAction:"seek"} │
│ ... │
└──────────────────────────┬──────────────────────────────────────┘
│
┌──────▼───────┐
│ Feeder │
└──────┬───────┘
│
┌──────────────┼──────────────┐
▼ ▼
Afsal's agent tree: John's agent tree:
afsal-mando-1-duration-where-1 (root) john-moana-1-duration-where-1 (root)
afsal-mando-1-and-2 john-moana-1-and-2
... ...
afsal-mando-1-latest-event-to-state-8 john-moana-1-latest-event-to-state-8
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ aggregator-cdn-a │ │ aggregator-cdn-b │
│ (Akamai) │ │ (Cloudfront) │
└───────────────────┘ └───────────────────┘
At any given instant, most of these agents are suspended (not in memory).
Only the agents currently processing a push notification are active. When Afsal's
playerStateChange:"buffer" event arrives:
- Feeder calls
add_eventonafsal-mando-1-has-existed-4andafsal-mando-1-latest-event-to-state-8 has-existed-4wakes (~1ms), evaluates "has play existed?" → yes, pushestruetoand-3, suspendsand-3wakes (~1ms), evaluatesAnd(true, ...)→ pushes toand-2, suspends- ... cascade continues to
duration-where-1(TlDurationWhere) → pushes delta toaggregator-cdn-a - Meanwhile,
latest-event-to-state-8wakes (~1ms), records latest state as"buffer", pushes toequal-to-7, suspends equal-to-7evaluatesEqualTo("buffer", "buffer")→true, pushes toand-2- All agents suspend. Total wall time: ~5–10ms. Total agents in memory during this: ~5
All of John's agents remain completely suspended during this — zero cost.
Per-session query — "What is Afsal's CIRR duration right now?"
golem agent invoke \
'timeline-processor("afsal-mando-1-duration-where-1")' \
'timeline:core/timeline-processor.{get-derived-result}' \
'100'This is a local point lookup on duration-where-1's precomputed state — no RPC cascade.
Cross-session query — "What is the average CIRR across all Akamai sessions?"
golem agent invoke \
'aggregator("aggregator-cdn-a")' \
'timeline:core/aggregator.{get-aggregation-result}'| Operation | Type | Description |
|---|---|---|
TlLatestEventToState(col) |
Leaf | Track latest event value for a column as state |
TlHasExisted(predicate) |
Leaf | Has the predicate ever been true? (cumulative OR) |
TlHasExistedWithin(predicate, duration) |
Leaf | Has the predicate been true within a time window? |
EqualTo(timeline, value) |
Derived | Is the timeline state equal to a constant? |
GreaterThan(timeline, value) |
Derived | Is the timeline state greater than a constant? |
GreaterThanOrEqual(timeline, value) |
Derived | Is the timeline state ≥ a constant? |
LessThan(timeline, value) |
Derived | Is the timeline state less than a constant? |
LessThanOrEqual(timeline, value) |
Derived | Is the timeline state ≤ a constant? |
And(left, right) |
Derived | Boolean AND of two timelines |
Or(left, right) |
Derived | Boolean OR of two timelines |
Not(timeline) |
Derived | Negate a boolean timeline |
TlDurationWhere(timeline) |
Derived | Cumulative duration where timeline is true. Uses a DurationState — Climbing { base, since } while true, Flat { value } while false — so queries at time t return base + (t − since) without storing per-tick data. |
TlDurationInCurState(timeline) |
Derived | Duration in the current state. Resets to Climbing { base: 0, since: t } on every state change; queries return elapsed time since the last transition. |
The Aggregator agent enables metrics across multiple independent sessions (e.g., "average CIRR
across all sessions on CDN X"). Each session's root TimelineProcessor is wired to a shared
Aggregator agent during initialization.
How it works:
- When the root node's value changes (e.g., CIRR duration goes from 2 → 5), it computes the
delta (5 − 2 = 3) and calls
on_delta(3.0)on the aggregator. - The aggregator maintains only running accumulators (
sum,count), so adding more sessions costs O(1) memory per session — no per-session history is stored in the aggregator. register_sessionis called once per session during initialization to increment the count.- Query
get_aggregation_resultat any time to get{ count, sum, avg, min, max }.
| Aggregation | Description |
|---|---|
Count |
Number of registered sessions |
Sum |
Running sum of all deltas |
Avg |
sum / count |
Min |
Minimum value seen (not yet tracked) |
Max |
Maximum value seen (not yet tracked) |
This section walks through what a production deployment looks like at streaming-platform scale (e.g., Disney+, Netflix), how many agents exist, how many are actually in memory at any instant, and what the Kubernetes deployment looks like.
Consider the full CIRR expression from the examples above:
TlDurationWhere(
And(
And(
TlHasExisted(play),
Not(TlHasExistedWithin(seek, 5))
),
EqualTo(TlLatestEventToState("playerStateChange"), "buffer")
)
)The TimelineDriver walks this tree and spawns one agent per node:
TlDurationWhere ← TimelineProcessor
│
And ← TimelineProcessor
/ \
And EqualTo ← TimelineProcessor × 2
/ \ \
TlHasExisted Not TlLatestEvent ← EventProcessor, TimelineProcessor, EventProcessor
|
TlHasExistedWithin ← EventProcessor| Agent type | Count per session |
|---|---|
| EventProcessor (leaf) | 3 |
| TimelineProcessor (derived) | 5 |
| Total per session | 8 |
Plus 1 shared Aggregator per CDN (not per session).
| Parameter | Estimate |
|---|---|
| Total subscribers | ~150 M |
| Peak concurrent streams | ~10 M |
| Agents per session | 8 |
| Total agents at peak | ~80 M |
| CDNs (aggregator agents) | ~10–50 |
80 million agents sounds enormous — but the critical insight is that Golem suspends idle agents to durable storage. An agent that isn't actively processing is not in memory. It's persisted and can be resumed on demand.
When Afsal presses play, an event hits his session's EventProcessor leaf. The push cascade
wakes agents one at a time up the tree:
t=0ms EventProcessor (leaf) ← wakes, processes event, pushes to parent, suspends
t=10ms TimelineProcessor (And) ← wakes, recomputes, pushes to parent, suspends
t=20ms TimelineProcessor (And) ← wakes, recomputes, pushes to parent, suspends
t=30ms TimelineProcessor (Duration) ← wakes, recomputes, pushes delta to aggregator, suspends
t=40ms Aggregator ← wakes, adds delta to sum, suspends
Each agent invoke takes ~10ms (wake + execute + persist + suspend). At any instant, only the agents currently processing a push notification are in memory. The rest — including all agents for sessions where no events are arriving — are suspended.
Rough estimate of in-memory agents:
| Parameter | Value |
|---|---|
| Events per session per minute | ~2–5 (state changes are sparse) |
| Processing time per agent per event | ~10 ms |
| Agents woken per event (cascade depth) | ~5 (for CIRR) |
| Active time per event | ~50 ms total across the chain |
| Peak events/sec across 10M sessions | ~300K–800K events/sec |
| In-memory agents at any instant | ~15,000–40,000 |
That is: out of 80M total agents, only tens of thousands are in memory at any moment. The rest cost nothing beyond durable storage.
Golem Cloud runs on Kubernetes. The key resources to size are:
These are the pods that execute agent (worker) code. Each pod hosts many agents concurrently.
| Resource | Estimate | Notes |
|---|---|---|
| Memory per active agent | ~1–5 MB | StateDynamicsTimeLine + DurationState + WASM runtime overhead |
| In-memory agents per pod | ~500–2,000 | Depends on pod memory limit |
| Pod memory | 4–8 GB | Standard for worker executor pods |
| Pods needed (steady state) | 8–20 | For ~40,000 concurrently active agents |
| Pods needed (burst/headroom) | 20–50 | For event spikes (e.g., popular show premiere) |
All 80M suspended agents live in Golem's durable persistence layer (e.g., Redis, blob store, or Golem's built-in storage). Each suspended agent is a serialized snapshot:
| Resource | Estimate |
|---|---|
| Serialized size per agent | ~0.5–2 KB (state timeline + config) |
| Total storage at peak | ~40–160 GB |
This is modest — a single cloud storage volume handles it comfortably.
The push cascade means each event triggers a chain of ~5 agent-to-agent calls. At 500K events/sec:
| Parameter | Value |
|---|---|
| Internal RPC calls/sec | ~2.5M |
| Payload per call | ~100–200 bytes (time + EventValue) |
| Bandwidth | ~250–500 MB/s internal |
This is within the capacity of a standard Kubernetes cluster's pod-to-pod network, but worth monitoring. Golem's worker executor pods colocate agents, so many of these calls are in-process and never hit the network.
8 PM Friday — a popular new show drops on Disney+ or Netflix:
- 2M users start streaming within 5 minutes → 2M new sessions → 16M new agents created.
- The
TimelineDriverfor each session spawns 8 agents and wires them. This is a burst of creation, but each driver runs once and suspends. Golem can spread creation across executor pods. - Events start flowing: ~10M events/min across 2M sessions. The push cascade processes each event in ~50ms end-to-end (5 agents × ~10ms each). Around ~50,000 agents are in memory at any instant.
- Each session's root pushes deltas to its CDN's
Aggregator. With 10 CDNs, each aggregator handles ~1M sessions but only processes oneon_deltacall at a time — it's a simplesum += delta, so it never becomes a bottleneck. - An hour later, 1M users stop watching. Their agents remain suspended in storage but cost zero memory and zero compute. They can be resumed if needed for historical queries.
| Challenge | Mitigation |
|---|---|
| Agent creation burst (millions of agents at once) | Golem lazy-creates agents on first invocation. The TimelineDriver itself can be parallelized across sessions. Rate-limit session initialization if needed. |
| Storage growth (80M serialized agents) | Serialized state is small (~1 KB). Implement TTL-based cleanup for completed sessions. Golem's persistence layer supports compaction. |
| Hot aggregator (one aggregator per CDN receiving millions of deltas) | on_delta is O(1) — a single addition. If a single CDN has 5M sessions and each emits ~2 events/min, that's ~170K deltas/sec to one aggregator. May need sharding (e.g., aggregator-cdn-x-shard-0) for extreme cases. |
| Cold-start latency (resuming a suspended agent) | Golem's resume time is ~10ms. For latency-sensitive paths, keep agents warm with periodic heartbeats. |
| Event ordering across leaves | The push-based model processes events per-leaf independently. Two leaves in the same session may receive events at different wall-clock times. The And/Or nodes use time + 1 lookups to see the latest state, which handles this correctly for monotonically increasing timestamps. |
The timeline-core WASM component is deployed once to the Golem cluster. It contains all four
agent types (EventProcessor, TimelineProcessor, TimelineDriver, Aggregator) — these are general-purpose
building blocks, not metric-specific. Every metric expression (CIRR, Time-to-First-Play, engagement
scores, etc.) uses the same deployed component. There is no per-metric deployment.
┌────────────────────────────────────────────────────────────────┐
│ Golem Cluster │
│ │
│ timeline-core (1 component, 1 deployment) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ EventProcessor · TimelineProcessor · Aggregator · Driver│ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Metric: CIRR Metric: Time-to-First-Play │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ 8 agents per session │ │ 2 agents per session │ │
│ │ sess-42, sess-99 ... │ │ sess-42, sess-99 ... │ │
│ └──────────────────────┘ └──────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
| Role | Can do | Cannot do |
|---|---|---|
| Admin | Deploy/update timeline-core to Golem. Manage the cluster, feeders, and Kafka infrastructure. |
— |
| Developer | Write a metric expression in the DSL. Click "Deploy Metric" to register it. View sub-computation results, session lists, and aggregation data. | Deploy or modify the timeline-core component. |
Admins deploy the platform. Developers use it.
When a developer writes a metric and clicks deploy, the system does not deploy a new component. Instead:
-
Registers the metric expression — stores the DSL text and its compiled
TimelineOpGraphplus optionalAggregationConfig(e.g.,group_by(cdn), count, sum, avg). -
Derives the leaf routing table — the system walks the compiled graph to identify which nodes are leaves and what each leaf needs from the event stream. For the CIRR metric:
node-4 tl-has-existed → needs events containing "playerStateChange" node-6 tl-has-existed-within → needs events containing "playerStateChange" node-8 latest-event-to-state → needs events containing "playerStateChange"
This leaf routing table tells the feeder: "for every event in a session, send it to these leaf agents."
-
Feeder picks up the new metric — the feeder (Kafka consumer) must learn about the new metric's leaf routing table so it can start sending events to the right agents. For every incoming event on a session:
- If the feeder hasn't seen this session+metric before, it first invokes
timeline-driver("{session-id}")→initialize-timeline(graph, aggregation)to spawn and wire all agents for that session. - It then sends the event to every leaf agent for that metric:
event-processor("{session-id}-has-existed-4").add-event(...),event-processor("{session-id}-has-existed-within-6").add-event(...),event-processor("{session-id}-latest-event-to-state-8").add-event(...). - The push cascade takes care of the rest — each leaf pushes state changes upward through the derived nodes to the root and aggregator.
- If the feeder hasn't seen this session+metric before, it first invokes
-
Multiple metrics, same event stream — the feeder routes each event to the leaf agents of every registered metric. If CIRR has 3 leaves and Time-to-First-Play has 1 leaf, each event triggers 4 leaf invocations (3 + 1). Adding a new metric adds more leaf invocations per event but does not require redeploying the component.
The timeline project has no knowledge of where feeders run. Feeders are external infrastructure —
standalone binaries (kafka-consumer-feeder) that consume from Kafka and invoke Golem agents.
The deployment workflow must somehow get the leaf routing table to the feeders.
What the feeder currently does:
The feeder takes a hardcoded list of leaf agent names (e.g., sess-1-has-existed-4, sess-1-has-existed-within-6)
and sends every consumed event to every listed leaf. It has no concept of metrics, graphs, or
templates — it's just a static event router.
The driver returns the leaf agent names:
initialize_timeline returns an InitializeResult containing the root agent name and
all leaf agent names. The feeder calls the driver on every event for a session and gets
back exactly which event-processor agents to send events to — no graph walking, no
tracking, no in-memory state. The driver is a durable Golem agent: the first call spawns
and wires all agents; subsequent calls are idempotent and return the same result.
The hard problem — how does a running feeder learn about a new metric?
| Approach | Trade-off |
|---|---|
| Feeder polls a config endpoint | Simple but introduces latency (if polling every 30s, the developer waits 30s). The feeder is a blackbox — no feedback on whether it picked up the metric. |
| Push via a config Kafka topic | The "Deploy Metric" action publishes the graph + leaf table to a dedicated Kafka topic (e.g., timeline-metrics-config). The feeder already knows how to consume Kafka — it consumes from both the events topic and the config topic. Instant pickup, and Kafka provides durability + replay. |
| New feeder per metric | Each deploy spins up a new feeder instance. Simple isolation but expensive — more processes, more Kafka consumers. |
The config Kafka topic approach is likely the best fit: feeders already consume from Kafka,
adding a second topic is minimal work, delivery is instant, and Kafka's log provides an audit
trail of all registered metrics. The feeder consumes from timeline-metrics-config on startup
(replays all registered metrics) and watches for new entries. When a new metric arrives, the
feeder immediately starts routing events to its leaf agents — no polling delay, no blackbox.
A concrete example of the CIRR metric with events arriving from Kafka:
Feeder startup:
1. Consumes CIRR graph + aggregation config from timeline-metrics-config topic
2. Ready to process events
Event stream from Kafka (timeline-events topic):
event1: {session: "sess-42", time: 100, playerStateChange: "play", cdn: "akamai"}
event2: {session: "sess-42", time: 200, playerStateChange: "buffer", cdn: "akamai"}
event3: {session: "sess-99", time: 100, playerStateChange: "init", cdn: "fastly"}
Processing event1 (sess-42):
1. Calls: timeline-driver("sess-42").initialize-timeline(cirr_graph, agg_config)
→ Driver spawns 8 agents, wires parent refs
→ Returns InitializeResult:
root_agent: "sess-42-duration-where-1"
leaf_agents: ["sess-42-has-existed-4", "sess-42-has-existed-within-6", "sess-42-latest-event-to-state-8"]
2. Sends event to each leaf from the result:
event-processor("sess-42-has-existed-4").add-event(...)
event-processor("sess-42-has-existed-within-6").add-event(...)
event-processor("sess-42-latest-event-to-state-8").add-event(...)
Processing event2 (sess-42 again):
1. Calls: timeline-driver("sess-42").initialize-timeline(cirr_graph, agg_config)
→ Driver is durable — agents already exist, idempotent, no re-wiring
→ Returns same InitializeResult:
root_agent: "sess-42-duration-where-1"
leaf_agents: ["sess-42-has-existed-4", "sess-42-has-existed-within-6", "sess-42-latest-event-to-state-8"]
2. Sends event to the 3 leaves
Processing event3 (sess-99):
1. Calls: timeline-driver("sess-99").initialize-timeline(...)
→ New session — spawns 8 fresh agents
→ Returns InitializeResult:
root_agent: "sess-99-duration-where-1"
leaf_agents: ["sess-99-has-existed-4", "sess-99-has-existed-within-6", "sess-99-latest-event-to-state-8"]
2. Sends event to sess-99's leaves
Key point: the feeder calls initialize_timeline on every event. No in-memory state,
no tracking which sessions exist. The driver is a durable Golem worker — first call spawns
agents, subsequent calls are idempotent and return the same InitializeResult. The feeder
gets the leaf agent names directly from the return value.
Golem workers are durable. Calling initialize_timeline on an already-initialized session
simply overwrites operation, children, and parent with the same values (same graph =
same configuration). The accumulated StateDynamicsTimeLine state is untouched — initialize_leaf
and initialize_derived don't clear it. So the feeder can safely call the driver on every
event without tracking which sessions have been initialized.
There is no separate session registry. The feeder discovers session IDs from the event stream (e.g., Kafka message keys). When an event arrives for a session the feeder hasn't seen before, it initializes the agents for that session. Developers look up a specific session ID to inspect its sub-computation results — there is no need to enumerate all sessions (which would be impossible at scale). The developer either knows the session ID they want to debug, or finds it from their own application logs / event stream.
If two developers register the same metric expression (identical DSL text and aggregation config), they share the same computation. The system recognizes the duplicate and points both developers at the same set of agents. No new agents are spawned — both developers see the same sub-computation results, the same sessions, and the same aggregation data.
This is metric-level reuse: identical expression = shared agents. Sub-expression reuse across
different metrics (e.g., CIRR and Time-to-First-Play sharing a has_existed(play) subtree) is
a future design goal documented below.
The TimelineDriver names inner agents using pre-order depth-first numbering:
{session-id}-{operation}-{counter}. Each node is a separate Golem agent with its own agent type.
The developer sees a template of all agents their metric will create per session, with
{session-id} as a placeholder. They pick a real session ID from the list of sessions the
platform has seen to inspect or query.
For the CIRR metric, the full agent template is:
| Golem agent ID | Agent name | Operation | Description |
|---|---|---|---|
timeline-driver("{session-id}") |
{session-id} |
— | Orchestrator (runs once to spawn and wire all agents) |
timeline-processor("{session-id}-duration-where-1") |
{session-id}-duration-where-1 |
duration-where | Root: cumulative CIRR rebuffering seconds |
timeline-processor("{session-id}-and-2") |
{session-id}-and-2 |
and | All 3 conditions combined |
timeline-processor("{session-id}-and-3") |
{session-id}-and-3 |
and | has-existed ∧ ¬has-existed-within |
event-processor("{session-id}-has-existed-4") |
{session-id}-has-existed-4 |
tl-has-existed | Leaf: has play ever occurred? |
timeline-processor("{session-id}-not-5") |
{session-id}-not-5 |
not | ¬has-existed-within(seek, 5) |
event-processor("{session-id}-has-existed-within-6") |
{session-id}-has-existed-within-6 |
tl-has-existed-within | Leaf: was there a recent seek? |
timeline-processor("{session-id}-equal-to-7") |
{session-id}-equal-to-7 |
equal-to("buffer") | Is current state "buffer"? |
event-processor("{session-id}-latest-event-to-state-8") |
{session-id}-latest-event-to-state-8 |
latest-event-to-state | Leaf: latest playerStateChange |
aggregator("aggregator-cdn-{value}") |
aggregator-cdn-{value} |
aggregator | Shared across sessions per CDN value |
For session sess-42 on CDN akamai, the actual agents are:
timeline-driver("sess-42"), timeline-processor("sess-42-duration-where-1"), ...,
event-processor("sess-42-has-existed-4"), ..., aggregator("aggregator-cdn-akamai").
The developer doesn't need to construct these names manually. The dashboard shows:
- The agent template for their metric (the full table above with business descriptions)
- Sub-computation results for any session ID the developer looks up (Computation Progress)
- Aggregation results across sessions (Live Dashboard)
-
Developer writes a metric in the DSL editor:
duration_where( has_existed(playerStateChange == "play") && latest_event_to_state(playerStateChange) == "buffer" ) | aggregate(group_by(cdn), count, sum, avg)
-
Clicks Deploy Metric. The system registers it, names it (e.g.,
buffering-duration), and shows the agent template:duration-where-1 → How long has the condition been true? and-2 → Are both sub-conditions true? has-existed-3 (leaf) → Has the user ever pressed play? equal-to-4 "buffer" → Is the player currently buffering? latest-event-to-state-5 → What is the current player state?
-
Events start flowing. The developer enters a session ID they want to debug (e.g.,
sess-42from their application logs) and a query time. -
Computation Progress shows all sub-computation results for that session at that point in time. Live Dashboard shows aggregation results across all sessions.
-
Another developer registers the exact same DSL expression → they see the same metric, same sessions, same results. No duplicate computation.
Timeline expressions naturally share sub-expressions. Consider a streaming platform that runs two workflows for the same session:
Workflow A (CIRR):
TlDurationWhere(
And(
EqualTo(TlLatestEventToState("playerStateChange"), "buffer"), ← subtree X
TlHasExisted(playerStateChange == "play") ← subtree Y
)
)
Workflow B (Time-to-First-Play):
TlHasExisted(playerStateChange == "play") ← same as Y
Subtree Y is identical in both workflows. Ideally, a single EventProcessor agent
computes Y once, and both Workflow A's And node and Workflow B's root consume its
output. This is compute reuse — the agent DAG shares common sub-expressions instead
of duplicating them.
Without reuse (current): With reuse (future):
Workflow A Workflow B Workflow A Workflow B
┌───────┐ ┌───────┐ ┌───────┐
│ DurWh │ │ │ │ DurWh │
└───┬───┘ │ │ └───┬───┘
┌───┴───┐ │ │ ┌───┴───┐
│ And │ │ │ │ And │
└─┬───┬─┘ │ │ └─┬───┬─┘
│ │ │ │ │ │
┌─┴─┐ ┌┴──┐ ┌──┴──┐ ┌─┴─┐ │
│ X │ │ Y │ │ Y' │ ← duplicate │ X │ │
└───┘ └───┘ └─────┘ └───┘ │
│ ┌───────┐
2 leaves compute Y └────┤ Y ├──── Workflow B root
same events sent twice └───────┘
1 leaf, shared
At Disney+ or Netflix scale with 10M sessions and 3 workflows sharing 2 common sub-expressions, this saves ~20M redundant agents and eliminates duplicate event processing.
Three structural limitations prevent compute reuse in the current implementation:
1. ParentRef is singular
Each agent has at most one parent:
pub struct ParentRef {
pub agent_name: String,
pub child_index: u32,
}If Y pushes to Workflow A's And node, it cannot also push to Workflow B's root.
Sharing requires fan-out to multiple parents.
2. Agent naming is positional, not content-addressed
The TimelineDriver names agents by traversal order:
let agent_name = format!("{}-{}-{}", self.name, operation, counter);Two workflows traversing the same sub-expression Y produce different names
(sess-1-has-existed-4 vs sess-1-has-existed-7). There is no way to detect that they
compute the same thing.
3. The driver always re-initializes
Calling initialize_leaf on an existing agent overwrites its operation and parent ref.
There is no "add another parent" operation.
Current Future
┌──────────────────┐ ┌──────────────────┐
│ ParentRef │ │ Vec<ParentRef> │
│ (single) │ │ (fan-out) │
└──────────────────┘ └──────────────────┘
┌──────────────────┐ ┌─────────────────-─┐
│ "{sid}-node-{N}"│ │ content-addressed│
│ (positional) │ │ naming (hash of │
└──────────────────┘ │ operation + cols)│
└─────────────────-─┘
┌──────────────────┐ ┌──────────────────┐
│ initialize_leaf │ │ add_parent_ref │
│ (overwrite) │ │ (append) │
└──────────────────┘ └──────────────────┘
Step 1: Content-addressed agent names
Name agents by what they compute, not where they appear in the tree:
// Instead of: "{sid}-has-existed-4"
// Use: "{sid}-leaf-{hash(TlHasExisted, playerStateChange, play)}"
fn agent_name(session_id: &str, op: &TimeLineOp) -> String {
let hash = hash_operation(op); // deterministic hash of the subtree
format!("{}-{}", session_id, hash)
}Two workflows requesting TlHasExisted(playerStateChange == "play") for the same
session produce the same agent name → same agent.
Step 2: Fan-out parent refs
// Before:
pub parent: Option<ParentRef>,
// After:
pub parents: Vec<ParentRef>,notify_parent becomes notify_parents — iterates over all registered parents:
async fn notify_parents(parents: &[ParentRef], time: u64, value: EventValue) {
for parent in parents {
let mut client = TimelineProcessorClient::get(parent.agent_name.clone());
client.on_child_state_changed(parent.child_index, time, value.clone()).await;
}
}Step 3: Idempotent initialization with parent accumulation
The driver checks if an agent already exists before creating it:
if agent_exists(&agent_name) {
// Agent already computing this — just add our parent ref
add_parent_ref(&agent_name, new_parent_ref).await;
} else {
// First time — create and initialize
initialize_leaf(&agent_name, operation).await;
set_parent(&agent_name, new_parent_ref).await;
}| Benefit | Cost |
|---|---|
| Fewer agents (saves memory + storage) | Agent naming becomes content-dependent — harder to debug |
| Less duplicate event processing | Fan-out notifications add latency (serial parent pushes) |
| Fewer events routed by feeder | Cleanup requires reference counting (can't delete a shared agent until all workflows are done) |
| Natural DAG structure | The tree becomes a DAG — cycle detection may be needed for safety |
| Metric | Without reuse | With reuse |
|---|---|---|
| Agents per session (3 workflows, 2 shared subtrees) | 24 | 16 |
| Total agents at 10M sessions | 240M | 160M |
| Events processed per input event | 3× (duplicated leaves) | 1× |
| Aggregator delta calls | Same | Same |




