Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 1 addition & 122 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,127 +146,6 @@ pub mod layout {

}

pub use container::Column;
mod container {

/// A container based on a columnar store, encoded in aligned bytes.
pub enum Column<C> {
/// The typed variant of the container.
Typed(C),
/// The binary variant of the container.
Bytes(timely::bytes::arc::Bytes),
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
///
/// Reasons could include misalignment, cloning of data, or wanting
/// to release the `Bytes` as a scarce resource.
Align(std::sync::Arc<[u64]>),
}

impl<C: Default> Default for Column<C> {
fn default() -> Self { Self::Typed(Default::default()) }
}

impl<C> Column<C> {
pub fn as_mut(&mut self) -> &mut C { if let Column::Typed(c) = self { c } else { panic!() }}
}

// The clone implementation moves out of the `Bytes` variant into `Align`.
// This is optional and non-optimal, as the bytes clone is relatively free.
// But, we don't want to leak the uses of `Bytes`, is why we do this I think.
impl<C: columnar::Container> Clone for Column<C> where C: Clone {
fn clone(&self) -> Self {
match self {
Column::Typed(t) => Column::Typed(t.clone()),
Column::Bytes(b) => {
assert!(b.len() % 8 == 0);
let mut alloc: Vec<u64> = vec![0; b.len() / 8];
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]);
Self::Align(alloc.into())
},
Column::Align(a) => Column::Align(std::sync::Arc::clone(&a.clone())),
}
}
fn clone_from(&mut self, other: &Self) {
match (self, other) {
(Column::Typed(t0), Column::Typed(t1)) => {
// Derived `Clone` implementations for e.g. tuples cannot be relied on to call `clone_from`.
let t1 = t1.borrow();
t0.clear();
t0.extend_from_self(t1, 0..t1.len());
}
(Column::Align(a0), Column::Align(a1)) => { a0.clone_from(a1); }
(x, y) => { *x = y.clone(); }
}
}
}

use columnar::{Len, FromBytes};
use columnar::bytes::{EncodeDecode, Indexed};

impl<C: columnar::ContainerBytes> Column<C> {
/// Borrows the contents no matter their representation.
///
/// This function is meant to be efficient, but it cannot be relied on to be zero-cost.
/// Ideal uses would borrow a container infrequently, and access the borrowed form repeatedly.
#[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> {
match self {
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Column::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
}
}

pub fn into_typed(self) -> C where C: Default {
if let Column::Typed(c) = self { c }
else {
let mut result = C::default();
let borrow = self.borrow();
result.extend_from_self(borrow, 0 .. borrow.len());
result
}
}
}

impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
match self {
Column::Typed(t) => t.push(item),
Column::Align(_) | Column::Bytes(_) => {
// We really oughtn't be calling this in this case.
// We could convert to owned, but need more constraints on `C`.
unimplemented!("Pushing into Column::Bytes without first clearing");
}
}
}
}

impl<C: columnar::ContainerBytes> timely::dataflow::channels::ContainerBytes for Column<C> {
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
// If the alignment is borked, we can relocate. IF the size is borked,
// not sure what we do in that case.
assert!(bytes.len() % 8 == 0);
if bytemuck::try_cast_slice::<_, u64>(&bytes).is_ok() {
Self::Bytes(bytes)
}
else {
println!("Re-locating bytes for alignment reasons");
let mut alloc: Vec<u64> = vec![0; bytes.len() / 8];
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]);
Self::Align(alloc.into())
}
}

// Borrow rather than trust the sizes of the bytes themselves.
fn length_in_bytes(&self) -> usize { 8 * Indexed::length_in_words(&self.borrow()) }

// Borrow rather than trust the sizes of the bytes themselves.
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { Indexed::write(writer, &self.borrow()).unwrap() }
}
}


pub use updates::Updates;

/// A thin wrapper around `Updates` that tracks the pre-consolidation record count
Expand Down Expand Up @@ -488,7 +367,7 @@ pub mod arrangement {
/// A builder for columnar storage.
pub type ValBuilder<K, V, T, R> = RcBuilder<ValMirror<(K,V,T,R)>>;

/// A batch container implementation for Column<C>.
/// A batch container implementation for Coltainer<C>.
pub use batch_container::Coltainer;
pub mod batch_container {

Expand Down
Loading