-
Notifications
You must be signed in to change notification settings - Fork 295
Expand file tree
/
Copy pathevent_driven.rs
More file actions
52 lines (42 loc) · 1.64 KB
/
event_driven.rs
File metadata and controls
52 lines (42 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
extern crate timely;
// use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Map, Probe};
fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let timer = std::time::Instant::now();
let mut args = std::env::args();
args.next();
let dataflows = args.next().unwrap().parse::<usize>().unwrap();
let length = args.next().unwrap().parse::<usize>().unwrap();
let record = args.next() == Some("record".to_string());
let mut inputs = Vec::new();
let mut probes = Vec::new();
// create a new input, exchange data, and inspect its output
for _dataflow in 0 .. dataflows {
worker.dataflow(|scope| {
let (input, mut stream) = scope.new_input();
for _step in 0 .. length {
stream = stream.map(|x: ()| x);
}
let probe = stream.probe();
inputs.push(input);
probes.push(probe);
});
}
println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
for round in 0 .. 10 {
let dataflow = round % dataflows;
if record {
inputs[dataflow].send(());
}
inputs[dataflow].advance_to(round);
let mut steps = 0;
while probes[dataflow].less_than(&round) {
worker.step();
steps += 1;
}
println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
}
}).unwrap();
}