Skip to content

Commit 8ada9f4

Browse files
Streaming columnar.rs example (#732)
1 parent a64db10 commit 8ada9f4

1 file changed

Lines changed: 16 additions & 103 deletions

File tree

timely/examples/columnar.rs

Lines changed: 16 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -129,66 +129,18 @@ fn main() {
129129
pub use container::Column;
130130
mod container {
131131

132-
/// A container based on a columnar store, encoded in aligned bytes.
133-
pub enum Column<C> {
134-
/// The typed variant of the container.
135-
Typed(C),
136-
/// The binary variant of the container.
137-
Bytes(timely_bytes::arc::Bytes),
138-
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
139-
///
140-
/// Reasons could include misalignment, cloning of data, or wanting
141-
/// to release the `Bytes` as a scarce resource.
142-
Align(Box<[u64]>),
143-
}
132+
use columnar::bytes::stash::Stash;
144133

145-
impl<C: Default> Default for Column<C> {
146-
fn default() -> Self { Self::Typed(Default::default()) }
147-
}
134+
#[derive(Clone, Default)]
135+
pub struct Column<C> { pub stash: Stash<C, timely_bytes::arc::Bytes> }
148136

149-
// The clone implementation moves out of the `Bytes` variant into `Align`.
150-
// This is optional and non-optimal, as the bytes clone is relatively free.
151-
// But, we don't want to leak the uses of `Bytes`, is why we do this I think.
152-
impl<C: columnar::Container> Clone for Column<C> where C: Clone {
153-
fn clone(&self) -> Self {
154-
match self {
155-
Column::Typed(t) => Column::Typed(t.clone()),
156-
Column::Bytes(b) => {
157-
assert!(b.len() % 8 == 0);
158-
let mut alloc: Vec<u64> = vec![0; b.len() / 8];
159-
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]);
160-
Self::Align(alloc.into())
161-
},
162-
Column::Align(a) => Column::Align(a.clone()),
163-
}
164-
}
165-
fn clone_from(&mut self, other: &Self) {
166-
match (self, other) {
167-
(Column::Typed(t0), Column::Typed(t1)) => {
168-
// Derived `Clone` implementations for e.g. tuples cannot be relied on to call `clone_from`.
169-
let t1 = t1.borrow();
170-
t0.clear();
171-
t0.extend_from_self(t1, 0..t1.len());
172-
}
173-
(Column::Align(a0), Column::Align(a1)) => { a0.clone_from(a1); }
174-
(x, y) => { *x = y.clone(); }
175-
}
176-
}
177-
}
178-
179-
use columnar::{Len, Index, FromBytes};
137+
use columnar::{Len, Index};
180138
use columnar::bytes::{EncodeDecode, Indexed};
181139
use columnar::common::IterOwn;
182140

183141
impl<C: columnar::ContainerBytes> Column<C> {
184142
/// Borrows the contents no matter their representation.
185-
#[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> {
186-
match self {
187-
Column::Typed(t) => t.borrow(),
188-
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
189-
Column::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
190-
}
191-
}
143+
#[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> { self.stash.borrow() }
192144
}
193145

194146
impl<C: columnar::ContainerBytes> timely::Accountable for Column<C> {
@@ -203,65 +155,26 @@ mod container {
203155

204156
impl<C: columnar::ContainerBytes> timely::container::SizableContainer for Column<C> {
205157
fn at_capacity(&self) -> bool {
206-
match self {
207-
Self::Typed(t) => {
158+
match &self.stash {
159+
Stash::Typed(t) => {
208160
let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow());
209161
length_in_bytes >= (1 << 20)
210162
},
211-
Self::Bytes(_) => true,
212-
Self::Align(_) => true,
163+
Stash::Bytes(_) => true,
164+
Stash::Align(_) => true,
213165
}
214166
}
215167
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
216168
}
217169

218170
impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
219-
#[inline]
220-
fn push_into(&mut self, item: T) {
221-
match self {
222-
Column::Typed(t) => t.push(item),
223-
Column::Align(_) | Column::Bytes(_) => {
224-
// We really oughtn't be calling this in this case.
225-
// We could convert to owned, but need more constraints on `C`.
226-
unimplemented!("Pushing into Column::Bytes without first clearing");
227-
}
228-
}
229-
}
171+
#[inline] fn push_into(&mut self, item: T) { use columnar::Push; self.stash.push(item) }
230172
}
231173

232174
impl<C: columnar::ContainerBytes> timely::dataflow::channels::ContainerBytes for Column<C> {
233-
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
234-
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
235-
// If the alignment is borked, we can relocate. IF the size is borked,
236-
// not sure what we do in that case.
237-
assert!(bytes.len() % 8 == 0);
238-
if bytemuck::try_cast_slice::<_, u64>(&bytes).is_ok() {
239-
Self::Bytes(bytes)
240-
}
241-
else {
242-
println!("Re-locating bytes for alignment reasons");
243-
let mut alloc: Vec<u64> = vec![0; bytes.len() / 8];
244-
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]);
245-
Self::Align(alloc.into())
246-
}
247-
}
248-
249-
fn length_in_bytes(&self) -> usize {
250-
match self {
251-
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
252-
Column::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()),
253-
Column::Bytes(b) => b.len(),
254-
Column::Align(a) => 8 * a.len(),
255-
}
256-
}
257-
258-
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
259-
match self {
260-
Column::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() },
261-
Column::Bytes(b) => writer.write_all(b).unwrap(),
262-
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
263-
}
264-
}
175+
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: bytes.into() } }
176+
fn length_in_bytes(&self) -> usize { self.stash.length_in_bytes() }
177+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { self.stash.into_bytes(writer) }
265178
}
266179
}
267180

@@ -270,7 +183,7 @@ use builder::ColumnBuilder;
270183
mod builder {
271184

272185
use std::collections::VecDeque;
273-
use columnar::bytes::{EncodeDecode, Indexed};
186+
use columnar::bytes::{EncodeDecode, Indexed, stash::Stash};
274187
use super::Column;
275188

276189
/// A container builder for `Column<C>`.
@@ -294,7 +207,7 @@ mod builder {
294207
if round - words < round / 10 {
295208
let mut alloc = Vec::with_capacity(round);
296209
Indexed::encode(&mut alloc, &self.current.borrow());
297-
self.pending.push_back(Column::Align(alloc.into_boxed_slice()));
210+
self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice()) });
298211
self.current.clear();
299212
}
300213
}
@@ -317,7 +230,7 @@ mod builder {
317230
#[inline]
318231
fn finish(&mut self) -> Option<&mut Self::Container> {
319232
if !self.current.is_empty() {
320-
self.pending.push_back(Column::Typed(std::mem::take(&mut self.current)));
233+
self.pending.push_back(Column { stash: Stash::Typed(std::mem::take(&mut self.current)) });
321234
}
322235
self.empty = self.pending.pop_front();
323236
self.empty.as_mut()

0 commit comments

Comments
 (0)