diff --git a/Cargo.toml b/Cargo.toml index 8995ab151..cbabb3087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,9 +23,9 @@ rust-version = "1.86" [workspace.dependencies] differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.20.0" } -timely = { version = "0.27", default-features = false } +#timely = { version = "0.27", default-features = false } columnar = { version = "0.11", default-features = false } -#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } #timely = { path = "../timely-dataflow/timely/", default-features = false } [workspace.lints.clippy] diff --git a/diagnostics/src/logging.rs b/diagnostics/src/logging.rs index b47721e62..39671f62b 100644 --- a/diagnostics/src/logging.rs +++ b/diagnostics/src/logging.rs @@ -30,7 +30,7 @@ use std::time::{Duration, Instant}; use differential_dataflow::collection::concatenate; use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; use differential_dataflow::{AsCollection, VecCollection}; @@ -166,9 +166,9 @@ pub type DiagnosticEvent = Event; // ============================================================================ /// A key-value trace: key K, value V, time Duration, diff i64. -type ValTrace = TraceAgent>; +type ValTrace = TraceIntra>; /// A key-only trace: key K, time Duration, diff i64. -type KeyTrace = TraceAgent>; +type KeyTrace = TraceIntra>; /// Trace handles for timely logging arrangements. pub struct TimelyTraces { diff --git a/differential-dataflow/examples/arrange.rs b/differential-dataflow/examples/arrange.rs index d3f5abcec..87f73e648 100644 --- a/differential-dataflow/examples/arrange.rs +++ b/differential-dataflow/examples/arrange.rs @@ -87,7 +87,7 @@ fn main() { }) .probe_with(&mut probe) .as_collection() - .arrange_by_key() + .arrange_by_key_inter() // .arrange::() .trace }); diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 9a97922f1..6254802d7 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -4,7 +4,7 @@ use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::InputHandle; use timely::dataflow::ProbeHandle; -use differential_dataflow::operators::arrange::arrangement::arrange_core; +use differential_dataflow::operators::arrange::arrangement::arrange_inter; use mimalloc::MiMalloc; @@ -39,8 +39,8 @@ fn main() { let data_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; let keys_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; - let data = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(data, data_pact, "Data"); - let keys = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(keys, keys_pact, "Keys"); + let data = arrange_inter::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(data, data_pact, "Data"); + let keys = arrange_inter::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(keys, keys_pact, "Keys"); keys.join_core(data, |_k, (), ()| { Option::<()>::None }) .probe_with(&mut probe); diff --git a/differential-dataflow/examples/event_driven.rs b/differential-dataflow/examples/event_driven.rs new file mode 100644 index 000000000..e2ca3fa32 --- /dev/null +++ b/differential-dataflow/examples/event_driven.rs @@ -0,0 +1,60 @@ +use timely::dataflow::Scope; +use timely::dataflow::operators::{Input, Probe, Enter, Leave}; +use timely::dataflow::operators::core::Filter; + +fn main() { + // initializes and runs a timely dataflow. + timely::execute_from_args(std::env::args(), |worker| { + + let timer = std::time::Instant::now(); + + let mut args = std::env::args(); + args.next(); + + let dataflows = args.next().unwrap().parse::().unwrap(); + let length = args.next().unwrap().parse::().unwrap(); + let local = args.next() == Some("local".to_string()); + println!("Local: {:?}", local); + + let mut inputs = Vec::new(); + let mut probes = Vec::new(); + + // create a new input, exchange data, and inspect its output + for _dataflow in 0 .. dataflows { + worker.dataflow(|scope| { + let (input, stream) = scope.new_input(); + let stream = scope.region(|inner| { + let mut stream = stream.enter(inner); + use differential_dataflow::operators::arrange::arrangement::{arrange_intra, arrange_inter}; + use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; + stream = if local { arrange_intra::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner } + else { arrange_inter::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner }; + for _step in 0 .. length { + stream = stream.filter(|_| false); + } + stream.leave() + }); + let (probe, _stream) = stream.probe(); + inputs.push(input); + probes.push(probe); + }); + } + + println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); + + // Repeatedly, insert a record in one dataflow, tick all dataflow inputs. + for round in 0 .. { + let dataflow = round % dataflows; + inputs[dataflow].send(((0i32, 0i32), round, 1)); + for d in 0 .. dataflows { inputs[d].advance_to(round); } + let mut steps = 0; + while probes[dataflow].less_than(&round) { + worker.step(); + steps += 1; + } + + println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); + } + + }).unwrap(); +} diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 79c7fd184..98c5d308b 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -65,10 +65,10 @@ pub struct Query { } use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; -use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use differential_dataflow::operators::arrange::{Arranged, TraceInter}; -type TraceKeyHandle = TraceAgent>; -type TraceValHandle = TraceAgent>; +type TraceKeyHandle = TraceInter>; +type TraceValHandle = TraceInter>; type Arrange = Arranged::Timestamp, R>>; /// An evolving set of edges. @@ -115,14 +115,14 @@ impl> EdgeVariable { /// The collection arranged in the forward direction. pub fn forward(&mut self) -> &Arrange { if self.forward.is_none() { - self.forward = Some(self.collection.clone().arrange_by_key()); + self.forward = Some(self.collection.clone().arrange_by_key_inter()); } self.forward.as_ref().unwrap() } /// The collection arranged in the reverse direction. pub fn reverse(&mut self) -> &Arrange { if self.reverse.is_none() { - self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key()); + self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key_inter()); } self.reverse.as_ref().unwrap() } @@ -171,7 +171,7 @@ impl Query { // create variables and result handles for each named relation. for (name, (input, collection)) in input_map { let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1)); - let trace = edge_variable.collection.clone().leave().arrange_by_self().trace; + let trace = edge_variable.collection.clone().leave().arrange_by_self_inter().trace; result_map.insert(name.clone(), RelationHandles { input, trace }); variable_map.insert(name.clone(), edge_variable); } @@ -200,7 +200,7 @@ impl Query { transposed = transposed .join_core(to_join, |_k,&x,&y| Some((y,x))) - .arrange_by_key(); + .arrange_by_key_inter(); } // Reverse the direction before adding it as a production. diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index c13227f2c..b81a890c2 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -705,7 +705,7 @@ pub mod vec { } use crate::trace::{Trace, Builder}; - use crate::operators::arrange::{Arranged, TraceAgent}; + use crate::operators::arrange::{Arranged, TraceInter, TraceIntra}; impl Collection where @@ -780,7 +780,7 @@ pub mod vec { /// .trace; /// }); /// ``` - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where T2: for<'a> Trace= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, @@ -798,7 +798,7 @@ pub mod vec { /// Unlike `reduce_arranged`, this method may be called with an empty `input`, /// and it may not be safe to index into the first element. /// At least one of the two collections will be non-empty. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where V: Clone+'static, T2: for<'a> Trace=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static, @@ -1016,14 +1016,14 @@ pub mod vec { V: crate::ExchangeData, R: crate::ExchangeData + Semigroup, { - fn arrange_named(self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged> where Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(self.inner, exchange, name) + crate::operators::arrange::arrangement::arrange_intra::<_, _, Ba, Bu, _>(self.inner, exchange, name) } } @@ -1031,14 +1031,14 @@ pub mod vec { where G: Scope, { - fn arrange_named(self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged> where Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) + crate::operators::arrange::arrangement::arrange_intra::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) } } @@ -1052,14 +1052,25 @@ pub mod vec { /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// This trace is current for all times completed by the output stream, which can be used to /// safely identify the stable times and values in the trace. - pub fn arrange_by_key(self) -> Arranged>> { + pub fn arrange_by_key(self) -> Arranged>> { self.arrange_by_key_named("ArrangeByKey") } /// As `arrange_by_key` but with the ability to name the arrangement. - pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } + + /// As `arrange_by_key` but producing a `TraceInter` that can be imported into other dataflows. + pub fn arrange_by_key_inter(self) -> Arranged>> { + self.arrange_by_key_inter_named("ArrangeByKey") + } + + /// As `arrange_by_key_inter` but with the ability to name the arrangement. + pub fn arrange_by_key_inter_named(self, name: &str) -> Arranged>> { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_inter::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(self.inner, exchange, name) + } } impl Collection @@ -1071,15 +1082,26 @@ pub mod vec { /// This operator arranges a collection of records into a shared trace, whose contents it maintains. /// This trace is current for all times complete in the output stream, which can be used to safely /// identify the stable times and values in the trace. - pub fn arrange_by_self(self) -> Arranged>> { + pub fn arrange_by_self(self) -> Arranged>> { self.arrange_by_self_named("ArrangeBySelf") } /// As `arrange_by_self` but with the ability to name the arrangement. - pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { self.map(|k| (k, ())) .arrange_named::,KeyBuilder<_,_,_>,_>(name) } + + /// As `arrange_by_self` but producing a `TraceInter` that can be imported into other dataflows. + pub fn arrange_by_self_inter(self) -> Arranged>> { + self.arrange_by_self_inter_named("ArrangeBySelf") + } + + /// As `arrange_by_self_inter` but with the ability to name the arrangement. + pub fn arrange_by_self_inter_named(self, name: &str) -> Arranged>> { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_inter::<_,_,KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(self.map(|k| (k, ())).inner, exchange, name) + } } impl Collection diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 3fd612107..71b396636 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -258,7 +258,7 @@ impl InputSession { /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible. pub fn flush(&mut self) { self.handle.send_batch(&mut self.buffer); - if self.handle.epoch().less_than(&self.time) { + if self.handle.time().less_than(&self.time) { self.handle.advance_to(self.time.clone()); } } @@ -269,13 +269,11 @@ impl InputSession { /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while` /// method unless the session has just been flushed. pub fn advance_to(&mut self, time: T) { - assert!(self.handle.epoch().less_equal(&time)); + assert!(self.handle.time().less_equal(&time)); assert!(&self.time.less_equal(&time)); self.time = time; } - /// Reveals the current time of the session. - pub fn epoch(&self) -> &T { &self.time } /// Reveals the current time of the session. pub fn time(&self) -> &T { &self.time } diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index 615aa5ded..b5561cfb2 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -53,7 +53,7 @@ //! //! ```ignore //! loop { -//! let time = input.epoch(); +//! let time = input.time(); //! for round in time .. time + 100 { //! input.advance_to(round); //! input.insert((round % 13, round % 7)); diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index e1c2fde5f..540b814d4 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -15,19 +15,20 @@ use crate::trace::wrappers::rc::TraceBox; use timely::scheduling::Activator; -use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged}; +use super::{TraceWriter, TraceInterQueueWriter, TraceInterQueueReader, Arranged}; use super::TraceReplayInstruction; +use super::writer::TraceWriterInner; use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier}; -/// A `TraceReader` wrapper which can be imported into other dataflows. +/// Trace reader that can share a trace within a dataflow. /// -/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted -/// from the dataflow in which it was defined, and imported into other dataflows. -pub struct TraceAgent { +/// Unlike `TraceInter`, this trace reader cannot be shared across +/// dataflows, but in exchange doesn't need to be scheduled as its +/// frontiers advance, in the absence of updates. +pub struct TraceIntra { trace: Rc>>, - queues: Weak>>>, logical_compaction: Antichain, physical_compaction: Antichain, temp_antichain: Antichain, @@ -37,11 +38,11 @@ pub struct TraceAgent { } use crate::trace::implementations::WithLayout; -impl WithLayout for TraceAgent { +impl WithLayout for TraceIntra { type Layout = Tr::Layout; } -impl TraceReader for TraceAgent { +impl TraceReader for TraceIntra { type Batch = Tr::Batch; type Storage = Tr::Storage; @@ -75,14 +76,13 @@ impl TraceReader for TraceAgent { fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } } -impl TraceAgent { - /// Creates a new agent from a trace reader. - pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriter) +impl TraceIntra { + /// Creates a new inner agent from a trace reader, returning the agent and a matching writer. + pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriterInner) where Tr: Trace, { let trace = Rc::new(RefCell::new(TraceBox::new(trace))); - let queues = Rc::new(RefCell::new(Vec::new())); if let Some(logging) = &logging { logging.log( @@ -90,38 +90,152 @@ impl TraceAgent { ); } - let reader = TraceAgent { - trace: trace.clone(), - queues: Rc::downgrade(&queues), + let reader = TraceIntra { logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), + trace: Rc::clone(&trace), temp_antichain: Antichain::new(), operator, logging, }; - let writer = TraceWriter::new( + let writer = TraceWriterInner::new( vec![::minimum()], Rc::downgrade(&trace), - queues, ); (reader, writer) } + /// The [OperatorInfo] of the underlying Timely operator + pub fn operator(&self) -> &OperatorInfo { + &self.operator + } + + /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain + /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior + /// to mutate the trace box. Keeping strong references can prevent resource reclamation. + /// + /// This method is subject to changes and removal and should not be considered part of a stable + /// interface. + pub fn trace_box_unstable(&self) -> Rc>> { + Rc::clone(&self.trace) + } +} + +impl Clone for TraceIntra { + fn clone(&self) -> Self { + + if let Some(logging) = &self.logging { + logging.log( + crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 } + ); + } + + // increase counts for wrapped `TraceBox`. + let empty_frontier = Antichain::new(); + self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); + + TraceIntra { + trace: Rc::clone(&self.trace), + logical_compaction: self.logical_compaction.clone(), + physical_compaction: self.physical_compaction.clone(), + operator: self.operator.clone(), + logging: self.logging.clone(), + temp_antichain: Antichain::new(), + } + } +} + +impl Drop for TraceIntra { + fn drop(&mut self) { + + if let Some(logging) = &self.logging { + logging.log( + crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 } + ); + } + + // decrement borrow counts to remove all holds + let empty_frontier = Antichain::new(); + self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); + } +} + +/// Trace reader that can both share a trace within a dataflow and be imported into other dataflows. +/// +/// Unlike `TraceIntra`, this trace reader can be shared across dataflows, +/// but in exchange it must be scheduled whenever its frontiers advance. +/// This can increase the scheduling load. +pub struct TraceInter { + /// Inner agent maintaining the shared trace and compaction state. + inner: TraceIntra, + /// A sequence of private queues into which batches are written. + queues: Weak>>>, +} + +impl WithLayout for TraceInter { + type Layout = Tr::Layout; +} + +impl TraceReader for TraceInter { + + type Batch = Tr::Batch; + type Storage = Tr::Storage; + type Cursor = Tr::Cursor; + + fn set_logical_compaction(&mut self, frontier: AntichainRef) { + self.inner.set_logical_compaction(frontier); + } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { + self.inner.get_logical_compaction() + } + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { + self.inner.set_physical_compaction(frontier); + } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { + self.inner.get_physical_compaction() + } + fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { + self.inner.cursor_through(frontier) + } + fn map_batches(&self, f: F) { self.inner.map_batches(f) } +} + +impl TraceInter { + /// Creates a new agent from a trace reader. + pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriter) + where + Tr: Trace, + { + let queues = Rc::new(RefCell::new(Vec::new())); + let (inner, writer_inner) = TraceIntra::new(trace, operator, logging); + + let reader = TraceInter { + inner, + queues: Rc::downgrade(&queues), + }; + + let writer = TraceWriter::from_inner(writer_inner, queues); + + (reader, writer) + } + /// Attaches a new shared queue to the trace. /// /// The queue is first populated with existing batches from the trace, /// The queue will be immediately populated with existing historical batches from the trace, and until the reference /// is dropped the queue will receive new batches as produced by the source `arrange` operator. - pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader + pub fn new_listener(&mut self, activator: Activator) -> TraceInterQueueReader { // create a new queue for progress and batch information. let mut new_queue = VecDeque::new(); // add the existing batches from the trace let mut upper = None; - self.trace + self.inner.trace .borrow_mut() .trace .map_batches(|batch| { @@ -145,7 +259,7 @@ impl TraceAgent { /// The [OperatorInfo] of the underlying Timely operator pub fn operator(&self) -> &OperatorInfo { - &self.operator + self.inner.operator() } /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain @@ -155,11 +269,16 @@ impl TraceAgent { /// This method is subject to changes and removal and should not be considered part of a stable /// interface. pub fn trace_box_unstable(&self) -> Rc>> { - Rc::clone(&self.trace) + self.inner.trace_box_unstable() + } + + /// Extracts the inner `TraceIntra`, discarding queue management. + pub fn into_inner(self) -> TraceIntra { + self.inner } } -impl TraceAgent { +impl TraceInter { /// Copies an existing collection into the supplied scope. /// /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange` @@ -211,7 +330,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import(&mut self, scope: &G) -> Arranged> + pub fn import(&mut self, scope: &G) -> Arranged> where G: Scope, { @@ -219,7 +338,7 @@ impl TraceAgent { } /// Same as `import`, but allows to name the source. - pub fn import_named(&mut self, scope: &G, name: &str) -> Arranged> + pub fn import_named(&mut self, scope: &G, name: &str) -> Arranged> where G: Scope, { @@ -274,7 +393,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_core(&mut self, scope: &G, name: &str) -> (Arranged>, ShutdownButton>) + pub fn import_core(&mut self, scope: &G, name: &str) -> (Arranged>, ShutdownButton>) where G: Scope, { @@ -389,7 +508,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_frontier(&mut self, scope: &G, name: &str) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier(&mut self, scope: &G, name: &str) -> (Arranged>>, ShutdownButton>) where G: Scope, Tr: TraceReader, @@ -407,7 +526,7 @@ impl TraceAgent { /// /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty /// frontier indicates the end of times. - pub fn import_frontier_core(&mut self, scope: &G, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier_core(&mut self, scope: &G, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) where G: Scope, Tr: TraceReader, @@ -510,44 +629,13 @@ impl Drop for ShutdownDeadmans { } } -impl Clone for TraceAgent { +impl Clone for TraceInter { fn clone(&self) -> Self { - - if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 } - ); - } - - // increase counts for wrapped `TraceBox`. - let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); - self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); - - TraceAgent { - trace: self.trace.clone(), + TraceInter { + inner: self.inner.clone(), queues: self.queues.clone(), - logical_compaction: self.logical_compaction.clone(), - physical_compaction: self.physical_compaction.clone(), - operator: self.operator.clone(), - logging: self.logging.clone(), - temp_antichain: Antichain::new(), } } } -impl Drop for TraceAgent { - fn drop(&mut self) { - - if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 } - ); - } - - // decrement borrow counts to remove all holds - let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow()); - self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); - } -} +// Drop is handled by TraceIntra's Drop impl. diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index a894d5486..8af8793ca 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -36,7 +36,8 @@ use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; -use super::TraceAgent; +use super::TraceInter; +use super::agent::TraceIntra; /// An arranged collection of `(K,V)` values. /// @@ -249,7 +250,7 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where T1: TraceReader, T2: for<'a> Trace< @@ -272,7 +273,7 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where T1: TraceReader, T2: for<'a> Trace< @@ -313,8 +314,8 @@ pub trait Arrange : Sized where G: Scope, { - /// Arranges updates into a shared trace. - fn arrange(self) -> Arranged> + /// Arranges updates into a trace local to this dataflow. + fn arrange(self) -> Arranged> where Ba: Batcher + 'static, Bu: Builder, @@ -323,8 +324,8 @@ where self.arrange_named::("Arrange") } - /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(self, name: &str) -> Arranged> + /// Arranges updates into a trace local to this dataflow, with a supplied name. + fn arrange_named(self, name: &str) -> Arranged> where Ba: Batcher + 'static, Bu: Builder, @@ -337,7 +338,52 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: Stream, pact: P, name: &str) -> Arranged> +/// +/// Unlike `arrange_intra`, this trace can be imported into other dataflows. To provide this ability +/// it must be continually scheduled even in the absence of input updates, in order to communicate +/// its input frontier to the other dataflow. This can result in more scheduling overhead. +pub fn arrange_inter(stream: Stream, pact: P, name: &str) -> Arranged> +where + G: Scope, + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + arrange_core::(stream, pact, name, timely::progress::operate::FrontierInterest::Always) +} + +/// Arranges a stream of updates into a private trace, without shared queue distribution. +/// +/// This operator arranges a stream of values into a shared trace, whose contents it maintains. +/// It uses the supplied parallelization contract to distribute the data, which does not need to +/// be consistently by key (though this is the most common). +/// +/// Unlike `arrange_inter`, this trace cannot be shared across dataflows, only within dataflows. +/// It lacks the `import` method that would allow this. By so doing, the operator does not need +/// to be continually rescheduled in the absence of input updates, which can reduce the scheduling +/// load. +pub fn arrange_intra(stream: Stream, pact: P, name: &str) -> Arranged> +where + G: Scope, + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + let arranged = arrange_core::(stream, pact, name, timely::progress::operate::FrontierInterest::IfCapability); + Arranged { + stream: arranged.stream, + trace: arranged.trace.into_inner(), + } +} + +/// Arranges a stream of updates by a key, configured with a name, a parallelization contract, +/// and a frontier interest policy. +/// +/// This is the general form that both `arrange_inter` and `arrange_intra` delegate to. +/// The `FrontierInterest` parameter controls when the operator is notified of frontier changes. +pub fn arrange_core(stream: Stream, pact: P, name: &str, interest: timely::progress::operate::FrontierInterest) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -360,38 +406,44 @@ where // held by the batcher, which may prevents the operator from sending an // empty batch. - let mut reader: Option> = None; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; - // fabricate a data-parallel operator using the `unary_notify` pattern. - let reader_ref = &mut reader; let scope = stream.scope(); - let stream = stream.unary_frontier(pact, name, move |_capability, info| { + let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); + let operator_info = builder.operator_info(); - // Acquire a logger for arrange events. - let logger = scope.logger_for::("differential/arrange").map(Into::into); + let mut input = builder.new_input(stream, pact); + builder.set_notify_for(0, interest); + let (mut output, stream) = builder.new_output(); - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Ba::new(logger.clone(), info.global_id); + // Acquire a logger for arrange events. + let logger = scope.logger_for::("differential/arrange").map(Into::into); - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); + // Where we will deposit received updates, and from which we extract batches. + let mut batcher = Ba::new(logger.clone(), operator_info.global_id); - let activator = Some(scope.activator_for(info.address.clone())); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); - } + let activator = Some(scope.activator_for(operator_info.address.clone())); + let mut empty_trace = Tr::new(operator_info.clone(), logger.clone(), activator); + // If there is default exertion logic set, install it. + if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); + } - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + let (trace, mut writer) = TraceInter::new(empty_trace, operator_info, logger); - *reader_ref = Some(reader_local); + builder.build(move |_capabilities| { + + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); // Initialize to the minimal input frontier. let mut prev_frontier = Antichain::from_elem(::minimum()); - move |(input, frontier), output| { + move |frontiers| { + + let frontier = &frontiers[0]; + let mut output = output.activate(); // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. // We don't have to keep all capabilities, but we need to be able to form output messages @@ -453,7 +505,7 @@ where writer.insert(batch.clone(), Some(capability.time().clone())); // send the batch to downstream consumers, empty or not. - output.session(&capabilities.elements()[index]).give(batch); + output.give(&capabilities.elements()[index], &mut vec![batch]); } } @@ -488,5 +540,5 @@ where } }); - Arranged { stream, trace: reader.unwrap() } + Arranged { stream, trace } } diff --git a/differential-dataflow/src/operators/arrange/mod.rs b/differential-dataflow/src/operators/arrange/mod.rs index 70f9fb866..2a6bbad53 100644 --- a/differential-dataflow/src/operators/arrange/mod.rs +++ b/differential-dataflow/src/operators/arrange/mod.rs @@ -57,8 +57,8 @@ pub enum TraceReplayInstruction { // Short names for strongly and weakly owned activators and shared queues. type BatchQueue = VecDeque>; -type TraceAgentQueueReader = Rc<(Activator, RefCell>)>; -type TraceAgentQueueWriter = Weak<(Activator, RefCell>)>; +type TraceInterQueueReader = Rc<(Activator, RefCell>)>; +type TraceInterQueueWriter = Weak<(Activator, RefCell>)>; pub mod writer; pub mod agent; @@ -67,6 +67,6 @@ pub mod arrangement; pub mod upsert; pub use self::writer::TraceWriter; -pub use self::agent::{TraceAgent, ShutdownButton}; +pub use self::agent::{TraceInter, TraceIntra, ShutdownButton}; pub use self::arrangement::{Arranged, Arrange}; \ No newline at end of file diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index e9dbe9cdb..4e6eaa3f9 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -115,7 +115,7 @@ use crate::{ExchangeData, Hashable}; use crate::trace::implementations::containers::BatchContainer; -use super::TraceAgent; +use super::TraceInter; /// Arrange data from a stream of keyed upserts. /// @@ -130,7 +130,7 @@ use super::TraceAgent; pub fn arrange_from_upsert( stream: Stream, G::Timestamp)>>, name: &str, -) -> Arranged> +) -> Arranged> where G: Scope, Tr: for<'a> Trace< @@ -141,7 +141,7 @@ where >+'static, Bu: Builder, Output = Tr::Batch>, { - let mut reader: Option> = None; + let mut reader: Option> = None; // fabricate a data-parallel operator using the `unary_notify` pattern. let stream = { @@ -166,7 +166,7 @@ where empty_trace.set_exert_logic(exert_logic); } - let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + let (mut reader_local, mut writer) = TraceInter::new(empty_trace, info, logger); // Capture the reader outside the builder scope. *reader = Some(reader_local.clone()); diff --git a/differential-dataflow/src/operators/arrange/writer.rs b/differential-dataflow/src/operators/arrange/writer.rs index 8df11690c..ba4820ec6 100644 --- a/differential-dataflow/src/operators/arrange/writer.rs +++ b/differential-dataflow/src/operators/arrange/writer.rs @@ -12,33 +12,28 @@ use crate::trace::{Trace, Batch, BatchReader}; use crate::trace::wrappers::rc::TraceBox; -use super::TraceAgentQueueWriter; +use super::TraceInterQueueWriter; use super::TraceReplayInstruction; -/// Write endpoint for a sequence of batches. -/// -/// A `TraceWriter` accepts a sequence of batches and distributes them -/// to both a shared trace and to a sequence of private queues. -pub struct TraceWriter { +/// Inner write endpoint that maintains the frontier and shared trace. +pub struct TraceWriterInner { /// Current upper limit. upper: Antichain, /// Shared trace, possibly absent (due to weakness). trace: Weak>>, - /// A sequence of private queues into which batches are written. - queues: Rc>>>, } -impl TraceWriter { - /// Creates a new `TraceWriter`. - pub fn new( - upper: Vec, - trace: Weak>>, - queues: Rc>>> - ) -> Self - { +impl TraceWriterInner { + /// Creates a new `TraceWriterInner`. + pub fn new(upper: Vec, trace: Weak>>) -> Self { let mut temp = Antichain::new(); temp.extend(upper); - Self { upper: temp, trace, queues } + Self { upper: temp, trace } + } + + /// The current upper frontier. + pub fn upper(&self) -> &Antichain { + &self.upper } /// Exerts merge effort, even without additional updates. @@ -50,10 +45,9 @@ impl TraceWriter { /// Advances the trace by `batch`. /// - /// The `hint` argument is either `None` in the case of an empty batch, - /// or is `Some(time)` for a time less or equal to all updates in the - /// batch and which is suitable for use as a capability. - pub fn insert(&mut self, batch: Tr::Batch, hint: Option) { + /// Asserts the batch is a valid continuation of the current frontier, + /// updates the upper frontier, and pushes the batch into the shared trace. + pub fn insert(&mut self, batch: Tr::Batch) { // Something is wrong if not a sequence. if !(&self.upper == batch.lower()) { @@ -64,6 +58,69 @@ impl TraceWriter { self.upper.clone_from(batch.upper()); + // push data to the trace, if it still exists. + if let Some(trace) = self.trace.upgrade() { + trace.borrow_mut().trace.insert(batch); + } + } + + /// Inserts an empty batch up to `upper`. + pub fn seal(&mut self, upper: Antichain) { + if self.upper != upper { + self.insert(Tr::Batch::empty(self.upper.clone(), upper)); + } + } +} + +impl Drop for TraceWriterInner { + fn drop(&mut self) { + self.seal(Antichain::new()) + } +} + +/// Write endpoint for a sequence of batches. +/// +/// A `TraceWriter` accepts a sequence of batches and distributes them +/// to both a shared trace and to a sequence of private queues. +pub struct TraceWriter { + /// Inner writer maintaining the frontier and shared trace. + inner: TraceWriterInner, + /// A sequence of private queues into which batches are written. + queues: Rc>>>, +} + +impl TraceWriter { + /// Creates a new `TraceWriter`. + pub fn new( + upper: Vec, + trace: Weak>>, + queues: Rc>>> + ) -> Self + { + Self { inner: TraceWriterInner::new(upper, trace), queues } + } + + /// Creates a new `TraceWriter` from an existing `TraceWriterInner` and queues. + pub fn from_inner( + inner: TraceWriterInner, + queues: Rc>>> + ) -> Self + { + Self { inner, queues } + } + + /// Exerts merge effort, even without additional updates. + pub fn exert(&mut self) { + self.inner.exert(); + } + + /// Advances the trace by `batch`. + /// + /// The `hint` argument is either `None` in the case of an empty batch, + /// or is `Some(time)` for a time less or equal to all updates in the + /// batch and which is suitable for use as a capability. + pub fn insert(&mut self, batch: Tr::Batch, hint: Option) { + // push information to each listener that still exists. let mut borrow = self.queues.borrow_mut(); for queue in borrow.iter_mut() { @@ -75,17 +132,14 @@ impl TraceWriter { } borrow.retain(|w| w.upgrade().is_some()); - // push data to the trace, if it still exists. - if let Some(trace) = self.trace.upgrade() { - trace.borrow_mut().trace.insert(batch); - } - + // push data to the trace and update the frontier. + self.inner.insert(batch); } /// Inserts an empty batch up to `upper`. pub fn seal(&mut self, upper: Antichain) { - if self.upper != upper { - self.insert(Tr::Batch::empty(self.upper.clone(), upper), None); + if *self.inner.upper() != upper { + self.insert(Tr::Batch::empty(self.inner.upper().clone(), upper), None); } } } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index fe3622d74..f27750b4f 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -16,7 +16,7 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; -use crate::operators::arrange::{Arranged, TraceAgent}; +use crate::operators::arrange::{Arranged, TraceInter}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; @@ -26,7 +26,7 @@ use crate::trace::TraceReader; /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, T1: TraceReader + Clone + 'static, @@ -56,7 +56,7 @@ where let mut source_trace = trace.trace.clone(); - let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); + let (mut output_reader, mut output_writer) = TraceInter::new(empty, operator_info, logger); // let mut output_trace = TraceRc::make_from(agent).0; *result_trace = Some(output_reader.clone()); diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index 747e45b72..7c73741a7 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -52,7 +52,7 @@ fn test_import_vanilla() { let (mut input, mut trace) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); (input, arranged.trace.clone()) }); let (captured,) = worker.dataflow(move |scope| { @@ -112,7 +112,7 @@ fn test_import_completed_dataflow() { let (mut input, mut trace, probe) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); let (probe, _) = arranged.stream.probe(); (input, arranged.trace.clone(), probe) }); @@ -174,7 +174,7 @@ fn test_import_stalled_dataflow() { let arranged = input .to_collection(scope) - .arrange_by_self(); + .arrange_by_self_inter(); let (probe, _) = arranged.stream.probe(); (arranged.trace, probe) @@ -221,7 +221,7 @@ fn import_skewed() { let (mut input, mut trace) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); (input, arranged.trace.clone()) }); diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index f704d5988..7a5f82bf7 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -8,7 +8,7 @@ use timely::dataflow::operators::Concatenate; use differential_dataflow::{ExchangeData, VecCollection, AsCollection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; pub mod altneu; pub mod calculus; @@ -93,8 +93,8 @@ impl, P, E> ValidateExtensionMethod = TraceAgent>; -type TraceKeyHandle = TraceAgent>; +type TraceValHandle = TraceInter>; +type TraceKeyHandle = TraceInter>; pub struct CollectionIndex where @@ -140,16 +140,16 @@ where pub fn index>(collection: VecCollection) -> Self { // We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation. // counts and validate can share the base arrangement - let arranged = collection.clone().arrange_by_self(); + let arranged = collection.clone().arrange_by_self_inter(); // TODO: This could/should be arrangement to arrangement, via `reduce_abelian`, but the types are a mouthful at the moment. let counts = arranged .clone() .as_collection(|k,_v| k.clone()) .distinct() .map(|(k, _v)| k) - .arrange_by_self() + .arrange_by_self_inter() .trace; - let propose = collection.arrange_by_key().trace; + let propose = collection.arrange_by_key_inter().trace; let validate = arranged.trace; CollectionIndex { diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index 3ea53cd14..973dd10ca 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -10,10 +10,10 @@ use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; type Node = u32; diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index 782941e15..a4a87e99e 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -7,13 +7,13 @@ use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::Variable; use differential_dataflow::difference::Present; -type EdgeArranged = Arranged::Timestamp, R>>>; +type EdgeArranged = Arranged::Timestamp, R>>>; type Node = u32; type Edge = (Node, Node); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 6e5b0ffaa..853c0c744 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -255,10 +255,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 35a118cd9..d8b20eec8 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -223,10 +223,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index 7e27e7ed8..c3a9a6a6b 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -288,10 +288,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index 2ec74e93f..39a6638de 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -191,10 +191,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 16cbbd6dd..742ac08ba 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -48,7 +48,7 @@ fn main() { .to_stream(scope) .as_collection(); - edges.arrange_by_key().trace + edges.arrange_by_key_inter().trace }); while worker.step() { } @@ -86,7 +86,7 @@ fn main() { forward .import(scope) .as_collection(|&k,&v| (v,k)) - .arrange_by_key() + .arrange_by_key_inter() .trace }); while worker.step() { } @@ -102,9 +102,9 @@ fn main() { }).unwrap(); } -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; -type TraceHandle = TraceAgent; +type TraceHandle = TraceInter; fn reach> ( graph: &mut TraceHandle, diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index ea7420120..0b99d0a13 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -36,8 +36,8 @@ fn main() { let (mut graph, mut trace) = worker.dataflow(|scope| { let (graph_input, graph) = scope.new_collection(); - let graph_indexed = graph.arrange_by_key(); - // let graph_indexed = graph.arrange_by_key(); + let graph_indexed = graph.arrange_by_key_inter(); + // let graph_indexed = graph.arrange_by_key_inter(); (graph_input, graph_indexed.trace) }); @@ -84,9 +84,9 @@ fn main() { } // use differential_dataflow::trace::implementations::ord::OrdValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; -type TraceHandle = TraceAgent; +type TraceHandle = TraceInter; fn reach> ( graph: &mut TraceHandle, diff --git a/interactive/src/command.rs b/interactive/src/command.rs index 5d110ea70..343827c30 100644 --- a/interactive/src/command.rs +++ b/interactive/src/command.rs @@ -78,7 +78,7 @@ where for Rule { name, plan } in query.rules.into_iter() { let collection = plan.render(scope, &mut collections, &mut manager.traces) - .arrange_by_self(); + .arrange_by_self_inter(); collection.stream.probe_with(&mut manager.probe); let trace = collection.trace; @@ -104,7 +104,7 @@ where let (input, trace) = worker.dataflow(|scope| { let (input, collection) = scope.new_collection_from(updates.into_iter()); - let trace = collection.arrange_by_self().trace; + let trace = collection.arrange_by_self_inter().trace; (input, trace) }); diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index 322dd16d2..15139e740 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -159,13 +159,13 @@ where // }); use differential_dataflow::collection::AsCollection; - let operates = operates.as_collection().arrange_by_self().trace; - let channels = channels.as_collection().arrange_by_self().trace; - let schedule = schedule.as_collection().arrange_by_self().trace; - let messages = messages.as_collection().arrange_by_self().trace; - let shutdown = shutdown.as_collection().arrange_by_self().trace; - let park = park.as_collection().arrange_by_self().trace; - let text = text.as_collection().arrange_by_self().trace; + let operates = operates.as_collection().arrange_by_self_inter().trace; + let channels = channels.as_collection().arrange_by_self_inter().trace; + let schedule = schedule.as_collection().arrange_by_self_inter().trace; + let messages = messages.as_collection().arrange_by_self_inter().trace; + let shutdown = shutdown.as_collection().arrange_by_self_inter().trace; + let park = park.as_collection().arrange_by_self_inter().trace; + let text = text.as_collection().arrange_by_self_inter().trace; // let elapsed = // duration @@ -177,7 +177,7 @@ where // (k,time,r) // }) // .as_collection() - // .arrange_by_self() + // .arrange_by_self_inter() // .trace; // let histogram = @@ -190,7 +190,7 @@ where // (k,time,r) // }) // .as_collection() - // .arrange_by_self() + // .arrange_by_self_inter() // .trace; (operates, channels, schedule, messages, shutdown, park, text) @@ -273,8 +273,8 @@ where }); use differential_dataflow::collection::AsCollection; - let batch = batch.as_collection().arrange_by_self().trace; - let merge = merge.as_collection().arrange_by_self().trace; + let batch = batch.as_collection().arrange_by_self_inter().trace; + let merge = merge.as_collection().arrange_by_self_inter().trace; (merge,batch) }); diff --git a/interactive/src/manager.rs b/interactive/src/manager.rs index 8579a538e..936fc6a15 100644 --- a/interactive/src/manager.rs +++ b/interactive/src/manager.rs @@ -13,7 +13,7 @@ use timely::logging::TimelyEventBuilder; use differential_dataflow::ExchangeData; use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; use differential_dataflow::input::InputSession; use differential_dataflow::logging::DifferentialEventBuilder; @@ -21,9 +21,9 @@ use differential_dataflow::logging::DifferentialEventBuilder; use crate::{Time, Diff, Plan, Datum}; /// A trace handle for key-only data. -pub type TraceKeyHandle = TraceAgent>; +pub type TraceKeyHandle = TraceInter>; /// A trace handle for key-value data. -pub type TraceValHandle = TraceAgent>; +pub type TraceValHandle = TraceInter>; /// A key-only trace handle binding `Time` and `Diff` using `Vec` as data. pub type KeysOnlyHandle = TraceKeyHandle, Time, Diff>; /// A key-value trace handle binding `Time` and `Diff` using `Vec` as data. diff --git a/interactive/src/plan/join.rs b/interactive/src/plan/join.rs index 956fab419..e4f59d41c 100644 --- a/interactive/src/plan/join.rs +++ b/interactive/src/plan/join.rs @@ -56,7 +56,7 @@ impl Render for Join { .collect::>(), ) ) - .arrange_by_key(); + .arrange_by_key_inter(); arrangements.set_keyed(&self.plan1, &keys1[..], &arrangement.trace); arrangement.trace @@ -85,7 +85,7 @@ impl Render for Join { .collect::>(), ) ) - .arrange_by_key(); + .arrange_by_key_inter(); arrangements.set_keyed(&self.plan2, &keys2[..], &arrangement.trace); arrangement.trace diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index deff18a4a..f37113ad4 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -164,7 +164,7 @@ impl Render for Plan { trace.import(scope) } else { - let input_arrangement = distinct.render(scope, collections, arrangements).arrange_by_self(); + let input_arrangement = distinct.render(scope, collections, arrangements).arrange_by_self_inter(); arrangements.set_unkeyed(&distinct, &input_arrangement.trace); input_arrangement }; diff --git a/interactive/src/plan/sfw.rs b/interactive/src/plan/sfw.rs index 6a5c82fc8..20aaa87ba 100644 --- a/interactive/src/plan/sfw.rs +++ b/interactive/src/plan/sfw.rs @@ -131,7 +131,7 @@ impl Render for MultiwayJoin { if arrangements.get_unkeyed(&plan).is_none() { // println!("\tbuilding/caching source plan"); let collection = plan.render(scope, collections, arrangements); - arrangements.set_unkeyed(plan, &collection.arrange_by_self().trace); + arrangements.set_unkeyed(plan, &collection.arrange_by_self_inter().trace); } else { // println!("\tsource plan found"); @@ -203,7 +203,7 @@ impl Render for MultiwayJoin { let arrangement = plan.render(scope, collections, arrangements) .map(move |tuple| (keys_clone.iter().map(|&i| tuple[i].clone()).collect::>(), tuple)) - .arrange_by_key(); + .arrange_by_key_inter(); arrangements.set_keyed(&plan, &keys[..], &arrangement.trace); } diff --git a/server/dataflows/random_graph/src/lib.rs b/server/dataflows/random_graph/src/lib.rs index cb527b25d..b9ebfc5c6 100644 --- a/server/dataflows/random_graph/src/lib.rs +++ b/server/dataflows/random_graph/src/lib.rs @@ -180,7 +180,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), }) .probe_with(probe) .as_collection() - .arrange_by_key() + .arrange_by_key_inter() .trace; // release all blocks on merging. diff --git a/server/src/lib.rs b/server/src/lib.rs index 745de0bb9..8a3f707f8 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -11,13 +11,13 @@ use timely::dataflow::scopes::Child; use timely::dataflow::operators::probe::Handle as ProbeHandle; // stuff for talking about shared trace types ... -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; use differential_dataflow::trace::implementations::ValSpine; // These are all defined here so that users can be assured a common layout. pub type RootTime = usize; type TraceSpine = ValSpine; -pub type TraceHandle = TraceAgent; +pub type TraceHandle = TraceInter; /// Arguments provided to each shared library to help build their dataflows and register their results. pub type Environment<'a, 'b> = ( diff --git a/tpchlike/src/lib.rs b/tpchlike/src/lib.rs index 053efd9d0..e8d82e332 100644 --- a/tpchlike/src/lib.rs +++ b/tpchlike/src/lib.rs @@ -115,10 +115,10 @@ impl Collections { use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use differential_dataflow::operators::arrange::{Arranged, TraceInter}; type ArrangedScope = Arranged>; -type ArrangedIndex = TraceAgent>; +type ArrangedIndex = TraceInter>; pub struct ArrangementsInScope> { customer: ArrangedScope, @@ -154,37 +154,37 @@ impl Arrangements { let empty_frontier = timely::progress::Antichain::new(); let empty_frontier = empty_frontier.borrow(); - let mut arranged = scope.input_from(&mut inputs.customer).as_collection().map(|x| (x.cust_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.customer).as_collection().map(|x| (x.cust_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let customer = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.nation).as_collection().map(|x| (x.nation_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.nation).as_collection().map(|x| (x.nation_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let nation = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.order).as_collection().map(|x| (x.order_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.order).as_collection().map(|x| (x.order_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let order = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.part).as_collection().map(|x| (x.part_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.part).as_collection().map(|x| (x.part_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let part = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.partsupp).as_collection().map(|x| ((x.part_key, x.supp_key), x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.partsupp).as_collection().map(|x| ((x.part_key, x.supp_key), x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let partsupp = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.region).as_collection().map(|x| (x.region_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.region).as_collection().map(|x| (x.region_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let region = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.supplier).as_collection().map(|x| (x.supp_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.supplier).as_collection().map(|x| (x.supp_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let supplier = arranged.trace; @@ -203,25 +203,25 @@ impl Arrangements { pub fn in_scope>(&mut self, scope: &mut G, experiment: &mut Experiment) -> ArrangementsInScope { let (mut customer, button) = self.customer.import_core(scope, "customer"); - if !self.arrange { customer = customer.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { customer = customer.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut nation, button) = self.nation.import_core(scope, "nation"); - if !self.arrange { nation = nation.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { nation = nation.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut order, button) = self.order.import_core(scope, "order"); - if !self.arrange { order = order.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { order = order.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut part, button) = self.part.import_core(scope, "part"); - if !self.arrange { part = part.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { part = part.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut partsupp, button) = self.partsupp.import_core(scope, "partsupp"); - if !self.arrange { partsupp = partsupp.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { partsupp = partsupp.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut region, button) = self.region.import_core(scope, "region"); - if !self.arrange { region = region.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { region = region.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut supplier, button) = self.supplier.import_core(scope, "supplier"); - if !self.arrange { supplier = supplier.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { supplier = supplier.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); ArrangementsInScope {