|
1 | 1 | //! A columnar container based on the columnation library. |
2 | 2 |
|
3 | 3 | use std::iter::FromIterator; |
4 | | - |
5 | | -pub use columnation::*; |
| 4 | +use std::rc::Rc; |
| 5 | + |
| 6 | +use columnation::{Columnation, Region}; |
| 7 | +use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; |
| 8 | +use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder}; |
| 9 | +use differential_dataflow::trace::implementations::spine_fueled::Spine; |
| 10 | +use differential_dataflow::trace::implementations::{BatchContainer, Layout, OffsetList, Update}; |
| 11 | +use differential_dataflow::trace::rc_blanket_impls::RcBuilder; |
6 | 12 | use timely::container::PushInto; |
7 | 13 |
|
| 14 | +mod merge_batcher; |
| 15 | +mod chunker; |
| 16 | + |
| 17 | +pub use merge_batcher::ColMerger; |
| 18 | +pub use chunker::ColumnationChunker; |
| 19 | + |
| 20 | +/// A layout based on timely stacks |
| 21 | +pub struct TStack<U: Update> { |
| 22 | + phantom: std::marker::PhantomData<U>, |
| 23 | +} |
| 24 | + |
| 25 | +impl<U: Update> Layout for TStack<U> |
| 26 | +where |
| 27 | + U::Key: Columnation, |
| 28 | + U::Val: Columnation, |
| 29 | + U::Time: Columnation, |
| 30 | + U::Diff: Columnation + Ord, |
| 31 | +{ |
| 32 | + type Target = U; |
| 33 | + type KeyContainer = TimelyStack<U::Key>; |
| 34 | + type ValContainer = TimelyStack<U::Val>; |
| 35 | + type TimeContainer = TimelyStack<U::Time>; |
| 36 | + type DiffContainer = TimelyStack<U::Diff>; |
| 37 | + type OffsetContainer = OffsetList; |
| 38 | +} |
| 39 | + |
| 40 | + |
| 41 | +/// A trace implementation backed by columnar storage. |
| 42 | +pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>; |
| 43 | +/// A batcher for columnar storage. |
| 44 | +pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; |
| 45 | +/// A builder for columnar storage. |
| 46 | +pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>; |
| 47 | + |
| 48 | + |
| 49 | +/// A trace implementation backed by columnar storage. |
| 50 | +pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>; |
| 51 | +/// A batcher for columnar storage |
| 52 | +pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; |
| 53 | +/// A builder for columnar storage |
| 54 | +pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>; |
| 55 | + |
| 56 | +// The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now |
| 57 | +// be presented with the actual contained type, rather than a type that borrows into it. |
| 58 | +impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> { |
| 59 | + type Owned = T; |
| 60 | + type ReadItem<'a> = &'a T; |
| 61 | + |
| 62 | + fn with_capacity(size: usize) -> Self { |
| 63 | + Self::with_capacity(size) |
| 64 | + } |
| 65 | + |
| 66 | + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { |
| 67 | + let mut new = Self::default(); |
| 68 | + new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); |
| 69 | + new |
| 70 | + } |
| 71 | + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } |
| 72 | + fn index(&self, index: usize) -> Self::ReadItem<'_> { |
| 73 | + &self[index] |
| 74 | + } |
| 75 | + fn len(&self) -> usize { |
| 76 | + self[..].len() |
| 77 | + } |
| 78 | +} |
| 79 | + |
| 80 | + |
8 | 81 | /// An append-only vector that store records as columns. |
9 | 82 | /// |
10 | 83 | /// This container maintains elements that might conventionally own |
@@ -274,7 +347,7 @@ mod container { |
274 | 347 | use timely::Container; |
275 | 348 | use timely::container::SizableContainer; |
276 | 349 |
|
277 | | - use crate::containers::TimelyStack; |
| 350 | + use crate::columnation::TimelyStack; |
278 | 351 |
|
279 | 352 | impl<T: Columnation> Container for TimelyStack<T> { |
280 | 353 | type ItemRef<'a> = &'a T where Self: 'a; |
|
0 commit comments