Skip to content

Commit 87a45a9

Browse files
Documentation on execution idioms (#770)
1 parent b7f9dd8 commit 87a45a9

1 file changed

Lines changed: 50 additions & 0 deletions

File tree

mdbook/src/chapter_3/chapter_3_3.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,53 @@ The `worker.step()` call is the heart of data processing in timely dataflow. The
1818
Importantly, this is also where we start moving data around. Until we call `worker.step()` all data are just sitting in queues. The parts of our computation that do clever things like filtering down the data, or projecting out just a few small fields, or pre-aggregating the data before we act on it, these all happen here and don't happen until we call this.
1919

2020
Make sure to call `worker.step()` now and again, like you would your parents.
21+
22+
## Stepping until caught up
23+
24+
The `worker.step()` method returns `true` if any dataflow operator remains incomplete — meaning it could still receive data or produce output. This includes input operators whose handles have not been dropped. A common mistake is to write:
25+
26+
```rust,ignore
27+
while worker.step() {
28+
// wait for the dataflow to finish
29+
}
30+
```
31+
32+
This loop will **never terminate** as long as any input handle is still open, because the input operator is always incomplete while it could still receive data. The system has no way to know you are done sending; from its perspective, another `input.send()` could arrive at any moment.
33+
34+
Instead, use a probe to step until the dataflow has caught up to the input:
35+
36+
```rust
37+
# extern crate timely;
38+
# use timely::dataflow::InputHandle;
39+
# use timely::dataflow::operators::{Input, Inspect, Probe};
40+
# fn main() {
41+
# timely::execute_from_args(std::env::args().take(1), |worker| {
42+
# let mut input = InputHandle::new();
43+
# let probe = worker.dataflow(|scope|
44+
# scope.input_from(&mut input)
45+
# .container::<Vec<_>>()
46+
# .inspect(|x| println!("seen: {:?}", x))
47+
# .probe()
48+
# .0
49+
# );
50+
for round in 0..10 {
51+
input.send(round);
52+
input.advance_to(round + 1);
53+
while probe.less_than(input.time()) {
54+
worker.step();
55+
}
56+
}
57+
# }).unwrap();
58+
# }
59+
```
60+
61+
The probe reports whether there are still timestamps less than the argument that could appear at the probed point in the dataflow. By stepping until `probe.less_than(input.time())` is false, you ensure the dataflow has processed everything up through the current round before moving on.
62+
63+
The `while worker.step()` pattern is only appropriate after all inputs have been closed (by dropping their handles), at which point it correctly runs the dataflow to completion:
64+
65+
```rust,ignore
66+
drop(input);
67+
while worker.step() {
68+
// drain remaining work after closing input
69+
}
70+
```

0 commit comments

Comments
 (0)