Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ rust-version = "1.86"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.20.0" }
timely = { version = "0.27", default-features = false }
#timely = { version = "0.27", default-features = false }
columnar = { version = "0.11", default-features = false }
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[workspace.lints.clippy]
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::InputHandle;
use timely::dataflow::ProbeHandle;

use differential_dataflow::operators::arrange::arrangement::arrange_core;
use differential_dataflow::operators::arrange::arrangement::arrange_inter;

use mimalloc::MiMalloc;

Expand Down Expand Up @@ -39,8 +39,8 @@ fn main() {
let data_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec<u8>>| k.hashed() };
let keys_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec<u8>>| k.hashed() };

let data = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(data, data_pact, "Data");
let keys = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(keys, keys_pact, "Keys");
let data = arrange_inter::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(data, data_pact, "Data");
let keys = arrange_inter::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(keys, keys_pact, "Keys");

keys.join_core(data, |_k, (), ()| { Option::<()>::None })
.probe_with(&mut probe);
Expand Down
60 changes: 60 additions & 0 deletions differential-dataflow/examples/event_driven.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Probe, Enter, Leave};
use timely::dataflow::operators::core::Filter;

fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {

let timer = std::time::Instant::now();

let mut args = std::env::args();
args.next();

let dataflows = args.next().unwrap().parse::<usize>().unwrap();
let length = args.next().unwrap().parse::<usize>().unwrap();
let local = args.next() == Some("local".to_string());
println!("Local: {:?}", local);

let mut inputs = Vec::new();
let mut probes = Vec::new();

// create a new input, exchange data, and inspect its output
for _dataflow in 0 .. dataflows {
worker.dataflow(|scope| {
let (input, stream) = scope.new_input();
let stream = scope.region(|inner| {
let mut stream = stream.enter(inner);
use differential_dataflow::operators::arrange::arrangement::{arrange_intra, arrange_inter};
use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine};
stream = if local { arrange_intra::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner }
else { arrange_inter::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner };
for _step in 0 .. length {
stream = stream.filter(|_| false);
}
stream.leave()
});
let (probe, _stream) = stream.probe();
inputs.push(input);
probes.push(probe);
});
}

println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);

// Repeatedly, insert a record in one dataflow, tick all dataflow inputs.
for round in 0 .. {
let dataflow = round % dataflows;
inputs[dataflow].send(((0i32, 0i32), round, 1));
for d in 0 .. dataflows { inputs[d].advance_to(round); }
let mut steps = 0;
while probes[dataflow].less_than(&round) {
worker.step();
steps += 1;
}

println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
}

}).unwrap();
}
4 changes: 2 additions & 2 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ pub mod vec {
Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(self.inner, exchange, name)
crate::operators::arrange::arrangement::arrange_inter::<_, _, Ba, Bu, _>(self.inner, exchange, name)
}
}

Expand All @@ -1038,7 +1038,7 @@ pub mod vec {
Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
crate::operators::arrange::arrangement::arrange_inter::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
}
}

Expand Down
6 changes: 2 additions & 4 deletions differential-dataflow/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> InputSession<T, D, R> {

/// Introduces a handle as collection.
pub fn to_collection<G: TimelyInput>(&mut self, scope: &mut G) -> VecCollection<G, D, R>

Check warning on line 201 in differential-dataflow/src/input.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

bound is defined in more than one place
where
G: ScopeParent<Timestamp=T>,
{
Expand Down Expand Up @@ -258,7 +258,7 @@
/// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible.
pub fn flush(&mut self) {
self.handle.send_batch(&mut self.buffer);
if self.handle.epoch().less_than(&self.time) {
if self.handle.time().less_than(&self.time) {
self.handle.advance_to(self.time.clone());
}
}
Expand All @@ -269,13 +269,11 @@
/// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while`
/// method unless the session has just been flushed.
pub fn advance_to(&mut self, time: T) {
assert!(self.handle.epoch().less_equal(&time));
assert!(self.handle.time().less_equal(&time));
assert!(&self.time.less_equal(&time));
self.time = time;
}

/// Reveals the current time of the session.
pub fn epoch(&self) -> &T { &self.time }
/// Reveals the current time of the session.
pub fn time(&self) -> &T { &self.time }

Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
//!
//! ```ignore
//! loop {
//! let time = input.epoch();
//! let time = input.time();
//! for round in time .. time + 100 {
//! input.advance_to(round);
//! input.insert((round % 13, round % 7));
Expand Down
Loading
Loading