From 3d5f897d0885ce0e8acdaa0a093d639b2ab04419 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 18 Mar 2026 22:16:10 -0400 Subject: [PATCH 1/3] Live visualizer --- mdbook/src/SUMMARY.md | 1 + mdbook/src/chapter_5/chapter_5_4.md | 385 ++++--- timely/Cargo.toml | 2 + timely/examples/logging-live.rs | 77 ++ timely/src/lib.rs | 3 + timely/src/visualizer.rs | 232 ++++ visualizer/index.html | 1643 +++++++++++++++++++++++++++ 7 files changed, 2158 insertions(+), 185 deletions(-) create mode 100644 timely/examples/logging-live.rs create mode 100644 timely/src/visualizer.rs create mode 100644 visualizer/index.html diff --git a/mdbook/src/SUMMARY.md b/mdbook/src/SUMMARY.md index 14197b95c..fc0ba42c2 100644 --- a/mdbook/src/SUMMARY.md +++ b/mdbook/src/SUMMARY.md @@ -36,4 +36,5 @@ - [Internals](./chapter_5/chapter_5.md) - [Communication](./chapter_5/chapter_5_1.md) - [Progress Tracking](./chapter_5/chapter_5_2.md) + - [Containers](./chapter_5/chapter_5_3.md) - [Logging](./chapter_5/chapter_5_4.md) diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index 6e8e888ca..b330de194 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -1,172 +1,205 @@ # Logging -Timely dataflow has a built-in logging infrastructure that lets you observe the internal behavior of a running computation. Every worker maintains a `Registry` of named logging streams. You can tap into these streams by registering closures that receive batches of events, and you can also create your own custom logging streams for application-level instrumentation. +Timely dataflow provides a comprehensive logging infrastructure that records structural and runtime events as the dataflow executes. +These events allow you to reconstruct the dataflow graph, understand how data flows across scope boundaries, and profile operator execution. -## Tapping into logging +All events are logged to named log streams, and each event carries a `Duration` timestamp (elapsed time since the worker started). +The primary log stream is `"timely"`, which carries `TimelyEvent` variants. +Additional typed log streams exist for progress, summary, and reachability information. -Each timely worker has a logging registry accessible via `worker.log_register()`. You register a logging callback by calling `insert` on the registry, providing a string name and a closure. The string name identifies which logging stream you want to listen to, and the closure is called with batches of events. +## Structural Events -Here is a minimal example that prints all timely system events: +These events describe the shape of the dataflow graph. They are logged once during construction. -```rust,no_run -use timely::logging::TimelyEventBuilder; +### OperatesEvent -timely::execute_from_args(std::env::args(), |worker| { +Logged when an operator is created within a scope. - // Register a callback for the "timely" logging stream. - worker.log_register().unwrap() - .insert::("timely", |time, data| { - if let Some(data) = data { - for event in data.iter() { - println!("{:?}", event); - } - } - }); +| Field | Type | Description | +|--------|--------------|-------------| +| `id` | `usize` | Worker-unique identifier for the operator, allocated by the worker. | +| `addr` | `Vec` | Hierarchical address: the path from the root scope to this operator. | +| `name` | `String` | Human-readable name (e.g. `"Map"`, `"Feedback"`, `"Subgraph"`). | - worker.dataflow::(|scope| { - // ... build your dataflow here ... - }); +The `addr` field encodes the nesting structure. +For example, an address of `[0, 2, 1]` means: child 0 of the root, then child 2 within that scope, then child 1 within that. +Within any scope, child indices start at 1 for actual operators; index 0 is reserved (see [Scope Boundary Conventions](#scope-boundary-conventions) below). -}).unwrap(); -``` +The `id` field is a flat, worker-unique integer. +It is the key used by all other events (`ScheduleEvent`, `ShutdownEvent`, `MessagesEvent` via channels, etc.) to refer to this operator. +Two different workers will generally assign different `id` values to corresponding operators, but the `addr` will be the same. -The `insert` method is generic over two type parameters: a *container builder* type that describes how events are batched, and the closure type. Each built-in logging stream has a corresponding builder type alias (like `TimelyEventBuilder`) that you should use. +### ChannelsEvent -The closure signature is `FnMut(&Duration, &mut Option)`: +Logged when a data channel is created between two operators (or between an operator and a scope boundary). -- The `&Duration` is the elapsed time since worker startup. -- `&mut Option` is `Some(container)` when delivering a batch of events, or `None` to signal a flush (e.g., when the worker is about to park or the stream is closing). - -Each event in the container is a tuple `(Duration, Event)` where the `Duration` is the timestamp at which the event was logged. +| Field | Type | Description | +|--------------|------------------|-------------| +| `id` | `usize` | Worker-unique channel identifier. | +| `scope_addr` | `Vec` | Address of the scope that *contains* this channel. | +| `source` | `(usize, usize)` | `(operator_index, output_port)` of the source within the containing scope. | +| `target` | `(usize, usize)` | `(operator_index, input_port)` of the target within the containing scope. | +| `typ` | `String` | The container type transported on this channel, as a string. | -## Retrieving loggers for custom events +The `source` and `target` tuples use **scope-local** operator indices (not the worker-unique `id` from `OperatesEvent`). +To resolve them, find the `OperatesEvent` whose `addr` equals `scope_addr` with the operator index appended. +For example, if `scope_addr` is `[0, 2]` and `source` is `(3, 0)`, the source operator has address `[0, 2, 3]` and you want output port 0. -You can also create your own logging streams. Register a stream with `insert`, then retrieve a `Logger` handle with `get` to log events from within your dataflow: - -```rust,no_run -use std::time::Duration; -use timely::container::CapacityContainerBuilder; +When either the source or target operator index is 0, the channel crosses a scope boundary. See [Scope Boundary Conventions](#scope-boundary-conventions). -timely::execute_from_args(std::env::args(), |worker| { +### CommChannelsEvent - // Define a container builder for your event type. - type MyBuilder = CapacityContainerBuilder>; +Logged when a communication channel (for inter-worker exchange) is established. - // Register the logging stream with a callback. - worker.log_register().unwrap() - .insert::("my-app/events", |_time, data| { - if let Some(data) = data { - for (ts, msg) in data.iter() { - println!("[{:?}] {}", ts, msg); - } - } - }); +| Field | Type | Description | +|--------------|-------------------|-------------| +| `identifier` | `usize` | Communication channel identifier. | +| `kind` | `CommChannelKind` | Either `Progress` or `Data`. | - // Retrieve a Logger handle to use later. - let my_logger = worker.log_register().unwrap() - .get::("my-app/events") - .expect("Logger was just registered"); +## Runtime Events - worker.dataflow::(|scope| { - // ... build your dataflow ... - }); +These events describe what happens as the dataflow executes. - // Log events from your application logic. - my_logger.log("something happened".to_string()); +### ScheduleEvent -}).unwrap(); -``` +Logged when an operator begins or finishes a scheduling invocation. -The `Logger` buffers events internally and flushes them to the registered closure when the buffer reaches capacity, when `flush()` is called explicitly, or when the logger is dropped. +| Field | Type | Description | +|--------------|-------------|-------------| +| `id` | `usize` | Worker-unique operator identifier (same as `OperatesEvent::id`). | +| `start_stop` | `StartStop` | `Start` when the operator begins executing, `Stop` when it returns. | -## Logging stream names +A matched pair of `Start` and `Stop` events brackets one invocation of the operator's `schedule()` method. +These pairs let you measure per-operator execution time. -Timely uses string names to identify logging streams. The built-in stream names are: +### MessagesEvent -| Stream name | Builder type | Event type | Description | -|---|---|---|---| -| `"timely"` | `TimelyEventBuilder` | `TimelyEvent` | Core system events: operator lifecycle, scheduling, messages, channels | -| `"timely/progress/{T}"` | `TimelyProgressEventBuilder` | `TimelyProgressEvent` | Progress protocol messages between operators | -| `"timely/summary/{T}"` | `TimelySummaryEventBuilder` | `OperatesSummaryEvent` | Operator connectivity summaries | -| `"timely/reachability/{T}"` | `TrackerEventBuilder` | `TrackerEvent` | Reachability tracker updates | +Logged when a batch of data is sent or received on a channel. -The `{T}` in the progress, summary, and reachability stream names is the Rust type name of the dataflow's timestamp, obtained from `std::any::type_name::()` (e.g., `"timely/progress/usize"` for a dataflow using `usize` timestamps). This is because these events are generic over the timestamp type. Note that `type_name` is best-effort and not guaranteed to be stable across compiler versions, so these stream names should be treated accordingly. +| Field | Type | Description | +|----------------|---------|-------------| +| `is_send` | `bool` | `true` for a send, `false` for a receive. | +| `channel` | `usize` | Channel identifier (same as `ChannelsEvent::id`). | +| `source` | `usize` | Source worker index. | +| `target` | `usize` | Target worker index. | +| `seq_no` | `usize` | Sequence number for this (source, target) pair on this channel. | +| `record_count` | `i64` | Number of records in the batch. | -You can register a callback for these at any point before or after building a dataflow. If you register a callback for a stream name that is already in use, the new callback takes effect for subsequently created loggers but existing loggers continue to use the old callback. +For channels that stay within a single worker, `source` and `target` will be the same worker index. +For exchange (inter-worker) channels, they may differ. +The `record_count` comes from the container's `Accountable` trait implementation (e.g. `Vec::len()` cast to `i64`). -## The `TimelyEvent` variants +### ShutdownEvent -The `TimelyEvent` enum is the primary event type for the `"timely"` logging stream. Its variants capture the lifecycle and activity of a timely computation: +Logged when an operator is permanently shut down. -### `Operates(OperatesEvent)` +| Field | Type | Description | +|-------|---------|-------------| +| `id` | `usize` | Worker-unique operator identifier. | -Logged when an operator is created. Contains the operator's worker-unique `id`, its hierarchical `addr` (the address of the scope containing the operator, matching the convention used by `ChannelsEvent`), and its `name`. The full address of the operator itself can be reconstructed as `addr ++ [id]`. +### PushProgressEvent -### `Channels(ChannelsEvent)` +Logged when frontier changes are pushed to an operator. -Logged when a channel is created between operators. Contains the channel `id`, the `scope_addr` of the containing scope (not including the channel's own id), `source` and `target` descriptors (each an `(operator_index, port)` pair), and a `typ` string naming the data type carried on the channel. +| Field | Type | Description | +|---------|---------|-------------| +| `op_id` | `usize` | Worker-unique operator identifier. | -### `Schedule(ScheduleEvent)` +### ParkEvent -Logged when an operator starts or stops executing. Contains the operator `id` and a `start_stop` field that is either `StartStop::Start` or `StartStop::Stop`. These events bracket each invocation of an operator's logic, and can be used to measure how much time each operator consumes. +Logged when a worker parks (goes idle waiting for external events) or wakes up. -### `Messages(MessagesEvent)` +| Variant | Description | +|--------------------|-------------| +| `Park(Option)` | Worker parks, with an optional maximum sleep duration. | +| `Unpark` | Worker wakes from a parked state. | -Logged when a batch of messages is sent or received on a channel. Contains `is_send` (true for send, false for receive), the `channel` identifier, `source` and `target` worker indices, a `seq_no` sequence number, and the `record_count` of records in the batch. +### Text(String) -### `PushProgress(PushProgressEvent)` +An unstructured text event for ad-hoc logging. -Logged when external progress updates are pushed onto an operator. Contains the `op_id` of the operator receiving the update. +## Scope Boundary Conventions -### `Shutdown(ShutdownEvent)` +Understanding scope boundaries is essential for interpreting `ChannelsEvent` data and reconstructing the full dataflow graph across nested scopes. -Logged when an operator is shut down. Contains the operator `id`. +### Child Zero -### `Park(ParkEvent)` +By convention, **child index 0** within any scope is a pseudo-operator representing the scope's own boundary — its interface with its parent. +It is not a real operator; you will not see an `OperatesEvent` for child zero. +Instead, child zero is the mechanism by which channels inside a scope connect to channels outside. -Logged when a worker parks (goes idle waiting for work) or unparks. `ParkEvent::Park(Some(duration))` indicates a timed park, `ParkEvent::Park(None)` an indefinite park, and `ParkEvent::Unpark` the resumption of work. +Child zero's ports are **inverted** relative to the scope's external interface: -### `CommChannels(CommChannelsEvent)` +- **Child zero's outputs** are the scope's **inputs** (data arriving from the parent). +- **Child zero's inputs** are the scope's **outputs** (data leaving to the parent). -Logged when a communication channel is established. Contains an `identifier` and a `kind` that is either `CommChannelKind::Progress` or `CommChannelKind::Data`. +This inversion makes the internal wiring uniform: every channel inside a scope connects an operator output to an operator input, even when one end is the scope boundary. -### `Text(String)` +### Connecting Parent and Child -An unstructured text event for ad-hoc logging. +When a scope (say, an iterative scope) appears as operator `K` in its parent, and you look inside that scope, the relationship is: -## Connecting operators and channels +| Parent perspective | Child perspective | +|--------------------|-------------------| +| Operator `K`, input port `i` | Child zero, output port `i` | +| Operator `K`, output port `j` | Child zero, input port `j` | -The `OperatesEvent` and `ChannelsEvent` logs together describe the structure of a dataflow graph. To make sense of them, it helps to understand how addresses, operator indices, and ports relate to each other. +So if you see a `ChannelsEvent` in the parent scope with `target: (K, i)`, the data enters the child scope and appears as if it came from child zero's output port `i`. +Inside the child scope, a `ChannelsEvent` with `source: (0, i)` connects that incoming data to whatever internal operator consumes it. -### From a channel to its operators +Similarly, data produced inside the child scope that should leave the scope is connected via a `ChannelsEvent` with `target: (0, j)` inside the child scope, and emerges as output port `j` of operator `K` in the parent scope. -Both `OperatesEvent` and `ChannelsEvent` use the same convention: their address field (`addr` and `scope_addr`, respectively) is the address of the *containing scope*, not the entity itself. For an operator, the full address can be reconstructed as `addr ++ [id]`. For a channel, the `scope_addr` tells you which scope the channel lives in, and the `source`/`target` descriptors of the form `(operator_index, port)` identify operators within that scope. +### Worked Example -To find the full address of a channel's source or target operator, concatenate the channel's `scope_addr` with the `operator_index`: +Consider a dataflow with an iterative scope: ```text -operator address = scope_addr ++ [operator_index] +worker.dataflow(|scope| { // root scope, addr [0] + input // operator at [0, 1] + .enter(scope.iterative(|inner| { // iterative scope at [0, 2] + inner + .map(...) // operator at [0, 2, 1] + .filter(...) // operator at [0, 2, 2] + })) + .inspect(...) // operator at [0, 3] +}); ``` -For example, if a channel has `scope_addr: [0]`, `source: (3, 0)`, and `target: (5, 1)`, then: -- The source operator has address `[0, 3]`, output port `0`. -- The target operator has address `[0, 5]`, input port `1`. +You would see structural events like: + +1. `OperatesEvent { id: _, addr: [0], name: "Dataflow" }` — the root scope itself. +2. `OperatesEvent { id: _, addr: [0, 1], name: "Input" }` — the input operator. +3. `OperatesEvent { id: _, addr: [0, 2], name: "Iterative" }` — the iterative scope (appears as an operator in the root). +4. `OperatesEvent { id: _, addr: [0, 2, 1], name: "Map" }` — the map, inside the iterative scope. +5. `OperatesEvent { id: _, addr: [0, 2, 2], name: "Filter" }` — the filter, inside the iterative scope. +6. `OperatesEvent { id: _, addr: [0, 3], name: "Inspect" }` — the inspect, in the root scope. + +Channel events in the root scope (`scope_addr: [0]`) connecting `Input` to the iterative scope: +- `ChannelsEvent { scope_addr: [0], source: (1, 0), target: (2, 0), ... }` — from Input's output 0 to the iterative scope's input 0. + +Channel events inside the iterative scope (`scope_addr: [0, 2]`): +- `ChannelsEvent { scope_addr: [0, 2], source: (0, 0), target: (1, 0), ... }` — from child zero's output 0 (= scope input 0) to Map's input 0. +- `ChannelsEvent { scope_addr: [0, 2], source: (1, 0), target: (2, 0), ... }` — from Map's output 0 to Filter's input 0. +- `ChannelsEvent { scope_addr: [0, 2], source: (2, 0), target: (0, 0), ... }` — from Filter's output 0 to child zero's input 0 (= scope output 0). -These operators will have `addr: [0]` in their `OperatesEvent` logs (the containing scope), with `id` values corresponding to the operator indices `3` and `5`. +And back in the root scope: +- `ChannelsEvent { scope_addr: [0], source: (2, 0), target: (3, 0), ... }` — from the iterative scope's output 0 to Inspect's input 0. -### Child 0: the scope boundary +This chain shows data flowing: Input → [into scope via child zero] → Map → Filter → [out of scope via child zero] → Inspect. -Every subgraph has a special "child 0" that represents the boundary between the subgraph and its surrounding scope. Child 0 is not a real operator; it is a stand-in for the scope that contains the subgraph. You will not see an `OperatesEvent` for child 0, but you will see channels that connect to it. +### Reconstructing the Full Graph -- **Child 0's outputs** correspond to data *entering* the subgraph. Output port `k` of child 0 is the `k`-th input of the subgraph operator as seen from the outer scope. +To reconstruct the dataflow graph from logged events: -- **Child 0's inputs** correspond to data *leaving* the subgraph. Input port `k` of child 0 is the `k`-th output of the subgraph operator as seen from the outer scope. +1. **Build the operator tree** from `OperatesEvent` entries, using `addr` to establish parent-child relationships. Any operator whose `addr` has length `n` is a child of the operator (scope) whose `addr` is the first `n-1` elements. -So a channel with `target: (0, 2)` means "data leaves this subgraph and emerges at output port 2 of the operator that hosts this subgraph, in the outer scope." A channel with `source: (0, 1)` means "data arrives from input port 1 of the hosting operator, entering the subgraph." +2. **Build per-scope channel graphs** from `ChannelsEvent` entries. Group channels by `scope_addr`. Within each scope, the `source` and `target` pairs give you directed edges between scope-local operator indices. -This convention lets you trace data flow across scope boundaries: follow a channel to child 0, then find the corresponding port on the subgraph operator in the parent scope. +3. **Stitch across scope boundaries** using child zero. When a channel in scope `S` has source or target operator index 0, it connects to the scope's external interface. Find the operator in `S`'s parent whose `addr` equals `S`, and link the corresponding port. -### Operator summaries: internal connectivity +4. **Correlate runtime events** using the worker-unique `id` from `OperatesEvent` to join `ScheduleEvent`, `ShutdownEvent`, and other events. Use `ChannelsEvent::id` to join `MessagesEvent` records to their channel. + +### Operator Summaries: Internal Connectivity While `ChannelsEvent` logs describe the *external* wiring between operators, an `OperatesSummaryEvent` describes an operator's *internal* topology: which of its inputs can result in data at which of its outputs, and what transformation is applied to timestamps along the way. This is the information reported by each operator during initialization, and it is what the progress tracking protocol uses to reason about which timestamps may still appear at downstream operators. @@ -176,120 +209,102 @@ For example, a `map` operator has one input and one output with an identity summ This information is essential for understanding the progress guarantees of a dataflow. If you are trying to understand why a particular timestamp is not yet "complete" at some point in the graph, the operator summaries tell you which paths could still produce data at that timestamp. -## Other event types - -### `TimelyProgressEvent` - -Events on the `"timely/progress/{T}"` streams capture the exchange of progress information between operators. Each event records whether it is a send or receive (`is_send`), the `source` worker, the `channel` and `seq_no`, the `identifier` of the operator, and two lists of updates: `messages` (updates to message counts at targets) and `internal` (updates to capabilities at sources). Each update is a tuple `(node, port, timestamp, delta)`. +## Additional Log Streams -These events are primarily useful for debugging the progress tracking protocol or understanding how capabilities flow through a dataflow. +Beyond the main `"timely"` stream, there are typed log streams for deeper introspection: -### `TrackerEvent` +| Stream name | Builder type | Event type | Description | +|---|---|---|---| +| `"timely"` | `TimelyEventBuilder` | `TimelyEvent` | Core system events: operator lifecycle, scheduling, messages, channels | +| `"timely/progress/"` | `TimelyProgressEventBuilder` | `TimelyProgressEvent` | Progress protocol messages between operators | +| `"timely/summary/"` | `TimelySummaryEventBuilder` | `OperatesSummaryEvent` | Operator connectivity summaries (see above) | +| `"timely/reachability/"` | `TrackerEventBuilder` | `TrackerEvent` | Reachability tracker updates | -Events on the `"timely/reachability/{T}"` streams record updates to the reachability tracker, which maintains the set of timestamps that could still arrive at each operator input. The variants are `SourceUpdate` and `TargetUpdate`, each carrying the node, port, timestamp, and delta of the update. +The `` in the stream names is the Rust type name of the dataflow's timestamp, obtained from `std::any::type_name::()` (e.g., `"timely/progress/usize"` for a dataflow using `usize` timestamps). Note that `type_name` is best-effort and not guaranteed to be stable across compiler versions, so these stream names should be treated accordingly. -### `OperatesSummaryEvent` +**`TimelyProgressEvent`** captures the exchange of progress information between operators. Each event records whether it is a send or receive (`is_send`), the `source` worker, the `channel` and `seq_no`, the `identifier` of the operator, and two lists of updates: `messages` (updates to message counts at targets) and `internal` (updates to capabilities at sources). Each update is a tuple `(node, port, timestamp, delta)`. These are primarily useful for debugging the progress tracking protocol. -Events on the `"timely/summary/{T}"` streams record the internal connectivity summary of each operator. See [Operator summaries: internal connectivity](#operator-summaries-internal-connectivity) above for a detailed explanation. +**`TrackerEvent`** records updates to the reachability tracker, which maintains the set of timestamps that could still arrive at each operator input. The variants are `SourceUpdate` and `TargetUpdate`, each carrying the node, port, timestamp, and delta of the update. -## Example: logging multiple streams +## Registering a Logger -The following example registers callbacks for all four built-in logging streams and also creates a custom application-level logger: +To consume logging events, register a callback with the worker's log registry. The `insert` method takes a string name and a closure: ```rust,no_run -use std::time::Duration; -use timely::logging::{ - TimelyEventBuilder, TimelyProgressEventBuilder, TimelySummaryEventBuilder, -}; -use timely::container::CapacityContainerBuilder; -use timely::progress::reachability::logging::TrackerEventBuilder; +use timely::logging::TimelyEventBuilder; timely::execute_from_args(std::env::args(), |worker| { - // Core timely events. worker.log_register().unwrap() - .insert::("timely", |_time, data| { + .insert::("timely", |time, data| { if let Some(data) = data { - for event in data.iter() { - println!("TIMELY: {:?}", event); + for (elapsed, event) in data.iter() { + println!("{elapsed:?}\t{event:?}"); } } }); - // Progress tracking events (for usize timestamps). - worker.log_register().unwrap() - .insert::, _>( - "timely/progress/usize", |_time, data| { - if let Some(data) = data { - for event in data.iter() { - println!("PROGRESS: {:?}", event); - } - } - } - ); + worker.dataflow::(|scope| { + // ... build your dataflow ... + }); - // Reachability tracker events. - worker.log_register().unwrap() - .insert::, _>( - "timely/reachability/usize", |_time, data| { - if let Some(data) = data { - for event in data.iter() { - println!("REACHABILITY: {:?}", event); - } - } - } - ); +}).unwrap(); +``` - // Operator summary events. - worker.log_register().unwrap() - .insert::, _>( - "timely/summary/usize", |_time, data| { - if let Some(data) = data { - for (_, event) in data.iter() { - println!("SUMMARY: {:?}", event); - } - } - } - ); +The `insert` method is generic over two type parameters: a *container builder* type that describes how events are batched, and the closure type. Each built-in logging stream has a corresponding builder type alias (like `TimelyEventBuilder`) that you should use. + +The closure signature is `FnMut(&Duration, &mut Option)`: + +- The `&Duration` is the elapsed time since worker startup. +- `&mut Option` is `Some(container)` when delivering a batch of events, or `None` to signal a flush (e.g., when the worker is about to park or the stream is closing). + +Each event in the container is a tuple `(Duration, Event)` where the `Duration` is the timestamp at which the event was logged. - // Custom application-level logger. - type RoundBuilder = CapacityContainerBuilder>; +You can register a callback for a stream at any point before or after building a dataflow. If you register a callback for a stream name that is already in use, the new callback takes effect for subsequently created loggers but existing loggers continue to use the old callback. + +## Custom Logging Streams + +You can create your own logging streams. Register a stream with `insert`, then retrieve a `Logger` handle with `get` to log events from your application code: + +```rust,no_run +use std::time::Duration; +use timely::container::CapacityContainerBuilder; + +timely::execute_from_args(std::env::args(), |worker| { + + // Define a container builder for your event type. + type MyBuilder = CapacityContainerBuilder>; + + // Register the logging stream with a callback. worker.log_register().unwrap() - .insert::("my-app/rounds", |_time, data| { + .insert::("my-app/events", |_time, data| { if let Some(data) = data { - for (ts, round) in data.iter() { - println!("[{:?}] completed round {}", ts, round); + for (ts, msg) in data.iter() { + println!("[{:?}] {}", ts, msg); } } }); - let round_logger = worker.log_register().unwrap() - .get::("my-app/rounds") - .expect("Round logger absent"); + // Retrieve a Logger handle to use later. + let my_logger = worker.log_register().unwrap() + .get::("my-app/events") + .expect("Logger was just registered"); worker.dataflow::(|scope| { // ... build your dataflow ... }); - for round in 0..10 { - // ... do work ... - round_logger.log(round); - } + // Log events from your application logic. + my_logger.log("something happened".to_string()); }).unwrap(); ``` -## The `BatchLogger` adapter - -If you want to feed logging events into a timely dataflow stream (for example, to analyze logs in real time or write them to durable storage), the `BatchLogger` struct bridges the two. It wraps an `EventPusher` and converts logging callbacks into a stream of timely `Event`s with progress information: - -```rust,no_run -use timely::logging::BatchLogger; -``` +The `Logger` buffers events internally and flushes them to the registered closure when the buffer reaches capacity, when `flush()` is called explicitly, or when the logger is dropped. -`BatchLogger::publish_batch` is called with each `(&Duration, &mut Option)` from the logging closure, and it translates these into `Event::Messages` and `Event::Progress` updates suitable for consumption by `capture` and `replay` infrastructure. +You can also use `BatchLogger` to forward events into a timely capture stream for downstream processing via the `capture` and `replay` infrastructure. -## Communication thread logging +## Communication Thread Logging The logging described above all runs on worker threads and is accessed through the worker's `Registry`. In multi-process (cluster) deployments, timely also runs dedicated send and receive threads for TCP networking. These threads have their own logging, configured separately. @@ -297,10 +312,10 @@ Communication logging is configured via the `log_fn` field in `Config::Cluster`. The `CommunicationEvent` enum has three variants: -- **`Setup(CommunicationSetup)`**: Identifies the thread, recording whether it is a `sender` or receiver, the local `process` id, and the `remote` process id (if any). - -- **`State(StateEvent)`**: Logged when a communication thread starts or stops. Contains `send` (whether this is a send thread), `process` and `remote` ids, and `start` (true when starting, false when stopping). - -- **`Message(MessageEvent)`**: Logged for each message sent or received over the network. Contains `is_send` and the message `header` (which includes the channel, source, target, and length). +| Variant | Description | +|---------|-------------| +| `Setup(CommunicationSetup)` | Identifies the thread: whether it is a `sender` or receiver, the local `process` id, and the `remote` process id. | +| `State(StateEvent)` | Thread start or stop. Contains `send` (is this a send thread), `process` and `remote` ids, and `start` (true when starting, false when stopping). | +| `Message(MessageEvent)` | Message send or receive. Contains `is_send` and the message `header` (which includes channel, source, target, and length). | These events are only relevant when running across multiple processes. For single-process or single-thread configurations, no communication threads are created and these events will not appear. diff --git a/timely/Cargo.toml b/timely/Cargo.toml index f45a04b40..e72c05cc2 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -21,6 +21,7 @@ workspace = true [features] default = ["getopts"] getopts = ["dep:getopts", "timely_communication/getopts"] +visualizer = ["dep:tungstenite"] [dependencies] columnar = { workspace = true } @@ -35,3 +36,4 @@ timely_logging = { path = "../logging", version = "0.27" } timely_communication = { path = "../communication", version = "0.27", default-features = false } timely_container = { path = "../container", version = "0.27" } smallvec = { version = "1.15.1", features = ["serde", "const_generics"] } +tungstenite = { version = "0.26", optional = true } diff --git a/timely/examples/logging-live.rs b/timely/examples/logging-live.rs new file mode 100644 index 000000000..3c5a397ed --- /dev/null +++ b/timely/examples/logging-live.rs @@ -0,0 +1,77 @@ +//! Streams timely events over WebSocket for live visualization. +//! +//! Usage: +//! cargo run --example logging-live --features visualizer [-- timely args] +//! +//! Then open `visualizer/index.html` in a browser and connect to `ws://localhost:51371`. + +use timely::dataflow::operators::{Input, Exchange, Enter, Leave, Inspect, Probe, Feedback, ConnectLoop, Concat}; +use timely::dataflow::operators::vec::{Map, Filter}; +use timely::dataflow::{InputHandle, Scope}; +use timely::visualizer::Server; + +fn main() { + let port = std::env::var("TIMELY_VIS_PORT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(51371); + + let server = Server::start(port); + + timely::execute_from_args(std::env::args(), move |worker| { + + server.register(worker); + + // A richer dataflow to exercise the visualizer: + // + // Input -> Map -> Exchange -> Filter ----> Region[ Inspect ] -> Probe + // \ ^ + // +-> Feedback ----+ + // + // This gives us: multiple pipeline stages, an exchange, branching, + // a nested scope (region), a feedback loop with a back-edge, and + // real data flowing through channels. + let mut input = InputHandle::new(); + let mut probe = timely::dataflow::ProbeHandle::new(); + worker.dataflow(|scope| { + let (handle, cycle) = scope.feedback::>(1); + + let mapped = scope + .input_from(&mut input) + .container::>() + .map(|x: u64| x.wrapping_mul(17).wrapping_add(3)) + .exchange(|&x: &u64| x); + + let filtered = mapped.filter(|&x: &u64| x % 2 == 0); + + let looped = filtered.clone().concat(cycle); + + scope.region(|inner| { + looped + .enter(inner) + .inspect(|_x| { }) + .leave() + }) + .probe_with(&mut probe); + + // Feed back values that haven't reached zero yet. + filtered + .map(|x: u64| x / 4) + .filter(|&x: &u64| x > 0) + .connect_loop(handle); + }); + + // Continuously feed data so events keep flowing. + let mut round = 0u64; + loop { + for i in 0..100u64 { + input.send(round * 100 + i); + } + round += 1; + input.advance_to(round); + while probe.less_than(input.time()) { + worker.step(); + } + } + }).unwrap(); +} diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 39625f7e3..e4de275c4 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -96,6 +96,9 @@ pub mod order; pub mod logging; // pub mod log_events; +#[cfg(feature = "visualizer")] +pub mod visualizer; + pub mod scheduling; /// A composite trait for types usable as containers in timely dataflow. diff --git a/timely/src/visualizer.rs b/timely/src/visualizer.rs new file mode 100644 index 000000000..1979d740a --- /dev/null +++ b/timely/src/visualizer.rs @@ -0,0 +1,232 @@ +//! Live dataflow visualization over WebSocket. +//! +//! This module provides a [`Server`] that streams timely logging events as JSON +//! over WebSocket to the browser-based visualizer (`visualizer/index.html`). +//! +//! # Usage +//! +//! ```ignore +//! use timely::visualizer::Server; +//! +//! let server = Server::start(51371); +//! +//! timely::execute_from_args(std::env::args(), move |worker| { +//! server.register(worker); +//! // ... build and run dataflows ... +//! }).unwrap(); +//! ``` +//! +//! Then open `visualizer/index.html` in a browser and connect to `ws://localhost:51371`. +//! +//! Requires the `visualizer` feature flag. + +use std::net::{TcpListener, TcpStream}; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; +use std::time::{Duration, Instant}; + +use tungstenite::{accept, Message}; + +use crate::logging::{TimelyEvent, TimelyEventBuilder, StartStop}; +use crate::worker::Worker; +use crate::communication::Allocate; + +/// A handle to the visualization WebSocket server. +/// +/// The server accepts browser connections and streams timely events as JSON. +/// Structural events (`Operates`, `Channels`) are replayed to late-connecting +/// clients so they can reconstruct the dataflow graph. +/// +/// `Server` is cheaply cloneable — all clones share the same underlying channel +/// and server thread. The server thread runs until all `Server` handles (and +/// their associated senders) are dropped. +#[derive(Clone)] +pub struct Server { + tx: Arc>>, +} + +impl Server { + /// Start the WebSocket visualization server on the given port. + /// + /// Spawns a background thread that accepts WebSocket connections and + /// broadcasts events. The thread exits when all `Server` handles are + /// dropped. + /// + /// # Panics + /// + /// Panics if the port cannot be bound. + pub fn start(port: u16) -> Self { + let (tx, rx) = mpsc::channel::(); + + thread::spawn(move || run_ws_server(port, rx)); + + eprintln!("Visualizer WebSocket server on ws://localhost:{port}"); + eprintln!("Open visualizer/index.html and connect to the address above."); + + Server { + tx: Arc::new(Mutex::new(tx)), + } + } + + /// Register the timely event logger for this worker. + /// + /// This installs a logging callback on the `"timely"` log stream that + /// serializes events as JSON and sends them to the WebSocket server. + pub fn register(&self, worker: &mut Worker) { + let tx = Arc::clone(&self.tx); + let worker_index = worker.index(); + + worker.log_register().unwrap().insert::( + "timely", + move |_time, data| { + if let Some(data) = data { + let tx = tx.lock().unwrap(); + for (elapsed, event) in data.iter() { + let duration_ns = + elapsed.as_secs() as u64 * 1_000_000_000 + + elapsed.subsec_nanos() as u64; + if let Some(json) = event_to_json(worker_index, duration_ns, event) { + let _ = tx.send(json); + } + } + } + }, + ); + } +} + +/// Convert a timely event to a JSON string: `[worker, duration_ns, event]`. +fn event_to_json(worker: usize, duration_ns: u64, event: &TimelyEvent) -> Option { + let event_json = match event { + TimelyEvent::Operates(e) => { + let addr: Vec = e.addr.iter().map(|a| a.to_string()).collect(); + let name = e.name.replace('\\', "\\\\").replace('"', "\\\""); + format!( + r#"{{"Operates": {{"id": {}, "addr": [{}], "name": "{}"}}}}"#, + e.id, addr.join(", "), name, + ) + } + TimelyEvent::Channels(e) => { + let scope_addr: Vec = e.scope_addr.iter().map(|a| a.to_string()).collect(); + let typ = e.typ.replace('\\', "\\\\").replace('"', "\\\""); + format!( + r#"{{"Channels": {{"id": {}, "scope_addr": [{}], "source": [{}, {}], "target": [{}, {}], "typ": "{}"}}}}"#, + e.id, scope_addr.join(", "), + e.source.0, e.source.1, e.target.0, e.target.1, typ, + ) + } + TimelyEvent::Schedule(e) => { + let ss = match e.start_stop { + StartStop::Start => "\"Start\"", + StartStop::Stop => "\"Stop\"", + }; + format!( + r#"{{"Schedule": {{"id": {}, "start_stop": {}}}}}"#, + e.id, ss, + ) + } + TimelyEvent::Messages(e) => { + format!( + r#"{{"Messages": {{"is_send": {}, "channel": {}, "source": {}, "target": {}, "seq_no": {}, "record_count": {}}}}}"#, + e.is_send, e.channel, e.source, e.target, e.seq_no, e.record_count, + ) + } + TimelyEvent::Shutdown(e) => { + format!(r#"{{"Shutdown": {{"id": {}}}}}"#, e.id) + } + _ => return None, + }; + Some(format!("[{worker}, {duration_ns}, {event_json}]")) +} + +const FLUSH_INTERVAL: Duration = Duration::from_millis(250); + +/// Run the WebSocket server. Batches events and replays structural events to +/// late-connecting clients. +fn run_ws_server(port: u16, rx: mpsc::Receiver) { + let listener = TcpListener::bind(format!("0.0.0.0:{port}")) + .unwrap_or_else(|e| panic!("Failed to bind to port {port}: {e}")); + listener.set_nonblocking(true).expect("Cannot set non-blocking"); + + let mut clients: Vec> = Vec::new(); + let mut batch: Vec = Vec::new(); + let mut replay: Vec = Vec::new(); + let mut done = false; + + loop { + let deadline = Instant::now() + FLUSH_INTERVAL; + + // Accept pending connections (non-blocking). + loop { + match listener.accept() { + Ok((stream, addr)) => { + eprintln!("Visualizer client connected from {addr}"); + stream.set_nonblocking(false).ok(); + match accept(stream) { + Ok(mut ws) => { + if !replay.is_empty() { + let payload = format!("[{}]", replay.join(",")); + let msg = Message::Text(payload.into()); + if ws.send(msg).is_err() { + eprintln!("Failed to send replay; dropping client"); + continue; + } + } + clients.push(ws); + } + Err(e) => eprintln!("WebSocket handshake failed: {e}"), + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => { + eprintln!("Accept error: {e}"); + break; + } + } + } + + // Drain the channel until the flush deadline. + loop { + let remaining = deadline.saturating_duration_since(Instant::now()); + match rx.recv_timeout(remaining) { + Ok(json) => { + if json.contains("\"Operates\"") || json.contains("\"Channels\"") { + replay.push(json.clone()); + } + batch.push(json); + } + Err(mpsc::RecvTimeoutError::Timeout) => break, + Err(mpsc::RecvTimeoutError::Disconnected) => { done = true; break; } + } + } + + // Flush batch to all connected clients. + if !batch.is_empty() && !clients.is_empty() { + let payload = format!("[{}]", batch.join(",")); + let msg = Message::Text(payload.into()); + clients.retain_mut(|ws| { + match ws.send(msg.clone()) { + Ok(_) => true, + Err(_) => { + eprintln!("Visualizer client disconnected"); + false + } + } + }); + } + batch.clear(); + + if done { break; } + } + + // Close all clients gracefully. + for ws in clients.iter_mut() { + let _ = ws.close(None); + loop { + match ws.read() { + Ok(_) => continue, + Err(_) => break, + } + } + } +} diff --git a/visualizer/index.html b/visualizer/index.html new file mode 100644 index 000000000..ba9cb7f4c --- /dev/null +++ b/visualizer/index.html @@ -0,0 +1,1643 @@ + + + + + + Timely Dataflow Visualizer + + + + + +
+
+ + + or + + +
+
+ Drop a JSON log file here, or click to browse +
+
+ Live: run cargo run --example logging-live and click Connect.
+ File: a JSON array of [worker, duration_ns, event] triples + (generate with cargo run --example logging-json > events.json). +
+
+ + + +
+ + + + From dd5f439b4c2961e290bf58d807f45faf28c18759 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Mar 2026 12:59:36 -0400 Subject: [PATCH 2/3] Further updates, gate example --- mdbook/src/chapter_5/chapter_5_4.md | 4 ++-- timely/Cargo.toml | 4 ++++ timely/src/logging.rs | 4 ++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index b330de194..fcaa2bf94 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -175,7 +175,7 @@ You would see structural events like: 6. `OperatesEvent { id: _, addr: [0, 3], name: "Inspect" }` — the inspect, in the root scope. Channel events in the root scope (`scope_addr: [0]`) connecting `Input` to the iterative scope: -- `ChannelsEvent { scope_addr: [0], source: (1, 0), target: (2, 0), ... }` — from Input's output 0 to the iterative scope's input 0. +- `ChannelsEvent { scope_addr: [0], source: (1, 0), target: (2, 0), ... }` — from Input (index 1) output 0 to the iterative scope (index 2) input 0. Channel events inside the iterative scope (`scope_addr: [0, 2]`): - `ChannelsEvent { scope_addr: [0, 2], source: (0, 0), target: (1, 0), ... }` — from child zero's output 0 (= scope input 0) to Map's input 0. @@ -195,7 +195,7 @@ To reconstruct the dataflow graph from logged events: 2. **Build per-scope channel graphs** from `ChannelsEvent` entries. Group channels by `scope_addr`. Within each scope, the `source` and `target` pairs give you directed edges between scope-local operator indices. -3. **Stitch across scope boundaries** using child zero. When a channel in scope `S` has source or target operator index 0, it connects to the scope's external interface. Find the operator in `S`'s parent whose `addr` equals `S`, and link the corresponding port. +3. **Stitch across scope boundaries** using child zero. When a channel in scope `S` has source or target operator index 0, it connects to the scope's external interface. Find the operator in `S`'s parent that represents this scope, and link the corresponding port. 4. **Correlate runtime events** using the worker-unique `id` from `OperatesEvent` to join `ScheduleEvent`, `ShutdownEvent`, and other events. Use `ChannelsEvent::id` to join `MessagesEvent` records to their channel. diff --git a/timely/Cargo.toml b/timely/Cargo.toml index e72c05cc2..21868e2c1 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -37,3 +37,7 @@ timely_communication = { path = "../communication", version = "0.27", default-fe timely_container = { path = "../container", version = "0.27" } smallvec = { version = "1.15.1", features = ["serde", "const_generics"] } tungstenite = { version = "0.26", optional = true } + +[[example]] +name = "logging-live" +required-features = ["visualizer"] diff --git a/timely/src/logging.rs b/timely/src/logging.rs index 0b3a7a061..0b2fb499f 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -64,7 +64,7 @@ impl Drop for BatchLogger where P: EventPusher { pub struct OperatesEvent { /// Worker-unique identifier for the operator. pub id: usize, - /// Sequence of nested scope identifiers indicating the path from the root to the scope containing this operator. + /// Sequence of nested scope identifiers indicating the path from the root to this operator. pub addr: Vec, /// A helpful name. pub name: String, @@ -85,7 +85,7 @@ pub struct OperatesSummaryEvent { pub struct ChannelsEvent { /// Worker-unique identifier for the channel pub id: usize, - /// Sequence of nested scope identifiers indicating the path from the root to this instance. + /// Sequence of nested scope identifiers indicating the path from the root to the scope containing this channel. pub scope_addr: Vec, /// Source descriptor, indicating operator index and output port. pub source: (usize, usize), From 6ef68c3576d49d4bab5745c7d42c0ee9ed9e3b21 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Mar 2026 15:38:33 -0400 Subject: [PATCH 3/3] Further noodling --- timely/src/progress/subgraph.rs | 6 +- timely/src/visualizer.rs | 2 +- timely/src/worker.rs | 2 +- visualizer/index.html | 718 +++++++++++++++++++------------- 4 files changed, 427 insertions(+), 301 deletions(-) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 6bd303da0..e11011368 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -136,9 +136,13 @@ where pub fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging); if let Some(l) = &mut self.logging { + let mut child_path = Vec::with_capacity(self.path.len() + 1); + child_path.extend_from_slice(&self.path[..]); + child_path.push(index); + l.log(crate::logging::OperatesEvent { id: identifier, - addr: self.path.to_vec(), + addr: child_path, name: child.name.to_owned(), }); } diff --git a/timely/src/visualizer.rs b/timely/src/visualizer.rs index 1979d740a..a0b6d8391 100644 --- a/timely/src/visualizer.rs +++ b/timely/src/visualizer.rs @@ -139,7 +139,7 @@ fn event_to_json(worker: usize, duration_ns: u64, event: &TimelyEvent) -> Option Some(format!("[{worker}, {duration_ns}, {event_json}]")) } -const FLUSH_INTERVAL: Duration = Duration::from_millis(250); +const FLUSH_INTERVAL: Duration = Duration::from_millis(100); /// Run the WebSocket server. Batches events and replays structural events to /// late-connecting clients. diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 235cae026..70051214a 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -673,7 +673,7 @@ impl Worker
{ if let Some(l) = logging.as_mut() { l.log(crate::logging::OperatesEvent { id: identifier, - addr: vec![], + addr: operator.path().to_vec(), name: operator.name().to_string(), }); l.flush(); diff --git a/visualizer/index.html b/visualizer/index.html index ba9cb7f4c..cd5eb41f5 100644 --- a/visualizer/index.html +++ b/visualizer/index.html @@ -4,6 +4,7 @@ Timely Dataflow Visualizer +