diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index a1b818c6c..252a36336 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -278,11 +278,160 @@ pub mod container { ); } - /// A `Merger` using internal iteration for `Vec` containers. - pub type VecInternalMerger = InternalMerger>; + /// A `Merger` for `Vec` containers, which contain owned data and need special treatment. + pub type VecInternalMerger = VecMerger; /// A `Merger` using internal iteration for `TimelyStack` containers. pub type ColInternalMerger = InternalMerger>; + /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs. + pub struct VecMerger { + _marker: PhantomData<(D, T, R)>, + } + + impl Default for VecMerger { + fn default() -> Self { Self { _marker: PhantomData } } + } + + impl VecMerger { + /// The target capacity for output buffers, as a power of two. + /// + /// This amount is used to size vectors, where vectors not exactly this capacity are dropped. + /// If this is mis-set, there is the potential for more memory churn than anticipated. + fn target_capacity() -> usize { + timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two() + } + /// Acquire a buffer with the target capacity. + fn empty(&self, stash: &mut Vec>) -> Vec<(D, T, R)> { + let target = Self::target_capacity(); + let mut container = stash.pop().unwrap_or_default(); + container.clear(); + // Reuse if at target; otherwise allocate fresh. + if container.capacity() != target { + container = Vec::with_capacity(target); + } + container + } + /// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`. + fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator>, stash: &mut Vec>) { + if queue.is_empty() { + let target = Self::target_capacity(); + if stash.len() < 2 { + let mut recycled = Vec::from(std::mem::take(queue)); + recycled.clear(); + if recycled.capacity() == target { + stash.push(recycled); + } + } + if let Some(chunk) = iter.next() { + *queue = std::collections::VecDeque::from(chunk); + } + } + } + } + + impl Merger for VecMerger + where + D: Ord + Clone + 'static, + T: Ord + Clone + PartialOrder + 'static, + R: crate::difference::Semigroup + Clone + 'static, + { + type Chunk = Vec<(D, T, R)>; + type Time = T; + + fn merge( + &mut self, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + stash: &mut Vec>, + ) { + use std::cmp::Ordering; + use std::collections::VecDeque; + + let mut iter1 = list1.into_iter(); + let mut iter2 = list2.into_iter(); + let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default()); + let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default()); + + let mut result = self.empty(stash); + + // Merge while both queues are non-empty. + while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) { + match (d1, t1).cmp(&(d2, t2)) { + Ordering::Less => { + result.push(q1.pop_front().unwrap()); + } + Ordering::Greater => { + result.push(q2.pop_front().unwrap()); + } + Ordering::Equal => { + let (d, t, mut r1) = q1.pop_front().unwrap(); + let (_, _, r2) = q2.pop_front().unwrap(); + r1.plus_equals(&r2); + if !r1.is_zero() { + result.push((d, t, r1)); + } + } + } + + if result.at_capacity() { + output.push(std::mem::take(&mut result)); + result = self.empty(stash); + } + + // Refill emptied queues from their chains. + if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); } + if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); } + } + + // Push partial result and remaining data from both sides. + if !result.is_empty() { output.push(result); } + for q in [q1, q2] { + if !q.is_empty() { output.push(Vec::from(q)); } + } + output.extend(iter1); + output.extend(iter2); + } + + fn extract( + &mut self, + merged: Vec>, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec>, + kept: &mut Vec>, + stash: &mut Vec>, + ) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for chunk in merged { + for (data, time, diff) in chunk { + if upper.less_equal(&time) { + frontier.insert_with(&time, |time| time.clone()); + keep.push((data, time, diff)); + } else { + ready.push((data, time, diff)); + } + } + if keep.at_capacity() { + kept.push(std::mem::take(&mut keep)); + keep = self.empty(stash); + } + if ready.at_capacity() { + ship.push(std::mem::take(&mut ready)); + ready = self.empty(stash); + } + } + if !keep.is_empty() { kept.push(keep); } + if !ready.is_empty() { ship.push(ready); } + } + + fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) { + (chunk.len(), 0, 0, 0) + } + } + /// A merger that uses internal iteration via [`InternalMerge`]. pub struct InternalMerger { _marker: PhantomData, @@ -307,6 +456,9 @@ pub mod container { stash.push(chunk); } /// Drain remaining items from one side into `result`/`output`. + /// + /// Copies the partially-consumed head into `result`, then appends + /// remaining full chunks directly to `output` without copying. fn drain_side( &self, head: &mut MC, @@ -316,21 +468,20 @@ pub mod container { output: &mut Vec, stash: &mut Vec, ) { - while *pos < head.len() { + // Copy the partially-consumed head into result. + if *pos < head.len() { result.merge_from( std::slice::from_mut(head), std::slice::from_mut(pos), ); - if *pos >= head.len() { - let old = std::mem::replace(head, list.next().unwrap_or_default()); - self.recycle(old, stash); - *pos = 0; - } - if result.at_capacity() { - output.push(std::mem::take(result)); - *result = self.empty(stash); - } } + // Flush result before appending full chunks. + if !result.is_empty() { + output.push(std::mem::take(result)); + *result = self.empty(stash); + } + // Remaining full chunks go directly to output. + output.extend(list); } } @@ -370,20 +521,12 @@ pub mod container { } } - // Drain remaining from side 0. + // Drain remaining from each side: copy partial head, then append full chunks. self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash); - if !result.is_empty() { - output.push(std::mem::take(&mut result)); - result = self.empty(stash); - } - output.extend(list1); - - // Drain remaining from side 1. self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash); if !result.is_empty() { - output.push(std::mem::take(&mut result)); + output.push(result); } - output.extend(list2); } fn extract( @@ -424,6 +567,10 @@ pub mod container { } /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`. + /// + /// Note: The `VecMerger` type implements `Merger` directly and avoids + /// cloning by draining inputs. This `InternalMerge` impl is retained + /// because `reduce` requires `Builder::Input: InternalMerge`. pub mod vec_internal { use std::cmp::Ordering; use timely::PartialOrder;