diff --git a/encodings/alp/public-api.lock b/encodings/alp/public-api.lock index 95c4f3ad885..f20a7154a25 100644 --- a/encodings/alp/public-api.lock +++ b/encodings/alp/public-api.lock @@ -556,6 +556,6 @@ pub fn f64::to_u16(bits: Self::UINT) -> u16 pub fn vortex_alp::alp_encode(parray: &vortex_array::arrays::primitive::array::PrimitiveArray, exponents: core::option::Option) -> vortex_error::VortexResult -pub fn vortex_alp::alp_rd_decode(left_parts: vortex_buffer::buffer::Buffer, left_parts_dict: &[u16], right_bit_width: u8, right_parts: vortex_buffer::buffer_mut::BufferMut<::UINT>, left_parts_patches: core::option::Option<&vortex_array::patches::Patches>, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> +pub fn vortex_alp::alp_rd_decode(left_parts: vortex_buffer::buffer::Buffer, left_parts_dict: &[u16], right_bit_width: u8, right_parts: vortex_buffer::buffer::Buffer<::UINT>, left_parts_patches: core::option::Option<&vortex_array::patches::Patches>, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> pub fn vortex_alp::decompress_into_array(array: vortex_alp::ALPArray, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult diff --git a/encodings/alp/src/alp_rd/array.rs b/encodings/alp/src/alp_rd/array.rs index d98b596eaa8..b63ae99f327 100644 --- a/encodings/alp/src/alp_rd/array.rs +++ b/encodings/alp/src/alp_rd/array.rs @@ -17,12 +17,14 @@ use vortex_array::Precision; use vortex_array::ProstMetadata; use vortex_array::SerializeMetadata; use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::PrimitiveVTable; use vortex_array::buffer::BufferHandle; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; use vortex_array::patches::Patches; use vortex_array::patches::PatchesMetadata; +use vortex_array::require_child; use vortex_array::serde::ArrayChildren; use vortex_array::stats::ArrayStats; use vortex_array::stats::StatsSetRef; @@ -41,7 +43,6 @@ use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_err; use vortex_error::vortex_panic; -use vortex_mask::Mask; use vortex_session::VortexSession; use crate::alp_rd::kernel::PARENT_KERNELS; @@ -297,17 +298,12 @@ impl VTable for ALPRDVTable { } fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult { - let left_parts = array.left_parts().clone().execute::(ctx)?; - let right_parts = array.right_parts().clone().execute::(ctx)?; + let left_parts = require_child!(array.left_parts(), 0 => PrimitiveVTable).clone(); + let right_parts = require_child!(array.right_parts(), 1 => PrimitiveVTable).clone(); // Decode the left_parts using our builtin dictionary. let left_parts_dict = array.left_parts_dictionary(); - - let validity = array - .left_parts() - .validity()? - .to_array(array.len()) - .execute::(ctx)?; + let validity = left_parts.validity_mask()?; let decoded_array = if array.is_f32() { PrimitiveArray::new( @@ -315,7 +311,7 @@ impl VTable for ALPRDVTable { left_parts.into_buffer::(), left_parts_dict, array.right_bit_width, - right_parts.into_buffer_mut::(), + right_parts.into_buffer::(), array.left_parts_patches(), ctx, )?, @@ -327,7 +323,7 @@ impl VTable for ALPRDVTable { left_parts.into_buffer::(), left_parts_dict, array.right_bit_width, - right_parts.into_buffer_mut::(), + right_parts.into_buffer::(), array.left_parts_patches(), ctx, )?, diff --git a/encodings/alp/src/alp_rd/mod.rs b/encodings/alp/src/alp_rd/mod.rs index 58188f7ab9f..6ab3b7a3f35 100644 --- a/encodings/alp/src/alp_rd/mod.rs +++ b/encodings/alp/src/alp_rd/mod.rs @@ -294,7 +294,7 @@ pub fn alp_rd_decode( left_parts: Buffer, left_parts_dict: &[u16], right_bit_width: u8, - right_parts: BufferMut, + right_parts: Buffer, left_parts_patches: Option<&Patches>, ctx: &mut ExecutionCtx, ) -> VortexResult> { @@ -347,7 +347,7 @@ fn alp_rd_apply_patches( fn alp_rd_decode_core( _left_parts_dict: &[u16], right_bit_width: u8, - right_parts: BufferMut, + right_parts: Buffer, values: BufferMut, ) -> Buffer { // Shift the left-parts and add in the right-parts. @@ -361,7 +361,6 @@ fn alp_rd_decode_core( }) .freeze() } - /// Find the best "cut point" for a set of floating point values such that we can /// cast them all to the relevant value instead. fn find_best_dictionary(samples: &[T]) -> ALPRDDictionary { diff --git a/encodings/datetime-parts/src/canonical.rs b/encodings/datetime-parts/src/canonical.rs index 51e2d0447ba..87abfc61f6a 100644 --- a/encodings/datetime-parts/src/canonical.rs +++ b/encodings/datetime-parts/src/canonical.rs @@ -54,7 +54,7 @@ pub fn decode_to_temporal( // We split this into separate passes because often the seconds and/org subseconds components // are constant. let mut values: BufferMut = days_buf - .into_buffer_mut::() + .into_buffer::() .map_each_in_place(|d| d * 86_400 * divisor); if let Some(seconds) = array.seconds().as_constant() { diff --git a/encodings/fastlanes/src/for/array/for_decompress.rs b/encodings/fastlanes/src/for/array/for_decompress.rs index f7292a5a06e..e833e1974c8 100644 --- a/encodings/fastlanes/src/for/array/for_decompress.rs +++ b/encodings/fastlanes/src/for/array/for_decompress.rs @@ -14,7 +14,6 @@ use vortex_array::match_each_integer_ptype; use vortex_array::match_each_unsigned_integer_ptype; use vortex_array::vtable::ValidityHelper; use vortex_buffer::Buffer; -use vortex_buffer::BufferMut; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -71,7 +70,7 @@ pub fn decompress(array: &FoRArray, ctx: &mut ExecutionCtx) -> VortexResult(), min), + decompress_primitive(encoded.into_buffer::(), min), validity, ) } @@ -137,7 +136,7 @@ pub(crate) fn fused_decompress< } fn decompress_primitive( - values: BufferMut, + values: Buffer, min: T, ) -> Buffer { values diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 575a41459b9..f3e4f6723cd 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -111,6 +111,10 @@ harness = false name = "take_patches" harness = false +[[bench]] +name = "apply_patches" +harness = false + [[bench]] name = "chunk_array_builder" harness = false diff --git a/vortex-array/benches/apply_patches.rs b/vortex-array/benches/apply_patches.rs new file mode 100644 index 00000000000..734099ec149 --- /dev/null +++ b/vortex-array/benches/apply_patches.rs @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(clippy::unwrap_used)] +#![allow(clippy::cast_possible_truncation)] + +use divan::Bencher; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::patches::Patches; +use vortex_array::validity::Validity; +use vortex_buffer::Buffer; + +fn main() { + divan::main(); +} + +const ARRAY_LEN: usize = 1 << 16; // 65536 + +#[derive(Clone, Copy, Debug)] +enum Distribution { + /// Patch indices are uniformly random across the array. + Random, + /// Patch indices are clustered together in a contiguous region. + Clustered, +} + +/// Combined benchmark arguments: (density, distribution). +const BENCH_ARGS: &[(f64, Distribution)] = &[ + (0.01, Distribution::Random), + (0.01, Distribution::Clustered), + (0.1, Distribution::Random), + (0.1, Distribution::Clustered), +]; + +fn make_base_array(len: usize) -> PrimitiveArray { + let buffer = Buffer::from_iter(0..len as u32); + PrimitiveArray::new(buffer, Validity::NonNullable) +} + +fn make_patches(array_len: usize, density: f64, dist: Distribution, rng: &mut StdRng) -> Patches { + let num_patches = (array_len as f64 * density) as usize; + + let indices: Vec = match dist { + Distribution::Random => { + let mut raw: Vec = (0..num_patches) + .map(|_| rng.random_range(0..array_len as u64)) + .collect(); + raw.sort_unstable(); + raw.dedup(); + raw + } + Distribution::Clustered => { + // Place patches in a contiguous cluster starting at a random offset. + let max_start = array_len.saturating_sub(num_patches); + let start = rng.random_range(0..=max_start as u64); + (start..start + num_patches as u64).collect() + } + }; + + let n = indices.len(); + let values = Buffer::from_iter((0..n).map(|i| i as u32)).into_array(); + Patches::new( + array_len, + 0, + Buffer::from(indices).into_array(), + values, + None, + ) + .unwrap() +} + +#[divan::bench(args = BENCH_ARGS)] +fn patch_inplace(bencher: Bencher, &(density, dist): &(f64, Distribution)) { + let mut rng = StdRng::seed_from_u64(42); + let patches = make_patches(ARRAY_LEN, density, dist, &mut rng); + + bencher + .with_inputs(|| { + ( + make_base_array(ARRAY_LEN), + LEGACY_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, mut ctx)| array.patch(&patches, &mut ctx).unwrap()); +} + +#[divan::bench(args = BENCH_ARGS)] +fn patch_copy_to_buffer(bencher: Bencher, &(density, dist): &(f64, Distribution)) { + let mut rng = StdRng::seed_from_u64(42); + let patches = make_patches(ARRAY_LEN, density, dist, &mut rng); + + bencher + .with_inputs(|| { + ( + make_base_array(ARRAY_LEN), + LEGACY_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, mut ctx)| { + let arr_ref = array.clone(); + (arr_ref.patch(&patches, &mut ctx).unwrap(), array) + }); +} diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index ccadf82286a..44569b6d3c1 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -21642,6 +21642,8 @@ pub macro vortex_array::match_smallest_offset_type! pub macro vortex_array::register_kernel! +pub macro vortex_array::require_child! + pub macro vortex_array::vtable! pub enum vortex_array::Canonical diff --git a/vortex-array/src/arrays/dict/vtable/mod.rs b/vortex-array/src/arrays/dict/vtable/mod.rs index ffde0e4223b..215335e10fd 100644 --- a/vortex-array/src/arrays/dict/vtable/mod.rs +++ b/vortex-array/src/arrays/dict/vtable/mod.rs @@ -14,6 +14,7 @@ use vortex_session::VortexSession; use super::DictArray; use super::DictMetadata; use super::take_canonical; +use crate::AnyCanonical; use crate::ArrayRef; use crate::Canonical; use crate::DeserializeMetadata; @@ -23,6 +24,7 @@ use crate::Precision; use crate::ProstMetadata; use crate::SerializeMetadata; use crate::arrays::ConstantArray; +use crate::arrays::PrimitiveVTable; use crate::arrays::dict::compute::rules::PARENT_RULES; use crate::buffer::BufferHandle; use crate::dtype::DType; @@ -32,6 +34,7 @@ use crate::executor::ExecutionCtx; use crate::executor::ExecutionStep; use crate::hash::ArrayEq; use crate::hash::ArrayHash; +use crate::require_child; use crate::scalar::Scalar; use crate::serde::ArrayChildren; use crate::stats::StatsSetRef; @@ -192,25 +195,29 @@ impl VTable for DictVTable { } fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult { - if let Some(canonical) = execute_fast_path(array, ctx)? { - return Ok(ExecutionStep::Done(canonical)); + if array.is_empty() { + let result_dtype = array + .dtype() + .union_nullability(array.codes().dtype().nullability()); + return Ok(ExecutionStep::Done( + Canonical::empty(&result_dtype).into_array(), + )); } - // TODO(joe): if the values are constant return a constant - let values = array.values().clone().execute::(ctx)?; - let codes = array - .codes() - .clone() - .execute::(ctx)? - .into_primitive(); + let codes = require_child!(array.codes(), 0 => PrimitiveVTable); - // TODO(ngates): if indices are sorted and unique (strict-sorted), then we should delegate to - // the filter function since they're typically optimised for this case. - // TODO(ngates): if indices min is quite high, we could slice self and offset the indices - // such that canonicalize does less work. + if codes.all_invalid()? { + return Ok(ExecutionStep::Done( + ConstantArray::new(Scalar::null(array.dtype().as_nullable()), array.codes.len()) + .into_array(), + )); + } + + let values = require_child!(array.values(), 1 => AnyCanonical); + let values = Canonical::from(values); Ok(ExecutionStep::Done( - take_canonical(values, &codes, ctx)?.into_array(), + take_canonical(values, codes, ctx)?.into_array(), )) } @@ -231,27 +238,3 @@ impl VTable for DictVTable { PARENT_KERNELS.execute(array, parent, child_idx, ctx) } } - -/// Check for fast-path execution conditions. -pub(super) fn execute_fast_path( - array: &DictArray, - _ctx: &mut ExecutionCtx, -) -> VortexResult> { - // Empty array - nothing to do - if array.is_empty() { - let result_dtype = array - .dtype() - .union_nullability(array.codes().dtype().nullability()); - return Ok(Some(Canonical::empty(&result_dtype).into_array())); - } - - // All codes are null - result is all nulls - if array.codes.all_invalid()? { - return Ok(Some( - ConstantArray::new(Scalar::null(array.dtype().as_nullable()), array.codes.len()) - .into_array(), - )); - } - - Ok(None) -} diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index da05450f8de..1e6ae528090 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -387,6 +387,23 @@ impl fmt::Debug for ExecutionStep { } } +/// Require that a child array matches `$M`. If it does, evaluates to the matched value. +/// Otherwise, early-returns `Ok(ExecutionStep::execute_child::<$M>($idx))`. +/// +/// ```ignore +/// let codes = require_child!(array.codes(), 0 => PrimitiveVTable); +/// let values = require_child!(array.values(), 1 => AnyCanonical); +/// ``` +#[macro_export] +macro_rules! require_child { + ($child:expr, $idx:expr => $M:ty) => { + match $child.as_opt::<$M>() { + Some(c) => c, + None => return Ok($crate::ExecutionStep::execute_child::<$M>($idx)), + } + }; +} + /// Extension trait for creating an execution context from a session. pub trait VortexSessionExecute { /// Create a new execution context from this session. diff --git a/vortex-buffer/benches/vortex_buffer.rs b/vortex-buffer/benches/vortex_buffer.rs index 712890a5a4a..1fc28023c75 100644 --- a/vortex-buffer/benches/vortex_buffer.rs +++ b/vortex-buffer/benches/vortex_buffer.rs @@ -73,6 +73,17 @@ impl MapEach for Arrow MapEach for Buffer { + type Output = BufferMut; + + fn map_each(self, f: F) -> Self::Output + where + F: FnMut(T) -> R, + { + Buffer::::map_each_in_place(self, f) + } +} + impl MapEach for BufferMut { type Output = BufferMut; @@ -85,7 +96,7 @@ impl MapEach for BufferMut { } #[divan::bench( - types = [Arrow>, BufferMut], + types = [Arrow>, Buffer, BufferMut], args = INPUT_SIZE, )] fn map_each + FromIterator>(bencher: Bencher, n: i32) { diff --git a/vortex-buffer/public-api.lock b/vortex-buffer/public-api.lock index b695c9cf14d..c1f3ed13a62 100644 --- a/vortex-buffer/public-api.lock +++ b/vortex-buffer/public-api.lock @@ -600,6 +600,8 @@ pub fn vortex_buffer::Buffer::iter(&self) -> vortex_buffer::Iter<'_, T> pub fn vortex_buffer::Buffer::len(&self) -> usize +pub fn vortex_buffer::Buffer::map_each_in_place(self, f: F) -> vortex_buffer::BufferMut where T: core::marker::Copy, F: core::ops::function::FnMut(T) -> R + pub fn vortex_buffer::Buffer::slice(&self, range: impl core::ops::range::RangeBounds) -> Self pub fn vortex_buffer::Buffer::slice_ref(&self, subset: &[T]) -> Self @@ -704,7 +706,9 @@ pub type vortex_buffer::Buffer::Target = [T] pub fn vortex_buffer::Buffer::deref(&self) -> &Self::Target -pub struct vortex_buffer::BufferIterator +pub struct vortex_buffer::BufferIterator + +impl core::iter::traits::exact_size::ExactSizeIterator for vortex_buffer::BufferIterator impl core::iter::traits::iterator::Iterator for vortex_buffer::BufferIterator diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index f941db6dbec..053cb5baee5 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -208,6 +208,31 @@ impl Buffer { buffer.freeze() } + /// Map each element of the buffer with a closure. + pub fn map_each_in_place(self, mut f: F) -> BufferMut + where + T: Copy, + F: FnMut(T) -> R, + { + match self.try_into_mut() { + Ok(mut_buf) => mut_buf.map_each_in_place(f), + Err(buf) => { + let len = buf.len(); + let mut out_buf = BufferMut::with_capacity(len); + out_buf + .spare_capacity_mut() + .iter_mut() + .zip(buf) + .for_each(|(out, in_)| { + out.write(f(in_)); + }); + // Safety: just assigned to each value + unsafe { out_buf.set_len(len) } + out_buf + } + } + } + /// Clear the buffer, preserving existing capacity. pub fn clear(&mut self) { self.bytes.clear(); @@ -645,9 +670,11 @@ impl Buf for ByteBuffer { } /// Owned iterator over a [`Buffer`]. -pub struct BufferIterator { - buffer: Buffer, - index: usize, +pub struct BufferIterator { + // Keep the buffer alive for the duration of the iteration. + _buffer: Buffer, + ptr: *const T, + end: *const T, } impl Iterator for BufferIterator { @@ -655,29 +682,37 @@ impl Iterator for BufferIterator { #[inline] fn next(&mut self) -> Option { - (self.index < self.buffer.len()).then(move || { - let value = self.buffer[self.index]; - self.index += 1; - value - }) + if self.ptr == self.end { + None + } else { + // SAFETY: ptr is within the buffer and has not reached end. + let value = unsafe { self.ptr.read() }; + self.ptr = unsafe { self.ptr.add(1) }; + Some(value) + } } #[inline] fn size_hint(&self) -> (usize, Option) { - let remaining = self.buffer.len() - self.index; + let remaining = unsafe { self.end.offset_from(self.ptr) } as usize; (remaining, Some(remaining)) } } +impl ExactSizeIterator for BufferIterator {} + impl IntoIterator for Buffer { type Item = T; type IntoIter = BufferIterator; #[inline] fn into_iter(self) -> Self::IntoIter { + let ptr = self.as_slice().as_ptr(); + let end = unsafe { ptr.add(self.len()) }; BufferIterator { - buffer: self, - index: 0, + _buffer: self, + ptr, + end, } } }