From c4f74fd1989c52c2d3c907d607f5d968a1b599df Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 17 Mar 2026 07:48:36 -0400 Subject: [PATCH 1/7] Update to track timely master --- Cargo.toml | 4 ++-- differential-dataflow/src/input.rs | 6 ++---- differential-dataflow/src/lib.rs | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8995ab151..cbabb3087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,9 +23,9 @@ rust-version = "1.86" [workspace.dependencies] differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.20.0" } -timely = { version = "0.27", default-features = false } +#timely = { version = "0.27", default-features = false } columnar = { version = "0.11", default-features = false } -#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } #timely = { path = "../timely-dataflow/timely/", default-features = false } [workspace.lints.clippy] diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 3fd612107..71b396636 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -258,7 +258,7 @@ impl InputSession { /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible. pub fn flush(&mut self) { self.handle.send_batch(&mut self.buffer); - if self.handle.epoch().less_than(&self.time) { + if self.handle.time().less_than(&self.time) { self.handle.advance_to(self.time.clone()); } } @@ -269,13 +269,11 @@ impl InputSession { /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while` /// method unless the session has just been flushed. pub fn advance_to(&mut self, time: T) { - assert!(self.handle.epoch().less_equal(&time)); + assert!(self.handle.time().less_equal(&time)); assert!(&self.time.less_equal(&time)); self.time = time; } - /// Reveals the current time of the session. - pub fn epoch(&self) -> &T { &self.time } /// Reveals the current time of the session. pub fn time(&self) -> &T { &self.time } diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index 615aa5ded..b5561cfb2 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -53,7 +53,7 @@ //! //! ```ignore //! loop { -//! let time = input.epoch(); +//! let time = input.time(); //! for round in time .. time + 100 { //! input.advance_to(round); //! input.insert((round % 13, round % 7)); From c0c1fad96fe5b1b0d1c1da62171532786aac3a2a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 18 Mar 2026 13:50:33 -0400 Subject: [PATCH 2/7] Rework arrange_core and demonstrate --- Cargo.toml | 4 +- .../examples/event_driven.rs | 60 ++++++ .../src/operators/arrange/agent.rs | 185 +++++++++++++----- .../src/operators/arrange/arrangement.rs | 165 ++++++++++++++-- .../src/operators/arrange/writer.rs | 106 +++++++--- 5 files changed, 419 insertions(+), 101 deletions(-) create mode 100644 differential-dataflow/examples/event_driven.rs diff --git a/Cargo.toml b/Cargo.toml index cbabb3087..78c9ab1a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,8 @@ rust-version = "1.86" differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.20.0" } #timely = { version = "0.27", default-features = false } columnar = { version = "0.11", default-features = false } -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } -#timely = { path = "../timely-dataflow/timely/", default-features = false } +#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { path = "../timely-dataflow/timely/", default-features = false } [workspace.lints.clippy] type_complexity = "allow" diff --git a/differential-dataflow/examples/event_driven.rs b/differential-dataflow/examples/event_driven.rs new file mode 100644 index 000000000..4325ddee3 --- /dev/null +++ b/differential-dataflow/examples/event_driven.rs @@ -0,0 +1,60 @@ +use timely::dataflow::Scope; +use timely::dataflow::operators::{Input, Probe, Enter, Leave}; +use timely::dataflow::operators::core::Filter; + +fn main() { + // initializes and runs a timely dataflow. + timely::execute_from_args(std::env::args(), |worker| { + + let timer = std::time::Instant::now(); + + let mut args = std::env::args(); + args.next(); + + let dataflows = args.next().unwrap().parse::().unwrap(); + let length = args.next().unwrap().parse::().unwrap(); + let local = args.next() == Some("local".to_string()); + println!("Local: {:?}", local); + + let mut inputs = Vec::new(); + let mut probes = Vec::new(); + + // create a new input, exchange data, and inspect its output + for _dataflow in 0 .. dataflows { + worker.dataflow(|scope| { + let (input, stream) = scope.new_input(); + let stream = scope.region(|inner| { + let mut stream = stream.enter(inner); + use differential_dataflow::operators::arrange::arrangement::{arrange_private, arrange_core}; + use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; + stream = if local { arrange_private::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner } + else { arrange_core::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner }; + for _step in 0 .. length { + stream = stream.filter(|_| false); + } + stream.leave() + }); + let (probe, _stream) = stream.probe(); + inputs.push(input); + probes.push(probe); + }); + } + + println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); + + // Repeatedly, insert a record in one dataflow, tick all dataflow inputs. + for round in 0 .. { + let dataflow = round % dataflows; + inputs[dataflow].send(((0i32, 0i32), round, 1)); + for d in 0 .. dataflows { inputs[d].advance_to(round); } + let mut steps = 0; + while probes[dataflow].less_than(&round) { + worker.step(); + steps += 1; + } + + println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); + } + + }).unwrap(); +} diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index e1c2fde5f..caa103631 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -17,17 +17,17 @@ use timely::scheduling::Activator; use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged}; use super::TraceReplayInstruction; +use super::writer::TraceWriterInner; use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier}; -/// A `TraceReader` wrapper which can be imported into other dataflows. +/// Inner agent that maintains the shared trace and compaction state. /// -/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted -/// from the dataflow in which it was defined, and imported into other dataflows. -pub struct TraceAgent { +/// A `TraceAgentInner` holds the shared trace and tracks logical and physical +/// compaction frontiers, without any queue management. +pub struct TraceAgentInner { trace: Rc>>, - queues: Weak>>>, logical_compaction: Antichain, physical_compaction: Antichain, temp_antichain: Antichain, @@ -37,11 +37,11 @@ pub struct TraceAgent { } use crate::trace::implementations::WithLayout; -impl WithLayout for TraceAgent { +impl WithLayout for TraceAgentInner { type Layout = Tr::Layout; } -impl TraceReader for TraceAgent { +impl TraceReader for TraceAgentInner { type Batch = Tr::Batch; type Storage = Tr::Storage; @@ -75,14 +75,13 @@ impl TraceReader for TraceAgent { fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } } -impl TraceAgent { - /// Creates a new agent from a trace reader. - pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriter) +impl TraceAgentInner { + /// Creates a new inner agent from a trace reader, returning the agent and a matching writer. + pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriterInner) where Tr: Trace, { let trace = Rc::new(RefCell::new(TraceBox::new(trace))); - let queues = Rc::new(RefCell::new(Vec::new())); if let Some(logging) = &logging { logging.log( @@ -90,25 +89,138 @@ impl TraceAgent { ); } - let reader = TraceAgent { - trace: trace.clone(), - queues: Rc::downgrade(&queues), + let reader = TraceAgentInner { logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), + trace: trace.clone(), temp_antichain: Antichain::new(), operator, logging, }; - let writer = TraceWriter::new( + let writer = TraceWriterInner::new( vec![::minimum()], Rc::downgrade(&trace), - queues, ); (reader, writer) } + /// The [OperatorInfo] of the underlying Timely operator + pub fn operator(&self) -> &OperatorInfo { + &self.operator + } + + /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain + /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior + /// to mutate the trace box. Keeping strong references can prevent resource reclamation. + /// + /// This method is subject to changes and removal and should not be considered part of a stable + /// interface. + pub fn trace_box_unstable(&self) -> Rc>> { + Rc::clone(&self.trace) + } +} + +impl Clone for TraceAgentInner { + fn clone(&self) -> Self { + + if let Some(logging) = &self.logging { + logging.log( + crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 } + ); + } + + // increase counts for wrapped `TraceBox`. + let empty_frontier = Antichain::new(); + self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); + + TraceAgentInner { + trace: self.trace.clone(), + logical_compaction: self.logical_compaction.clone(), + physical_compaction: self.physical_compaction.clone(), + operator: self.operator.clone(), + logging: self.logging.clone(), + temp_antichain: Antichain::new(), + } + } +} + +impl Drop for TraceAgentInner { + fn drop(&mut self) { + + if let Some(logging) = &self.logging { + logging.log( + crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 } + ); + } + + // decrement borrow counts to remove all holds + let empty_frontier = Antichain::new(); + self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); + } +} + +/// A `TraceReader` wrapper which can be imported into other dataflows. +/// +/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted +/// from the dataflow in which it was defined, and imported into other dataflows. +pub struct TraceAgent { + /// Inner agent maintaining the shared trace and compaction state. + inner: TraceAgentInner, + /// A sequence of private queues into which batches are written. + queues: Weak>>>, +} + +impl WithLayout for TraceAgent { + type Layout = Tr::Layout; +} + +impl TraceReader for TraceAgent { + + type Batch = Tr::Batch; + type Storage = Tr::Storage; + type Cursor = Tr::Cursor; + + fn set_logical_compaction(&mut self, frontier: AntichainRef) { + self.inner.set_logical_compaction(frontier); + } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { + self.inner.get_logical_compaction() + } + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { + self.inner.set_physical_compaction(frontier); + } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { + self.inner.get_physical_compaction() + } + fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { + self.inner.cursor_through(frontier) + } + fn map_batches(&self, f: F) { self.inner.map_batches(f) } +} + +impl TraceAgent { + /// Creates a new agent from a trace reader. + pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriter) + where + Tr: Trace, + { + let queues = Rc::new(RefCell::new(Vec::new())); + let (inner, writer_inner) = TraceAgentInner::new(trace, operator, logging); + + let reader = TraceAgent { + inner, + queues: Rc::downgrade(&queues), + }; + + let writer = TraceWriter::from_inner(writer_inner, queues); + + (reader, writer) + } + /// Attaches a new shared queue to the trace. /// /// The queue is first populated with existing batches from the trace, @@ -121,7 +233,7 @@ impl TraceAgent { // add the existing batches from the trace let mut upper = None; - self.trace + self.inner.trace .borrow_mut() .trace .map_batches(|batch| { @@ -145,7 +257,7 @@ impl TraceAgent { /// The [OperatorInfo] of the underlying Timely operator pub fn operator(&self) -> &OperatorInfo { - &self.operator + self.inner.operator() } /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain @@ -155,7 +267,7 @@ impl TraceAgent { /// This method is subject to changes and removal and should not be considered part of a stable /// interface. pub fn trace_box_unstable(&self) -> Rc>> { - Rc::clone(&self.trace) + self.inner.trace_box_unstable() } } @@ -512,42 +624,11 @@ impl Drop for ShutdownDeadmans { impl Clone for TraceAgent { fn clone(&self) -> Self { - - if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 } - ); - } - - // increase counts for wrapped `TraceBox`. - let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); - self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); - TraceAgent { - trace: self.trace.clone(), + inner: self.inner.clone(), queues: self.queues.clone(), - logical_compaction: self.logical_compaction.clone(), - physical_compaction: self.physical_compaction.clone(), - operator: self.operator.clone(), - logging: self.logging.clone(), - temp_antichain: Antichain::new(), } } } -impl Drop for TraceAgent { - fn drop(&mut self) { - - if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 } - ); - } - - // decrement borrow counts to remove all holds - let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow()); - self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); - } -} +// Drop is handled by TraceAgentInner's Drop impl. diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index a894d5486..436853f91 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -37,6 +37,7 @@ use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; use super::TraceAgent; +use super::agent::TraceAgentInner; /// An arranged collection of `(K,V)` values. /// @@ -360,38 +361,44 @@ where // held by the batcher, which may prevents the operator from sending an // empty batch. - let mut reader: Option> = None; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; - // fabricate a data-parallel operator using the `unary_notify` pattern. - let reader_ref = &mut reader; let scope = stream.scope(); - let stream = stream.unary_frontier(pact, name, move |_capability, info| { + let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); + let operator_info = builder.operator_info(); - // Acquire a logger for arrange events. - let logger = scope.logger_for::("differential/arrange").map(Into::into); + let mut input = builder.new_input(stream, pact); + builder.set_notify_for(0, timely::progress::operate::FrontierInterest::Always); + let (mut output, stream) = builder.new_output(); - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Ba::new(logger.clone(), info.global_id); + // Acquire a logger for arrange events. + let logger = scope.logger_for::("differential/arrange").map(Into::into); - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); + // Where we will deposit received updates, and from which we extract batches. + let mut batcher = Ba::new(logger.clone(), operator_info.global_id); - let activator = Some(scope.activator_for(info.address.clone())); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); - } + let activator = Some(scope.activator_for(operator_info.address.clone())); + let mut empty_trace = Tr::new(operator_info.clone(), logger.clone(), activator); + // If there is default exertion logic set, install it. + if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); + } + + let (trace, mut writer) = TraceAgent::new(empty_trace, operator_info, logger); - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + builder.build(move |_capabilities| { - *reader_ref = Some(reader_local); + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); // Initialize to the minimal input frontier. let mut prev_frontier = Antichain::from_elem(::minimum()); - move |(input, frontier), output| { + move |frontiers| { + + let frontier = &frontiers[0]; + let mut output = output.activate(); // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. // We don't have to keep all capabilities, but we need to be able to form output messages @@ -453,7 +460,7 @@ where writer.insert(batch.clone(), Some(capability.time().clone())); // send the batch to downstream consumers, empty or not. - output.session(&capabilities.elements()[index]).give(batch); + output.give(&capabilities.elements()[index], &mut vec![batch]); } } @@ -488,5 +495,121 @@ where } }); - Arranged { stream, trace: reader.unwrap() } + Arranged { stream, trace } +} + +/// Arranges a stream of updates into a private trace, without shared queue distribution. +/// +/// This variant uses `TraceAgentInner` and `TraceWriterInner` directly, avoiding the +/// overhead of queue management for listeners. The operator uses `FrontierInterest::IfCapability`, +/// meaning it will only be notified of frontier changes while it holds capabilities. +pub fn arrange_private(stream: Stream, pact: P, name: &str) -> Arranged> +where + G: Scope, + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + + let scope = stream.scope(); + + let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); + let operator_info = builder.operator_info(); + + let mut input = builder.new_input(stream, pact); + builder.set_notify_for(0, timely::progress::operate::FrontierInterest::IfCapability); + let (mut output, stream) = builder.new_output(); + + // Acquire a logger for arrange events. + let logger = scope.logger_for::("differential/arrange").map(Into::into); + + // Where we will deposit received updates, and from which we extract batches. + let mut batcher = Ba::new(logger.clone(), operator_info.global_id); + + let activator = Some(scope.activator_for(operator_info.address.clone())); + let mut empty_trace = Tr::new(operator_info.clone(), logger.clone(), activator); + // If there is default exertion logic set, install it. + if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); + } + + let (trace, mut writer) = TraceAgentInner::new(empty_trace, operator_info, logger); + + builder.build(move |_capabilities| { + + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); + + // Initialize to the minimal input frontier. + let mut prev_frontier = Antichain::from_elem(::minimum()); + + move |frontiers| { + + let frontier = &frontiers[0]; + let mut output = output.activate(); + + input.for_each(|cap, data| { + capabilities.insert(cap.retain(0)); + batcher.push_container(data); + }); + + // Assert that the frontier never regresses. + assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier())); + + if prev_frontier.borrow() != frontier.frontier() { + + if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) { + + let mut upper = Antichain::new(); + + for (index, capability) in capabilities.elements().iter().enumerate() { + + if !frontier.less_equal(capability.time()) { + + upper.clear(); + for time in frontier.frontier().iter() { + upper.insert(time.clone()); + } + for other_capability in &capabilities.elements()[(index + 1) .. ] { + upper.insert(other_capability.time().clone()); + } + + let batch = batcher.seal::(upper.clone()); + + writer.insert(batch.clone()); + + // send the batch to downstream consumers, empty or not. + output.give(&capabilities.elements()[index], &mut vec![batch]); + } + } + + let mut new_capabilities = Antichain::new(); + for time in batcher.frontier().iter() { + if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { + new_capabilities.insert(capability.delayed(time)); + } + else { + panic!("failed to find capability"); + } + } + + capabilities = new_capabilities; + } + else { + // Announce progress updates, even without data. + let _batch = batcher.seal::(frontier.frontier().to_owned()); + writer.seal(frontier.frontier().to_owned()); + } + + prev_frontier.clear(); + prev_frontier.extend(frontier.frontier().iter().cloned()); + } + + writer.exert(); + } + }); + + Arranged { stream, trace } } diff --git a/differential-dataflow/src/operators/arrange/writer.rs b/differential-dataflow/src/operators/arrange/writer.rs index 8df11690c..6b3be7c6b 100644 --- a/differential-dataflow/src/operators/arrange/writer.rs +++ b/differential-dataflow/src/operators/arrange/writer.rs @@ -15,15 +15,76 @@ use crate::trace::wrappers::rc::TraceBox; use super::TraceAgentQueueWriter; use super::TraceReplayInstruction; +/// Inner write endpoint that maintains the frontier and shared trace. +pub struct TraceWriterInner { + /// Current upper limit. + upper: Antichain, + /// Shared trace, possibly absent (due to weakness). + trace: Weak>>, +} + +impl TraceWriterInner { + /// Creates a new `TraceWriterInner`. + pub fn new(upper: Vec, trace: Weak>>) -> Self { + let mut temp = Antichain::new(); + temp.extend(upper); + Self { upper: temp, trace } + } + + /// The current upper frontier. + pub fn upper(&self) -> &Antichain { + &self.upper + } + + /// Exerts merge effort, even without additional updates. + pub fn exert(&mut self) { + if let Some(trace) = self.trace.upgrade() { + trace.borrow_mut().trace.exert(); + } + } + + /// Advances the trace by `batch`. + /// + /// Asserts the batch is a valid continuation of the current frontier, + /// updates the upper frontier, and pushes the batch into the shared trace. + pub fn insert(&mut self, batch: Tr::Batch) { + + // Something is wrong if not a sequence. + if !(&self.upper == batch.lower()) { + println!("{:?} vs {:?}", self.upper, batch.lower()); + } + assert!(&self.upper == batch.lower()); + assert!(batch.lower() != batch.upper()); + + self.upper.clone_from(batch.upper()); + + // push data to the trace, if it still exists. + if let Some(trace) = self.trace.upgrade() { + trace.borrow_mut().trace.insert(batch); + } + } + + /// Inserts an empty batch up to `upper`. + pub fn seal(&mut self, upper: Antichain) { + if self.upper != upper { + self.insert(Tr::Batch::empty(self.upper.clone(), upper)); + } + } +} + +impl Drop for TraceWriterInner { + fn drop(&mut self) { + self.seal(Antichain::new()) + } +} + /// Write endpoint for a sequence of batches. /// /// A `TraceWriter` accepts a sequence of batches and distributes them /// to both a shared trace and to a sequence of private queues. pub struct TraceWriter { - /// Current upper limit. - upper: Antichain, - /// Shared trace, possibly absent (due to weakness). - trace: Weak>>, + /// Inner writer maintaining the frontier and shared trace. + inner: TraceWriterInner, /// A sequence of private queues into which batches are written. queues: Rc>>>, } @@ -36,16 +97,21 @@ impl TraceWriter { queues: Rc>>> ) -> Self { - let mut temp = Antichain::new(); - temp.extend(upper); - Self { upper: temp, trace, queues } + Self { inner: TraceWriterInner::new(upper, trace), queues } + } + + /// Creates a new `TraceWriter` from an existing `TraceWriterInner` and queues. + pub fn from_inner( + inner: TraceWriterInner, + queues: Rc>>> + ) -> Self + { + Self { inner, queues } } /// Exerts merge effort, even without additional updates. pub fn exert(&mut self) { - if let Some(trace) = self.trace.upgrade() { - trace.borrow_mut().trace.exert(); - } + self.inner.exert(); } /// Advances the trace by `batch`. @@ -55,15 +121,6 @@ impl TraceWriter { /// batch and which is suitable for use as a capability. pub fn insert(&mut self, batch: Tr::Batch, hint: Option) { - // Something is wrong if not a sequence. - if !(&self.upper == batch.lower()) { - println!("{:?} vs {:?}", self.upper, batch.lower()); - } - assert!(&self.upper == batch.lower()); - assert!(batch.lower() != batch.upper()); - - self.upper.clone_from(batch.upper()); - // push information to each listener that still exists. let mut borrow = self.queues.borrow_mut(); for queue in borrow.iter_mut() { @@ -75,17 +132,14 @@ impl TraceWriter { } borrow.retain(|w| w.upgrade().is_some()); - // push data to the trace, if it still exists. - if let Some(trace) = self.trace.upgrade() { - trace.borrow_mut().trace.insert(batch); - } - + // push data to the trace and update the frontier. + self.inner.insert(batch); } /// Inserts an empty batch up to `upper`. pub fn seal(&mut self, upper: Antichain) { - if self.upper != upper { - self.insert(Tr::Batch::empty(self.upper.clone(), upper), None); + if *self.inner.upper() != upper { + self.insert(Tr::Batch::empty(self.inner.upper().clone(), upper), None); } } } From accfd46fc0d8f679bc1f6c1d119d3f40c4b3f95d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 18 Mar 2026 14:08:30 -0400 Subject: [PATCH 3/7] Correct timely reference --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 78c9ab1a7..cbabb3087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,8 @@ rust-version = "1.86" differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.20.0" } #timely = { version = "0.27", default-features = false } columnar = { version = "0.11", default-features = false } -#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } -timely = { path = "../timely-dataflow/timely/", default-features = false } +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +#timely = { path = "../timely-dataflow/timely/", default-features = false } [workspace.lints.clippy] type_complexity = "allow" From 272deb7247f54fee33679b01307f369607e9a01e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 18 Mar 2026 14:23:25 -0400 Subject: [PATCH 4/7] Make Rc::clone intent explicit --- differential-dataflow/src/operators/arrange/agent.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index caa103631..1b250e469 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -92,7 +92,7 @@ impl TraceAgentInner { let reader = TraceAgentInner { logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), - trace: trace.clone(), + trace: Rc::clone(&trace), temp_antichain: Antichain::new(), operator, logging, @@ -137,7 +137,7 @@ impl Clone for TraceAgentInner { self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); TraceAgentInner { - trace: self.trace.clone(), + trace: Rc::clone(&self.trace), logical_compaction: self.logical_compaction.clone(), physical_compaction: self.physical_compaction.clone(), operator: self.operator.clone(), From cf30c2c256e1f472332adfed58c3138983500bd3 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Mar 2026 13:20:16 -0400 Subject: [PATCH 5/7] Refactor complex logic --- .../src/operators/arrange/agent.rs | 5 + .../src/operators/arrange/arrangement.rs | 127 ++++-------------- 2 files changed, 28 insertions(+), 104 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 1b250e469..102c3cbe7 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -269,6 +269,11 @@ impl TraceAgent { pub fn trace_box_unstable(&self) -> Rc>> { self.inner.trace_box_unstable() } + + /// Extracts the inner `TraceAgentInner`, discarding queue management. + pub fn into_inner(self) -> TraceAgentInner { + self.inner + } } impl TraceAgent { diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 436853f91..43f7b2ec6 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -339,6 +339,22 @@ where /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). pub fn arrange_core(stream: Stream, pact: P, name: &str) -> Arranged> +where + G: Scope, + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + arrange_core_with_interest::(stream, pact, name, timely::progress::operate::FrontierInterest::Always) +} + +/// Arranges a stream of updates by a key, configured with a name, a parallelization contract, +/// and a frontier interest policy. +/// +/// This is the general form of `arrange_core`, which additionally accepts a `FrontierInterest` +/// parameter controlling when the operator is notified of frontier changes. +pub fn arrange_core_with_interest(stream: Stream, pact: P, name: &str, interest: timely::progress::operate::FrontierInterest) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -369,7 +385,7 @@ where let operator_info = builder.operator_info(); let mut input = builder.new_input(stream, pact); - builder.set_notify_for(0, timely::progress::operate::FrontierInterest::Always); + builder.set_notify_for(0, interest); let (mut output, stream) = builder.new_output(); // Acquire a logger for arrange events. @@ -500,9 +516,8 @@ where /// Arranges a stream of updates into a private trace, without shared queue distribution. /// -/// This variant uses `TraceAgentInner` and `TraceWriterInner` directly, avoiding the -/// overhead of queue management for listeners. The operator uses `FrontierInterest::IfCapability`, -/// meaning it will only be notified of frontier changes while it holds capabilities. +/// This variant delegates to `arrange_core_with_interest` using `FrontierInterest::IfCapability`, +/// then extracts the inner `TraceAgentInner`, discarding the queue management layer. pub fn arrange_private(stream: Stream, pact: P, name: &str) -> Arranged> where G: Scope, @@ -511,105 +526,9 @@ where Bu: Builder, Tr: Trace+'static, { - use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; - - let scope = stream.scope(); - - let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); - let operator_info = builder.operator_info(); - - let mut input = builder.new_input(stream, pact); - builder.set_notify_for(0, timely::progress::operate::FrontierInterest::IfCapability); - let (mut output, stream) = builder.new_output(); - - // Acquire a logger for arrange events. - let logger = scope.logger_for::("differential/arrange").map(Into::into); - - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Ba::new(logger.clone(), operator_info.global_id); - - let activator = Some(scope.activator_for(operator_info.address.clone())); - let mut empty_trace = Tr::new(operator_info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); + let arranged = arrange_core_with_interest::(stream, pact, name, timely::progress::operate::FrontierInterest::IfCapability); + Arranged { + stream: arranged.stream, + trace: arranged.trace.into_inner(), } - - let (trace, mut writer) = TraceAgentInner::new(empty_trace, operator_info, logger); - - builder.build(move |_capabilities| { - - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); - - // Initialize to the minimal input frontier. - let mut prev_frontier = Antichain::from_elem(::minimum()); - - move |frontiers| { - - let frontier = &frontiers[0]; - let mut output = output.activate(); - - input.for_each(|cap, data| { - capabilities.insert(cap.retain(0)); - batcher.push_container(data); - }); - - // Assert that the frontier never regresses. - assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier())); - - if prev_frontier.borrow() != frontier.frontier() { - - if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) { - - let mut upper = Antichain::new(); - - for (index, capability) in capabilities.elements().iter().enumerate() { - - if !frontier.less_equal(capability.time()) { - - upper.clear(); - for time in frontier.frontier().iter() { - upper.insert(time.clone()); - } - for other_capability in &capabilities.elements()[(index + 1) .. ] { - upper.insert(other_capability.time().clone()); - } - - let batch = batcher.seal::(upper.clone()); - - writer.insert(batch.clone()); - - // send the batch to downstream consumers, empty or not. - output.give(&capabilities.elements()[index], &mut vec![batch]); - } - } - - let mut new_capabilities = Antichain::new(); - for time in batcher.frontier().iter() { - if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { - new_capabilities.insert(capability.delayed(time)); - } - else { - panic!("failed to find capability"); - } - } - - capabilities = new_capabilities; - } - else { - // Announce progress updates, even without data. - let _batch = batcher.seal::(frontier.frontier().to_owned()); - writer.seal(frontier.frontier().to_owned()); - } - - prev_frontier.clear(); - prev_frontier.extend(frontier.frontier().iter().cloned()); - } - - writer.exert(); - } - }); - - Arranged { stream, trace } } From d8265f4274a0f411c413d74d6b77affe375db696 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Mar 2026 13:40:05 -0400 Subject: [PATCH 6/7] Rename to _inter and _intra --- differential-dataflow/examples/columnar.rs | 6 +- .../examples/event_driven.rs | 6 +- differential-dataflow/src/collection.rs | 4 +- .../src/operators/arrange/arrangement.rs | 58 +++++++++++-------- 4 files changed, 42 insertions(+), 32 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 9a97922f1..6254802d7 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -4,7 +4,7 @@ use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::InputHandle; use timely::dataflow::ProbeHandle; -use differential_dataflow::operators::arrange::arrangement::arrange_core; +use differential_dataflow::operators::arrange::arrangement::arrange_inter; use mimalloc::MiMalloc; @@ -39,8 +39,8 @@ fn main() { let data_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; let keys_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; - let data = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(data, data_pact, "Data"); - let keys = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(keys, keys_pact, "Keys"); + let data = arrange_inter::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(data, data_pact, "Data"); + let keys = arrange_inter::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(keys, keys_pact, "Keys"); keys.join_core(data, |_k, (), ()| { Option::<()>::None }) .probe_with(&mut probe); diff --git a/differential-dataflow/examples/event_driven.rs b/differential-dataflow/examples/event_driven.rs index 4325ddee3..e2ca3fa32 100644 --- a/differential-dataflow/examples/event_driven.rs +++ b/differential-dataflow/examples/event_driven.rs @@ -25,10 +25,10 @@ fn main() { let (input, stream) = scope.new_input(); let stream = scope.region(|inner| { let mut stream = stream.enter(inner); - use differential_dataflow::operators::arrange::arrangement::{arrange_private, arrange_core}; + use differential_dataflow::operators::arrange::arrangement::{arrange_intra, arrange_inter}; use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; - stream = if local { arrange_private::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner } - else { arrange_core::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner }; + stream = if local { arrange_intra::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner } + else { arrange_inter::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(stream, timely::dataflow::channels::pact::Pipeline, "test").as_collection(|k: &i32,v: &i32| (*k, *v)).inner }; for _step in 0 .. length { stream = stream.filter(|_| false); } diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index c13227f2c..492457bfa 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -1023,7 +1023,7 @@ pub mod vec { Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(self.inner, exchange, name) + crate::operators::arrange::arrangement::arrange_inter::<_, _, Ba, Bu, _>(self.inner, exchange, name) } } @@ -1038,7 +1038,7 @@ pub mod vec { Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) + crate::operators::arrange::arrangement::arrange_inter::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) } } diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 43f7b2ec6..2064cdfc8 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -338,7 +338,11 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: Stream, pact: P, name: &str) -> Arranged> +/// +/// Unlike `arrange_intra`, this trace can be imported into other dataflows. To provide this ability +/// it must be continually scheduled even in the absence of input updates, in order to communicate +/// its input frontier to the other dataflow. This can result in more scheduling overhead. +pub fn arrange_inter(stream: Stream, pact: P, name: &str) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -346,15 +350,40 @@ where Bu: Builder, Tr: Trace+'static, { - arrange_core_with_interest::(stream, pact, name, timely::progress::operate::FrontierInterest::Always) + arrange_core::(stream, pact, name, timely::progress::operate::FrontierInterest::Always) +} + +/// Arranges a stream of updates into a private trace, without shared queue distribution. +/// +/// This operator arranges a stream of values into a shared trace, whose contents it maintains. +/// It uses the supplied parallelization contract to distribute the data, which does not need to +/// be consistently by key (though this is the most common). +/// +/// Unlike `arrange_inter`, this trace cannot be shared across dataflows, only within dataflows. +/// It lacks the `import` method that would allow this. By so doing, the operator does not need +/// to be continually rescheduled in the absence of input updates, which can reduce the scheduling +/// load. +pub fn arrange_intra(stream: Stream, pact: P, name: &str) -> Arranged> +where + G: Scope, + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + let arranged = arrange_core::(stream, pact, name, timely::progress::operate::FrontierInterest::IfCapability); + Arranged { + stream: arranged.stream, + trace: arranged.trace.into_inner(), + } } /// Arranges a stream of updates by a key, configured with a name, a parallelization contract, /// and a frontier interest policy. /// -/// This is the general form of `arrange_core`, which additionally accepts a `FrontierInterest` -/// parameter controlling when the operator is notified of frontier changes. -pub fn arrange_core_with_interest(stream: Stream, pact: P, name: &str, interest: timely::progress::operate::FrontierInterest) -> Arranged> +/// This is the general form that both `arrange_inter` and `arrange_intra` delegate to. +/// The `FrontierInterest` parameter controls when the operator is notified of frontier changes. +pub fn arrange_core(stream: Stream, pact: P, name: &str, interest: timely::progress::operate::FrontierInterest) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -513,22 +542,3 @@ where Arranged { stream, trace } } - -/// Arranges a stream of updates into a private trace, without shared queue distribution. -/// -/// This variant delegates to `arrange_core_with_interest` using `FrontierInterest::IfCapability`, -/// then extracts the inner `TraceAgentInner`, discarding the queue management layer. -pub fn arrange_private(stream: Stream, pact: P, name: &str) -> Arranged> -where - G: Scope, - P: ParallelizationContract, - Ba: Batcher + 'static, - Bu: Builder, - Tr: Trace+'static, -{ - let arranged = arrange_core_with_interest::(stream, pact, name, timely::progress::operate::FrontierInterest::IfCapability); - Arranged { - stream: arranged.stream, - trace: arranged.trace.into_inner(), - } -} From 836747385ec9f1c0478de642efc5526a6758f1bf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Mar 2026 14:46:44 -0400 Subject: [PATCH 7/7] Rename to TraceInter and TraceIntra --- diagnostics/src/logging.rs | 6 +- differential-dataflow/examples/arrange.rs | 2 +- differential-dataflow/examples/graspan.rs | 14 ++-- differential-dataflow/src/collection.rs | 44 +++++++++--- .../src/operators/arrange/agent.rs | 72 ++++++++++--------- .../src/operators/arrange/arrangement.rs | 24 +++---- .../src/operators/arrange/mod.rs | 6 +- .../src/operators/arrange/upsert.rs | 8 +-- .../src/operators/arrange/writer.rs | 8 +-- differential-dataflow/src/operators/reduce.rs | 6 +- differential-dataflow/tests/import.rs | 8 +-- dogsdogsdogs/src/lib.rs | 12 ++-- experiments/src/bin/deals-interactive.rs | 4 +- experiments/src/bin/deals.rs | 4 +- experiments/src/bin/graphs-interactive-alt.rs | 4 +- .../src/bin/graphs-interactive-neu-zwei.rs | 4 +- experiments/src/bin/graphs-interactive-neu.rs | 4 +- experiments/src/bin/graphs-interactive.rs | 4 +- experiments/src/bin/graphs-static.rs | 8 +-- experiments/src/bin/graphs.rs | 8 +-- interactive/src/command.rs | 4 +- interactive/src/logging.rs | 22 +++--- interactive/src/manager.rs | 6 +- interactive/src/plan/join.rs | 4 +- interactive/src/plan/mod.rs | 2 +- interactive/src/plan/sfw.rs | 4 +- server/dataflows/random_graph/src/lib.rs | 2 +- server/src/lib.rs | 4 +- tpchlike/src/lib.rs | 32 ++++----- 29 files changed, 177 insertions(+), 153 deletions(-) diff --git a/diagnostics/src/logging.rs b/diagnostics/src/logging.rs index b47721e62..39671f62b 100644 --- a/diagnostics/src/logging.rs +++ b/diagnostics/src/logging.rs @@ -30,7 +30,7 @@ use std::time::{Duration, Instant}; use differential_dataflow::collection::concatenate; use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; use differential_dataflow::{AsCollection, VecCollection}; @@ -166,9 +166,9 @@ pub type DiagnosticEvent = Event; // ============================================================================ /// A key-value trace: key K, value V, time Duration, diff i64. -type ValTrace = TraceAgent>; +type ValTrace = TraceIntra>; /// A key-only trace: key K, time Duration, diff i64. -type KeyTrace = TraceAgent>; +type KeyTrace = TraceIntra>; /// Trace handles for timely logging arrangements. pub struct TimelyTraces { diff --git a/differential-dataflow/examples/arrange.rs b/differential-dataflow/examples/arrange.rs index d3f5abcec..87f73e648 100644 --- a/differential-dataflow/examples/arrange.rs +++ b/differential-dataflow/examples/arrange.rs @@ -87,7 +87,7 @@ fn main() { }) .probe_with(&mut probe) .as_collection() - .arrange_by_key() + .arrange_by_key_inter() // .arrange::() .trace }); diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 79c7fd184..98c5d308b 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -65,10 +65,10 @@ pub struct Query { } use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; -use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use differential_dataflow::operators::arrange::{Arranged, TraceInter}; -type TraceKeyHandle = TraceAgent>; -type TraceValHandle = TraceAgent>; +type TraceKeyHandle = TraceInter>; +type TraceValHandle = TraceInter>; type Arrange = Arranged::Timestamp, R>>; /// An evolving set of edges. @@ -115,14 +115,14 @@ impl> EdgeVariable { /// The collection arranged in the forward direction. pub fn forward(&mut self) -> &Arrange { if self.forward.is_none() { - self.forward = Some(self.collection.clone().arrange_by_key()); + self.forward = Some(self.collection.clone().arrange_by_key_inter()); } self.forward.as_ref().unwrap() } /// The collection arranged in the reverse direction. pub fn reverse(&mut self) -> &Arrange { if self.reverse.is_none() { - self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key()); + self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key_inter()); } self.reverse.as_ref().unwrap() } @@ -171,7 +171,7 @@ impl Query { // create variables and result handles for each named relation. for (name, (input, collection)) in input_map { let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1)); - let trace = edge_variable.collection.clone().leave().arrange_by_self().trace; + let trace = edge_variable.collection.clone().leave().arrange_by_self_inter().trace; result_map.insert(name.clone(), RelationHandles { input, trace }); variable_map.insert(name.clone(), edge_variable); } @@ -200,7 +200,7 @@ impl Query { transposed = transposed .join_core(to_join, |_k,&x,&y| Some((y,x))) - .arrange_by_key(); + .arrange_by_key_inter(); } // Reverse the direction before adding it as a production. diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 492457bfa..b81a890c2 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -705,7 +705,7 @@ pub mod vec { } use crate::trace::{Trace, Builder}; - use crate::operators::arrange::{Arranged, TraceAgent}; + use crate::operators::arrange::{Arranged, TraceInter, TraceIntra}; impl Collection where @@ -780,7 +780,7 @@ pub mod vec { /// .trace; /// }); /// ``` - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where T2: for<'a> Trace= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, @@ -798,7 +798,7 @@ pub mod vec { /// Unlike `reduce_arranged`, this method may be called with an empty `input`, /// and it may not be safe to index into the first element. /// At least one of the two collections will be non-empty. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where V: Clone+'static, T2: for<'a> Trace=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static, @@ -1016,14 +1016,14 @@ pub mod vec { V: crate::ExchangeData, R: crate::ExchangeData + Semigroup, { - fn arrange_named(self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged> where Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_inter::<_, _, Ba, Bu, _>(self.inner, exchange, name) + crate::operators::arrange::arrangement::arrange_intra::<_, _, Ba, Bu, _>(self.inner, exchange, name) } } @@ -1031,14 +1031,14 @@ pub mod vec { where G: Scope, { - fn arrange_named(self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged> where Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_inter::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) + crate::operators::arrange::arrangement::arrange_intra::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) } } @@ -1052,14 +1052,25 @@ pub mod vec { /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// This trace is current for all times completed by the output stream, which can be used to /// safely identify the stable times and values in the trace. - pub fn arrange_by_key(self) -> Arranged>> { + pub fn arrange_by_key(self) -> Arranged>> { self.arrange_by_key_named("ArrangeByKey") } /// As `arrange_by_key` but with the ability to name the arrangement. - pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } + + /// As `arrange_by_key` but producing a `TraceInter` that can be imported into other dataflows. + pub fn arrange_by_key_inter(self) -> Arranged>> { + self.arrange_by_key_inter_named("ArrangeByKey") + } + + /// As `arrange_by_key_inter` but with the ability to name the arrangement. + pub fn arrange_by_key_inter_named(self, name: &str) -> Arranged>> { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_inter::<_,_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(self.inner, exchange, name) + } } impl Collection @@ -1071,15 +1082,26 @@ pub mod vec { /// This operator arranges a collection of records into a shared trace, whose contents it maintains. /// This trace is current for all times complete in the output stream, which can be used to safely /// identify the stable times and values in the trace. - pub fn arrange_by_self(self) -> Arranged>> { + pub fn arrange_by_self(self) -> Arranged>> { self.arrange_by_self_named("ArrangeBySelf") } /// As `arrange_by_self` but with the ability to name the arrangement. - pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { self.map(|k| (k, ())) .arrange_named::,KeyBuilder<_,_,_>,_>(name) } + + /// As `arrange_by_self` but producing a `TraceInter` that can be imported into other dataflows. + pub fn arrange_by_self_inter(self) -> Arranged>> { + self.arrange_by_self_inter_named("ArrangeBySelf") + } + + /// As `arrange_by_self_inter` but with the ability to name the arrangement. + pub fn arrange_by_self_inter_named(self, name: &str) -> Arranged>> { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_inter::<_,_,KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(self.map(|k| (k, ())).inner, exchange, name) + } } impl Collection diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 102c3cbe7..540b814d4 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -15,18 +15,19 @@ use crate::trace::wrappers::rc::TraceBox; use timely::scheduling::Activator; -use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged}; +use super::{TraceWriter, TraceInterQueueWriter, TraceInterQueueReader, Arranged}; use super::TraceReplayInstruction; use super::writer::TraceWriterInner; use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier}; -/// Inner agent that maintains the shared trace and compaction state. +/// Trace reader that can share a trace within a dataflow. /// -/// A `TraceAgentInner` holds the shared trace and tracks logical and physical -/// compaction frontiers, without any queue management. -pub struct TraceAgentInner { +/// Unlike `TraceInter`, this trace reader cannot be shared across +/// dataflows, but in exchange doesn't need to be scheduled as its +/// frontiers advance, in the absence of updates. +pub struct TraceIntra { trace: Rc>>, logical_compaction: Antichain, physical_compaction: Antichain, @@ -37,11 +38,11 @@ pub struct TraceAgentInner { } use crate::trace::implementations::WithLayout; -impl WithLayout for TraceAgentInner { +impl WithLayout for TraceIntra { type Layout = Tr::Layout; } -impl TraceReader for TraceAgentInner { +impl TraceReader for TraceIntra { type Batch = Tr::Batch; type Storage = Tr::Storage; @@ -75,7 +76,7 @@ impl TraceReader for TraceAgentInner { fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } } -impl TraceAgentInner { +impl TraceIntra { /// Creates a new inner agent from a trace reader, returning the agent and a matching writer. pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriterInner) where @@ -89,7 +90,7 @@ impl TraceAgentInner { ); } - let reader = TraceAgentInner { + let reader = TraceIntra { logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), trace: Rc::clone(&trace), @@ -122,7 +123,7 @@ impl TraceAgentInner { } } -impl Clone for TraceAgentInner { +impl Clone for TraceIntra { fn clone(&self) -> Self { if let Some(logging) = &self.logging { @@ -136,7 +137,7 @@ impl Clone for TraceAgentInner { self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); - TraceAgentInner { + TraceIntra { trace: Rc::clone(&self.trace), logical_compaction: self.logical_compaction.clone(), physical_compaction: self.physical_compaction.clone(), @@ -147,7 +148,7 @@ impl Clone for TraceAgentInner { } } -impl Drop for TraceAgentInner { +impl Drop for TraceIntra { fn drop(&mut self) { if let Some(logging) = &self.logging { @@ -163,22 +164,23 @@ impl Drop for TraceAgentInner { } } -/// A `TraceReader` wrapper which can be imported into other dataflows. +/// Trace reader that can both share a trace within a dataflow and be imported into other dataflows. /// -/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted -/// from the dataflow in which it was defined, and imported into other dataflows. -pub struct TraceAgent { +/// Unlike `TraceIntra`, this trace reader can be shared across dataflows, +/// but in exchange it must be scheduled whenever its frontiers advance. +/// This can increase the scheduling load. +pub struct TraceInter { /// Inner agent maintaining the shared trace and compaction state. - inner: TraceAgentInner, + inner: TraceIntra, /// A sequence of private queues into which batches are written. - queues: Weak>>>, + queues: Weak>>>, } -impl WithLayout for TraceAgent { +impl WithLayout for TraceInter { type Layout = Tr::Layout; } -impl TraceReader for TraceAgent { +impl TraceReader for TraceInter { type Batch = Tr::Batch; type Storage = Tr::Storage; @@ -202,16 +204,16 @@ impl TraceReader for TraceAgent { fn map_batches(&self, f: F) { self.inner.map_batches(f) } } -impl TraceAgent { +impl TraceInter { /// Creates a new agent from a trace reader. pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriter) where Tr: Trace, { let queues = Rc::new(RefCell::new(Vec::new())); - let (inner, writer_inner) = TraceAgentInner::new(trace, operator, logging); + let (inner, writer_inner) = TraceIntra::new(trace, operator, logging); - let reader = TraceAgent { + let reader = TraceInter { inner, queues: Rc::downgrade(&queues), }; @@ -226,7 +228,7 @@ impl TraceAgent { /// The queue is first populated with existing batches from the trace, /// The queue will be immediately populated with existing historical batches from the trace, and until the reference /// is dropped the queue will receive new batches as produced by the source `arrange` operator. - pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader + pub fn new_listener(&mut self, activator: Activator) -> TraceInterQueueReader { // create a new queue for progress and batch information. let mut new_queue = VecDeque::new(); @@ -270,13 +272,13 @@ impl TraceAgent { self.inner.trace_box_unstable() } - /// Extracts the inner `TraceAgentInner`, discarding queue management. - pub fn into_inner(self) -> TraceAgentInner { + /// Extracts the inner `TraceIntra`, discarding queue management. + pub fn into_inner(self) -> TraceIntra { self.inner } } -impl TraceAgent { +impl TraceInter { /// Copies an existing collection into the supplied scope. /// /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange` @@ -328,7 +330,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import(&mut self, scope: &G) -> Arranged> + pub fn import(&mut self, scope: &G) -> Arranged> where G: Scope, { @@ -336,7 +338,7 @@ impl TraceAgent { } /// Same as `import`, but allows to name the source. - pub fn import_named(&mut self, scope: &G, name: &str) -> Arranged> + pub fn import_named(&mut self, scope: &G, name: &str) -> Arranged> where G: Scope, { @@ -391,7 +393,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_core(&mut self, scope: &G, name: &str) -> (Arranged>, ShutdownButton>) + pub fn import_core(&mut self, scope: &G, name: &str) -> (Arranged>, ShutdownButton>) where G: Scope, { @@ -506,7 +508,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_frontier(&mut self, scope: &G, name: &str) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier(&mut self, scope: &G, name: &str) -> (Arranged>>, ShutdownButton>) where G: Scope, Tr: TraceReader, @@ -524,7 +526,7 @@ impl TraceAgent { /// /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty /// frontier indicates the end of times. - pub fn import_frontier_core(&mut self, scope: &G, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier_core(&mut self, scope: &G, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) where G: Scope, Tr: TraceReader, @@ -627,13 +629,13 @@ impl Drop for ShutdownDeadmans { } } -impl Clone for TraceAgent { +impl Clone for TraceInter { fn clone(&self) -> Self { - TraceAgent { + TraceInter { inner: self.inner.clone(), queues: self.queues.clone(), } } } -// Drop is handled by TraceAgentInner's Drop impl. +// Drop is handled by TraceIntra's Drop impl. diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 2064cdfc8..8af8793ca 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -36,8 +36,8 @@ use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; -use super::TraceAgent; -use super::agent::TraceAgentInner; +use super::TraceInter; +use super::agent::TraceIntra; /// An arranged collection of `(K,V)` values. /// @@ -250,7 +250,7 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where T1: TraceReader, T2: for<'a> Trace< @@ -273,7 +273,7 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where T1: TraceReader, T2: for<'a> Trace< @@ -314,8 +314,8 @@ pub trait Arrange : Sized where G: Scope, { - /// Arranges updates into a shared trace. - fn arrange(self) -> Arranged> + /// Arranges updates into a trace local to this dataflow. + fn arrange(self) -> Arranged> where Ba: Batcher + 'static, Bu: Builder, @@ -324,8 +324,8 @@ where self.arrange_named::("Arrange") } - /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(self, name: &str) -> Arranged> + /// Arranges updates into a trace local to this dataflow, with a supplied name. + fn arrange_named(self, name: &str) -> Arranged> where Ba: Batcher + 'static, Bu: Builder, @@ -342,7 +342,7 @@ where /// Unlike `arrange_intra`, this trace can be imported into other dataflows. To provide this ability /// it must be continually scheduled even in the absence of input updates, in order to communicate /// its input frontier to the other dataflow. This can result in more scheduling overhead. -pub fn arrange_inter(stream: Stream, pact: P, name: &str) -> Arranged> +pub fn arrange_inter(stream: Stream, pact: P, name: &str) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -363,7 +363,7 @@ where /// It lacks the `import` method that would allow this. By so doing, the operator does not need /// to be continually rescheduled in the absence of input updates, which can reduce the scheduling /// load. -pub fn arrange_intra(stream: Stream, pact: P, name: &str) -> Arranged> +pub fn arrange_intra(stream: Stream, pact: P, name: &str) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -383,7 +383,7 @@ where /// /// This is the general form that both `arrange_inter` and `arrange_intra` delegate to. /// The `FrontierInterest` parameter controls when the operator is notified of frontier changes. -pub fn arrange_core(stream: Stream, pact: P, name: &str, interest: timely::progress::operate::FrontierInterest) -> Arranged> +pub fn arrange_core(stream: Stream, pact: P, name: &str, interest: timely::progress::operate::FrontierInterest) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -430,7 +430,7 @@ where empty_trace.set_exert_logic(exert_logic); } - let (trace, mut writer) = TraceAgent::new(empty_trace, operator_info, logger); + let (trace, mut writer) = TraceInter::new(empty_trace, operator_info, logger); builder.build(move |_capabilities| { diff --git a/differential-dataflow/src/operators/arrange/mod.rs b/differential-dataflow/src/operators/arrange/mod.rs index 70f9fb866..2a6bbad53 100644 --- a/differential-dataflow/src/operators/arrange/mod.rs +++ b/differential-dataflow/src/operators/arrange/mod.rs @@ -57,8 +57,8 @@ pub enum TraceReplayInstruction { // Short names for strongly and weakly owned activators and shared queues. type BatchQueue = VecDeque>; -type TraceAgentQueueReader = Rc<(Activator, RefCell>)>; -type TraceAgentQueueWriter = Weak<(Activator, RefCell>)>; +type TraceInterQueueReader = Rc<(Activator, RefCell>)>; +type TraceInterQueueWriter = Weak<(Activator, RefCell>)>; pub mod writer; pub mod agent; @@ -67,6 +67,6 @@ pub mod arrangement; pub mod upsert; pub use self::writer::TraceWriter; -pub use self::agent::{TraceAgent, ShutdownButton}; +pub use self::agent::{TraceInter, TraceIntra, ShutdownButton}; pub use self::arrangement::{Arranged, Arrange}; \ No newline at end of file diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index e9dbe9cdb..4e6eaa3f9 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -115,7 +115,7 @@ use crate::{ExchangeData, Hashable}; use crate::trace::implementations::containers::BatchContainer; -use super::TraceAgent; +use super::TraceInter; /// Arrange data from a stream of keyed upserts. /// @@ -130,7 +130,7 @@ use super::TraceAgent; pub fn arrange_from_upsert( stream: Stream, G::Timestamp)>>, name: &str, -) -> Arranged> +) -> Arranged> where G: Scope, Tr: for<'a> Trace< @@ -141,7 +141,7 @@ where >+'static, Bu: Builder, Output = Tr::Batch>, { - let mut reader: Option> = None; + let mut reader: Option> = None; // fabricate a data-parallel operator using the `unary_notify` pattern. let stream = { @@ -166,7 +166,7 @@ where empty_trace.set_exert_logic(exert_logic); } - let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + let (mut reader_local, mut writer) = TraceInter::new(empty_trace, info, logger); // Capture the reader outside the builder scope. *reader = Some(reader_local.clone()); diff --git a/differential-dataflow/src/operators/arrange/writer.rs b/differential-dataflow/src/operators/arrange/writer.rs index 6b3be7c6b..ba4820ec6 100644 --- a/differential-dataflow/src/operators/arrange/writer.rs +++ b/differential-dataflow/src/operators/arrange/writer.rs @@ -12,7 +12,7 @@ use crate::trace::{Trace, Batch, BatchReader}; use crate::trace::wrappers::rc::TraceBox; -use super::TraceAgentQueueWriter; +use super::TraceInterQueueWriter; use super::TraceReplayInstruction; /// Inner write endpoint that maintains the frontier and shared trace. @@ -86,7 +86,7 @@ pub struct TraceWriter { /// Inner writer maintaining the frontier and shared trace. inner: TraceWriterInner, /// A sequence of private queues into which batches are written. - queues: Rc>>>, + queues: Rc>>>, } impl TraceWriter { @@ -94,7 +94,7 @@ impl TraceWriter { pub fn new( upper: Vec, trace: Weak>>, - queues: Rc>>> + queues: Rc>>> ) -> Self { Self { inner: TraceWriterInner::new(upper, trace), queues } @@ -103,7 +103,7 @@ impl TraceWriter { /// Creates a new `TraceWriter` from an existing `TraceWriterInner` and queues. pub fn from_inner( inner: TraceWriterInner, - queues: Rc>>> + queues: Rc>>> ) -> Self { Self { inner, queues } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index fe3622d74..f27750b4f 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -16,7 +16,7 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; -use crate::operators::arrange::{Arranged, TraceAgent}; +use crate::operators::arrange::{Arranged, TraceInter}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; @@ -26,7 +26,7 @@ use crate::trace::TraceReader; /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, T1: TraceReader + Clone + 'static, @@ -56,7 +56,7 @@ where let mut source_trace = trace.trace.clone(); - let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); + let (mut output_reader, mut output_writer) = TraceInter::new(empty, operator_info, logger); // let mut output_trace = TraceRc::make_from(agent).0; *result_trace = Some(output_reader.clone()); diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index 747e45b72..7c73741a7 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -52,7 +52,7 @@ fn test_import_vanilla() { let (mut input, mut trace) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); (input, arranged.trace.clone()) }); let (captured,) = worker.dataflow(move |scope| { @@ -112,7 +112,7 @@ fn test_import_completed_dataflow() { let (mut input, mut trace, probe) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); let (probe, _) = arranged.stream.probe(); (input, arranged.trace.clone(), probe) }); @@ -174,7 +174,7 @@ fn test_import_stalled_dataflow() { let arranged = input .to_collection(scope) - .arrange_by_self(); + .arrange_by_self_inter(); let (probe, _) = arranged.stream.probe(); (arranged.trace, probe) @@ -221,7 +221,7 @@ fn import_skewed() { let (mut input, mut trace) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); (input, arranged.trace.clone()) }); diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index f704d5988..7a5f82bf7 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -8,7 +8,7 @@ use timely::dataflow::operators::Concatenate; use differential_dataflow::{ExchangeData, VecCollection, AsCollection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; pub mod altneu; pub mod calculus; @@ -93,8 +93,8 @@ impl, P, E> ValidateExtensionMethod = TraceAgent>; -type TraceKeyHandle = TraceAgent>; +type TraceValHandle = TraceInter>; +type TraceKeyHandle = TraceInter>; pub struct CollectionIndex where @@ -140,16 +140,16 @@ where pub fn index>(collection: VecCollection) -> Self { // We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation. // counts and validate can share the base arrangement - let arranged = collection.clone().arrange_by_self(); + let arranged = collection.clone().arrange_by_self_inter(); // TODO: This could/should be arrangement to arrangement, via `reduce_abelian`, but the types are a mouthful at the moment. let counts = arranged .clone() .as_collection(|k,_v| k.clone()) .distinct() .map(|(k, _v)| k) - .arrange_by_self() + .arrange_by_self_inter() .trace; - let propose = collection.arrange_by_key().trace; + let propose = collection.arrange_by_key_inter().trace; let validate = arranged.trace; CollectionIndex { diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index 3ea53cd14..973dd10ca 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -10,10 +10,10 @@ use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; type Node = u32; diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index 782941e15..a4a87e99e 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -7,13 +7,13 @@ use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::Variable; use differential_dataflow::difference::Present; -type EdgeArranged = Arranged::Timestamp, R>>>; +type EdgeArranged = Arranged::Timestamp, R>>>; type Node = u32; type Edge = (Node, Node); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 6e5b0ffaa..853c0c744 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -255,10 +255,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 35a118cd9..d8b20eec8 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -223,10 +223,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index 7e27e7ed8..c3a9a6a6b 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -288,10 +288,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index 2ec74e93f..39a6638de 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -191,10 +191,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 16cbbd6dd..742ac08ba 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -48,7 +48,7 @@ fn main() { .to_stream(scope) .as_collection(); - edges.arrange_by_key().trace + edges.arrange_by_key_inter().trace }); while worker.step() { } @@ -86,7 +86,7 @@ fn main() { forward .import(scope) .as_collection(|&k,&v| (v,k)) - .arrange_by_key() + .arrange_by_key_inter() .trace }); while worker.step() { } @@ -102,9 +102,9 @@ fn main() { }).unwrap(); } -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; -type TraceHandle = TraceAgent; +type TraceHandle = TraceInter; fn reach> ( graph: &mut TraceHandle, diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index ea7420120..0b99d0a13 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -36,8 +36,8 @@ fn main() { let (mut graph, mut trace) = worker.dataflow(|scope| { let (graph_input, graph) = scope.new_collection(); - let graph_indexed = graph.arrange_by_key(); - // let graph_indexed = graph.arrange_by_key(); + let graph_indexed = graph.arrange_by_key_inter(); + // let graph_indexed = graph.arrange_by_key_inter(); (graph_input, graph_indexed.trace) }); @@ -84,9 +84,9 @@ fn main() { } // use differential_dataflow::trace::implementations::ord::OrdValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; -type TraceHandle = TraceAgent; +type TraceHandle = TraceInter; fn reach> ( graph: &mut TraceHandle, diff --git a/interactive/src/command.rs b/interactive/src/command.rs index 5d110ea70..343827c30 100644 --- a/interactive/src/command.rs +++ b/interactive/src/command.rs @@ -78,7 +78,7 @@ where for Rule { name, plan } in query.rules.into_iter() { let collection = plan.render(scope, &mut collections, &mut manager.traces) - .arrange_by_self(); + .arrange_by_self_inter(); collection.stream.probe_with(&mut manager.probe); let trace = collection.trace; @@ -104,7 +104,7 @@ where let (input, trace) = worker.dataflow(|scope| { let (input, collection) = scope.new_collection_from(updates.into_iter()); - let trace = collection.arrange_by_self().trace; + let trace = collection.arrange_by_self_inter().trace; (input, trace) }); diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index 322dd16d2..15139e740 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -159,13 +159,13 @@ where // }); use differential_dataflow::collection::AsCollection; - let operates = operates.as_collection().arrange_by_self().trace; - let channels = channels.as_collection().arrange_by_self().trace; - let schedule = schedule.as_collection().arrange_by_self().trace; - let messages = messages.as_collection().arrange_by_self().trace; - let shutdown = shutdown.as_collection().arrange_by_self().trace; - let park = park.as_collection().arrange_by_self().trace; - let text = text.as_collection().arrange_by_self().trace; + let operates = operates.as_collection().arrange_by_self_inter().trace; + let channels = channels.as_collection().arrange_by_self_inter().trace; + let schedule = schedule.as_collection().arrange_by_self_inter().trace; + let messages = messages.as_collection().arrange_by_self_inter().trace; + let shutdown = shutdown.as_collection().arrange_by_self_inter().trace; + let park = park.as_collection().arrange_by_self_inter().trace; + let text = text.as_collection().arrange_by_self_inter().trace; // let elapsed = // duration @@ -177,7 +177,7 @@ where // (k,time,r) // }) // .as_collection() - // .arrange_by_self() + // .arrange_by_self_inter() // .trace; // let histogram = @@ -190,7 +190,7 @@ where // (k,time,r) // }) // .as_collection() - // .arrange_by_self() + // .arrange_by_self_inter() // .trace; (operates, channels, schedule, messages, shutdown, park, text) @@ -273,8 +273,8 @@ where }); use differential_dataflow::collection::AsCollection; - let batch = batch.as_collection().arrange_by_self().trace; - let merge = merge.as_collection().arrange_by_self().trace; + let batch = batch.as_collection().arrange_by_self_inter().trace; + let merge = merge.as_collection().arrange_by_self_inter().trace; (merge,batch) }); diff --git a/interactive/src/manager.rs b/interactive/src/manager.rs index 8579a538e..936fc6a15 100644 --- a/interactive/src/manager.rs +++ b/interactive/src/manager.rs @@ -13,7 +13,7 @@ use timely::logging::TimelyEventBuilder; use differential_dataflow::ExchangeData; use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; use differential_dataflow::input::InputSession; use differential_dataflow::logging::DifferentialEventBuilder; @@ -21,9 +21,9 @@ use differential_dataflow::logging::DifferentialEventBuilder; use crate::{Time, Diff, Plan, Datum}; /// A trace handle for key-only data. -pub type TraceKeyHandle = TraceAgent>; +pub type TraceKeyHandle = TraceInter>; /// A trace handle for key-value data. -pub type TraceValHandle = TraceAgent>; +pub type TraceValHandle = TraceInter>; /// A key-only trace handle binding `Time` and `Diff` using `Vec` as data. pub type KeysOnlyHandle = TraceKeyHandle, Time, Diff>; /// A key-value trace handle binding `Time` and `Diff` using `Vec` as data. diff --git a/interactive/src/plan/join.rs b/interactive/src/plan/join.rs index 956fab419..e4f59d41c 100644 --- a/interactive/src/plan/join.rs +++ b/interactive/src/plan/join.rs @@ -56,7 +56,7 @@ impl Render for Join { .collect::>(), ) ) - .arrange_by_key(); + .arrange_by_key_inter(); arrangements.set_keyed(&self.plan1, &keys1[..], &arrangement.trace); arrangement.trace @@ -85,7 +85,7 @@ impl Render for Join { .collect::>(), ) ) - .arrange_by_key(); + .arrange_by_key_inter(); arrangements.set_keyed(&self.plan2, &keys2[..], &arrangement.trace); arrangement.trace diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index deff18a4a..f37113ad4 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -164,7 +164,7 @@ impl Render for Plan { trace.import(scope) } else { - let input_arrangement = distinct.render(scope, collections, arrangements).arrange_by_self(); + let input_arrangement = distinct.render(scope, collections, arrangements).arrange_by_self_inter(); arrangements.set_unkeyed(&distinct, &input_arrangement.trace); input_arrangement }; diff --git a/interactive/src/plan/sfw.rs b/interactive/src/plan/sfw.rs index 6a5c82fc8..20aaa87ba 100644 --- a/interactive/src/plan/sfw.rs +++ b/interactive/src/plan/sfw.rs @@ -131,7 +131,7 @@ impl Render for MultiwayJoin { if arrangements.get_unkeyed(&plan).is_none() { // println!("\tbuilding/caching source plan"); let collection = plan.render(scope, collections, arrangements); - arrangements.set_unkeyed(plan, &collection.arrange_by_self().trace); + arrangements.set_unkeyed(plan, &collection.arrange_by_self_inter().trace); } else { // println!("\tsource plan found"); @@ -203,7 +203,7 @@ impl Render for MultiwayJoin { let arrangement = plan.render(scope, collections, arrangements) .map(move |tuple| (keys_clone.iter().map(|&i| tuple[i].clone()).collect::>(), tuple)) - .arrange_by_key(); + .arrange_by_key_inter(); arrangements.set_keyed(&plan, &keys[..], &arrangement.trace); } diff --git a/server/dataflows/random_graph/src/lib.rs b/server/dataflows/random_graph/src/lib.rs index cb527b25d..b9ebfc5c6 100644 --- a/server/dataflows/random_graph/src/lib.rs +++ b/server/dataflows/random_graph/src/lib.rs @@ -180,7 +180,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), }) .probe_with(probe) .as_collection() - .arrange_by_key() + .arrange_by_key_inter() .trace; // release all blocks on merging. diff --git a/server/src/lib.rs b/server/src/lib.rs index 745de0bb9..8a3f707f8 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -11,13 +11,13 @@ use timely::dataflow::scopes::Child; use timely::dataflow::operators::probe::Handle as ProbeHandle; // stuff for talking about shared trace types ... -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; use differential_dataflow::trace::implementations::ValSpine; // These are all defined here so that users can be assured a common layout. pub type RootTime = usize; type TraceSpine = ValSpine; -pub type TraceHandle = TraceAgent; +pub type TraceHandle = TraceInter; /// Arguments provided to each shared library to help build their dataflows and register their results. pub type Environment<'a, 'b> = ( diff --git a/tpchlike/src/lib.rs b/tpchlike/src/lib.rs index 053efd9d0..e8d82e332 100644 --- a/tpchlike/src/lib.rs +++ b/tpchlike/src/lib.rs @@ -115,10 +115,10 @@ impl Collections { use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use differential_dataflow::operators::arrange::{Arranged, TraceInter}; type ArrangedScope = Arranged>; -type ArrangedIndex = TraceAgent>; +type ArrangedIndex = TraceInter>; pub struct ArrangementsInScope> { customer: ArrangedScope, @@ -154,37 +154,37 @@ impl Arrangements { let empty_frontier = timely::progress::Antichain::new(); let empty_frontier = empty_frontier.borrow(); - let mut arranged = scope.input_from(&mut inputs.customer).as_collection().map(|x| (x.cust_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.customer).as_collection().map(|x| (x.cust_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let customer = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.nation).as_collection().map(|x| (x.nation_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.nation).as_collection().map(|x| (x.nation_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let nation = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.order).as_collection().map(|x| (x.order_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.order).as_collection().map(|x| (x.order_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let order = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.part).as_collection().map(|x| (x.part_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.part).as_collection().map(|x| (x.part_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let part = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.partsupp).as_collection().map(|x| ((x.part_key, x.supp_key), x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.partsupp).as_collection().map(|x| ((x.part_key, x.supp_key), x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let partsupp = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.region).as_collection().map(|x| (x.region_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.region).as_collection().map(|x| (x.region_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let region = arranged.trace; - let mut arranged = scope.input_from(&mut inputs.supplier).as_collection().map(|x| (x.supp_key, x)).arrange_by_key(); + let mut arranged = scope.input_from(&mut inputs.supplier).as_collection().map(|x| (x.supp_key, x)).arrange_by_key_inter(); arranged.stream.probe_with(probe); arranged.trace.set_physical_compaction(empty_frontier); let supplier = arranged.trace; @@ -203,25 +203,25 @@ impl Arrangements { pub fn in_scope>(&mut self, scope: &mut G, experiment: &mut Experiment) -> ArrangementsInScope { let (mut customer, button) = self.customer.import_core(scope, "customer"); - if !self.arrange { customer = customer.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { customer = customer.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut nation, button) = self.nation.import_core(scope, "nation"); - if !self.arrange { nation = nation.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { nation = nation.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut order, button) = self.order.import_core(scope, "order"); - if !self.arrange { order = order.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { order = order.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut part, button) = self.part.import_core(scope, "part"); - if !self.arrange { part = part.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { part = part.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut partsupp, button) = self.partsupp.import_core(scope, "partsupp"); - if !self.arrange { partsupp = partsupp.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { partsupp = partsupp.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut region, button) = self.region.import_core(scope, "region"); - if !self.arrange { region = region.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { region = region.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); let (mut supplier, button) = self.supplier.import_core(scope, "supplier"); - if !self.arrange { supplier = supplier.as_collection(|&k,v| (k,v.clone())).arrange_by_key(); } + if !self.arrange { supplier = supplier.as_collection(|&k,v| (k,v.clone())).arrange_by_key_inter(); } experiment.buttons.push(button); ArrangementsInScope {