Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion encodings/alp/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,6 @@ pub fn f64::to_u16(bits: Self::UINT) -> u16

pub fn vortex_alp::alp_encode(parray: &vortex_array::arrays::primitive::array::PrimitiveArray, exponents: core::option::Option<vortex_alp::Exponents>) -> vortex_error::VortexResult<vortex_alp::ALPArray>

pub fn vortex_alp::alp_rd_decode<T: vortex_alp::ALPRDFloat>(left_parts: vortex_buffer::buffer::Buffer<u16>, left_parts_dict: &[u16], right_bit_width: u8, right_parts: vortex_buffer::buffer_mut::BufferMut<<T as vortex_alp::ALPRDFloat>::UINT>, left_parts_patches: core::option::Option<&vortex_array::patches::Patches>, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_buffer::buffer::Buffer<T>>
pub fn vortex_alp::alp_rd_decode<T: vortex_alp::ALPRDFloat>(left_parts: vortex_buffer::buffer::Buffer<u16>, left_parts_dict: &[u16], right_bit_width: u8, right_parts: vortex_buffer::buffer::Buffer<<T as vortex_alp::ALPRDFloat>::UINT>, left_parts_patches: core::option::Option<&vortex_array::patches::Patches>, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_buffer::buffer::Buffer<T>>

pub fn vortex_alp::decompress_into_array(array: vortex_alp::ALPArray, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::primitive::array::PrimitiveArray>
18 changes: 7 additions & 11 deletions encodings/alp/src/alp_rd/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_array::patches::Patches;
use vortex_array::patches::PatchesMetadata;
use vortex_array::require_child;
use vortex_array::serde::ArrayChildren;
use vortex_array::stats::ArrayStats;
use vortex_array::stats::StatsSetRef;
Expand All @@ -41,7 +43,6 @@ use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_mask::Mask;
use vortex_session::VortexSession;

use crate::alp_rd::kernel::PARENT_KERNELS;
Expand Down Expand Up @@ -297,25 +298,20 @@ impl VTable for ALPRDVTable {
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
let left_parts = array.left_parts().clone().execute::<PrimitiveArray>(ctx)?;
let right_parts = array.right_parts().clone().execute::<PrimitiveArray>(ctx)?;
let left_parts = require_child!(array.left_parts(), 0 => PrimitiveVTable).clone();
let right_parts = require_child!(array.right_parts(), 1 => PrimitiveVTable).clone();

// Decode the left_parts using our builtin dictionary.
let left_parts_dict = array.left_parts_dictionary();

let validity = array
.left_parts()
.validity()?
.to_array(array.len())
.execute::<Mask>(ctx)?;
let validity = left_parts.validity_mask()?;

let decoded_array = if array.is_f32() {
PrimitiveArray::new(
alp_rd_decode::<f32>(
left_parts.into_buffer::<u16>(),
left_parts_dict,
array.right_bit_width,
right_parts.into_buffer_mut::<u32>(),
right_parts.into_buffer::<u32>(),
array.left_parts_patches(),
ctx,
)?,
Expand All @@ -327,7 +323,7 @@ impl VTable for ALPRDVTable {
left_parts.into_buffer::<u16>(),
left_parts_dict,
array.right_bit_width,
right_parts.into_buffer_mut::<u64>(),
right_parts.into_buffer::<u64>(),
array.left_parts_patches(),
ctx,
)?,
Expand Down
5 changes: 2 additions & 3 deletions encodings/alp/src/alp_rd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ pub fn alp_rd_decode<T: ALPRDFloat>(
left_parts: Buffer<u16>,
left_parts_dict: &[u16],
right_bit_width: u8,
right_parts: BufferMut<T::UINT>,
right_parts: Buffer<T::UINT>,
left_parts_patches: Option<&Patches>,
ctx: &mut ExecutionCtx,
) -> VortexResult<Buffer<T>> {
Expand Down Expand Up @@ -347,7 +347,7 @@ fn alp_rd_apply_patches(
fn alp_rd_decode_core<T: ALPRDFloat>(
_left_parts_dict: &[u16],
right_bit_width: u8,
right_parts: BufferMut<T::UINT>,
right_parts: Buffer<T::UINT>,
values: BufferMut<u16>,
) -> Buffer<T> {
// Shift the left-parts and add in the right-parts.
Expand All @@ -361,7 +361,6 @@ fn alp_rd_decode_core<T: ALPRDFloat>(
})
.freeze()
}

/// Find the best "cut point" for a set of floating point values such that we can
/// cast them all to the relevant value instead.
fn find_best_dictionary<T: ALPRDFloat>(samples: &[T]) -> ALPRDDictionary {
Expand Down
2 changes: 1 addition & 1 deletion encodings/datetime-parts/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn decode_to_temporal(
// We split this into separate passes because often the seconds and/org subseconds components
// are constant.
let mut values: BufferMut<i64> = days_buf
.into_buffer_mut::<i64>()
.into_buffer::<i64>()
.map_each_in_place(|d| d * 86_400 * divisor);

if let Some(seconds) = array.seconds().as_constant() {
Expand Down
5 changes: 2 additions & 3 deletions encodings/fastlanes/src/for/array/for_decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use vortex_array::match_each_integer_ptype;
use vortex_array::match_each_unsigned_integer_ptype;
use vortex_array::vtable::ValidityHelper;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -71,7 +70,7 @@ pub fn decompress(array: &FoRArray, ctx: &mut ExecutionCtx) -> VortexResult<Prim
encoded
} else {
PrimitiveArray::new(
decompress_primitive(encoded.into_buffer_mut::<T>(), min),
decompress_primitive(encoded.into_buffer::<T>(), min),
validity,
)
}
Expand Down Expand Up @@ -137,7 +136,7 @@ pub(crate) fn fused_decompress<
}

fn decompress_primitive<T: NativePType + WrappingAdd + PrimInt>(
values: BufferMut<T>,
values: Buffer<T>,
min: T,
) -> Buffer<T> {
values
Expand Down
4 changes: 4 additions & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ harness = false
name = "take_patches"
harness = false

[[bench]]
name = "apply_patches"
harness = false

[[bench]]
name = "chunk_array_builder"
harness = false
Expand Down
109 changes: 109 additions & 0 deletions vortex-array/benches/apply_patches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#![allow(clippy::unwrap_used)]
#![allow(clippy::cast_possible_truncation)]

use divan::Bencher;
use rand::Rng;
use rand::SeedableRng;
use rand::rngs::StdRng;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;

fn main() {
divan::main();
}

const ARRAY_LEN: usize = 1 << 16; // 65536

#[derive(Clone, Copy, Debug)]
enum Distribution {
/// Patch indices are uniformly random across the array.
Random,
/// Patch indices are clustered together in a contiguous region.
Clustered,
}

/// Combined benchmark arguments: (density, distribution).
const BENCH_ARGS: &[(f64, Distribution)] = &[
(0.01, Distribution::Random),
(0.01, Distribution::Clustered),
(0.1, Distribution::Random),
(0.1, Distribution::Clustered),
];

fn make_base_array(len: usize) -> PrimitiveArray {
let buffer = Buffer::from_iter(0..len as u32);
PrimitiveArray::new(buffer, Validity::NonNullable)
}

fn make_patches(array_len: usize, density: f64, dist: Distribution, rng: &mut StdRng) -> Patches {
let num_patches = (array_len as f64 * density) as usize;

let indices: Vec<u64> = match dist {
Distribution::Random => {
let mut raw: Vec<u64> = (0..num_patches)
.map(|_| rng.random_range(0..array_len as u64))
.collect();
raw.sort_unstable();
raw.dedup();
raw
}
Distribution::Clustered => {
// Place patches in a contiguous cluster starting at a random offset.
let max_start = array_len.saturating_sub(num_patches);
let start = rng.random_range(0..=max_start as u64);
(start..start + num_patches as u64).collect()
}
};

let n = indices.len();
let values = Buffer::from_iter((0..n).map(|i| i as u32)).into_array();
Patches::new(
array_len,
0,
Buffer::from(indices).into_array(),
values,
None,
)
.unwrap()
}

#[divan::bench(args = BENCH_ARGS)]
fn patch_inplace(bencher: Bencher, &(density, dist): &(f64, Distribution)) {
let mut rng = StdRng::seed_from_u64(42);
let patches = make_patches(ARRAY_LEN, density, dist, &mut rng);

bencher
.with_inputs(|| {
(
make_base_array(ARRAY_LEN),
LEGACY_SESSION.create_execution_ctx(),
)
})
.bench_values(|(array, mut ctx)| array.patch(&patches, &mut ctx).unwrap());
}

#[divan::bench(args = BENCH_ARGS)]
fn patch_copy_to_buffer(bencher: Bencher, &(density, dist): &(f64, Distribution)) {
let mut rng = StdRng::seed_from_u64(42);
let patches = make_patches(ARRAY_LEN, density, dist, &mut rng);

bencher
.with_inputs(|| {
(
make_base_array(ARRAY_LEN),
LEGACY_SESSION.create_execution_ctx(),
)
})
.bench_values(|(array, mut ctx)| {
let arr_ref = array.clone();
(arr_ref.patch(&patches, &mut ctx).unwrap(), array)
});
}
2 changes: 2 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21642,6 +21642,8 @@ pub macro vortex_array::match_smallest_offset_type!

pub macro vortex_array::register_kernel!

pub macro vortex_array::require_child!

pub macro vortex_array::vtable!

pub enum vortex_array::Canonical
Expand Down
59 changes: 21 additions & 38 deletions vortex-array/src/arrays/dict/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use vortex_session::VortexSession;
use super::DictArray;
use super::DictMetadata;
use super::take_canonical;
use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::DeserializeMetadata;
Expand All @@ -23,6 +24,7 @@ use crate::Precision;
use crate::ProstMetadata;
use crate::SerializeMetadata;
use crate::arrays::ConstantArray;
use crate::arrays::PrimitiveVTable;
use crate::arrays::dict::compute::rules::PARENT_RULES;
use crate::buffer::BufferHandle;
use crate::dtype::DType;
Expand All @@ -32,6 +34,7 @@ use crate::executor::ExecutionCtx;
use crate::executor::ExecutionStep;
use crate::hash::ArrayEq;
use crate::hash::ArrayHash;
use crate::require_child;
use crate::scalar::Scalar;
use crate::serde::ArrayChildren;
use crate::stats::StatsSetRef;
Expand Down Expand Up @@ -192,25 +195,29 @@ impl VTable for DictVTable {
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
if let Some(canonical) = execute_fast_path(array, ctx)? {
return Ok(ExecutionStep::Done(canonical));
if array.is_empty() {
let result_dtype = array
.dtype()
.union_nullability(array.codes().dtype().nullability());
return Ok(ExecutionStep::Done(
Canonical::empty(&result_dtype).into_array(),
));
}

// TODO(joe): if the values are constant return a constant
let values = array.values().clone().execute::<Canonical>(ctx)?;
let codes = array
.codes()
.clone()
.execute::<Canonical>(ctx)?
.into_primitive();
let codes = require_child!(array.codes(), 0 => PrimitiveVTable);

// TODO(ngates): if indices are sorted and unique (strict-sorted), then we should delegate to
// the filter function since they're typically optimised for this case.
// TODO(ngates): if indices min is quite high, we could slice self and offset the indices
// such that canonicalize does less work.
if codes.all_invalid()? {
return Ok(ExecutionStep::Done(
ConstantArray::new(Scalar::null(array.dtype().as_nullable()), array.codes.len())
.into_array(),
));
}

let values = require_child!(array.values(), 1 => AnyCanonical);
let values = Canonical::from(values);

Ok(ExecutionStep::Done(
take_canonical(values, &codes, ctx)?.into_array(),
take_canonical(values, codes, ctx)?.into_array(),
))
}

Expand All @@ -231,27 +238,3 @@ impl VTable for DictVTable {
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
}
}

/// Check for fast-path execution conditions.
pub(super) fn execute_fast_path(
array: &DictArray,
_ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
// Empty array - nothing to do
if array.is_empty() {
let result_dtype = array
.dtype()
.union_nullability(array.codes().dtype().nullability());
return Ok(Some(Canonical::empty(&result_dtype).into_array()));
}

// All codes are null - result is all nulls
if array.codes.all_invalid()? {
return Ok(Some(
ConstantArray::new(Scalar::null(array.dtype().as_nullable()), array.codes.len())
.into_array(),
));
}

Ok(None)
}
17 changes: 17 additions & 0 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,23 @@ impl fmt::Debug for ExecutionStep {
}
}

/// Require that a child array matches `$M`. If it does, evaluates to the matched value.
/// Otherwise, early-returns `Ok(ExecutionStep::execute_child::<$M>($idx))`.
///
/// ```ignore
/// let codes = require_child!(array.codes(), 0 => PrimitiveVTable);
/// let values = require_child!(array.values(), 1 => AnyCanonical);
/// ```
#[macro_export]
macro_rules! require_child {
($child:expr, $idx:expr => $M:ty) => {
match $child.as_opt::<$M>() {
Some(c) => c,
None => return Ok($crate::ExecutionStep::execute_child::<$M>($idx)),
}
};
}

/// Extension trait for creating an execution context from a session.
pub trait VortexSessionExecute {
/// Create a new execution context from this session.
Expand Down
Loading
Loading