diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 8404e7aa4..85b573157 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -168,8 +168,16 @@ mod reachability { let result = combined_arr.reduce_abelian::<_, ValBuilder, ValSpine, - >("Distinct", |_node, _input, output| { - output.push(((), 1)); + _, + >("Distinct", |_node, _input, output| { output.push(((), 1)); }, + |col, key, upds| { + use columnar::{Clear, Push}; + col.keys.clear(); + col.vals.clear(); + col.times.clear(); + col.diffs.clear(); + for (val, time, diff) in upds.drain(..) { col.push((key, &val, &time, &diff)); } + *col = std::mem::take(col).consolidate(); }); // Extract RecordedUpdates from the Arranged's batch stream. diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index c13227f2c..6981cc3da 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -752,7 +752,11 @@ pub mod vec { use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>(name, logic) + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine,_>( + name, + logic, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) .as_collection(|k,v| (k.clone(), v.clone())) } @@ -782,7 +786,7 @@ pub mod vec { /// ``` pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where - T2: for<'a> Trace= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, + T2: for<'a> Trace= &'a K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { @@ -801,12 +805,16 @@ pub mod vec { pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where V: Clone+'static, - T2: for<'a> Trace=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static, + T2: for<'a> Trace=&'a K, ValOwn = V, Time=G::Timestamp>+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_core::<_,Bu,_>(name, logic) + .reduce_core::<_,Bu,_,_>( + name, + logic, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) } } @@ -871,7 +879,11 @@ pub mod vec { use crate::trace::implementations::{KeyBuilder, KeySpine}; self.arrange_by_self_named(&format!("Arrange: {}", name)) - .reduce_abelian::<_,KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + .reduce_abelian::<_,KeyBuilder,KeySpine,_>( + name, + move |k,s,t| t.push(((), thresh(k, &s[0].1))), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) .as_collection(|k,_| k.clone()) } @@ -908,7 +920,11 @@ pub mod vec { pub fn count_core + 'static>(self) -> Collection { use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_self_named("Arrange: Count") - .reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + .reduce_abelian::<_,ValBuilder,ValSpine,_>( + "Count", + |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) .as_collection(|k,c| (k.clone(), c.clone())) } } diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index e1c2fde5f..715542f55 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -205,7 +205,11 @@ impl TraceAgent { /// // create a second dataflow /// worker.dataflow(move |scope| { /// trace.import(scope) - /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Reduce", |_key, src, dst| dst.push((*src[0].0, 1))) + /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>,_>( + /// "Reduce", + /// |_key, src, dst| dst.push((*src[0].0, 1)), + /// |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + /// ) /// .as_collection(|k,v| (k.clone(), v.clone())); /// }); /// diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 69965e65b..cd1bd0894 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -30,7 +30,6 @@ use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::merge_batcher::container::InternalMerge; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -75,7 +74,6 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; use timely::Container; -use timely::container::PushInto; impl Arranged where @@ -169,12 +167,17 @@ where /// The underlying `Stream>>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_vecs(self) -> VecCollection + /// + /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support + /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic + /// on the reference types. + pub fn as_vecs(self) -> VecCollection where - Tr::KeyOwn: crate::ExchangeData, - Tr::ValOwn: crate::ExchangeData, + K: crate::ExchangeData, + V: crate::ExchangeData, + Tr: for<'a> TraceReader = &'a K, Val<'a> = &'a V>, { - self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))]) + self.flat_map_ref(move |key, val| [(key.clone(), val.clone())]) } /// Extracts elements from an arrangement as a `VecCollection`. @@ -271,43 +274,43 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged> where - T1: TraceReader, + T1: TraceReader, T2: for<'a> Trace< Key<'a>= T1::Key<'a>, - KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time, Diff: Abelian, >+'static, - Bu: Builder>, + Bu: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static, { - self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { + self.reduce_core::<_,Bu,T2,_>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) })); crate::consolidation::consolidate(change); - }) + }, push) } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L, push: P) -> Arranged> where - T1: TraceReader, + T1: TraceReader, T2: for<'a> Trace< Key<'a>=T1::Key<'a>, - KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time, >+'static, - Bu: Builder>, + Bu: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static, { use crate::operators::reduce::reduce_trace; - reduce_trace::<_,_,Bu,_,_>(self, name, logic) + reduce_trace::<_,_,Bu,_,_,_>(self, name, logic, push) } } diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index e9dbe9cdb..a8322eaa3 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -59,7 +59,7 @@ //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder, ValSpine>(stream, &"test"); +//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder, ValSpine,String,String>(stream, &"test"); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -127,19 +127,21 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( - stream: Stream, G::Timestamp)>>, +pub fn arrange_from_upsert( + stream: Stream, G::Timestamp)>>, name: &str, ) -> Arranged> where G: Scope, + K: ExchangeData+Hashable+std::hash::Hash, + V: ExchangeData, Tr: for<'a> Trace< - KeyOwn: ExchangeData+Hashable+std::hash::Hash, - ValOwn: ExchangeData, + Key<'a> = &'a K, + Val<'a> = &'a V, Time: TotalOrder+ExchangeData, Diff=isize, >+'static, - Bu: Builder, Output = Tr::Batch>, + Bu: Builder, Output = Tr::Batch>, { let mut reader: Option> = None; @@ -148,7 +150,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); let scope = stream.scope(); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -174,7 +176,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |(input, frontier), output| { @@ -237,10 +239,10 @@ where let mut key_con = Tr::KeyContainer::with_capacity(1); for (key, mut list) in to_process { - key_con.clear(); key_con.push_own(&key); + key_con.clear(); key_con.push_ref(&key); // The prior value associated with the key. - let mut prev_value: Option = None; + let mut prev_value: Option = None; // Attempt to find the key in the trace. trace_cursor.seek_key(&trace_storage, key_con.index(0)); @@ -252,7 +254,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(Tr::owned_val(val)); + prev_value = Some(val.clone()); } trace_cursor.step_val(&trace_storage); } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index b5b2aac4c..909199295 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -5,7 +5,6 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. -use timely::container::PushInto; use crate::Data; use timely::progress::frontier::Antichain; @@ -18,21 +17,29 @@ use crate::operators::arrange::{Arranged, TraceAgent}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; -use crate::trace::implementations::merge_batcher::container::InternalMerge; use crate::trace::TraceReader; -// TODO: Remove the InternalMerge constraint on Bu::Input. It only needs Clear. - /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L) -> Arranged> +/// +/// The `logic` closure is expected to take a key, accumulated input, and tentative accumulated output, +/// and populate its final argument with whatever it feels to be appopriate updates. The behavior and +/// correctness of the implementation rely on this making sense, and e.g. ideally the updates would if +/// applied to the tentative output bring it in line with some function applied to the input. +/// +/// The `push` closure is expected to clear its first argument, then populate it with the key and drain +/// the value updates, as appropriate for the container. It is critical that it clear the container as +/// the operator has no ability to do this otherwise, and failing to do so represents a leak from one +/// key's computation to another, and will likely introduce non-determinism. +pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L, mut push: P) -> Arranged> where G: Scope, - T1: TraceReader + Clone + 'static, - T2: for<'a> Trace=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static, - Bu: Builder>, + T1: TraceReader + Clone + 'static, + T2: for<'a> Trace=T1::Key<'a>, ValOwn: Data, Time=T1::Time> + 'static, + Bu: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static, { let mut result_trace = None; @@ -53,7 +60,6 @@ where empty.set_exert_logic(exert_logic); } - let mut source_trace = trace.trace.clone(); let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); @@ -63,8 +69,11 @@ where let mut new_interesting_times = Vec::::new(); // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, - // as well as capabilities for these times (or their lower envelope, at least). - let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new(); + // sorted by (key, time), as well as capabilities for the lower envelope of the times. + let mut pending_keys = T1::KeyContainer::with_capacity(0); + let mut pending_time = T1::TimeContainer::with_capacity(0); + let mut next_pending_keys = T1::KeyContainer::with_capacity(0); + let mut next_pending_time = T1::TimeContainer::with_capacity(0); let mut capabilities = timely::dataflow::operators::CapabilitySet::::default(); // buffers and logic for computing per-key interesting times "efficiently". @@ -115,25 +124,15 @@ where // We plan to retire the interval [lower_limit, upper_limit), which should be non-empty to proceed. if upper_limit != lower_limit { - // If we hold no capabilitys in the interval [lower_limit, upper_limit) then we have no compute needs, + // If we hold no capabilities in the interval [lower_limit, upper_limit) then we have no compute needs, // and could not transmit the outputs even if they were (incorrectly) non-zero. // We do have maintenance work after this logic, and should not fuse this test with the above test. if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) { - // `interesting` contains "todos" about key and time pairs that should be re-considered. - // We first extract those times from this list that lie in the interval we will process. - sort_dedup(&mut interesting); - // `exposed` contains interesting (key, time)s now below `upper_limit` - let mut exposed_keys = T1::KeyContainer::with_capacity(0); - let mut exposed_time = T1::TimeContainer::with_capacity(0); - // Keep pairs greater or equal to `upper_limit`, and "expose" other pairs. - interesting.retain(|(key, time)| { - if upper_limit.less_equal(time) { true } else { - exposed_keys.push_own(key); - exposed_time.push_own(time); - false - } - }); + // cursors for navigating input and output traces. + let (mut source_cursor, ref source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); + let (mut output_cursor, ref output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); + let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); // Prepare an output buffer and builder for each capability. // TODO: It would be better if all updates went into one batch, but timely dataflow prevents @@ -144,22 +143,21 @@ where buffers.push((cap.time().clone(), Vec::new())); builders.push(Bu::new()); } - + // Temporary staging for output building. let mut buffer = Bu::Input::default(); - // cursors for navigating input and output traces. - let (mut source_cursor, ref source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); - let (mut output_cursor, ref output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); - let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); - + // Reuseable state for performing the computation. let mut thinker = history_replay::HistoryReplayer::new(); + // Merge the received batch cursor with our list of interesting (key, time) moments. + // The interesting moments need to be in the interval to prompt work. + // March through the keys we must work on, merging `batch_cursors` and `exposed`. - let mut exposed_position = 0; - while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() { + let mut pending_pos = 0; + while batch_cursor.key_valid(batch_storage) || pending_pos < pending_keys.len() { // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed_keys.get(exposed_position); + let key1 = pending_keys.get(pending_pos); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -169,49 +167,71 @@ where }; // Populate `interesting_times` with interesting times not beyond `upper_limit`. - // TODO: This could just be `exposed_time` and `lower .. upper` bounds. + // TODO: This could just be `pending_time` and indexes within `lower .. upper`. + let prior_pos = pending_pos; interesting_times.clear(); - while exposed_keys.get(exposed_position) == Some(key) { - interesting_times.push(T1::owned_time(exposed_time.index(exposed_position))); - exposed_position += 1; + while pending_keys.get(pending_pos) == Some(key) { + let owned_time = T1::owned_time(pending_time.index(pending_pos)); + if !upper_limit.less_equal(&owned_time) { interesting_times.push(owned_time); } + pending_pos += 1; } // tidy up times, removing redundancy. sort_dedup(&mut interesting_times); - // do the per-key computation. - thinker.compute( - key, - (&mut source_cursor, source_storage), - (&mut output_cursor, output_storage), - (&mut batch_cursor, batch_storage), - &interesting_times, - &mut logic, - &upper_limit, - &mut buffers[..], - &mut new_interesting_times, - ); - - // Advance the cursor if this key, so that the loop's validity check registers the work as done. - if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); } - - // Record future warnings about interesting times (and assert they should be "future"). - debug_assert!(new_interesting_times.iter().all(|t| upper_limit.less_equal(t))); - interesting.extend(new_interesting_times.drain(..).map(|t| (T1::owned_key(key), t))); - - // Sort each buffer by value and move into the corresponding builder. - // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`, - // (ii) that the buffers are time-ordered, and (iii) that the builders accept - // arbitrarily ordered times. - for index in 0 .. buffers.len() { - buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); - for (val, time, diff) in buffers[index].1.drain(..) { - buffer.push_into(((T1::owned_key(key), val), time, diff)); + // If there are new updates, or pending times, we must investigate! + if batch_cursor.get_key(batch_storage) == Some(key) || !interesting_times.is_empty() { + + // do the per-key computation. + thinker.compute( + key, + (&mut source_cursor, source_storage), + (&mut output_cursor, output_storage), + (&mut batch_cursor, batch_storage), + &interesting_times, + &mut logic, + &upper_limit, + &mut buffers[..], + &mut new_interesting_times, + ); + + // Advance the cursor if this key, so that the loop's validity check registers the work as done. + if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); } + + // Merge novel pending times with any prior pending times we did not process. + // TODO: This could be a merge, not a sort_dedup, because both lists should be sorted. + for pos in prior_pos .. pending_pos { + let owned_time = T1::owned_time(pending_time.index(pos)); + if upper_limit.less_equal(&owned_time) { new_interesting_times.push(owned_time); } + } + sort_dedup(&mut new_interesting_times); + for time in new_interesting_times.drain(..) { + next_pending_keys.push_ref(key); + next_pending_time.push_own(&time); + } + + // Sort each buffer by value and move into the corresponding builder. + // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`, + // (ii) that the buffers are time-ordered, and (iii) that the builders accept + // arbitrarily ordered times. + for index in 0 .. buffers.len() { + buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); + push(&mut buffer, key, &mut buffers[index].1); + buffers[index].1.clear(); builders[index].push(&mut buffer); - buffer.clear(); + + } + } + else { + // copy over the pending key and times. + for pos in prior_pos .. pending_pos { + next_pending_keys.push_ref(pending_keys.index(pos)); + next_pending_time.push_ref(pending_time.index(pos)); } } } + // Drop to avoid lifetime issues that would lock `pending_{keys, time}`. + drop(thinker); // We start sealing output batches from the lower limit (previous upper limit). // In principle, we could update `lower_limit` itself, and it should arrive at @@ -243,14 +263,21 @@ where output_lower.extend(output_upper.borrow().iter().cloned()); } } - // This should be true, as the final iteration introduces no capabilities, and // uses exactly `upper_limit` to determine the upper bound. Good to check though. assert!(output_upper.borrow() == upper_limit.borrow()); - // Update `capabilities` to reflect interesting times. + // Refresh pending keys and times, then downgrade capabilities to the frontier of times. + pending_keys.clear(); std::mem::swap(&mut next_pending_keys, &mut pending_keys); + pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time); + + // Update `capabilities` to reflect pending times. let mut frontier = Antichain::::new(); - for (_, time) in &interesting { frontier.insert_ref(time); } + let mut owned_time = T1::Time::minimum(); + for pos in 0 .. pending_time.len() { + T1::clone_time_onto(pending_time.index(pos), &mut owned_time); + frontier.insert_ref(&owned_time); + } capabilities.downgrade(frontier); } diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index d73eb71f4..082af6812 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -113,8 +113,6 @@ pub trait WithLayout { /// Automatically implemented trait for types with layouts. pub trait LayoutExt : WithLayout> { - /// Alias for an owned key of a layout. - type KeyOwn; /// Alias for an borrowed key of a layout. type Key<'a>: Copy + Ord; /// Alias for an owned val of a layout. @@ -131,7 +129,7 @@ pub trait LayoutExt : WithLayout: Copy + Ord; /// Container for update keys. - type KeyContainer: for<'a> BatchContainer = Self::Key<'a>, Owned = Self::KeyOwn>; + type KeyContainer: for<'a> BatchContainer = Self::Key<'a>>; /// Container for update vals. type ValContainer: for<'a> BatchContainer = Self::Val<'a>, Owned = Self::ValOwn>; /// Container for times. @@ -139,8 +137,6 @@ pub trait LayoutExt : WithLayout BatchContainer = Self::DiffGat<'a>, Owned = Self::Diff>; - /// Construct an owned key from a reference. - fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn; /// Construct an owned val from a reference. fn owned_val(val: Self::Val<'_>) -> Self::ValOwn; /// Construct an owned time from a reference. @@ -153,7 +149,6 @@ pub trait LayoutExt : WithLayout LayoutExt for L { - type KeyOwn = <::KeyContainer as BatchContainer>::Owned; type Key<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; type ValOwn = <::ValContainer as BatchContainer>::Owned; type Val<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; @@ -167,7 +162,6 @@ impl LayoutExt for L { type TimeContainer = ::TimeContainer; type DiffContainer = ::DiffContainer; - #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { ::KeyContainer::into_owned(key) } #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { ::ValContainer::into_owned(val) } #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { ::TimeContainer::into_owned(time) } #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { ::DiffContainer::into_owned(diff) } diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 0197e09c0..582a6da62 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -54,7 +54,6 @@ pub trait TraceReader : LayoutExt { WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, - KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, ValOwn = Self::ValOwn, Time = Self::Time, @@ -77,7 +76,6 @@ pub trait TraceReader : LayoutExt { WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, - KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, ValOwn = Self::ValOwn, Time = Self::Time, @@ -253,7 +251,6 @@ pub trait BatchReader : LayoutExt + Sized { WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, - KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, ValOwn = Self::ValOwn, Time = Self::Time, diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index 747e45b72..1bb2c9cc1 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -60,7 +60,11 @@ fn test_import_vanilla() { ::std::mem::drop(trace); let captured = imported - .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>("Reduce", |_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64))) + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine,_>( + "Reduce", + |_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64)), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) .as_collection(|k,v| (k.clone(), v.clone())) .inner .exchange(|_| 0) @@ -135,7 +139,11 @@ fn test_import_completed_dataflow() { ::std::mem::drop(trace); let stream = imported - .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>("Reduce", |_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64))) + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine,_>( + "Reduce", + |_k, s, t| t.push((s.iter().map(|&(_, w)| w).sum(), 1i64)), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); + }) .as_collection(|k,v| (k.clone(), v.clone())) .inner .exchange(|_| 0); diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 0cd363a76..42367a2e1 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -19,7 +19,8 @@ pub fn count( ) -> VecCollection where G: Scope, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::KeyContainer: differential_dataflow::trace::implementations::BatchContainer, for<'a> Tr::Diff : Semigroup>, K: Hashable + Ord + Default + 'static, R: Monoid+Multiply+ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index ddb2a5979..2a18a9aa1 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -85,7 +85,8 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::KeyContainer: BatchContainer, R: Mul, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, @@ -151,7 +152,8 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::KeyContainer: BatchContainer, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, @@ -312,7 +314,8 @@ fn process_proposals( ) -> bool where G: Scope, - Tr: for<'a> TraceReader, + Tr: TraceReader, + Tr::KeyContainer: BatchContainer, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(Instant, usize) -> bool + 'static, S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, diff --git a/dogsdogsdogs/src/operators/half_join2.rs b/dogsdogsdogs/src/operators/half_join2.rs index 650dc433d..08029f96e 100644 --- a/dogsdogsdogs/src/operators/half_join2.rs +++ b/dogsdogsdogs/src/operators/half_join2.rs @@ -73,7 +73,8 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::KeyContainer: BatchContainer, R: Mul, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, @@ -130,7 +131,8 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::KeyContainer: BatchContainer, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 7908ce858..65108ed59 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -29,10 +29,10 @@ pub fn lookup_map( where G: Scope, Tr: for<'a> TraceReader< - KeyOwn = K, Time: std::hash::Hash, Diff : Semigroup>+Monoid+ExchangeData, >+Clone+'static, + Tr::KeyContainer: BatchContainer, K: Hashable + Ord + 'static, F: FnMut(&D, &mut K)+Clone+'static, D: ExchangeData, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 9f2e69f78..6cdcb130d 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -21,11 +21,11 @@ pub fn propose( where G: Scope, Tr: for<'a> TraceReader< - KeyOwn = K, ValOwn = V, Time: std::hash::Hash, Diff: Monoid+Multiply+ExchangeData+Semigroup>, >+Clone+'static, + Tr::KeyContainer: differential_dataflow::trace::implementations::BatchContainer, K: Hashable + Default + Ord + 'static, F: Fn(&P)->K+Clone+'static, P: ExchangeData, @@ -55,11 +55,11 @@ pub fn propose_distinct( where G: Scope, Tr: for<'a> TraceReader< - KeyOwn = K, ValOwn = V, Time: std::hash::Hash, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, + Tr::KeyContainer: differential_dataflow::trace::implementations::BatchContainer, K: Hashable + Default + Ord + 'static, F: Fn(&P)->K+Clone+'static, P: ExchangeData, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index f610d55be..0ab7297bb 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -20,10 +20,10 @@ pub fn validate( where G: Scope, Tr: for<'a> TraceReader< - KeyOwn = (K, V), Time: std::hash::Hash, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, + Tr::KeyContainer: differential_dataflow::trace::implementations::BatchContainer, K: Ord+Hash+Clone+Default + 'static, V: ExchangeData+Hash+Default, F: Fn(&P)->K+Clone+'static, diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index deff18a4a..31d99041d 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -169,7 +169,11 @@ impl Render for Plan { input_arrangement }; - let output = input.reduce_abelian::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1))); + let output = input.reduce_abelian::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>,_>( + "Distinct", + move |_,_,t| t.push(((), 1)), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ); arrangements.set_unkeyed(&self, &output.trace); output.as_collection(|k,&()| k.clone())