Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion mdbook/src/chapter_5/chapter_5_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,48 @@ The `<T>` in the stream names is the Rust type name of the dataflow's timestamp,

**`TimelyProgressEvent<T>`** captures the exchange of progress information between operators. Each event records whether it is a send or receive (`is_send`), the `source` worker, the `channel` and `seq_no`, the `identifier` of the operator, and two lists of updates: `messages` (updates to message counts at targets) and `internal` (updates to capabilities at sources). Each update is a tuple `(node, port, timestamp, delta)`. These are primarily useful for debugging the progress tracking protocol.

**`TrackerEvent<T>`** records updates to the reachability tracker, which maintains the set of timestamps that could still arrive at each operator input. The variants are `SourceUpdate` and `TargetUpdate`, each carrying the node, port, timestamp, and delta of the update.
**`TrackerEvent<T>`** records updates to the reachability tracker, which maintains the set of timestamps that could still arrive at each operator port. Each scope (subgraph) has its own tracker, identified by `tracker_id` — this is the worker-unique `id` of the scope operator (the same `id` from `OperatesEvent`).

The tracker monitors two kinds of locations:

- **Targets** (operator input ports): timestamps of messages that may still arrive.
- **Sources** (operator output ports): timestamps of capabilities that operators still hold.

The `TrackerEvent` enum has two variants:

| Variant | Fields | Description |
|---------|--------|-------------|
| `SourceUpdate` | `tracker_id`, `updates` | Changes to capability counts at operator output ports. |
| `TargetUpdate` | `tracker_id`, `updates` | Changes to message counts at operator input ports. |

Each entry in `updates` is a tuple `(node, port, timestamp, delta)`:

| Field | Type | Description |
|-------|------|-------------|
| `node` | `usize` | Scope-local operator index (same convention as `ChannelsEvent` source/target indices, including 0 for the scope boundary). |
| `port` | `usize` | Port index on that operator. |
| `timestamp` | `T` | The timestamp being updated. |
| `delta` | `i64` | The change in count: positive means a new capability or pending message; negative means one was retired. |

A `SourceUpdate` with positive `delta` means an operator has acquired (or retained) a capability to produce data at that timestamp on that output port. A negative `delta` means it has released one. Similarly, a `TargetUpdate` with positive `delta` means messages at that timestamp may still arrive at that input port; negative means some have been accounted for.

The frontier at any location is the set of timestamps with positive accumulated count. When all counts at a target reach zero for a given timestamp, the operator knows no more messages at that timestamp will arrive — this is the mechanism by which operators learn they can "close" a timestamp and make progress.

### Using Reachability Logging for Debugging

The `TrackerEvent` stream is particularly useful for diagnosing progress-tracking issues — for example, understanding why a dataflow appears stuck or why a particular timestamp hasn't completed.

**Reconstructing capability state.** Since each event carries a `delta`, you can reconstruct the current capability state at any point by accumulating deltas. For each `(tracker_id, node, port, timestamp)`, sum the deltas from all `SourceUpdate` events. A positive sum means the operator currently holds a capability at that timestamp on that port. When the sum reaches zero, the capability has been fully released.

The same applies to `TargetUpdate` events for message counts: a positive accumulated count at a target means messages at that timestamp may still be in flight.

**Identifying a stuck dataflow.** When a dataflow hangs, the accumulated state tells you exactly which operators hold capabilities and at which timestamps. Cross-reference the `tracker_id` with `OperatesEvent` to identify the scope, and the `node` with operator addresses within that scope (recall that `node` is a scope-local index, and the operator's full address is the scope's `addr` with `node` appended).

For example, if accumulated `SourceUpdate` deltas show that node 5 in tracker 7 holds a capability at timestamp `(42, 3)`, and `OperatesEvent` tells you tracker 7 is the scope at address `[0, 2]`, then the operator at address `[0, 2, 5]` holds the capability. Look up its name in the `OperatesEvent` log to identify it.

**Understanding why a frontier hasn't advanced.** The frontier at an operator's input can only advance when all upstream capabilities that could produce data at the current frontier timestamps have been released. The `SourceUpdate` events let you identify which operators still hold such capabilities. Trace the graph (using `ChannelsEvent` and operator summaries from `OperatesSummaryEvent`) from those capabilities forward to the stuck operator's input to understand the dependency chain.

**Matching scopes to log streams.** Each scope has its own tracker and its own log stream. A dataflow using `usize` timestamps with a nested iterative scope would produce two reachability streams: `"timely/reachability/usize"` for the root scope, and something like `"timely/reachability/Product<usize, u64>"` for the iterative scope (where the `u64` is the iteration counter). Register loggers for each stream you want to observe.

## Registering a Logger

Expand Down
Loading