diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 9a97922f1..cebc43fd8 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -808,145 +808,221 @@ pub mod arrangement { } } - use crate::{ColumnarUpdate, ValStorage, KeyStorage}; - use differential_dataflow::trace::implementations::chainless_batcher as chainless; - type ValBatcher2 = chainless::Batcher<::Time, ValStorage>; - type KeyBatcher2 = chainless::Batcher<::Time, KeyStorage>; + use crate::{ValStorage, KeyStorage}; + use differential_dataflow::trace::implementations::merge_batcher::{MergeBatcher, InternalMerger}; + type ValBatcher2 = MergeBatcher, TrieChunker>, InternalMerger>>; + type KeyBatcher2 = MergeBatcher, TrieChunker>, InternalMerger>>; + /// A pass-through chunker for already-sorted trie containers. + /// + /// Since `ValStorage`/`KeyStorage` are produced sorted by `ValColBuilder`/`KeyColBuilder`, + /// the chunker just queues them — no sorting or consolidation needed. + pub struct TrieChunker { + ready: std::collections::VecDeque, + empty: Option, + } + + impl Default for TrieChunker { + fn default() -> Self { Self { ready: Default::default(), empty: None } } + } + + impl<'a, C: Default> timely::container::PushInto<&'a mut C> for TrieChunker { + fn push_into(&mut self, container: &'a mut C) { + self.ready.push_back(std::mem::take(container)); + } + } + + impl timely::container::ContainerBuilder for TrieChunker { + type Container = C; + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + fn finish(&mut self) -> Option<&mut Self::Container> { + self.empty = self.ready.pop_front(); + self.empty.as_mut() + } + } + + impl timely::container::LengthPreservingContainerBuilder for TrieChunker { } + pub mod batcher { use std::ops::Range; use columnar::{Borrow, Columnar, Container, Index, Len, Push}; - use differential_dataflow::trace::implementations::chainless_batcher as chainless; use differential_dataflow::difference::{Semigroup, IsZero}; use timely::progress::frontier::{Antichain, AntichainRef}; + use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use crate::ColumnarUpdate as Update; use crate::{ValStorage, KeyStorage}; - impl chainless::BatcherStorage for ValStorage { + // --- SizableContainer impls --- + + impl timely::container::SizableContainer for ValStorage { + fn at_capacity(&self) -> bool { + use columnar::Len; + // Capacity based on update count; 64K is a reasonable threshold. + self.upds.values.len() >= 64 * 1024 + } + fn ensure_capacity(&mut self, _stash: &mut Option) { } + } + + impl timely::container::SizableContainer for KeyStorage { + fn at_capacity(&self) -> bool { + // Capacity based on update count. + self.upds.borrow().values.len() >= 64 * 1024 + } + fn ensure_capacity(&mut self, _stash: &mut Option) { } + } + + // --- InternalMerge for ValStorage --- + + impl InternalMerge for ValStorage { + + type TimeOwned = U::Time; fn len(&self) -> usize { self.upds.values.len() } + fn clear(&mut self) { *self = Self::default(); } #[inline(never)] - fn merge(self, other: Self) -> Self { - - let mut this_sum = U::Diff::default(); - let mut that_sum = U::Diff::default(); - - let mut merged = Self::default(); - let this = self; - let that = other; - let this_keys = this.keys.borrow(); - let that_keys = that.keys.borrow(); - let mut this_key_range = 0 .. this_keys.len(); - let mut that_key_range = 0 .. that_keys.len(); - while !this_key_range.is_empty() && !that_key_range.is_empty() { - let this_key = this_keys.get(this_key_range.start); - let that_key = that_keys.get(that_key_range.start); - match this_key.cmp(&that_key) { - std::cmp::Ordering::Less => { - let lower = this_key_range.start; - gallop(this_keys, &mut this_key_range, |x| x < that_key); - merged.extend_from_keys(&this, lower .. this_key_range.start); - }, - std::cmp::Ordering::Equal => { - // keys are equal; must make a bespoke vals list. - // only push the key if merged.vals.values.len() advances. - let values_len = merged.vals.values.len(); - let mut this_val_range = this.vals_bounds(this_key_range.start .. this_key_range.start+1); - let mut that_val_range = that.vals_bounds(that_key_range.start .. that_key_range.start+1); - while !this_val_range.is_empty() && !that_val_range.is_empty() { - let this_val = this.vals.values.borrow().get(this_val_range.start); - let that_val = that.vals.values.borrow().get(that_val_range.start); - match this_val.cmp(&that_val) { - std::cmp::Ordering::Less => { - let lower = this_val_range.start; - gallop(this.vals.values.borrow(), &mut this_val_range, |x| x < that_val); - merged.extend_from_vals(&this, lower .. this_val_range.start); - }, - std::cmp::Ordering::Equal => { - // vals are equal; must make a bespoke upds list. - // only push the val if merged.upds.values.len() advances. - let updates_len = merged.upds.values.len(); - let mut this_upd_range = this.upds_bounds(this_val_range.start .. this_val_range.start+1); - let mut that_upd_range = that.upds_bounds(that_val_range.start .. that_val_range.start+1); - - while !this_upd_range.is_empty() && !that_upd_range.is_empty() { - let (this_time, this_diff) = this.upds.values.borrow().get(this_upd_range.start); - let (that_time, that_diff) = that.upds.values.borrow().get(that_upd_range.start); - match this_time.cmp(&that_time) { - std::cmp::Ordering::Less => { - let lower = this_upd_range.start; - gallop(this.upds.values.0.borrow(), &mut this_upd_range, |x| x < that_time); - merged.upds.values.extend_from_self(this.upds.values.borrow(), lower .. this_upd_range.start); - }, - std::cmp::Ordering::Equal => { - // times are equal; must add diffs. - this_sum.copy_from(this_diff); - that_sum.copy_from(that_diff); - this_sum.plus_equals(&that_sum); - if !this_sum.is_zero() { merged.upds.values.push((this_time, &this_sum)); } - // Advance the update ranges by one. - this_upd_range.start += 1; - that_upd_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_upd_range.start; - gallop(that.upds.values.0.borrow(), &mut that_upd_range, |x| x < this_time); - merged.upds.values.extend_from_self(that.upds.values.borrow(), lower .. that_upd_range.start); - }, - } - } - // Extend with the remaining this and that updates. - merged.upds.values.extend_from_self(this.upds.values.borrow(), this_upd_range); - merged.upds.values.extend_from_self(that.upds.values.borrow(), that_upd_range); - // Seal the updates and push the val. - if merged.upds.values.len() > updates_len { - merged.upds.bounds.push(merged.upds.values.len() as u64); - merged.vals.values.push(this_val); + fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { + match others.len() { + 0 => {}, + 1 => { + // Bulk copy: take remaining keys from position onward. + let other = &mut others[0]; + let pos = &mut positions[0]; + if self.keys.len() == 0 && *pos == 0 { + std::mem::swap(self, other); + return; + } + let other_len = other.keys.len(); + self.extend_from_keys(other, *pos .. other_len); + *pos = other_len; + }, + 2 => { + let mut this_sum = U::Diff::default(); + let mut that_sum = U::Diff::default(); + + let (left, right) = others.split_at_mut(1); + let this = &left[0]; + let that = &right[0]; + let this_keys = this.keys.borrow(); + let that_keys = that.keys.borrow(); + let mut this_key_range = positions[0] .. this_keys.len(); + let mut that_key_range = positions[1] .. that_keys.len(); + + while !this_key_range.is_empty() && !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { + let this_key = this_keys.get(this_key_range.start); + let that_key = that_keys.get(that_key_range.start); + match this_key.cmp(&that_key) { + std::cmp::Ordering::Less => { + let lower = this_key_range.start; + gallop(this_keys, &mut this_key_range, |x| x < that_key); + self.extend_from_keys(this, lower .. this_key_range.start); + }, + std::cmp::Ordering::Equal => { + let values_len = self.vals.values.len(); + let mut this_val_range = this.vals_bounds(this_key_range.start .. this_key_range.start+1); + let mut that_val_range = that.vals_bounds(that_key_range.start .. that_key_range.start+1); + while !this_val_range.is_empty() && !that_val_range.is_empty() { + let this_val = this.vals.values.borrow().get(this_val_range.start); + let that_val = that.vals.values.borrow().get(that_val_range.start); + match this_val.cmp(&that_val) { + std::cmp::Ordering::Less => { + let lower = this_val_range.start; + gallop(this.vals.values.borrow(), &mut this_val_range, |x| x < that_val); + self.extend_from_vals(this, lower .. this_val_range.start); + }, + std::cmp::Ordering::Equal => { + let updates_len = self.upds.values.len(); + let mut this_upd_range = this.upds_bounds(this_val_range.start .. this_val_range.start+1); + let mut that_upd_range = that.upds_bounds(that_val_range.start .. that_val_range.start+1); + while !this_upd_range.is_empty() && !that_upd_range.is_empty() { + let (this_time, this_diff) = this.upds.values.borrow().get(this_upd_range.start); + let (that_time, that_diff) = that.upds.values.borrow().get(that_upd_range.start); + match this_time.cmp(&that_time) { + std::cmp::Ordering::Less => { + let lower = this_upd_range.start; + gallop(this.upds.values.0.borrow(), &mut this_upd_range, |x| x < that_time); + self.upds.values.extend_from_self(this.upds.values.borrow(), lower .. this_upd_range.start); + }, + std::cmp::Ordering::Equal => { + this_sum.copy_from(this_diff); + that_sum.copy_from(that_diff); + this_sum.plus_equals(&that_sum); + if !this_sum.is_zero() { self.upds.values.push((this_time, &this_sum)); } + this_upd_range.start += 1; + that_upd_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_upd_range.start; + gallop(that.upds.values.0.borrow(), &mut that_upd_range, |x| x < this_time); + self.upds.values.extend_from_self(that.upds.values.borrow(), lower .. that_upd_range.start); + }, + } + } + self.upds.values.extend_from_self(this.upds.values.borrow(), this_upd_range); + self.upds.values.extend_from_self(that.upds.values.borrow(), that_upd_range); + if self.upds.values.len() > updates_len { + self.upds.bounds.push(self.upds.values.len() as u64); + self.vals.values.push(this_val); + } + this_val_range.start += 1; + that_val_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_val_range.start; + gallop(that.vals.values.borrow(), &mut that_val_range, |x| x < this_val); + self.extend_from_vals(that, lower .. that_val_range.start); + }, } - // Advance the val ranges by one. - this_val_range.start += 1; - that_val_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_val_range.start; - gallop(that.vals.values.borrow(), &mut that_val_range, |x| x < this_val); - merged.extend_from_vals(&that, lower .. that_val_range.start); - }, - } - } - // Extend with the remaining this and that values. - merged.extend_from_vals(&this, this_val_range); - merged.extend_from_vals(&that, that_val_range); - // Seal the values and push the key. - if merged.vals.values.len() > values_len { - merged.vals.bounds.push(merged.vals.values.len() as u64); - merged.keys.push(this_key); + } + self.extend_from_vals(this, this_val_range); + self.extend_from_vals(that, that_val_range); + if self.vals.values.len() > values_len { + self.vals.bounds.push(self.vals.values.len() as u64); + self.keys.push(this_key); + } + this_key_range.start += 1; + that_key_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_key_range.start; + gallop(that_keys, &mut that_key_range, |x| x < this_key); + self.extend_from_keys(that, lower .. that_key_range.start); + }, } - // Advance the key ranges by one. - this_key_range.start += 1; - that_key_range.start += 1; - }, - std::cmp::Ordering::Greater => { + } + // Copy remaining from whichever side has data, up to capacity. + while !this_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { + let lower = this_key_range.start; + this_key_range.start = this_key_range.end; // take all remaining + self.extend_from_keys(this, lower .. this_key_range.start); + } + while !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { let lower = that_key_range.start; - gallop(that_keys, &mut that_key_range, |x| x < this_key); - merged.extend_from_keys(&that, lower .. that_key_range.start); - }, - } + that_key_range.start = that_key_range.end; + self.extend_from_keys(that, lower .. that_key_range.start); + } + positions[0] = this_key_range.start; + positions[1] = that_key_range.start; + }, + n => unimplemented!("{n}-way merge not supported"), } - // Extend with the remaining this and that keys. - merged.extend_from_keys(&this, this_key_range); - merged.extend_from_keys(&that, that_key_range); - - merged } - #[inline(never)] - fn split(&mut self, frontier: AntichainRef) -> Self { - // Unfortunately the times are at the leaves, so there can be no bulk copying. - let mut ship = Self::default(); - let mut keep = Self::default(); + fn extract( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ) { let mut time = U::Time::default(); for key_idx in 0 .. self.keys.len() { let key = self.keys.borrow().get(key_idx); @@ -959,7 +1035,8 @@ pub mod arrangement { for upd_idx in self.upds_bounds(val_idx..val_idx+1) { let (t, diff) = self.upds.values.borrow().get(upd_idx); time.copy_from(t); - if frontier.less_equal(&time) { + if upper.less_equal(&time) { + frontier.insert_ref(&time); keep.upds.values.push((t, diff)); } else { @@ -984,119 +1061,125 @@ pub mod arrangement { ship.keys.push(key); } } - - *self = keep; - ship - } - - fn lower(&self, frontier: &mut Antichain) { - use columnar::Columnar; - let mut times = self.upds.values.0.borrow().into_index_iter(); - if let Some(time_ref) = times.next() { - let mut time = ::into_owned(time_ref); - frontier.insert_ref(&time); - for time_ref in times { - ::copy_from(&mut time, time_ref); - frontier.insert_ref(&time); - } - } } } - impl chainless::BatcherStorage for KeyStorage { + // --- InternalMerge for KeyStorage --- + + impl InternalMerge for KeyStorage { + + type TimeOwned = U::Time; fn len(&self) -> usize { self.upds.borrow().values.len() } + fn clear(&mut self) { *self = Self::default(); } #[inline(never)] - fn merge(self, other: Self) -> Self { - - let mut this_sum = U::Diff::default(); - let mut that_sum = U::Diff::default(); - - let mut merged = Self::default(); - - let this = self; - let that = other; - let this_keys = this.keys.borrow(); - let that_keys = that.keys.borrow(); - let mut this_key_range = 0 .. this_keys.len(); - let mut that_key_range = 0 .. that_keys.len(); - let this_upds = this.upds.borrow(); - let that_upds = that.upds.borrow(); - - while !this_key_range.is_empty() && !that_key_range.is_empty() { - let this_key = this_keys.get(this_key_range.start); - let that_key = that_keys.get(that_key_range.start); - match this_key.cmp(&that_key) { - std::cmp::Ordering::Less => { - let lower = this_key_range.start; - gallop(this_keys, &mut this_key_range, |x| x < that_key); - merged.extend_from_keys(&this, lower .. this_key_range.start); - }, - std::cmp::Ordering::Equal => { - // keys are equal; must make a bespoke vals list. - // only push the key if merged.vals.values.len() advances. - let updates_len = merged.upds.borrow().values.len(); - let mut this_upd_range = this.upds_bounds(this_key_range.start .. this_key_range.start+1); - let mut that_upd_range = that.upds_bounds(that_key_range.start .. that_key_range.start+1); - - while !this_upd_range.is_empty() && !that_upd_range.is_empty() { - let (this_time, this_diff) = this_upds.values.get(this_upd_range.start); - let (that_time, that_diff) = that_upds.values.get(that_upd_range.start); - match this_time.cmp(&that_time) { - std::cmp::Ordering::Less => { - let lower = this_upd_range.start; - gallop(this_upds.values.0, &mut this_upd_range, |x| x < that_time); - merged.upds.as_mut().values.extend_from_self(this_upds.values, lower .. this_upd_range.start); - }, - std::cmp::Ordering::Equal => { - // times are equal; must add diffs. - this_sum.copy_from(this_diff); - that_sum.copy_from(that_diff); - this_sum.plus_equals(&that_sum); - if !this_sum.is_zero() { merged.upds.as_mut().values.push((this_time, &this_sum)); } - // Advance the update ranges by one. - this_upd_range.start += 1; - that_upd_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_upd_range.start; - gallop(that_upds.values.0, &mut that_upd_range, |x| x < this_time); - merged.upds.as_mut().values.extend_from_self(that_upds.values, lower .. that_upd_range.start); - }, - } - } - // Extend with the remaining this and that updates. - merged.upds.as_mut().values.extend_from_self(this_upds.values, this_upd_range); - merged.upds.as_mut().values.extend_from_self(that_upds.values, that_upd_range); - // Seal the values and push the key. - if merged.upds.borrow().values.len() > updates_len { - let temp_len = merged.upds.borrow().values.len() as u64; - merged.upds.as_mut().bounds.push(temp_len); - merged.keys.as_mut().push(this_key); + fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { + match others.len() { + 0 => {}, + 1 => { + let other = &mut others[0]; + let pos = &mut positions[0]; + if self.upds.borrow().values.len() == 0 && *pos == 0 { + std::mem::swap(self, other); + return; + } + let other_len = other.keys.borrow().len(); + self.extend_from_keys(other, *pos .. other_len); + *pos = other_len; + }, + 2 => { + let mut this_sum = U::Diff::default(); + let mut that_sum = U::Diff::default(); + + let (left, right) = others.split_at_mut(1); + let this = &left[0]; + let that = &right[0]; + let this_keys = this.keys.borrow(); + let that_keys = that.keys.borrow(); + let this_upds = this.upds.borrow(); + let that_upds = that.upds.borrow(); + let mut this_key_range = positions[0] .. this_keys.len(); + let mut that_key_range = positions[1] .. that_keys.len(); + + while !this_key_range.is_empty() && !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { + let this_key = this_keys.get(this_key_range.start); + let that_key = that_keys.get(that_key_range.start); + match this_key.cmp(&that_key) { + std::cmp::Ordering::Less => { + let lower = this_key_range.start; + gallop(this_keys, &mut this_key_range, |x| x < that_key); + self.extend_from_keys(this, lower .. this_key_range.start); + }, + std::cmp::Ordering::Equal => { + let updates_len = self.upds.borrow().values.len(); + let mut this_upd_range = this.upds_bounds(this_key_range.start .. this_key_range.start+1); + let mut that_upd_range = that.upds_bounds(that_key_range.start .. that_key_range.start+1); + while !this_upd_range.is_empty() && !that_upd_range.is_empty() { + let (this_time, this_diff) = this_upds.values.get(this_upd_range.start); + let (that_time, that_diff) = that_upds.values.get(that_upd_range.start); + match this_time.cmp(&that_time) { + std::cmp::Ordering::Less => { + let lower = this_upd_range.start; + gallop(this_upds.values.0, &mut this_upd_range, |x| x < that_time); + self.upds.as_mut().values.extend_from_self(this_upds.values, lower .. this_upd_range.start); + }, + std::cmp::Ordering::Equal => { + this_sum.copy_from(this_diff); + that_sum.copy_from(that_diff); + this_sum.plus_equals(&that_sum); + if !this_sum.is_zero() { self.upds.as_mut().values.push((this_time, &this_sum)); } + this_upd_range.start += 1; + that_upd_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_upd_range.start; + gallop(that_upds.values.0, &mut that_upd_range, |x| x < this_time); + self.upds.as_mut().values.extend_from_self(that_upds.values, lower .. that_upd_range.start); + }, + } + } + self.upds.as_mut().values.extend_from_self(this_upds.values, this_upd_range); + self.upds.as_mut().values.extend_from_self(that_upds.values, that_upd_range); + if self.upds.borrow().values.len() > updates_len { + let temp_len = self.upds.borrow().values.len() as u64; + self.upds.as_mut().bounds.push(temp_len); + self.keys.as_mut().push(this_key); + } + this_key_range.start += 1; + that_key_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_key_range.start; + gallop(that_keys, &mut that_key_range, |x| x < this_key); + self.extend_from_keys(that, lower .. that_key_range.start); + }, } - // Advance the key ranges by one. - this_key_range.start += 1; - that_key_range.start += 1; - }, - std::cmp::Ordering::Greater => { + } + while !this_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { + let lower = this_key_range.start; + this_key_range.start = this_key_range.end; + self.extend_from_keys(this, lower .. this_key_range.start); + } + while !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { let lower = that_key_range.start; - gallop(that_keys, &mut that_key_range, |x| x < this_key); - merged.extend_from_keys(&that, lower .. that_key_range.start); - }, - } + that_key_range.start = that_key_range.end; + self.extend_from_keys(that, lower .. that_key_range.start); + } + positions[0] = this_key_range.start; + positions[1] = that_key_range.start; + }, + n => unimplemented!("{n}-way merge not supported"), } - // Extend with the remaining this and that keys. - merged.extend_from_keys(&this, this_key_range); - merged.extend_from_keys(&that, that_key_range); - - merged } - #[inline(never)] - fn split(&mut self, frontier: AntichainRef) -> Self { - // Unfortunately the times are at the leaves, so there can be no bulk copying. - + fn extract( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ) { use crate::Column; use columnar::{ContainerOf, Vecs}; @@ -1113,7 +1196,8 @@ pub mod arrangement { for upd_idx in self.upds_bounds(key_idx..key_idx+1) { let (t, diff) = self.upds.borrow().values.get(upd_idx); time.copy_from(t); - if frontier.less_equal(&time) { + if upper.less_equal(&time) { + frontier.insert_ref(&time); keep_upds.values.push((t, diff)); } else { @@ -1130,28 +1214,10 @@ pub mod arrangement { } } - self.keys = Column::Typed(keep_keys); - self.upds = Column::Typed(keep_upds); - - // *self = keep; - // ship - Self { - keys: Column::Typed(ship_keys), - upds: Column::Typed(ship_upds), - } - } - - fn lower(&self, frontier: &mut Antichain) { - use columnar::Columnar; - let mut times = self.upds.borrow().values.0.into_index_iter(); - if let Some(time_ref) = times.next() { - let mut time = ::into_owned(time_ref); - frontier.insert_ref(&time); - for time_ref in times { - ::copy_from(&mut time, time_ref); - frontier.insert_ref(&time); - } - } + keep.keys = Column::Typed(keep_keys); + keep.upds = Column::Typed(keep_upds); + ship.keys = Column::Typed(ship_keys); + ship.upds = Column::Typed(ship_upds); } }