Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 169 additions & 22 deletions differential-dataflow/src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,160 @@ pub mod container {
);
}

/// A `Merger` using internal iteration for `Vec` containers.
pub type VecInternalMerger<D, T, R> = InternalMerger<Vec<(D, T, R)>>;
/// A `Merger` for `Vec` containers, which contain owned data and need special treatment.
pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
/// A `Merger` using internal iteration for `TimelyStack` containers.
pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;

/// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
pub struct VecMerger<D, T, R> {
_marker: PhantomData<(D, T, R)>,
}

impl<D, T, R> Default for VecMerger<D, T, R> {
fn default() -> Self { Self { _marker: PhantomData } }
}

impl<D, T, R> VecMerger<D, T, R> {
/// The target capacity for output buffers, as a power of two.
///
/// This amount is used to size vectors, where vectors not exactly this capacity are dropped.
/// If this is mis-set, there is the potential for more memory churn than anticipated.
fn target_capacity() -> usize {
timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
}
/// Acquire a buffer with the target capacity.
fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
let target = Self::target_capacity();
let mut container = stash.pop().unwrap_or_default();
container.clear();
// Reuse if at target; otherwise allocate fresh.
if container.capacity() != target {
container = Vec::with_capacity(target);
}
container
}
/// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`.
fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
if queue.is_empty() {
let target = Self::target_capacity();
if stash.len() < 2 {
let mut recycled = Vec::from(std::mem::take(queue));
recycled.clear();
if recycled.capacity() == target {
stash.push(recycled);
}
}
if let Some(chunk) = iter.next() {
*queue = std::collections::VecDeque::from(chunk);
}
}
}
}

impl<D, T, R> Merger for VecMerger<D, T, R>
where
D: Ord + Clone + 'static,
T: Ord + Clone + PartialOrder + 'static,
R: crate::difference::Semigroup + Clone + 'static,
{
type Chunk = Vec<(D, T, R)>;
type Time = T;

fn merge(
&mut self,
list1: Vec<Vec<(D, T, R)>>,
list2: Vec<Vec<(D, T, R)>>,
output: &mut Vec<Vec<(D, T, R)>>,
stash: &mut Vec<Vec<(D, T, R)>>,
) {
use std::cmp::Ordering;
use std::collections::VecDeque;

let mut iter1 = list1.into_iter();
let mut iter2 = list2.into_iter();
let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());

let mut result = self.empty(stash);

// Merge while both queues are non-empty.
while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
match (d1, t1).cmp(&(d2, t2)) {
Ordering::Less => {
result.push(q1.pop_front().unwrap());
}
Ordering::Greater => {
result.push(q2.pop_front().unwrap());
}
Ordering::Equal => {
let (d, t, mut r1) = q1.pop_front().unwrap();
let (_, _, r2) = q2.pop_front().unwrap();
r1.plus_equals(&r2);
if !r1.is_zero() {
result.push((d, t, r1));
}
}
}

if result.at_capacity() {
output.push(std::mem::take(&mut result));
result = self.empty(stash);
}

// Refill emptied queues from their chains.
if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
}

// Push partial result and remaining data from both sides.
if !result.is_empty() { output.push(result); }
for q in [q1, q2] {
if !q.is_empty() { output.push(Vec::from(q)); }
}
output.extend(iter1);
output.extend(iter2);
}

fn extract(
&mut self,
merged: Vec<Vec<(D, T, R)>>,
upper: AntichainRef<T>,
frontier: &mut Antichain<T>,
ship: &mut Vec<Vec<(D, T, R)>>,
kept: &mut Vec<Vec<(D, T, R)>>,
stash: &mut Vec<Vec<(D, T, R)>>,
) {
let mut keep = self.empty(stash);
let mut ready = self.empty(stash);

for chunk in merged {
for (data, time, diff) in chunk {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| time.clone());
keep.push((data, time, diff));
} else {
ready.push((data, time, diff));
}
}
if keep.at_capacity() {
kept.push(std::mem::take(&mut keep));
keep = self.empty(stash);
}
if ready.at_capacity() {
ship.push(std::mem::take(&mut ready));
ready = self.empty(stash);
}
}
if !keep.is_empty() { kept.push(keep); }
if !ready.is_empty() { ship.push(ready); }
}

fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
(chunk.len(), 0, 0, 0)
}
}

/// A merger that uses internal iteration via [`InternalMerge`].
pub struct InternalMerger<MC> {
_marker: PhantomData<MC>,
Expand All @@ -307,6 +456,9 @@ pub mod container {
stash.push(chunk);
}
/// Drain remaining items from one side into `result`/`output`.
///
/// Copies the partially-consumed head into `result`, then appends
/// remaining full chunks directly to `output` without copying.
fn drain_side(
&self,
head: &mut MC,
Expand All @@ -316,21 +468,20 @@ pub mod container {
output: &mut Vec<MC>,
stash: &mut Vec<MC>,
) {
while *pos < head.len() {
// Copy the partially-consumed head into result.
if *pos < head.len() {
result.merge_from(
std::slice::from_mut(head),
std::slice::from_mut(pos),
);
if *pos >= head.len() {
let old = std::mem::replace(head, list.next().unwrap_or_default());
self.recycle(old, stash);
*pos = 0;
}
if result.at_capacity() {
output.push(std::mem::take(result));
*result = self.empty(stash);
}
}
// Flush result before appending full chunks.
if !result.is_empty() {
output.push(std::mem::take(result));
*result = self.empty(stash);
}
// Remaining full chunks go directly to output.
output.extend(list);
}
}

Expand Down Expand Up @@ -370,20 +521,12 @@ pub mod container {
}
}

// Drain remaining from side 0.
// Drain remaining from each side: copy partial head, then append full chunks.
self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
if !result.is_empty() {
output.push(std::mem::take(&mut result));
result = self.empty(stash);
}
output.extend(list1);

// Drain remaining from side 1.
self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
if !result.is_empty() {
output.push(std::mem::take(&mut result));
output.push(result);
}
output.extend(list2);
}

fn extract(
Expand Down Expand Up @@ -424,6 +567,10 @@ pub mod container {
}

/// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
///
/// Note: The `VecMerger` type implements `Merger` directly and avoids
/// cloning by draining inputs. This `InternalMerge` impl is retained
/// because `reduce` requires `Builder::Input: InternalMerge`.
pub mod vec_internal {
use std::cmp::Ordering;
use timely::PartialOrder;
Expand Down
Loading