-
Notifications
You must be signed in to change notification settings - Fork 295
Expand file tree
/
Copy pathevent_driven_diamond.rs
More file actions
67 lines (58 loc) · 2.51 KB
/
event_driven_diamond.rs
File metadata and controls
67 lines (58 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use timely::dataflow::operators::{Input, Concat, Probe};
use timely::dataflow::operators::vec::{Map, Filter};
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let timer = std::time::Instant::now();
// Collect positional arguments, skipping flags consumed by timely (-w, -n, -p, -h).
let positional: Vec<String> = {
let mut pos = Vec::new();
let mut args = std::env::args();
args.next(); // skip binary name
while let Some(arg) = args.next() {
if arg.starts_with('-') {
args.next(); // skip flag value
} else {
pos.push(arg);
}
}
pos
};
let dataflows = positional[0].parse::<usize>().unwrap();
let diamonds = positional[1].parse::<usize>().unwrap();
let record = positional.get(2).map(|s| s.as_str()) == Some("record");
let rounds: usize = positional.get(3).map(|s| s.parse().unwrap()).unwrap_or(usize::MAX);
let mut inputs = Vec::new();
let mut probes = Vec::new();
// Each dataflow builds a chain of diamond patterns:
// input -> map (left) + map (right) -> concat -> ... -> probe
// Each diamond has 3 operators (map, map, concat).
// The clone/branch doesn't create an operator — it reuses the stream's Tee.
for _dataflow in 0..dataflows {
worker.dataflow(|scope| {
let (input, mut stream) = scope.new_input();
for _diamond in 0..diamonds {
let left = stream.clone().map(|x: ()| x);
let right = stream.filter(|_| false).map(|x: ()| x);
stream = left.concat(right).container::<Vec<_>>();
}
let (probe, _stream) = stream.probe();
inputs.push(input);
probes.push(probe);
});
}
println!("{:?}\tdataflows built ({} x {} diamonds)", timer.elapsed(), dataflows, diamonds);
for round in 0..rounds {
let dataflow = round % dataflows;
if record {
inputs[dataflow].send(());
}
inputs[dataflow].advance_to(round);
let mut steps = 0;
while probes[dataflow].less_than(&round) {
worker.step();
steps += 1;
}
println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
}
}).unwrap();
}