Skip to content

Commit 004336e

Browse files
committed
Replace VecChunker with ContainerChunker
1 parent e65d33e commit 004336e

3 files changed

Lines changed: 5 additions & 119 deletions

File tree

differential-dataflow/src/trace/implementations/chunker.rs

Lines changed: 0 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -10,120 +10,6 @@ use crate::containers::TimelyStack;
1010
use crate::consolidation::{consolidate_updates, Consolidate};
1111
use crate::difference::Semigroup;
1212

13-
/// Chunk a stream of vectors into chains of vectors.
14-
pub struct VecChunker<T> {
15-
pending: Vec<T>,
16-
ready: VecDeque<Vec<T>>,
17-
empty: Option<Vec<T>>,
18-
}
19-
20-
impl<T> Default for VecChunker<T> {
21-
fn default() -> Self {
22-
Self {
23-
pending: Vec::default(),
24-
ready: VecDeque::default(),
25-
empty: None,
26-
}
27-
}
28-
}
29-
30-
impl<K, V, T, R> VecChunker<((K, V), T, R)>
31-
where
32-
K: Ord,
33-
V: Ord,
34-
T: Ord,
35-
R: Semigroup,
36-
{
37-
const BUFFER_SIZE_BYTES: usize = 8 << 10;
38-
fn chunk_capacity() -> usize {
39-
let size = ::std::mem::size_of::<((K, V), T, R)>();
40-
if size == 0 {
41-
Self::BUFFER_SIZE_BYTES
42-
} else if size <= Self::BUFFER_SIZE_BYTES {
43-
Self::BUFFER_SIZE_BYTES / size
44-
} else {
45-
1
46-
}
47-
}
48-
49-
/// Form chunks out of pending data, if needed. This function is meant to be applied to
50-
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
51-
/// half full when the function returns.
52-
///
53-
/// `form_chunk` does the following:
54-
/// * If pending is full, consolidate.
55-
/// * If after consolidation it's more than half full, peel off chunks,
56-
/// leaving behind any partial chunk in pending.
57-
fn form_chunk(&mut self) {
58-
consolidate_updates(&mut self.pending);
59-
if self.pending.len() >= Self::chunk_capacity() {
60-
while self.pending.len() > Self::chunk_capacity() {
61-
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
62-
chunk.extend(self.pending.drain(..chunk.capacity()));
63-
self.ready.push_back(chunk);
64-
}
65-
}
66-
}
67-
}
68-
69-
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
70-
where
71-
K: Ord + Clone,
72-
V: Ord + Clone,
73-
T: Ord + Clone,
74-
R: Semigroup + Clone,
75-
{
76-
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
77-
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
78-
// because we don't write more than capacity elements into the buffer.
79-
// Important: Consolidation requires `pending` to have twice the chunk capacity to
80-
// amortize its cost. Otherwise, it risks to do quadratic work.
81-
if self.pending.capacity() < Self::chunk_capacity() * 2 {
82-
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
83-
}
84-
85-
let mut drain = container.drain(..).peekable();
86-
while drain.peek().is_some() {
87-
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
88-
if self.pending.len() == self.pending.capacity() {
89-
self.form_chunk();
90-
}
91-
}
92-
}
93-
}
94-
95-
impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
96-
where
97-
K: Ord + Clone + 'static,
98-
V: Ord + Clone + 'static,
99-
T: Ord + Clone + 'static,
100-
R: Semigroup + Clone + 'static,
101-
{
102-
type Container = Vec<((K, V), T, R)>;
103-
104-
fn extract(&mut self) -> Option<&mut Self::Container> {
105-
if let Some(ready) = self.ready.pop_front() {
106-
self.empty = Some(ready);
107-
self.empty.as_mut()
108-
} else {
109-
None
110-
}
111-
}
112-
113-
fn finish(&mut self) -> Option<&mut Self::Container> {
114-
if !self.pending.is_empty() {
115-
consolidate_updates(&mut self.pending);
116-
while !self.pending.is_empty() {
117-
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
118-
chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())));
119-
self.ready.push_back(chunk);
120-
}
121-
}
122-
self.empty = self.ready.pop_front();
123-
self.empty.as_mut()
124-
}
125-
}
126-
12713
/// Chunk a stream of vectors into chains of vectors.
12814
pub struct ColumnationChunker<T: Columnation> {
12915
pending: Vec<T>,

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use std::rc::Rc;
1212

1313
use crate::containers::TimelyStack;
14-
use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
14+
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker};
1515
use crate::trace::implementations::spine_fueled::Spine;
1616
use crate::trace::implementations::merge_batcher::MergeBatcher;
1717
use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger};
@@ -25,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
2525
/// A trace implementation using a spine of ordered lists.
2626
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
2727
/// A batcher using ordered lists.
28-
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecInternalMerger<(K, V), T, R>>;
28+
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
2929
/// A builder using ordered lists.
3030
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
3131

@@ -42,7 +42,7 @@ pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>
4242
/// A trace implementation using a spine of ordered lists.
4343
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
4444
/// A batcher for ordered lists.
45-
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecInternalMerger<(K, ()), T, R>>;
45+
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecInternalMerger<(K, ()), T, R>>;
4646
/// A builder for ordered lists.
4747
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
4848

differential-dataflow/src/trace/implementations/rhh.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
1212

1313
use crate::Hashable;
1414
use crate::containers::TimelyStack;
15-
use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15+
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker};
1616
use crate::trace::implementations::merge_batcher::MergeBatcher;
1717
use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger};
1818
use crate::trace::implementations::spine_fueled::Spine;
@@ -25,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
2525
/// A trace implementation using a spine of ordered lists.
2626
pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
2727
/// A batcher for ordered lists.
28-
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecInternalMerger<(K, V), T, R>>;
28+
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
2929
/// A builder for ordered lists.
3030
pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
3131

0 commit comments

Comments
 (0)