Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions differential-dataflow/src/columnar/arrangement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,45 +252,6 @@ impl<U: super::layout::ColumnarUpdate> timely::container::ContainerBuilder for T
}
}

pub mod batcher {
//! Batcher trait stubs required to plug `UpdatesTyped` into DD's merge batcher.

use columnar::Len;
use timely::progress::frontier::{Antichain, AntichainRef};
use crate::trace::implementations::merge_batcher::container::InternalMerge;

use super::super::layout::ColumnarUpdate as Update;
use super::super::updates::UpdatesTyped;

impl<U: Update> timely::container::SizableContainer for UpdatesTyped<U> {
fn at_capacity(&self) -> bool { self.view().diffs.values.len() >= crate::columnar::LINK_TARGET }
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}

/// Required by `reduce_abelian`'s bound `Builder::Input: InternalMerge`.
/// Not called at runtime — our batcher uses `TrieMerger` instead.
/// TODO: Relax the bound in DD's reduce to remove this requirement.
impl<U: Update> InternalMerge for UpdatesTyped<U> {
type TimeOwned = U::Time;
fn len(&self) -> usize { unimplemented!() }
fn clear(&mut self) {
use columnar::Clear;
self.keys.clear();
self.vals.clear();
self.times.clear();
self.diffs.clear();
}
fn merge_from(&mut self, _others: &mut [Self], _positions: &mut [usize]) { unimplemented!() }
fn extract(&mut self,
_position: &mut usize,
_upper: AntichainRef<U::Time>,
_frontier: &mut Antichain<U::Time>,
_keep: &mut Self,
_ship: &mut Self,
) { unimplemented!() }
}
}

pub mod builder {
//! [`ValMirror`] trace builder that seals melded chunks into [`OrdValBatch`].

Expand Down
305 changes: 3 additions & 302 deletions differential-dataflow/src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,81 +219,15 @@ pub trait Merger: Default {
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
}

pub use container::InternalMerger;

pub mod container {

//! Merger implementations for the merge batcher.
//!
//! The `InternalMerge` trait allows containers to merge sorted, consolidated
//! data using internal iteration. The `InternalMerger` type implements the
//! `Merger` trait using `InternalMerge`, and is the standard merger for all
//! container types.
/// A `Merger` implementation for vector update containers.
pub mod vec {

use std::marker::PhantomData;
use timely::container::SizableContainer;
use timely::progress::frontier::{Antichain, AntichainRef};
use timely::{Accountable, PartialOrder};
use timely::PartialOrder;
use crate::trace::implementations::merge_batcher::Merger;

/// A container that supports the operations needed by the merge batcher:
/// merging sorted chains and extracting updates by time.
pub trait InternalMerge: Accountable + SizableContainer + Default {
/// The owned time type, for maintaining antichains.
type TimeOwned;

/// The number of items in this container.
fn len(&self) -> usize;

/// Clear the container for reuse.
fn clear(&mut self);

/// Account the allocations behind the chunk.
fn account(&self) -> (usize, usize, usize, usize) {
let (size, capacity, allocations) = (0, 0, 0);
(usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
}

/// Merge items from sorted inputs into `self`, advancing positions.
/// Merges until `self` is at capacity or all inputs are exhausted.
///
/// Dispatches based on the number of inputs:
/// - **0**: no-op
/// - **1**: bulk copy (may swap the input into `self`)
/// - **2**: merge two sorted streams
fn merge_from(
&mut self,
others: &mut [Self],
positions: &mut [usize],
);

/// Extract updates from `self` into `ship` (times not beyond `upper`)
/// and `keep` (times beyond `upper`), updating `frontier` with kept times.
///
/// Iteration starts at `*position` and advances `*position` as updates
/// are consumed. The implementation must yield (return early) when
/// either `keep.at_capacity()` or `ship.at_capacity()` becomes true,
/// so the caller can swap out a full output buffer and resume by
/// calling `extract` again. The caller invokes `extract` repeatedly
/// until `*position >= self.len()`.
///
/// This shape exists because `at_capacity()` for `Vec` is
/// `len() == capacity()`, which silently becomes false again the
/// moment a push past capacity grows the backing allocation.
/// Without per-element yielding, a single `extract` call can
/// quietly produce oversized output chunks.
fn extract(
&mut self,
position: &mut usize,
upper: AntichainRef<Self::TimeOwned>,
frontier: &mut Antichain<Self::TimeOwned>,
keep: &mut Self,
ship: &mut Self,
);
}

/// A `Merger` for `Vec` containers, which contain owned data and need special treatment.
pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
/// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
pub struct VecMerger<D, T, R> {
_marker: PhantomData<(D, T, R)>,
Expand Down Expand Up @@ -447,237 +381,4 @@ pub mod container {
(chunk.len(), 0, 0, 0)
}
}

/// A merger that uses internal iteration via [`InternalMerge`].
pub struct InternalMerger<MC> {
_marker: PhantomData<MC>,
}

impl<MC> Default for InternalMerger<MC> {
fn default() -> Self { Self { _marker: PhantomData } }
}

impl<MC> InternalMerger<MC> where MC: InternalMerge {
#[inline]
fn empty(&self, stash: &mut Vec<MC>) -> MC {
stash.pop().unwrap_or_else(|| {
let mut container = MC::default();
container.ensure_capacity(&mut None);
container
})
}
#[inline]
fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
chunk.clear();
stash.push(chunk);
}
/// Drain remaining items from one side into `result`/`output`.
///
/// Copies the partially-consumed head into `result`, then appends
/// remaining full chunks directly to `output` without copying.
fn drain_side(
&self,
head: &mut MC,
pos: &mut usize,
list: &mut std::vec::IntoIter<MC>,
result: &mut MC,
output: &mut Vec<MC>,
stash: &mut Vec<MC>,
) {
// Copy the partially-consumed head into result.
if *pos < head.len() {
result.merge_from(
std::slice::from_mut(head),
std::slice::from_mut(pos),
);
}
// Flush result before appending full chunks.
if !result.is_empty() {
output.push(std::mem::take(result));
*result = self.empty(stash);
}
// Remaining full chunks go directly to output.
output.extend(list);
}
}

impl<MC> Merger for InternalMerger<MC>
where
MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
{
type Time = MC::TimeOwned;
type Chunk = MC;

fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
let mut positions = [0usize, 0usize];

let mut result = self.empty(stash);

// Main merge loop: both sides have data.
while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
result.merge_from(&mut heads, &mut positions);

if positions[0] >= heads[0].len() {
let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
self.recycle(old, stash);
positions[0] = 0;
}
if positions[1] >= heads[1].len() {
let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
self.recycle(old, stash);
positions[1] = 0;
}
if result.at_capacity() {
output.push(std::mem::take(&mut result));
result = self.empty(stash);
}
}

// Drain remaining from each side: copy partial head, then append full chunks.
self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
if !result.is_empty() {
output.push(result);
}
}

fn extract(
&mut self,
merged: Vec<Self::Chunk>,
upper: AntichainRef<Self::Time>,
frontier: &mut Antichain<Self::Time>,
ship: &mut Vec<Self::Chunk>,
kept: &mut Vec<Self::Chunk>,
stash: &mut Vec<Self::Chunk>,
) {
let mut keep = self.empty(stash);
let mut ready = self.empty(stash);

for mut buffer in merged {
let mut position = 0;
let len = buffer.len();
while position < len {
buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
if keep.at_capacity() {
kept.push(std::mem::take(&mut keep));
keep = self.empty(stash);
}
if ready.at_capacity() {
ship.push(std::mem::take(&mut ready));
ready = self.empty(stash);
}
}
self.recycle(buffer, stash);
}
if !keep.is_empty() {
kept.push(keep);
}
if !ready.is_empty() {
ship.push(ready);
}
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
chunk.account()
}
}

/// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
///
/// Note: The `VecMerger` type implements `Merger` directly and avoids
/// cloning by draining inputs. This `InternalMerge` impl is retained
/// because `reduce` requires `Builder::Input: InternalMerge`.
pub mod vec_internal {
use std::cmp::Ordering;
use timely::PartialOrder;
use timely::container::SizableContainer;
use timely::progress::frontier::{Antichain, AntichainRef};
use crate::difference::Semigroup;
use super::InternalMerge;

impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + 'static> InternalMerge for Vec<(D, T, R)> {
type TimeOwned = T;

fn len(&self) -> usize { Vec::len(self) }
fn clear(&mut self) { Vec::clear(self) }

fn merge_from(
&mut self,
others: &mut [Self],
positions: &mut [usize],
) {
match others.len() {
0 => {},
1 => {
let other = &mut others[0];
let pos = &mut positions[0];
if self.is_empty() && *pos == 0 {
std::mem::swap(self, other);
return;
}
self.extend_from_slice(&other[*pos ..]);
*pos = other.len();
},
2 => {
let (left, right) = others.split_at_mut(1);
let other1 = &mut left[0];
let other2 = &mut right[0];

while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
let (d1, t1, _) = &other1[positions[0]];
let (d2, t2, _) = &other2[positions[1]];
// NOTE: The .clone() calls here are not great, but this dead code to be removed in the next release.
match (d1, t1).cmp(&(d2, t2)) {
Ordering::Less => {
self.push(other1[positions[0]].clone());
positions[0] += 1;
}
Ordering::Greater => {
self.push(other2[positions[1]].clone());
positions[1] += 1;
}
Ordering::Equal => {
let (d, t, mut r1) = other1[positions[0]].clone();
let (_, _, ref r2) = other2[positions[1]];
r1.plus_equals(r2);
if !r1.is_zero() {
self.push((d, t, r1));
}
positions[0] += 1;
positions[1] += 1;
}
}
}
},
n => unimplemented!("{n}-way merge not yet supported"),
}
}

fn extract(
&mut self,
position: &mut usize,
upper: AntichainRef<T>,
frontier: &mut Antichain<T>,
keep: &mut Self,
ship: &mut Self,
) {
let len = self.len();
while *position < len && !keep.at_capacity() && !ship.at_capacity() {
let (data, time, diff) = self[*position].clone();
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| time.clone());
keep.push((data, time, diff));
} else {
ship.push((data, time, diff));
}
*position += 1;
}
}
}
}

}
6 changes: 3 additions & 3 deletions differential-dataflow/src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::rc::Rc;
use crate::trace::implementations::chunker::ContainerChunker;
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::MergeBatcher;
use crate::trace::implementations::merge_batcher::container::VecInternalMerger;
use crate::trace::implementations::merge_batcher::vec::VecMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Layout, Vector};
Expand All @@ -24,14 +24,14 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher using ordered lists.
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecMerger<(K, V), T, R>>;
/// A builder using ordered lists.
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
/// A batcher for ordered lists.
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecInternalMerger<(K, ()), T, R>>;
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecMerger<(K, ()), T, R>>;
/// A builder for ordered lists.
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;

Expand Down
Loading