diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index cebc43fd8..ecae4aaac 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -13,8 +13,8 @@ static GLOBAL: MiMalloc = MiMalloc; fn main() { - type WordCount = (Vec, u64, i64); - type Builder = KeyColBuilder; + type WordCount = (Vec, (), u64, i64); + type Builder = ValColBuilder; let keys: usize = std::env::args().nth(1).expect("missing argument 1").parse().unwrap(); let size: usize = std::env::args().nth(2).expect("missing argument 2").parse().unwrap(); @@ -22,36 +22,31 @@ fn main() { let timer1 = ::std::time::Instant::now(); let timer2 = timer1.clone(); - // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), move |worker| { let mut data_input = >::new_with_builder(); let mut keys_input = >::new_with_builder(); let mut probe = ProbeHandle::new(); - // create a new input, exchange data, and inspect its output worker.dataflow::(|scope| { - let data = data_input.to_stream(scope); let keys = keys_input.to_stream(scope); use differential_dataflow::Hashable; - let data_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; - let keys_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; + let data_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; + let keys_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; - let data = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(data, data_pact, "Data"); - let keys = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(keys, keys_pact, "Keys"); + let data = arrange_core::<_,_,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(data, data_pact, "Data"); + let keys = arrange_core::<_,_,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(keys, keys_pact, "Keys"); keys.join_core(data, |_k, (), ()| { Option::<()>::None }) .probe_with(&mut probe); }); - // Resources for placing input data in containers. use std::fmt::Write; let mut buffer = String::default(); - let mut builder = KeyColBuilder::::default(); + let mut builder = Builder::default(); - // Load up data in batches. let mut counter = 0; while counter < 10 * keys { let mut i = worker.index(); @@ -59,7 +54,7 @@ fn main() { while i < size { let val = (counter + i) % keys; write!(buffer, "{:?}", val).unwrap(); - builder.push_into((buffer.as_bytes(), time, 1)); + builder.push_into((buffer.as_bytes(), (), time, 1)); buffer.clear(); i += worker.peers(); } @@ -69,9 +64,7 @@ fn main() { counter += size; data_input.advance_to(data_input.time() + 1); keys_input.advance_to(keys_input.time() + 1); - while probe.less_than(data_input.time()) { - worker.step_or_park(None); - } + while probe.less_than(data_input.time()) { worker.step_or_park(None); } } println!("{:?}\tloading complete", timer1.elapsed()); @@ -82,7 +75,7 @@ fn main() { while i < size { let val = (queries + i) % keys; write!(buffer, "{:?}", val).unwrap(); - builder.push_into((buffer.as_bytes(), time, 1)); + builder.push_into((buffer.as_bytes(), (), time, 1)); buffer.clear(); i += worker.peers(); } @@ -92,15 +85,11 @@ fn main() { queries += size; data_input.advance_to(data_input.time() + 1); keys_input.advance_to(keys_input.time() + 1); - while probe.less_than(data_input.time()) { - worker.step_or_park(None); - } + while probe.less_than(data_input.time()) { worker.step_or_park(None); } } println!("{:?}\tqueries complete", timer1.elapsed()); - }) - .unwrap(); - + }).unwrap(); println!("{:?}\tshut down", timer2.elapsed()); } @@ -132,19 +121,6 @@ pub mod layout { type Diff = R; } - impl ColumnarUpdate for (K, T, R) - where - K: Columnar + Debug + Ord + Clone + 'static, - T: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp, - R: Columnar + Debug + Ord + Default + Semigroup + 'static, - { - type Key = K; - type Val = (); - type Time = T; - type Diff = R; - } - - use crate::arrangement::Coltainer; impl Layout for ColumnarLayout { type KeyContainer = Coltainer; @@ -291,455 +267,215 @@ mod container { } -pub use storage::val::ValStorage; -pub use storage::key::KeyStorage; -pub mod storage { - - pub mod val { - - use std::fmt::Debug; - use columnar::{Borrow, Container, ContainerOf, Index, Len, Push}; - use columnar::Vecs; - - use crate::layout::ColumnarUpdate as Update; - - /// Trie-shaped update storage. - #[derive(Debug)] - pub struct ValStorage { - /// An ordered list of keys. - pub keys: ContainerOf, - /// For each key in `keys`, a list of values. - pub vals: Vecs>, - /// For each val in `vals`, a list of (time, diff) updates. - pub upds: Vecs<(ContainerOf, ContainerOf)>, - } - - impl Default for ValStorage { fn default() -> Self { Self { keys: Default::default(), vals: Default::default(), upds: Default::default(), } } } - impl Clone for ValStorage { fn clone(&self) -> Self { Self { keys: self.keys.clone(), vals: self.vals.clone(), upds: self.upds.clone(), } } } +pub use updates::Updates; - pub type Tuple = (::Key, ::Val, ::Time, ::Diff); +/// A thin wrapper around `Updates` that tracks the pre-consolidation record count +/// for timely's exchange accounting. This wrapper is the stream container type; +/// the `TrieChunker` strips it, passing bare `Updates` into the merge batcher. +pub struct RecordedUpdates { + pub updates: Updates, + pub records: usize, +} - use std::ops::Range; - impl ValStorage { - - /// Forms `Self` from sorted update tuples. - pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { - - let mut output = Self::default(); - - if let Some((key,val,time,diff)) = sorted.next() { - output.keys.push(key); - output.vals.values.push(val); - output.upds.values.push((time, diff)); - for (key,val,time,diff) in sorted { - let mut differs = false; - // We would now iterate over layers. - // We'll do that manually, as the types are all different. - // Keys first; non-standard logic because they are not (yet) a list of lists. - let keys_len = output.keys.len(); - differs |= ContainerOf::::reborrow_ref(key) != output.keys.borrow().get(keys_len-1); - if differs { output.keys.push(key); } - // Vals next - let vals_len = output.vals.values.len(); - if differs { output.vals.bounds.push(vals_len as u64); } - differs |= ContainerOf::::reborrow_ref(val) != output.vals.values.borrow().get(vals_len-1); - if differs { output.vals.values.push(val); } - // Upds last - let upds_len = output.upds.values.len(); - if differs { output.upds.bounds.push(upds_len as u64); } - // differs |= ContainerOf::<(U::Time,U::Diff)>::reborrow_ref((time,diff)) != output.upds.values.borrow().get(upds_len-1); - differs = true; - if differs { output.upds.values.push((time,diff)); } - } - // output.keys.bounds.push(vals_len as u64); - output.vals.bounds.push(output.vals.values.len() as u64); - output.upds.bounds.push(output.upds.values.len() as u64); - } +impl Default for RecordedUpdates { + fn default() -> Self { Self { updates: Default::default(), records: 0 } } +} - assert_eq!(output.keys.len(), output.vals.len()); - assert_eq!(output.vals.values.len(), output.upds.len()); +impl Clone for RecordedUpdates { + fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records } } +} - output - } +impl timely::Accountable for RecordedUpdates { + #[inline] fn record_count(&self) -> i64 { self.records as i64 } +} - pub fn vals_bounds(&self, range: Range) -> Range { - if !range.is_empty() { - let lower = if range.start == 0 { 0 } else { Index::get(self.vals.bounds.borrow(), range.start-1) as usize }; - let upper = Index::get(self.vals.bounds.borrow(), range.end-1) as usize; - lower .. upper - } else { range } - } +impl timely::dataflow::channels::ContainerBytes for RecordedUpdates { + fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } + fn length_in_bytes(&self) -> usize { unimplemented!() } + fn into_bytes(&self, _writer: &mut W) { unimplemented!() } +} - pub fn upds_bounds(&self, range: Range) -> Range { - if !range.is_empty() { - let lower = if range.start == 0 { 0 } else { Index::get(self.upds.bounds.borrow(), range.start-1) as usize }; - let upper = Index::get(self.upds.bounds.borrow(), range.end-1) as usize; - lower .. upper - } else { range } - } +pub use column_builder::ValBuilder as ValColBuilder; +mod column_builder { - /// Copies `other[range]` into self, keys and all. - pub fn extend_from_keys(&mut self, other: &Self, range: Range) { - self.keys.extend_from_self(other.keys.borrow(), range.clone()); - self.vals.extend_from_self(other.vals.borrow(), range.clone()); - self.upds.extend_from_self(other.upds.borrow(), other.vals_bounds(range)); - } + use std::collections::VecDeque; + use columnar::{Columnar, Clear, Len, Push}; - pub fn extend_from_vals(&mut self, other: &Self, range: Range) { - self.vals.values.extend_from_self(other.vals.values.borrow(), range.clone()); - self.upds.extend_from_self(other.upds.borrow(), range); - } - } + use crate::layout::ColumnarUpdate as Update; + use crate::{Updates, RecordedUpdates}; - impl timely::Accountable for ValStorage { - #[inline] fn record_count(&self) -> i64 { use columnar::Len; self.upds.values.len() as i64 } - } + type TupleContainer = <(::Key, ::Val, ::Time, ::Diff) as Columnar>::Container; - use timely::dataflow::channels::ContainerBytes; - impl ContainerBytes for ValStorage { - fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } - fn length_in_bytes(&self) -> usize { unimplemented!() } - fn into_bytes(&self, _writer: &mut W) { unimplemented!() } - } + /// A container builder that produces `RecordedUpdates` (sorted, consolidated trie + record count). + pub struct ValBuilder { + /// Container that we're writing to. + current: TupleContainer, + /// Empty allocation. + empty: Option>, + /// Completed containers pending to be sent. + pending: VecDeque>, } - pub mod key { - - use columnar::{Borrow, Container, ContainerOf, Index, Len, Push}; - use columnar::Vecs; - - use crate::layout::ColumnarUpdate as Update; - use crate::Column; - - /// Trie-shaped update storage. - pub struct KeyStorage { - /// An ordered list of keys. - pub keys: Column>, - /// For each key in `keys`, a list of (time, diff) updates. - pub upds: Column, ContainerOf)>>, - } - - impl Default for KeyStorage { fn default() -> Self { Self { keys: Default::default(), upds: Default::default(), } } } - impl Clone for KeyStorage { fn clone(&self) -> Self { Self { keys: self.keys.clone(), upds: self.upds.clone(), } } } - - pub type Tuple = (::Key, ::Time, ::Diff); - - use std::ops::Range; - impl KeyStorage { - - /// Forms `Self` from sorted update tuples. - pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { - - let mut keys: ContainerOf = Default::default(); - let mut upds: Vecs<(ContainerOf, ContainerOf)> = Default::default(); - - if let Some((key,time,diff)) = sorted.next() { - keys.push(key); - upds.values.push((time, diff)); - for (key,time,diff) in sorted { - let mut differs = false; - // We would now iterate over layers. - // We'll do that manually, as the types are all different. - // Keys first; non-standard logic because they are not (yet) a list of lists. - let keys_len = keys.borrow().len(); - differs |= ContainerOf::::reborrow_ref(key) != keys.borrow().get(keys_len-1); - if differs { keys.push(key); } - // Upds last - let upds_len = upds.borrow().values.len(); - if differs { upds.bounds.push(upds_len as u64); } - // differs |= ContainerOf::<(U::Time,U::Diff)>::reborrow_ref((time,diff)) != output.upds.values.borrow().get(upds_len-1); - differs = true; - if differs { upds.values.push((time,diff)); } - } - upds.bounds.push(upds.borrow().values.len() as u64); - } - - assert_eq!(keys.borrow().len(), upds.borrow().len()); - - Self { - keys: Column::Typed(keys), - upds: Column::Typed(upds), - } - } - - pub fn upds_bounds(&self, range: Range) -> Range { - if !range.is_empty() { - let lower = if range.start == 0 { 0 } else { Index::get(self.upds.borrow().bounds, range.start-1) as usize }; - let upper = Index::get(self.upds.borrow().bounds, range.end-1) as usize; - lower .. upper - } else { range } - } - - /// Copies `other[range]` into self, keys and all. - pub fn extend_from_keys(&mut self, other: &Self, range: Range) { - self.keys.as_mut().extend_from_self(other.keys.borrow(), range.clone()); - self.upds.as_mut().extend_from_self(other.upds.borrow(), range.clone()); - } - } - - impl timely::Accountable for KeyStorage { - #[inline] fn record_count(&self) -> i64 { use columnar::Len; self.upds.borrow().values.len() as i64 } - } - - use timely::dataflow::channels::ContainerBytes; - impl ContainerBytes for KeyStorage { - fn from_bytes(mut bytes: timely::bytes::arc::Bytes) -> Self { - let keys: Column> = ContainerBytes::from_bytes(bytes.clone()); - let _ = bytes.extract_to(keys.length_in_bytes()); - let upds = ContainerBytes::from_bytes(bytes); - Self { keys, upds } - } - fn length_in_bytes(&self) -> usize { self.keys.length_in_bytes() + self.upds.length_in_bytes() } - fn into_bytes(&self, writer: &mut W) { - self.keys.into_bytes(writer); - self.upds.into_bytes(writer); + use timely::container::PushInto; + impl PushInto for ValBuilder where TupleContainer : Push { + #[inline] + fn push_into(&mut self, item: T) { + self.current.push(item); + if self.current.len() > 1024 * 1024 { + use columnar::{Borrow, Index}; + let records = self.current.len(); + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let updates = Updates::form(refs.into_iter()); + self.pending.push_back(RecordedUpdates { updates, records }); + self.current.clear(); } } } -} - -pub use column_builder::{val::ValBuilder as ValColBuilder, key::KeyBuilder as KeyColBuilder}; -mod column_builder { - - pub mod val { - - use std::collections::VecDeque; - use columnar::{Columnar, Clear, Len, Push}; - - use crate::layout::ColumnarUpdate as Update; - use crate::ValStorage; - - type TupleContainer = <(::Key, ::Val, ::Time, ::Diff) as Columnar>::Container; - - /// A container builder for `Column`. - pub struct ValBuilder { - /// Container that we're writing to. - current: TupleContainer, - /// Empty allocation. - empty: Option>, - /// Completed containers pending to be sent. - pending: VecDeque>, - } - use timely::container::PushInto; - impl PushInto for ValBuilder where TupleContainer : Push { - #[inline] - fn push_into(&mut self, item: T) { - self.current.push(item); - if self.current.len() > 1024 * 1024 { - // TODO: Consolidate the batch? - use columnar::{Borrow, Index}; - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let storage = ValStorage::form(refs.into_iter()); - self.pending.push_back(storage); - self.current.clear(); - } - } - } - - impl Default for ValBuilder { - fn default() -> Self { - ValBuilder { - current: Default::default(), - empty: None, - pending: Default::default(), - } + impl Default for ValBuilder { + fn default() -> Self { + ValBuilder { + current: Default::default(), + empty: None, + pending: Default::default(), } } + } - use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; - impl ContainerBuilder for ValBuilder { - type Container = ValStorage; - - #[inline] - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(container) = self.pending.pop_front() { - self.empty = Some(container); - self.empty.as_mut() - } else { - None - } - } + use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; + impl ContainerBuilder for ValBuilder { + type Container = RecordedUpdates; - #[inline] - fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.current.is_empty() { - // TODO: Consolidate the batch? - use columnar::{Borrow, Index}; - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let storage = ValStorage::form(refs.into_iter()); - self.pending.push_back(storage); - self.current.clear(); - } - self.empty = self.pending.pop_front(); + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); self.empty.as_mut() + } else { + None } } - impl LengthPreservingContainerBuilder for ValBuilder { } - } - - pub mod key { - - use std::collections::VecDeque; - use columnar::{Columnar, Clear, Len, Push}; - - use crate::layout::ColumnarUpdate as Update; - use crate::KeyStorage; - - type TupleContainer = <(::Key, ::Time, ::Diff) as Columnar>::Container; - - /// A container builder for `Column`. - pub struct KeyBuilder { - /// Container that we're writing to. - current: TupleContainer, - /// Empty allocation. - empty: Option>, - /// Completed containers pending to be sent. - pending: VecDeque>, - } - - use timely::container::PushInto; - impl PushInto for KeyBuilder where TupleContainer : Push { - #[inline] - fn push_into(&mut self, item: T) { - self.current.push(item); - if self.current.len() > 1024 * 1024 { - // TODO: Consolidate the batch? - use columnar::{Borrow, Index}; - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let storage = KeyStorage::form(refs.into_iter()); - self.pending.push_back(storage); - self.current.clear(); - } + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + use columnar::{Borrow, Index}; + let records = self.current.len(); + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let updates = Updates::form(refs.into_iter()); + self.pending.push_back(RecordedUpdates { updates, records }); + self.current.clear(); } + self.empty = self.pending.pop_front(); + self.empty.as_mut() } + } - impl Default for KeyBuilder { fn default() -> Self { KeyBuilder { current: Default::default(), empty: None, pending: Default::default(), } } } - - use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; - impl ContainerBuilder for KeyBuilder { - type Container = KeyStorage; - - #[inline] - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(container) = self.pending.pop_front() { - self.empty = Some(container); - self.empty.as_mut() - } else { - None - } - } - - #[inline] - fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.current.is_empty() { - // TODO: Consolidate the batch? - use columnar::{Borrow, Index}; - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let storage = KeyStorage::form(refs.into_iter()); - self.pending.push_back(storage); - self.current.clear(); - } - self.empty = self.pending.pop_front(); - self.empty.as_mut() - } - } + impl LengthPreservingContainerBuilder for ValBuilder { } - impl LengthPreservingContainerBuilder for KeyBuilder { } - } } -use distributor::key::KeyPact; +use distributor::ValPact; mod distributor { - pub mod key { - - use std::rc::Rc; - - use columnar::{Index, Len}; - use timely::logging::TimelyLogger; - use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor}; - use timely::dataflow::channels::Message; - use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract}; - use timely::progress::Timestamp; - use timely::worker::AsWorker; - - use crate::layout::ColumnarUpdate as Update; - use crate::KeyStorage; - - pub struct KeyDistributor { - marker: std::marker::PhantomData, - hashfunc: H, - } + use std::rc::Rc; - impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for KeyDistributor { - fn partition>>>(&mut self, container: &mut KeyStorage, time: &T, pushers: &mut [P]) { + use columnar::{Borrow, Index, Len}; + use timely::logging::TimelyLogger; + use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor}; + use timely::dataflow::channels::Message; + use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract}; + use timely::progress::Timestamp; + use timely::worker::AsWorker; - use columnar::{ContainerOf, Vecs, Container, Push}; - use crate::Column; + use crate::layout::ColumnarUpdate as Update; + use crate::{Updates, RecordedUpdates}; - let in_keys = container.keys.borrow(); - let in_upds = container.upds.borrow(); + pub struct ValDistributor { + marker: std::marker::PhantomData, + hashfunc: H, + pre_lens: Vec, + } - // We build bespoke containers by determining the target for each key using `self.hashfunc`, and then copying in key and associated data. - // We bypass the container builders, which do much work to go from tuples to columnar containers, and we save time by avoiding that round trip. - let mut out_keys = vec![ContainerOf::::default(); pushers.len()]; - let mut out_upds = vec![Vecs::<(ContainerOf::, ContainerOf::)>::default(); pushers.len()]; - for index in 0 .. in_keys.len() { - let key = in_keys.get(index); + impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for ValDistributor { + // TODO: For unsorted Updates (stride-1 outer keys), each key is its own outer group, + // so the per-group pre_lens snapshot and seal check costs O(keys × workers). Should + // either batch keys by destination first, or detect stride-1 outer bounds and use a + // simpler single-pass partitioning that seals once at the end. + fn partition>>>(&mut self, container: &mut RecordedUpdates, time: &T, pushers: &mut [P]) { + use crate::updates::child_range; + + let keys_b = container.updates.keys.borrow(); + let mut outputs: Vec> = (0..pushers.len()).map(|_| Updates::default()).collect(); + + // Each outer key group becomes a separate run in the destination. + for outer in 0..Len::len(&keys_b) { + self.pre_lens.clear(); + self.pre_lens.extend(outputs.iter().map(|o| o.keys.values.len())); + for k in child_range(keys_b.bounds, outer) { + let key = keys_b.values.get(k); let idx = ((self.hashfunc)(key) as usize) % pushers.len(); - out_keys[idx].push(key); - out_upds[idx].extend_from_self(in_upds, index..index+1); + outputs[idx].extend_from_keys(&container.updates, k..k+1); } + for (output, &pre) in outputs.iter_mut().zip(self.pre_lens.iter()) { + if output.keys.values.len() > pre { + output.keys.bounds.push(output.keys.values.len() as u64); + } + } + } - for ((pusher, keys), upds) in pushers.iter_mut().zip(out_keys).zip(out_upds) { - let mut container = KeyStorage { keys: Column::Typed(keys), upds: Column::Typed(upds) }; - Message::push_at(&mut container, time.clone(), pusher); + // Distribute the input's record count across non-empty outputs. + let total_records = container.records; + let non_empty: usize = outputs.iter().filter(|o| !o.keys.values.is_empty()).count(); + let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1)); + for (pusher, output) in pushers.iter_mut().zip(outputs) { + if !output.keys.values.is_empty() { + let recorded = RecordedUpdates { updates: output, records: first_records }; + first_records = 1; + let mut recorded = recorded; + Message::push_at(&mut recorded, time.clone(), pusher); } } - fn flush>>>(&mut self, _time: &T, _pushers: &mut [P]) { } - fn relax(&mut self) { } } + fn flush>>>(&mut self, _time: &T, _pushers: &mut [P]) { } + fn relax(&mut self) { } + } - pub struct KeyPact { pub hashfunc: H } - - // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. - impl ParallelizationContract> for KeyPact - where - T: Timestamp, - U: Update, - H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static, - { - type Pusher = Exchange< - T, - LogPusher>>>>, - KeyDistributor - >; - type Puller = LogPuller>>>>; - - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - let distributor = KeyDistributor { - marker: std::marker::PhantomData, - hashfunc: self.hashfunc, - }; - (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) - } + pub struct ValPact { pub hashfunc: H } + + impl ParallelizationContract> for ValPact + where + T: Timestamp, + U: Update, + H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static, + { + type Pusher = Exchange< + T, + LogPusher>>>>, + ValDistributor + >; + type Puller = LogPuller>>>>; + + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + let distributor = ValDistributor { + marker: std::marker::PhantomData, + hashfunc: self.hashfunc, + pre_lens: Vec::new(), + }; + (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } } -pub use arrangement::{ValBatcher, ValBuilder, ValSpine, KeyBatcher, KeyBuilder, KeySpine}; +pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; pub mod arrangement { use std::rc::Rc; - use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdKeyBatch}; + use differential_dataflow::trace::implementations::ord_neu::OrdValBatch; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; use differential_dataflow::trace::implementations::spine_fueled::Spine; @@ -752,13 +488,6 @@ pub mod arrangement { /// A builder for columnar storage. pub type ValBuilder = RcBuilder>; - /// A trace implementation backed by columnar storage. - pub type KeySpine = Spine>>>; - /// A batcher for columnar storage - pub type KeyBatcher = KeyBatcher2<(K,T,R)>; - /// A builder for columnar storage - pub type KeyBuilder = RcBuilder>; - /// A batch container implementation for Column. pub use batch_container::Coltainer; pub mod batch_container { @@ -808,31 +537,36 @@ pub mod arrangement { } } - use crate::{ValStorage, KeyStorage}; + use crate::{Updates, RecordedUpdates}; use differential_dataflow::trace::implementations::merge_batcher::{MergeBatcher, InternalMerger}; - type ValBatcher2 = MergeBatcher, TrieChunker>, InternalMerger>>; - type KeyBatcher2 = MergeBatcher, TrieChunker>, InternalMerger>>; - /// A pass-through chunker for already-sorted trie containers. + type ValBatcher2 = MergeBatcher, TrieChunker, InternalMerger>>; + + /// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. + /// The `records` accounting is discarded here — it has served its purpose for exchange. /// - /// Since `ValStorage`/`KeyStorage` are produced sorted by `ValColBuilder`/`KeyColBuilder`, - /// the chunker just queues them — no sorting or consolidation needed. - pub struct TrieChunker { - ready: std::collections::VecDeque, - empty: Option, + /// IMPORTANT: This chunker assumes the input `Updates` are sorted and consolidated + /// (as produced by `ValColBuilder::form`). The downstream `InternalMerge` relies on + /// this invariant. If `RecordedUpdates` could carry unsorted data (e.g. from a `map`), + /// we would need either a sorting chunker for that case, or a type-level distinction + /// (e.g. `RecordedUpdates` vs `RecordedUpdates`) to + /// route to the right chunker. + pub struct TrieChunker { + ready: std::collections::VecDeque>, + empty: Option>, } - impl Default for TrieChunker { + impl Default for TrieChunker { fn default() -> Self { Self { ready: Default::default(), empty: None } } } - impl<'a, C: Default> timely::container::PushInto<&'a mut C> for TrieChunker { - fn push_into(&mut self, container: &'a mut C) { - self.ready.push_back(std::mem::take(container)); + impl<'a, U: crate::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates> for TrieChunker { + fn push_into(&mut self, container: &'a mut RecordedUpdates) { + self.ready.push_back(std::mem::take(&mut container.updates)); } } - impl timely::container::ContainerBuilder for TrieChunker { - type Container = C; + impl timely::container::ContainerBuilder for TrieChunker { + type Container = Updates; fn extract(&mut self) -> Option<&mut Self::Container> { if let Some(ready) = self.ready.pop_front() { self.empty = Some(ready); @@ -847,8 +581,6 @@ pub mod arrangement { } } - impl timely::container::LengthPreservingContainerBuilder for TrieChunker { } - pub mod batcher { use std::ops::Range; @@ -858,34 +590,21 @@ pub mod arrangement { use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use crate::ColumnarUpdate as Update; - use crate::{ValStorage, KeyStorage}; - - // --- SizableContainer impls --- + use crate::Updates; - impl timely::container::SizableContainer for ValStorage { + impl timely::container::SizableContainer for Updates { fn at_capacity(&self) -> bool { use columnar::Len; - // Capacity based on update count; 64K is a reasonable threshold. - self.upds.values.len() >= 64 * 1024 + self.diffs.values.len() >= 64 * 1024 } fn ensure_capacity(&mut self, _stash: &mut Option) { } } - impl timely::container::SizableContainer for KeyStorage { - fn at_capacity(&self) -> bool { - // Capacity based on update count. - self.upds.borrow().values.len() >= 64 * 1024 - } - fn ensure_capacity(&mut self, _stash: &mut Option) { } - } - - // --- InternalMerge for ValStorage --- - - impl InternalMerge for ValStorage { + impl InternalMerge for Updates { type TimeOwned = U::Time; - fn len(&self) -> usize { self.upds.values.len() } + fn len(&self) -> usize { self.diffs.values.len() } fn clear(&mut self) { *self = Self::default(); } #[inline(never)] @@ -896,11 +615,11 @@ pub mod arrangement { // Bulk copy: take remaining keys from position onward. let other = &mut others[0]; let pos = &mut positions[0]; - if self.keys.len() == 0 && *pos == 0 { + if self.keys.values.len() == 0 && *pos == 0 { std::mem::swap(self, other); return; } - let other_len = other.keys.len(); + let other_len = other.keys.values.len(); self.extend_from_keys(other, *pos .. other_len); *pos = other_len; }, @@ -911,8 +630,8 @@ pub mod arrangement { let (left, right) = others.split_at_mut(1); let this = &left[0]; let that = &right[0]; - let this_keys = this.keys.borrow(); - let that_keys = that.keys.borrow(); + let this_keys = this.keys.values.borrow(); + let that_keys = that.keys.values.borrow(); let mut this_key_range = positions[0] .. this_keys.len(); let mut that_key_range = positions[1] .. that_keys.len(); @@ -939,37 +658,53 @@ pub mod arrangement { self.extend_from_vals(this, lower .. this_val_range.start); }, std::cmp::Ordering::Equal => { - let updates_len = self.upds.values.len(); - let mut this_upd_range = this.upds_bounds(this_val_range.start .. this_val_range.start+1); - let mut that_upd_range = that.upds_bounds(that_val_range.start .. that_val_range.start+1); - while !this_upd_range.is_empty() && !that_upd_range.is_empty() { - let (this_time, this_diff) = this.upds.values.borrow().get(this_upd_range.start); - let (that_time, that_diff) = that.upds.values.borrow().get(that_upd_range.start); + let updates_len = self.times.values.len(); + let mut this_time_range = this.times_bounds(this_val_range.start .. this_val_range.start+1); + let mut that_time_range = that.times_bounds(that_val_range.start .. that_val_range.start+1); + while !this_time_range.is_empty() && !that_time_range.is_empty() { + let this_time = this.times.values.borrow().get(this_time_range.start); + let this_diff = this.diffs.values.borrow().get(this_time_range.start); + let that_time = that.times.values.borrow().get(that_time_range.start); + let that_diff = that.diffs.values.borrow().get(that_time_range.start); match this_time.cmp(&that_time) { std::cmp::Ordering::Less => { - let lower = this_upd_range.start; - gallop(this.upds.values.0.borrow(), &mut this_upd_range, |x| x < that_time); - self.upds.values.extend_from_self(this.upds.values.borrow(), lower .. this_upd_range.start); + let lower = this_time_range.start; + gallop(this.times.values.borrow(), &mut this_time_range, |x| x < that_time); + self.times.values.extend_from_self(this.times.values.borrow(), lower .. this_time_range.start); + self.diffs.extend_from_self(this.diffs.borrow(), lower .. this_time_range.start); }, std::cmp::Ordering::Equal => { this_sum.copy_from(this_diff); that_sum.copy_from(that_diff); this_sum.plus_equals(&that_sum); - if !this_sum.is_zero() { self.upds.values.push((this_time, &this_sum)); } - this_upd_range.start += 1; - that_upd_range.start += 1; + if !this_sum.is_zero() { + self.times.values.push(this_time); + self.diffs.values.push(&this_sum); + self.diffs.bounds.push(self.diffs.values.len() as u64); + } + this_time_range.start += 1; + that_time_range.start += 1; }, std::cmp::Ordering::Greater => { - let lower = that_upd_range.start; - gallop(that.upds.values.0.borrow(), &mut that_upd_range, |x| x < this_time); - self.upds.values.extend_from_self(that.upds.values.borrow(), lower .. that_upd_range.start); + let lower = that_time_range.start; + gallop(that.times.values.borrow(), &mut that_time_range, |x| x < this_time); + self.times.values.extend_from_self(that.times.values.borrow(), lower .. that_time_range.start); + self.diffs.extend_from_self(that.diffs.borrow(), lower .. that_time_range.start); }, } } - self.upds.values.extend_from_self(this.upds.values.borrow(), this_upd_range); - self.upds.values.extend_from_self(that.upds.values.borrow(), that_upd_range); - if self.upds.values.len() > updates_len { - self.upds.bounds.push(self.upds.values.len() as u64); + // Remaining from this side + if !this_time_range.is_empty() { + self.times.values.extend_from_self(this.times.values.borrow(), this_time_range.clone()); + self.diffs.extend_from_self(this.diffs.borrow(), this_time_range.clone()); + } + // Remaining from that side + if !that_time_range.is_empty() { + self.times.values.extend_from_self(that.times.values.borrow(), that_time_range.clone()); + self.diffs.extend_from_self(that.diffs.borrow(), that_time_range.clone()); + } + if self.times.values.len() > updates_len { + self.times.bounds.push(self.times.values.len() as u64); self.vals.values.push(this_val); } this_val_range.start += 1; @@ -986,7 +721,7 @@ pub mod arrangement { self.extend_from_vals(that, that_val_range); if self.vals.values.len() > values_len { self.vals.bounds.push(self.vals.values.len() as u64); - self.keys.push(this_key); + self.keys.values.push(this_key); } this_key_range.start += 1; that_key_range.start += 1; @@ -1024,203 +759,51 @@ pub mod arrangement { ship: &mut Self, ) { let mut time = U::Time::default(); - for key_idx in 0 .. self.keys.len() { - let key = self.keys.borrow().get(key_idx); + for key_idx in 0 .. self.keys.values.len() { + let key = self.keys.values.borrow().get(key_idx); let keep_vals_len = keep.vals.values.len(); let ship_vals_len = ship.vals.values.len(); for val_idx in self.vals_bounds(key_idx..key_idx+1) { let val = self.vals.values.borrow().get(val_idx); - let keep_upds_len = keep.upds.values.len(); - let ship_upds_len = ship.upds.values.len(); - for upd_idx in self.upds_bounds(val_idx..val_idx+1) { - let (t, diff) = self.upds.values.borrow().get(upd_idx); + let keep_times_len = keep.times.values.len(); + let ship_times_len = ship.times.values.len(); + for time_idx in self.times_bounds(val_idx..val_idx+1) { + let t = self.times.values.borrow().get(time_idx); + let diff = self.diffs.values.borrow().get(time_idx); time.copy_from(t); if upper.less_equal(&time) { frontier.insert_ref(&time); - keep.upds.values.push((t, diff)); + keep.times.values.push(t); + keep.diffs.values.push(diff); + keep.diffs.bounds.push(keep.diffs.values.len() as u64); } else { - ship.upds.values.push((t, diff)); + ship.times.values.push(t); + ship.diffs.values.push(diff); + ship.diffs.bounds.push(ship.diffs.values.len() as u64); } } - if keep.upds.values.len() > keep_upds_len { - keep.upds.bounds.push(keep.upds.values.len() as u64); + if keep.times.values.len() > keep_times_len { + keep.times.bounds.push(keep.times.values.len() as u64); keep.vals.values.push(val); } - if ship.upds.values.len() > ship_upds_len { - ship.upds.bounds.push(ship.upds.values.len() as u64); + if ship.times.values.len() > ship_times_len { + ship.times.bounds.push(ship.times.values.len() as u64); ship.vals.values.push(val); } } if keep.vals.values.len() > keep_vals_len { keep.vals.bounds.push(keep.vals.values.len() as u64); - keep.keys.push(key); + keep.keys.values.push(key); } if ship.vals.values.len() > ship_vals_len { ship.vals.bounds.push(ship.vals.values.len() as u64); - ship.keys.push(key); + ship.keys.values.push(key); } } } } - // --- InternalMerge for KeyStorage --- - - impl InternalMerge for KeyStorage { - - type TimeOwned = U::Time; - - fn len(&self) -> usize { self.upds.borrow().values.len() } - fn clear(&mut self) { *self = Self::default(); } - - #[inline(never)] - fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { - match others.len() { - 0 => {}, - 1 => { - let other = &mut others[0]; - let pos = &mut positions[0]; - if self.upds.borrow().values.len() == 0 && *pos == 0 { - std::mem::swap(self, other); - return; - } - let other_len = other.keys.borrow().len(); - self.extend_from_keys(other, *pos .. other_len); - *pos = other_len; - }, - 2 => { - let mut this_sum = U::Diff::default(); - let mut that_sum = U::Diff::default(); - - let (left, right) = others.split_at_mut(1); - let this = &left[0]; - let that = &right[0]; - let this_keys = this.keys.borrow(); - let that_keys = that.keys.borrow(); - let this_upds = this.upds.borrow(); - let that_upds = that.upds.borrow(); - let mut this_key_range = positions[0] .. this_keys.len(); - let mut that_key_range = positions[1] .. that_keys.len(); - - while !this_key_range.is_empty() && !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let this_key = this_keys.get(this_key_range.start); - let that_key = that_keys.get(that_key_range.start); - match this_key.cmp(&that_key) { - std::cmp::Ordering::Less => { - let lower = this_key_range.start; - gallop(this_keys, &mut this_key_range, |x| x < that_key); - self.extend_from_keys(this, lower .. this_key_range.start); - }, - std::cmp::Ordering::Equal => { - let updates_len = self.upds.borrow().values.len(); - let mut this_upd_range = this.upds_bounds(this_key_range.start .. this_key_range.start+1); - let mut that_upd_range = that.upds_bounds(that_key_range.start .. that_key_range.start+1); - while !this_upd_range.is_empty() && !that_upd_range.is_empty() { - let (this_time, this_diff) = this_upds.values.get(this_upd_range.start); - let (that_time, that_diff) = that_upds.values.get(that_upd_range.start); - match this_time.cmp(&that_time) { - std::cmp::Ordering::Less => { - let lower = this_upd_range.start; - gallop(this_upds.values.0, &mut this_upd_range, |x| x < that_time); - self.upds.as_mut().values.extend_from_self(this_upds.values, lower .. this_upd_range.start); - }, - std::cmp::Ordering::Equal => { - this_sum.copy_from(this_diff); - that_sum.copy_from(that_diff); - this_sum.plus_equals(&that_sum); - if !this_sum.is_zero() { self.upds.as_mut().values.push((this_time, &this_sum)); } - this_upd_range.start += 1; - that_upd_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_upd_range.start; - gallop(that_upds.values.0, &mut that_upd_range, |x| x < this_time); - self.upds.as_mut().values.extend_from_self(that_upds.values, lower .. that_upd_range.start); - }, - } - } - self.upds.as_mut().values.extend_from_self(this_upds.values, this_upd_range); - self.upds.as_mut().values.extend_from_self(that_upds.values, that_upd_range); - if self.upds.borrow().values.len() > updates_len { - let temp_len = self.upds.borrow().values.len() as u64; - self.upds.as_mut().bounds.push(temp_len); - self.keys.as_mut().push(this_key); - } - this_key_range.start += 1; - that_key_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_key_range.start; - gallop(that_keys, &mut that_key_range, |x| x < this_key); - self.extend_from_keys(that, lower .. that_key_range.start); - }, - } - } - while !this_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = this_key_range.start; - this_key_range.start = this_key_range.end; - self.extend_from_keys(this, lower .. this_key_range.start); - } - while !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = that_key_range.start; - that_key_range.start = that_key_range.end; - self.extend_from_keys(that, lower .. that_key_range.start); - } - positions[0] = this_key_range.start; - positions[1] = that_key_range.start; - }, - n => unimplemented!("{n}-way merge not supported"), - } - } - - fn extract( - &mut self, - upper: AntichainRef, - frontier: &mut Antichain, - keep: &mut Self, - ship: &mut Self, - ) { - use crate::Column; - use columnar::{ContainerOf, Vecs}; - - let mut ship_keys: ContainerOf = Default::default(); - let mut ship_upds: Vecs<(ContainerOf, ContainerOf)> = Default::default(); - let mut keep_keys: ContainerOf = Default::default(); - let mut keep_upds: Vecs<(ContainerOf, ContainerOf)> = Default::default(); - - let mut time = U::Time::default(); - for key_idx in 0 .. self.keys.borrow().len() { - let key = self.keys.borrow().get(key_idx); - let keep_upds_len = keep_upds.borrow().values.len(); - let ship_upds_len = ship_upds.borrow().values.len(); - for upd_idx in self.upds_bounds(key_idx..key_idx+1) { - let (t, diff) = self.upds.borrow().values.get(upd_idx); - time.copy_from(t); - if upper.less_equal(&time) { - frontier.insert_ref(&time); - keep_upds.values.push((t, diff)); - } - else { - ship_upds.values.push((t, diff)); - } - } - if keep_upds.borrow().values.len() > keep_upds_len { - keep_upds.bounds.push(keep_upds.borrow().values.len() as u64); - keep_keys.push(key); - } - if ship_upds.borrow().values.len() > ship_upds_len { - ship_upds.bounds.push(ship_upds.borrow().values.len() as u64); - ship_keys.push(key); - } - } - - keep.keys = Column::Typed(keep_keys); - keep.upds = Column::Typed(keep_upds); - ship.keys = Column::Typed(ship_keys); - ship.upds = Column::Typed(ship_upds); - } - } - #[inline(always)] pub(crate) fn gallop(input: TC, range: &mut Range, mut cmp: impl FnMut(::Ref) -> bool) { // if empty input, or already >= element, return @@ -1244,127 +827,506 @@ pub mod arrangement { } } - use builder::val::ValMirror; - use builder::key::KeyMirror; + use builder::ValMirror; pub mod builder { - pub mod val { + use differential_dataflow::trace::implementations::ord_neu::{Vals, Upds}; + use differential_dataflow::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage}; + use differential_dataflow::trace::Description; + + use crate::Updates; + use crate::layout::ColumnarUpdate as Update; + use crate::layout::ColumnarLayout as Layout; + use crate::arrangement::Coltainer; + + use columnar::{Borrow, IndexAs}; + use columnar::primitive::offsets::Strides; + use differential_dataflow::trace::implementations::OffsetList; + fn strides_to_offset_list(bounds: &Strides, count: usize) -> OffsetList { + let mut output = OffsetList::with_capacity(count); + output.push(0); + let bounds_b = bounds.borrow(); + for i in 0..count { + output.push(bounds_b.index_as(i) as usize); + } + output + } + + pub struct ValMirror { marker: std::marker::PhantomData } + impl differential_dataflow::trace::Builder for ValMirror { + type Time = U::Time; + type Input = Updates; + type Output = OrdValBatch>; + + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { Self { marker: std::marker::PhantomData } } + fn push(&mut self, _chunk: &mut Self::Input) { unimplemented!() } + fn done(self, _description: Description) -> Self::Output { unimplemented!() } + fn seal(chain: &mut Vec, description: Description) -> Self::Output { + if chain.len() == 0 { + let storage = OrdValStorage { + keys: Default::default(), + vals: Default::default(), + upds: Default::default(), + }; + OrdValBatch { storage, description, updates: 0 } + } + else if chain.len() == 1 { + use columnar::Len; + let storage = chain.pop().unwrap(); + let updates = storage.diffs.values.len(); + let val_offs = strides_to_offset_list(&storage.vals.bounds, storage.keys.values.len()); + let time_offs = strides_to_offset_list(&storage.times.bounds, storage.vals.values.len()); + let storage = OrdValStorage { + keys: Coltainer { container: storage.keys.values }, + vals: Vals { + offs: val_offs, + vals: Coltainer { container: storage.vals.values }, + }, + upds: Upds { + offs: time_offs, + times: Coltainer { container: storage.times.values }, + diffs: Coltainer { container: storage.diffs.values }, + }, + }; + OrdValBatch { storage, description, updates } + } + else { + println!("chain length: {:?}", chain.len()); + unimplemented!() + } + } + } + + } +} + +pub mod updates { + + use columnar::{Columnar, Container, ContainerOf, Vecs, Borrow, Index, IndexAs, Len, Push}; + use columnar::primitive::offsets::Strides; + use differential_dataflow::difference::{Semigroup, IsZero}; + + use crate::layout::ColumnarUpdate as Update; + + /// A `Vecs` using strided offsets. + pub type Lists = Vecs; - use differential_dataflow::trace::implementations::ord_neu::{Vals, Upds}; - use differential_dataflow::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage}; - use differential_dataflow::trace::Description; + /// Trie-structured update storage using columnar containers. + /// + /// Four nested layers of `Lists`: + /// - `keys`: lists of keys (outer lists are independent groups) + /// - `vals`: per-key, lists of vals + /// - `times`: per-val, lists of times + /// - `diffs`: per-time, lists of diffs (singletons when consolidated) + /// + /// A flat unsorted input has stride 1 at every level (one key per entry, + /// one val per key, one time per val, one diff per time). + /// A fully consolidated trie has a single outer key list, all lists sorted + /// and deduplicated, and singleton diff lists. + pub struct Updates { + pub keys: Lists>, + pub vals: Lists>, + pub times: Lists>, + pub diffs: Lists>, + } - use crate::ValStorage; - use crate::layout::ColumnarUpdate as Update; - use crate::layout::ColumnarLayout as Layout; - use crate::arrangement::Coltainer; + impl Default for Updates { + fn default() -> Self { + Self { + keys: Default::default(), + vals: Default::default(), + times: Default::default(), + diffs: Default::default(), + } + } + } - use differential_dataflow::trace::implementations::OffsetList; - fn vec_u64_to_offset_list(list: Vec) -> OffsetList { - let mut output = OffsetList::with_capacity(list.len()); - output.push(0); - for item in list { output.push(item as usize); } - output + impl std::fmt::Debug for Updates { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Updates").finish() + } + } + + impl Clone for Updates { + fn clone(&self) -> Self { + Self { + keys: self.keys.clone(), + vals: self.vals.clone(), + times: self.times.clone(), + diffs: self.diffs.clone(), } + } + } - pub struct ValMirror { marker: std::marker::PhantomData } - impl differential_dataflow::trace::Builder for ValMirror { - type Time = U::Time; - type Input = ValStorage; - type Output = OrdValBatch>; - - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { Self { marker: std::marker::PhantomData } } - fn push(&mut self, _chunk: &mut Self::Input) { unimplemented!() } - fn done(self, _description: Description) -> Self::Output { unimplemented!() } - fn seal(chain: &mut Vec, description: Description) -> Self::Output { - if chain.len() == 0 { - let storage = OrdValStorage { - keys: Default::default(), - vals: Default::default(), - upds: Default::default(), - }; - OrdValBatch { storage, description, updates: 0 } - } - else if chain.len() == 1 { - use columnar::Len; - let storage = chain.pop().unwrap(); - let updates = storage.upds.len(); - let storage = OrdValStorage { - keys: Coltainer { container: storage.keys }, - vals: Vals { - offs: vec_u64_to_offset_list(storage.vals.bounds), - vals: Coltainer { container: storage.vals.values }, - }, - upds: Upds { - offs: vec_u64_to_offset_list(storage.upds.bounds), - times: Coltainer { container: storage.upds.values.0 }, - diffs: Coltainer { container: storage.upds.values.1 }, - }, - }; - OrdValBatch { storage, description, updates } - } - else { - println!("chain length: {:?}", chain.len()); - unimplemented!() + pub type Tuple = (::Key, ::Val, ::Time, ::Diff); + + /// Returns the value-index range for list `i` given cumulative bounds. + #[inline] + pub fn child_range>(bounds: B, i: usize) -> std::ops::Range { + let lower = if i == 0 { 0 } else { bounds.index_as(i - 1) as usize }; + let upper = bounds.index_as(i) as usize; + lower..upper + } + + impl Updates { + + pub fn vals_bounds(&self, key_range: std::ops::Range) -> std::ops::Range { + if !key_range.is_empty() { + let bounds = self.vals.bounds.borrow(); + let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize }; + let upper = bounds.index_as(key_range.end - 1) as usize; + lower..upper + } else { key_range } + } + pub fn times_bounds(&self, val_range: std::ops::Range) -> std::ops::Range { + if !val_range.is_empty() { + let bounds = self.times.bounds.borrow(); + let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize }; + let upper = bounds.index_as(val_range.end - 1) as usize; + lower..upper + } else { val_range } + } + pub fn diffs_bounds(&self, time_range: std::ops::Range) -> std::ops::Range { + if !time_range.is_empty() { + let bounds = self.diffs.bounds.borrow(); + let lower = if time_range.start == 0 { 0 } else { bounds.index_as(time_range.start - 1) as usize }; + let upper = bounds.index_as(time_range.end - 1) as usize; + lower..upper + } else { time_range } + } + + /// Copies `other[key_range]` into self, keys and all. + pub fn extend_from_keys(&mut self, other: &Self, key_range: std::ops::Range) { + self.keys.values.extend_from_self(other.keys.values.borrow(), key_range.clone()); + self.vals.extend_from_self(other.vals.borrow(), key_range.clone()); + let val_range = other.vals_bounds(key_range); + self.times.extend_from_self(other.times.borrow(), val_range.clone()); + let time_range = other.times_bounds(val_range); + self.diffs.extend_from_self(other.diffs.borrow(), time_range); + } + + /// Copies a range of vals (with their times and diffs) from `other` into self. + pub fn extend_from_vals(&mut self, other: &Self, val_range: std::ops::Range) { + self.vals.values.extend_from_self(other.vals.values.borrow(), val_range.clone()); + self.times.extend_from_self(other.times.borrow(), val_range.clone()); + let time_range = other.times_bounds(val_range); + self.diffs.extend_from_self(other.diffs.borrow(), time_range); + } + + /// Forms a consolidated `Updates` from sorted `(key, val, time, diff)` refs. + /// + /// Follows the same dedup pattern at each level: check if the current + /// entry differs from the previous, seal the previous group if so. + /// At the time level, equal `(key, val, time)` triples have diffs accumulated. + /// The `records` field tracks the input count for exchange accounting. + pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { + + let mut output = Self::default(); + let mut diff_stash = U::Diff::default(); + let mut diff_temp = U::Diff::default(); + + if let Some((key, val, time, diff)) = sorted.next() { + output.keys.values.push(key); + output.vals.values.push(val); + output.times.values.push(time); + Columnar::copy_from(&mut diff_stash, diff); + + for (key, val, time, diff) in sorted { + let mut differs = false; + // Keys: seal vals for previous key if key changed. + let keys_len = output.keys.values.len(); + differs |= ContainerOf::::reborrow_ref(key) != output.keys.values.borrow().get(keys_len - 1); + if differs { output.keys.values.push(key); } + // Vals: seal times for previous val if key or val changed. + let vals_len = output.vals.values.len(); + if differs { output.vals.bounds.push(vals_len as u64); } + differs |= ContainerOf::::reborrow_ref(val) != output.vals.values.borrow().get(vals_len - 1); + if differs { output.vals.values.push(val); } + // Times: seal diffs for previous time if key, val, or time changed. + let times_len = output.times.values.len(); + if differs { output.times.bounds.push(times_len as u64); } + differs |= ContainerOf::::reborrow_ref(time) != output.times.values.borrow().get(times_len - 1); + if differs { + // Flush accumulated diff for the previous time. + if !diff_stash.is_zero() { + output.diffs.values.push(&diff_stash); + } + // TODO: Else is complicated, as we may want to pop prior values. + // It is perhaps fine to leave zeros as a thing that won't + // invalidate merging. + output.diffs.bounds.push(output.diffs.values.len() as u64); + // Start new time. + output.times.values.push(time); + Columnar::copy_from(&mut diff_stash, diff); + } else { + // Same (key, val, time): accumulate diff. + Columnar::copy_from(&mut diff_temp, diff); + diff_stash.plus_equals(&diff_temp); } } + // Flush the last accumulated diff and seal all levels. + if !diff_stash.is_zero() { + output.diffs.values.push(&diff_stash); + } + output.diffs.bounds.push(output.diffs.values.len() as u64); + output.times.bounds.push(output.times.values.len() as u64); + output.vals.bounds.push(output.vals.values.len() as u64); + output.keys.bounds.push(output.keys.values.len() as u64); } + + output } - pub mod key { + /// Consolidates into canonical trie form: + /// single outer key list, all lists sorted and deduplicated, + /// diff lists are singletons (or absent if cancelled). + pub fn consolidate(self) -> Self { + + let Self { keys, vals, times, diffs } = self; + + let keys_b = keys.borrow(); + let vals_b = vals.borrow(); + let times_b = times.borrow(); + let diffs_b = diffs.borrow(); + + // Flatten to index tuples: [key_abs, val_abs, time_abs, diff_abs]. + let mut tuples: Vec<[usize; 4]> = Vec::new(); + for outer in 0..Len::len(&keys_b) { + for k in child_range(keys_b.bounds, outer) { + for v in child_range(vals_b.bounds, k) { + for t in child_range(times_b.bounds, v) { + for d in child_range(diffs_b.bounds, t) { + tuples.push([k, v, t, d]); + } + } + } + } + } + + // Sort by (key, val, time). Diff is payload. + tuples.sort_by(|a, b| { + keys_b.values.get(a[0]).cmp(&keys_b.values.get(b[0])) + .then_with(|| vals_b.values.get(a[1]).cmp(&vals_b.values.get(b[1]))) + .then_with(|| times_b.values.get(a[2]).cmp(×_b.values.get(b[2]))) + }); + + // Build consolidated output, bottom-up cancellation. + let mut output = Self::default(); + let mut diff_stash = U::Diff::default(); + let mut diff_temp = U::Diff::default(); + + let mut idx = 0; + while idx < tuples.len() { + let key_ref = keys_b.values.get(tuples[idx][0]); + let key_start_vals = output.vals.values.len(); + + // All entries with this key. + while idx < tuples.len() && keys_b.values.get(tuples[idx][0]) == key_ref { + let val_ref = vals_b.values.get(tuples[idx][1]); + let val_start_times = output.times.values.len(); + + // All entries with this (key, val). + while idx < tuples.len() + && keys_b.values.get(tuples[idx][0]) == key_ref + && vals_b.values.get(tuples[idx][1]) == val_ref + { + let time_ref = times_b.values.get(tuples[idx][2]); + + // Sum all diffs for this (key, val, time). + Columnar::copy_from(&mut diff_stash, diffs_b.values.get(tuples[idx][3])); + idx += 1; + while idx < tuples.len() + && keys_b.values.get(tuples[idx][0]) == key_ref + && vals_b.values.get(tuples[idx][1]) == val_ref + && times_b.values.get(tuples[idx][2]) == time_ref + { + Columnar::copy_from(&mut diff_temp, diffs_b.values.get(tuples[idx][3])); + diff_stash.plus_equals(&diff_temp); + idx += 1; + } - use differential_dataflow::trace::implementations::ord_neu::Upds; - use differential_dataflow::trace::implementations::ord_neu::key_batch::{OrdKeyBatch, OrdKeyStorage}; - use differential_dataflow::trace::Description; + // Emit time + singleton diff if nonzero. + if !diff_stash.is_zero() { + output.times.values.push(time_ref); + output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); + } + } - use crate::KeyStorage; - use crate::layout::ColumnarUpdate as Update; - use crate::layout::ColumnarLayout as Layout; - use crate::arrangement::Coltainer; + // Seal time list for this val; emit val if any times survived. + if output.times.values.len() > val_start_times { + output.times.bounds.push(output.times.values.len() as u64); + output.vals.values.push(val_ref); + } + } - use differential_dataflow::trace::implementations::OffsetList; - fn vec_u64_to_offset_list(list: Vec) -> OffsetList { - let mut output = OffsetList::with_capacity(list.len()); - output.push(0); - for item in list { output.push(item as usize); } - output + // Seal val list for this key; emit key if any vals survived. + if output.vals.values.len() > key_start_vals { + output.vals.bounds.push(output.vals.values.len() as u64); + output.keys.values.push(key_ref); + } } - pub struct KeyMirror { marker: std::marker::PhantomData } - impl> differential_dataflow::trace::Builder for KeyMirror { - type Time = U::Time; - type Input = KeyStorage; - type Output = OrdKeyBatch>; - - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { Self { marker: std::marker::PhantomData } } - fn push(&mut self, _chunk: &mut Self::Input) { unimplemented!() } - fn done(self, _description: Description) -> Self::Output { unimplemented!() } - fn seal(chain: &mut Vec, description: Description) -> Self::Output { - if chain.len() == 0 { - let storage = OrdKeyStorage { - keys: Default::default(), - upds: Default::default(), - }; - OrdKeyBatch { storage, description, updates: 0, value: OrdKeyBatch::>::create_value() } - } - else if chain.len() == 1 { - use columnar::Len; - let storage = chain.pop().unwrap(); - let updates = storage.upds.borrow().len(); - let upds = storage.upds.into_typed(); - let storage = OrdKeyStorage { - keys: Coltainer { container: storage.keys.into_typed() }, - upds: Upds { - offs: vec_u64_to_offset_list(upds.bounds), - times: Coltainer { container: upds.values.0 }, - diffs: Coltainer { container: upds.values.1 }, - }, - }; - OrdKeyBatch { storage, description, updates, value: OrdKeyBatch::>::create_value() } + // Seal the single outer key list. + if !output.keys.values.is_empty() { + output.keys.bounds.push(output.keys.values.len() as u64); + } + + output + } + + /// Push a single flat update `(key, val, time, diff)` as a stride-1 entry. + pub fn push<'a>(&mut self, key: columnar::Ref<'a, U::Key>, val: columnar::Ref<'a, U::Val>, time: columnar::Ref<'a, U::Time>, diff: columnar::Ref<'a, U::Diff>) { + self.keys.values.push(key); + self.keys.bounds.push(self.keys.values.len() as u64); + self.vals.values.push(val); + self.vals.bounds.push(self.vals.values.len() as u64); + self.times.values.push(time); + self.times.bounds.push(self.times.values.len() as u64); + self.diffs.values.push(diff); + self.diffs.bounds.push(self.diffs.values.len() as u64); + } + + /// Push a single flat update from owned values. + pub fn push_owned(&mut self, key: &U::Key, val: &U::Val, time: &U::Time, diff: &U::Diff) { + self.keys.values.push(key); + self.keys.bounds.push(self.keys.values.len() as u64); + self.vals.values.push(val); + self.vals.bounds.push(self.vals.values.len() as u64); + self.times.values.push(time); + self.times.bounds.push(self.times.values.len() as u64); + self.diffs.values.push(diff); + self.diffs.bounds.push(self.diffs.values.len() as u64); + } + + /// The number of leaf-level diff entries (total updates). + pub fn len(&self) -> usize { self.diffs.values.len() } + + /// Iterate all `(key, val, time, diff)` entries as refs. + pub fn iter(&self) -> impl Iterator, + columnar::Ref<'_, U::Val>, + columnar::Ref<'_, U::Time>, + columnar::Ref<'_, U::Diff>, + )> { + let keys_b = self.keys.borrow(); + let vals_b = self.vals.borrow(); + let times_b = self.times.borrow(); + let diffs_b = self.diffs.borrow(); + + let mut result = Vec::new(); + for outer in 0..Len::len(&keys_b) { + for k in child_range(keys_b.bounds, outer) { + let key = keys_b.values.get(k); + for v in child_range(vals_b.bounds, k) { + let val = vals_b.values.get(v); + for t in child_range(times_b.bounds, v) { + let time = times_b.values.get(t); + for d in child_range(diffs_b.bounds, t) { + let diff = diffs_b.values.get(d); + result.push((key, val, time, diff)); + } + } } - else { panic!("chain length: {:?} > 1", chain.len()); } } } + result.into_iter() + } + } + + impl timely::Accountable for Updates { + #[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 } + } + + impl timely::dataflow::channels::ContainerBytes for Updates { + fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } + fn length_in_bytes(&self) -> usize { unimplemented!() } + fn into_bytes(&self, _writer: &mut W) { unimplemented!() } + } + + #[cfg(test)] + mod tests { + use super::*; + + // (K, V, T, R) = (u64, u64, u64, i64). + type TestUpdate = (u64, u64, u64, i64); + + /// Collect entries as owned tuples for easier assertion. + fn collect(updates: &Updates) -> Vec<(u64, u64, u64, i64)> { + updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect() + } + + #[test] + fn test_push_and_consolidate_basic() { + let mut updates = Updates::::default(); + updates.push_owned(&1, &10, &100, &1); + updates.push_owned(&1, &10, &100, &2); + updates.push_owned(&2, &20, &200, &5); + + assert_eq!(updates.len(), 3); + + let consolidated = updates.consolidate(); + let entries = collect(&consolidated); + assert_eq!(entries, vec![(1, 10, 100, 3), (2, 20, 200, 5)]); + } + + #[test] + fn test_cancellation() { + let mut updates = Updates::::default(); + updates.push_owned(&1, &10, &100, &3); + updates.push_owned(&1, &10, &100, &-3); + updates.push_owned(&2, &20, &200, &1); + + let entries = collect(&updates.consolidate()); + assert_eq!(entries, vec![(2, 20, 200, 1)]); + } + + #[test] + fn test_multiple_vals_and_times() { + let mut updates = Updates::::default(); + updates.push_owned(&1, &10, &100, &1); + updates.push_owned(&1, &10, &200, &2); + updates.push_owned(&1, &20, &100, &3); + updates.push_owned(&1, &20, &100, &4); + + let entries = collect(&updates.consolidate()); + assert_eq!(entries, vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]); + } + + #[test] + fn test_val_cancellation_propagates() { + let mut updates = Updates::::default(); + updates.push_owned(&1, &10, &100, &5); + updates.push_owned(&1, &10, &100, &-5); + updates.push_owned(&1, &20, &100, &1); + + let entries = collect(&updates.consolidate()); + assert_eq!(entries, vec![(1, 20, 100, 1)]); + } + + #[test] + fn test_empty() { + let updates = Updates::::default(); + assert_eq!(collect(&updates.consolidate()), vec![]); + } + + #[test] + fn test_total_cancellation() { + let mut updates = Updates::::default(); + updates.push_owned(&1, &10, &100, &1); + updates.push_owned(&1, &10, &100, &-1); + assert_eq!(collect(&updates.consolidate()), vec![]); + } + + #[test] + fn test_unsorted_input() { + let mut updates = Updates::::default(); + updates.push_owned(&3, &30, &300, &1); + updates.push_owned(&1, &10, &100, &2); + updates.push_owned(&2, &20, &200, &3); + + let entries = collect(&updates.consolidate()); + assert_eq!(entries, vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]); } } }