From 3be43b8df0fbfbd6b3f5cc8fe5de218371e64bf7 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 6 May 2026 08:30:39 -0400 Subject: [PATCH 1/3] Remove InternalMerge implementations --- .../src/columnar/arrangement/mod.rs | 39 -------- .../trace/implementations/merge_batcher.rs | 95 ------------------- 2 files changed, 134 deletions(-) diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index d998d37a6..be980fcdd 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -252,45 +252,6 @@ impl timely::container::ContainerBuilder for T } } -pub mod batcher { - //! Batcher trait stubs required to plug `UpdatesTyped` into DD's merge batcher. - - use columnar::Len; - use timely::progress::frontier::{Antichain, AntichainRef}; - use crate::trace::implementations::merge_batcher::container::InternalMerge; - - use super::super::layout::ColumnarUpdate as Update; - use super::super::updates::UpdatesTyped; - - impl timely::container::SizableContainer for UpdatesTyped { - fn at_capacity(&self) -> bool { self.view().diffs.values.len() >= crate::columnar::LINK_TARGET } - fn ensure_capacity(&mut self, _stash: &mut Option) { } - } - - /// Required by `reduce_abelian`'s bound `Builder::Input: InternalMerge`. - /// Not called at runtime — our batcher uses `TrieMerger` instead. - /// TODO: Relax the bound in DD's reduce to remove this requirement. - impl InternalMerge for UpdatesTyped { - type TimeOwned = U::Time; - fn len(&self) -> usize { unimplemented!() } - fn clear(&mut self) { - use columnar::Clear; - self.keys.clear(); - self.vals.clear(); - self.times.clear(); - self.diffs.clear(); - } - fn merge_from(&mut self, _others: &mut [Self], _positions: &mut [usize]) { unimplemented!() } - fn extract(&mut self, - _position: &mut usize, - _upper: AntichainRef, - _frontier: &mut Antichain, - _keep: &mut Self, - _ship: &mut Self, - ) { unimplemented!() } - } -} - pub mod builder { //! [`ValMirror`] trace builder that seals melded chunks into [`OrdValBatch`]. diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index aa41300fb..fd21ec8a6 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -585,99 +585,4 @@ pub mod container { chunk.account() } } - - /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`. - /// - /// Note: The `VecMerger` type implements `Merger` directly and avoids - /// cloning by draining inputs. This `InternalMerge` impl is retained - /// because `reduce` requires `Builder::Input: InternalMerge`. - pub mod vec_internal { - use std::cmp::Ordering; - use timely::PartialOrder; - use timely::container::SizableContainer; - use timely::progress::frontier::{Antichain, AntichainRef}; - use crate::difference::Semigroup; - use super::InternalMerge; - - impl InternalMerge for Vec<(D, T, R)> { - type TimeOwned = T; - - fn len(&self) -> usize { Vec::len(self) } - fn clear(&mut self) { Vec::clear(self) } - - fn merge_from( - &mut self, - others: &mut [Self], - positions: &mut [usize], - ) { - match others.len() { - 0 => {}, - 1 => { - let other = &mut others[0]; - let pos = &mut positions[0]; - if self.is_empty() && *pos == 0 { - std::mem::swap(self, other); - return; - } - self.extend_from_slice(&other[*pos ..]); - *pos = other.len(); - }, - 2 => { - let (left, right) = others.split_at_mut(1); - let other1 = &mut left[0]; - let other2 = &mut right[0]; - - while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() { - let (d1, t1, _) = &other1[positions[0]]; - let (d2, t2, _) = &other2[positions[1]]; - // NOTE: The .clone() calls here are not great, but this dead code to be removed in the next release. - match (d1, t1).cmp(&(d2, t2)) { - Ordering::Less => { - self.push(other1[positions[0]].clone()); - positions[0] += 1; - } - Ordering::Greater => { - self.push(other2[positions[1]].clone()); - positions[1] += 1; - } - Ordering::Equal => { - let (d, t, mut r1) = other1[positions[0]].clone(); - let (_, _, ref r2) = other2[positions[1]]; - r1.plus_equals(r2); - if !r1.is_zero() { - self.push((d, t, r1)); - } - positions[0] += 1; - positions[1] += 1; - } - } - } - }, - n => unimplemented!("{n}-way merge not yet supported"), - } - } - - fn extract( - &mut self, - position: &mut usize, - upper: AntichainRef, - frontier: &mut Antichain, - keep: &mut Self, - ship: &mut Self, - ) { - let len = self.len(); - while *position < len && !keep.at_capacity() && !ship.at_capacity() { - let (data, time, diff) = self[*position].clone(); - if upper.less_equal(&time) { - frontier.insert_with(&time, |time| time.clone()); - keep.push((data, time, diff)); - } else { - ship.push((data, time, diff)); - } - *position += 1; - } - } - } - } - } From 71e04857db474dd9b73214a79edbbce165b0d48f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 6 May 2026 08:43:00 -0400 Subject: [PATCH 2/3] Move VecMerger to its own independent module --- .../trace/implementations/merge_batcher.rs | 146 +++++++++--------- .../src/trace/implementations/ord_neu.rs | 6 +- 2 files changed, 80 insertions(+), 72 deletions(-) diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index fd21ec8a6..c3283f79d 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -219,81 +219,15 @@ pub trait Merger: Default { fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); } -pub use container::InternalMerger; - -pub mod container { - - //! Merger implementations for the merge batcher. - //! - //! The `InternalMerge` trait allows containers to merge sorted, consolidated - //! data using internal iteration. The `InternalMerger` type implements the - //! `Merger` trait using `InternalMerge`, and is the standard merger for all - //! container types. +/// A `Merger` implementation for vector update containers. +pub mod vec { use std::marker::PhantomData; use timely::container::SizableContainer; use timely::progress::frontier::{Antichain, AntichainRef}; - use timely::{Accountable, PartialOrder}; + use timely::PartialOrder; use crate::trace::implementations::merge_batcher::Merger; - /// A container that supports the operations needed by the merge batcher: - /// merging sorted chains and extracting updates by time. - pub trait InternalMerge: Accountable + SizableContainer + Default { - /// The owned time type, for maintaining antichains. - type TimeOwned; - - /// The number of items in this container. - fn len(&self) -> usize; - - /// Clear the container for reuse. - fn clear(&mut self); - - /// Account the allocations behind the chunk. - fn account(&self) -> (usize, usize, usize, usize) { - let (size, capacity, allocations) = (0, 0, 0); - (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations) - } - - /// Merge items from sorted inputs into `self`, advancing positions. - /// Merges until `self` is at capacity or all inputs are exhausted. - /// - /// Dispatches based on the number of inputs: - /// - **0**: no-op - /// - **1**: bulk copy (may swap the input into `self`) - /// - **2**: merge two sorted streams - fn merge_from( - &mut self, - others: &mut [Self], - positions: &mut [usize], - ); - - /// Extract updates from `self` into `ship` (times not beyond `upper`) - /// and `keep` (times beyond `upper`), updating `frontier` with kept times. - /// - /// Iteration starts at `*position` and advances `*position` as updates - /// are consumed. The implementation must yield (return early) when - /// either `keep.at_capacity()` or `ship.at_capacity()` becomes true, - /// so the caller can swap out a full output buffer and resume by - /// calling `extract` again. The caller invokes `extract` repeatedly - /// until `*position >= self.len()`. - /// - /// This shape exists because `at_capacity()` for `Vec` is - /// `len() == capacity()`, which silently becomes false again the - /// moment a push past capacity grows the backing allocation. - /// Without per-element yielding, a single `extract` call can - /// quietly produce oversized output chunks. - fn extract( - &mut self, - position: &mut usize, - upper: AntichainRef, - frontier: &mut Antichain, - keep: &mut Self, - ship: &mut Self, - ); - } - - /// A `Merger` for `Vec` containers, which contain owned data and need special treatment. - pub type VecInternalMerger = VecMerger; /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs. pub struct VecMerger { _marker: PhantomData<(D, T, R)>, @@ -447,6 +381,80 @@ pub mod container { (chunk.len(), 0, 0, 0) } } +} + +pub use container::InternalMerger; + +pub mod container { + + //! Merger implementations for the merge batcher. + //! + //! The `InternalMerge` trait allows containers to merge sorted, consolidated + //! data using internal iteration. The `InternalMerger` type implements the + //! `Merger` trait using `InternalMerge`, and is the standard merger for all + //! container types. + + use std::marker::PhantomData; + use timely::container::SizableContainer; + use timely::progress::frontier::{Antichain, AntichainRef}; + use timely::{Accountable, PartialOrder}; + use crate::trace::implementations::merge_batcher::Merger; + + /// A container that supports the operations needed by the merge batcher: + /// merging sorted chains and extracting updates by time. + pub trait InternalMerge: Accountable + SizableContainer + Default { + /// The owned time type, for maintaining antichains. + type TimeOwned; + + /// The number of items in this container. + fn len(&self) -> usize; + + /// Clear the container for reuse. + fn clear(&mut self); + + /// Account the allocations behind the chunk. + fn account(&self) -> (usize, usize, usize, usize) { + let (size, capacity, allocations) = (0, 0, 0); + (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations) + } + + /// Merge items from sorted inputs into `self`, advancing positions. + /// Merges until `self` is at capacity or all inputs are exhausted. + /// + /// Dispatches based on the number of inputs: + /// - **0**: no-op + /// - **1**: bulk copy (may swap the input into `self`) + /// - **2**: merge two sorted streams + fn merge_from( + &mut self, + others: &mut [Self], + positions: &mut [usize], + ); + + /// Extract updates from `self` into `ship` (times not beyond `upper`) + /// and `keep` (times beyond `upper`), updating `frontier` with kept times. + /// + /// Iteration starts at `*position` and advances `*position` as updates + /// are consumed. The implementation must yield (return early) when + /// either `keep.at_capacity()` or `ship.at_capacity()` becomes true, + /// so the caller can swap out a full output buffer and resume by + /// calling `extract` again. The caller invokes `extract` repeatedly + /// until `*position >= self.len()`. + /// + /// This shape exists because `at_capacity()` for `Vec` is + /// `len() == capacity()`, which silently becomes false again the + /// moment a push past capacity grows the backing allocation. + /// Without per-element yielding, a single `extract` call can + /// quietly produce oversized output chunks. + fn extract( + &mut self, + position: &mut usize, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ); + } /// A merger that uses internal iteration via [`InternalMerge`]. pub struct InternalMerger { diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 00ac3009f..37a863811 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -13,7 +13,7 @@ use std::rc::Rc; use crate::trace::implementations::chunker::ContainerChunker; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::MergeBatcher; -use crate::trace::implementations::merge_batcher::container::VecInternalMerger; +use crate::trace::implementations::merge_batcher::vec::VecMerger; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Layout, Vector}; @@ -24,14 +24,14 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, ContainerChunker>, VecInternalMerger<(K, V), T, R>>; +pub type OrdValBatcher = MergeBatcher, ContainerChunker>, VecMerger<(K, V), T, R>>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, ContainerChunker>, VecInternalMerger<(K, ()), T, R>>; +pub type OrdKeyBatcher = MergeBatcher, ContainerChunker>, VecMerger<(K, ()), T, R>>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; From e22741ecc9e77e71086e68e3843507631ac61bf0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 6 May 2026 08:47:21 -0400 Subject: [PATCH 3/3] Remove InternalMerge* --- .../trace/implementations/merge_batcher.rs | 212 ------------------ 1 file changed, 212 deletions(-) diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index c3283f79d..ccb2ac943 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -382,215 +382,3 @@ pub mod vec { } } } - -pub use container::InternalMerger; - -pub mod container { - - //! Merger implementations for the merge batcher. - //! - //! The `InternalMerge` trait allows containers to merge sorted, consolidated - //! data using internal iteration. The `InternalMerger` type implements the - //! `Merger` trait using `InternalMerge`, and is the standard merger for all - //! container types. - - use std::marker::PhantomData; - use timely::container::SizableContainer; - use timely::progress::frontier::{Antichain, AntichainRef}; - use timely::{Accountable, PartialOrder}; - use crate::trace::implementations::merge_batcher::Merger; - - /// A container that supports the operations needed by the merge batcher: - /// merging sorted chains and extracting updates by time. - pub trait InternalMerge: Accountable + SizableContainer + Default { - /// The owned time type, for maintaining antichains. - type TimeOwned; - - /// The number of items in this container. - fn len(&self) -> usize; - - /// Clear the container for reuse. - fn clear(&mut self); - - /// Account the allocations behind the chunk. - fn account(&self) -> (usize, usize, usize, usize) { - let (size, capacity, allocations) = (0, 0, 0); - (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations) - } - - /// Merge items from sorted inputs into `self`, advancing positions. - /// Merges until `self` is at capacity or all inputs are exhausted. - /// - /// Dispatches based on the number of inputs: - /// - **0**: no-op - /// - **1**: bulk copy (may swap the input into `self`) - /// - **2**: merge two sorted streams - fn merge_from( - &mut self, - others: &mut [Self], - positions: &mut [usize], - ); - - /// Extract updates from `self` into `ship` (times not beyond `upper`) - /// and `keep` (times beyond `upper`), updating `frontier` with kept times. - /// - /// Iteration starts at `*position` and advances `*position` as updates - /// are consumed. The implementation must yield (return early) when - /// either `keep.at_capacity()` or `ship.at_capacity()` becomes true, - /// so the caller can swap out a full output buffer and resume by - /// calling `extract` again. The caller invokes `extract` repeatedly - /// until `*position >= self.len()`. - /// - /// This shape exists because `at_capacity()` for `Vec` is - /// `len() == capacity()`, which silently becomes false again the - /// moment a push past capacity grows the backing allocation. - /// Without per-element yielding, a single `extract` call can - /// quietly produce oversized output chunks. - fn extract( - &mut self, - position: &mut usize, - upper: AntichainRef, - frontier: &mut Antichain, - keep: &mut Self, - ship: &mut Self, - ); - } - - /// A merger that uses internal iteration via [`InternalMerge`]. - pub struct InternalMerger { - _marker: PhantomData, - } - - impl Default for InternalMerger { - fn default() -> Self { Self { _marker: PhantomData } } - } - - impl InternalMerger where MC: InternalMerge { - #[inline] - fn empty(&self, stash: &mut Vec) -> MC { - stash.pop().unwrap_or_else(|| { - let mut container = MC::default(); - container.ensure_capacity(&mut None); - container - }) - } - #[inline] - fn recycle(&self, mut chunk: MC, stash: &mut Vec) { - chunk.clear(); - stash.push(chunk); - } - /// Drain remaining items from one side into `result`/`output`. - /// - /// Copies the partially-consumed head into `result`, then appends - /// remaining full chunks directly to `output` without copying. - fn drain_side( - &self, - head: &mut MC, - pos: &mut usize, - list: &mut std::vec::IntoIter, - result: &mut MC, - output: &mut Vec, - stash: &mut Vec, - ) { - // Copy the partially-consumed head into result. - if *pos < head.len() { - result.merge_from( - std::slice::from_mut(head), - std::slice::from_mut(pos), - ); - } - // Flush result before appending full chunks. - if !result.is_empty() { - output.push(std::mem::take(result)); - *result = self.empty(stash); - } - // Remaining full chunks go directly to output. - output.extend(list); - } - } - - impl Merger for InternalMerger - where - MC: InternalMerge + 'static, - { - type Time = MC::TimeOwned; - type Chunk = MC; - - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); - - let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()]; - let mut positions = [0usize, 0usize]; - - let mut result = self.empty(stash); - - // Main merge loop: both sides have data. - while positions[0] < heads[0].len() && positions[1] < heads[1].len() { - result.merge_from(&mut heads, &mut positions); - - if positions[0] >= heads[0].len() { - let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default()); - self.recycle(old, stash); - positions[0] = 0; - } - if positions[1] >= heads[1].len() { - let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default()); - self.recycle(old, stash); - positions[1] = 0; - } - if result.at_capacity() { - output.push(std::mem::take(&mut result)); - result = self.empty(stash); - } - } - - // Drain remaining from each side: copy partial head, then append full chunks. - self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash); - self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash); - if !result.is_empty() { - output.push(result); - } - } - - fn extract( - &mut self, - merged: Vec, - upper: AntichainRef, - frontier: &mut Antichain, - ship: &mut Vec, - kept: &mut Vec, - stash: &mut Vec, - ) { - let mut keep = self.empty(stash); - let mut ready = self.empty(stash); - - for mut buffer in merged { - let mut position = 0; - let len = buffer.len(); - while position < len { - buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready); - if keep.at_capacity() { - kept.push(std::mem::take(&mut keep)); - keep = self.empty(stash); - } - if ready.at_capacity() { - ship.push(std::mem::take(&mut ready)); - ready = self.empty(stash); - } - } - self.recycle(buffer, stash); - } - if !keep.is_empty() { - kept.push(keep); - } - if !ready.is_empty() { - ship.push(ready); - } - } - - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { - chunk.account() - } - } -}