diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 8d5cc75c895..5e838a6ffcf 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -2268,6 +2268,12 @@ pub type vortex_array::arrays::decimal::DecimalArray = vortex_array::Array(&vortex_array::arrays::PrimitiveArray, &vortex_mask::Mask) -> core::option::Option + +pub fn vortex_array::arrays::dict::cardinality::has_repeated_code_sample(&vortex_array::arrays::PrimitiveArray, &vortex_mask::Mask) -> bool + pub mod vortex_array::arrays::dict::vtable pub struct vortex_array::arrays::dict::vtable::Dict diff --git a/vortex-array/src/arrays/dict/array.rs b/vortex-array/src/arrays/dict/array.rs index 10108191744..fe47768503b 100644 --- a/vortex-array/src/arrays/dict/array.rs +++ b/vortex-array/src/arrays/dict/array.rs @@ -11,6 +11,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_mask::AllOr; +use vortex_mask::Mask; use crate::ArrayRef; use crate::ArraySlots; @@ -23,6 +24,7 @@ use crate::array::ArrayParts; use crate::array::TypedArrayRef; use crate::array_slots; use crate::arrays::Dict; +use crate::arrays::PrimitiveArray; use crate::dtype::DType; use crate::dtype::PType; use crate::match_each_integer_ptype; @@ -150,46 +152,63 @@ pub trait DictArrayExt: TypedArrayRef + DictArraySlotsExt { .execute_mask(codes.len(), &mut LEGACY_SESSION.create_execution_ctx())?; #[expect(deprecated)] let codes_primitive = self.codes().to_primitive(); - let values_len = self.values().len(); - - let init_value = !referenced; - let referenced_value = referenced; - - let mut values_vec = vec![init_value; values_len]; - match codes_validity.bit_buffer() { - AllOr::All => { - match_each_integer_ptype!(codes_primitive.ptype(), |P| { - #[allow( - clippy::cast_possible_truncation, - clippy::cast_sign_loss, - reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" - )] - for &idx in codes_primitive.as_slice::

() { - values_vec[idx as usize] = referenced_value; - } - }); - } - AllOr::None => {} - AllOr::Some(mask) => { - match_each_integer_ptype!(codes_primitive.ptype(), |P| { - let codes = codes_primitive.as_slice::

(); - - #[allow( - clippy::cast_possible_truncation, - clippy::cast_sign_loss, - reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" - )] - mask.set_indices().for_each(|idx| { - values_vec[codes[idx] as usize] = referenced_value; - }); + compute_referenced_values_mask_from_codes( + &codes_primitive, + self.values().len(), + &codes_validity, + referenced, + ) + } +} +impl> DictArrayExt for T {} + +/// Build an exact bitmap over dictionary values referenced by valid codes. +/// +/// The sampling-based sparse dictionary estimator only decides whether an exact pass is likely to +/// be worthwhile. This helper is the exact pass: aggregate kernels use it to ignore unreferenced +/// values, and sparse dictionary canonicalization uses it to compact values before remapping codes. +pub(crate) fn compute_referenced_values_mask_from_codes( + codes_primitive: &PrimitiveArray, + values_len: usize, + codes_validity: &Mask, + referenced: bool, +) -> VortexResult { + let init_value = !referenced; + let referenced_value = referenced; + + let mut values_vec = vec![init_value; values_len]; + match codes_validity.bit_buffer() { + AllOr::All => { + match_each_integer_ptype!(codes_primitive.ptype(), |P| { + #[allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" + )] + for &idx in codes_primitive.as_slice::

() { + values_vec[idx as usize] = referenced_value; + } + }); + } + AllOr::None => {} + AllOr::Some(mask) => { + match_each_integer_ptype!(codes_primitive.ptype(), |P| { + let codes = codes_primitive.as_slice::

(); + + #[allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" + )] + mask.set_indices().for_each(|idx| { + values_vec[codes[idx] as usize] = referenced_value; }); - } + }); } - - Ok(BitBuffer::from(values_vec)) } + + Ok(BitBuffer::from(values_vec)) } -impl> DictArrayExt for T {} /// Concrete parts of a [`DictArray`](super::DictArray) after iterative execution. pub struct DictParts { diff --git a/vortex-array/src/arrays/dict/cardinality.rs b/vortex-array/src/arrays/dict/cardinality.rs new file mode 100644 index 00000000000..fb8ed84da5b --- /dev/null +++ b/vortex-array/src/arrays/dict/cardinality.rs @@ -0,0 +1,135 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Sampling-based cardinality estimation for dictionary codes. +//! +//! This module is used only as a cheap gate before the exact sparse-dictionary remap pass and as a +//! routing hint for downstream exporters. The estimate may be conservative or noisy, but +//! correctness does not depend on it: callers must still collect the exact unique code set and +//! re-check the sparse threshold before compacting. + +use vortex_mask::Mask; + +use crate::arrays::PrimitiveArray; +use crate::dtype::IntegerPType; + +const SAMPLE_SIZE: usize = 128; +const REPEATED_CODE_PROBE_SIZE: usize = 16; + +/// Return whether a small deterministic probe observes a repeated non-null code. +/// +/// Sparse canonicalization always has a cheap worst-case gate before it samples. This probe is the +/// next, cheaper filter for cases that are not sparse by row count alone: dense dictionaries should +/// not pay the full estimator cost unless the code stream first shows evidence of repeated codes. +/// A `true` result only means "run the estimator"; it is not enough to compact by itself. +pub fn has_repeated_code_sample( + codes: &PrimitiveArray, + validity_mask: &Mask, +) -> bool { + let sample_count = codes.len().min(REPEATED_CODE_PROBE_SIZE); + let mut observed_codes = Vec::::with_capacity(sample_count); + + for sample_idx in 0..sample_count { + let idx = sample_index(sample_idx, codes.len(), sample_count); + if !validity_mask.value(idx) { + continue; + } + + let code: usize = codes.as_slice::()[idx].as_(); + if observed_codes.contains(&code) { + return true; + } + observed_codes.push(code); + } + + false +} + +/// Estimate the number of distinct non-null dictionary codes. +/// +/// The estimator samples deterministic bucket midpoints so repeated executions make the same +/// compaction decision for the same input. Returning `None` means no valid sampled codes were seen. +/// A returned value should only be used to decide whether an exact pass is worth attempting. +pub fn estimate_code_cardinality( + codes: &PrimitiveArray, + validity_mask: &Mask, +) -> Option { + let sample_count = codes.len().min(SAMPLE_SIZE); + let mut observed_codes = Vec::<(usize, usize)>::new(); + + // Sample deterministic bucket midpoints instead of using randomness. The estimate only gates + // whether to run the exact pass; correctness never depends on the sample. + for sample_idx in 0..sample_count { + let idx = sample_index(sample_idx, codes.len(), sample_count); + if !validity_mask.value(idx) { + continue; + } + + let code: usize = codes.as_slice::()[idx].as_(); + if let Some((_, count)) = observed_codes + .iter_mut() + .find(|(observed, _)| *observed == code) + { + *count += 1; + } else { + observed_codes.push((code, 1)); + } + } + + estimate_cardinality_from_observations(&observed_codes) +} + +/// Estimate total cardinality from `(code, observed_count)` sample observations. +/// +/// The correction is Chao1-style: singleton-heavy samples imply more unseen codes, while repeated +/// observations imply the code stream is likely low-cardinality. +fn estimate_cardinality_from_observations(observed_codes: &[(usize, usize)]) -> Option { + if observed_codes.is_empty() { + return None; + } + + let unique_count = observed_codes.len(); + let singleton_count = observed_codes + .iter() + .filter(|(_, count)| *count == 1) + .count(); + let doubleton_count = observed_codes + .iter() + .filter(|(_, count)| *count == 2) + .count(); + + // Chao1-style lower-bias estimate for unseen codes. Repeated samples keep the estimate small + // for low-cardinality code streams; many singleton samples make dense streams look expensive. + let unseen_estimate = if doubleton_count == 0 { + singleton_count.saturating_mul(singleton_count.saturating_sub(1)) / 2 + } else { + div_ceil( + singleton_count.saturating_mul(singleton_count), + 2 * doubleton_count, + ) + }; + + Some(unique_count.saturating_add(unseen_estimate)) +} + +/// Return the midpoint index for one deterministic sampling bucket. +/// +/// Splitting the full code range into buckets avoids clustering all samples near the start while +/// avoiding RNG state in a hot execution path. +fn sample_index(sample_idx: usize, len: usize, sample_count: usize) -> usize { + debug_assert!(len > 0); + debug_assert!(sample_count > 0); + + let sample_idx = sample_idx as u128; + let len = len as u128; + let sample_count = sample_count as u128; + let bucket_start = sample_idx * len / sample_count; + let bucket_end = (sample_idx + 1) * len / sample_count; + + ((bucket_start + bucket_end) / 2).min(len - 1) as usize +} + +fn div_ceil(numerator: usize, denominator: usize) -> usize { + debug_assert!(denominator > 0); + numerator / denominator + usize::from(!numerator.is_multiple_of(denominator)) +} diff --git a/vortex-array/src/arrays/dict/compute/slice.rs b/vortex-array/src/arrays/dict/compute/slice.rs index 509fa7535d8..04434bb593a 100644 --- a/vortex-array/src/arrays/dict/compute/slice.rs +++ b/vortex-array/src/arrays/dict/compute/slice.rs @@ -12,25 +12,36 @@ use crate::arrays::Constant; use crate::arrays::ConstantArray; use crate::arrays::Dict; use crate::arrays::DictArray; +use crate::arrays::dict::DictArrayExt; use crate::arrays::dict::DictArraySlotsExt; use crate::arrays::slice::SliceReduce; use crate::scalar::Scalar; impl SliceReduce for Dict { fn slice(array: ArrayView<'_, Self>, range: Range) -> VortexResult> { + let is_full_slice = range.len() == array.len(); let sliced_code = array.codes().slice(range)?; // TODO(joe): if the range is size 1 replace with a constant array if let Some(code) = sliced_code.as_opt::() { let code = code.scalar().as_primitive().as_::(); return if let Some(code) = code { let values = array.values().slice(code..code + 1)?; - Ok(Some( - DictArray::new( + // SAFETY: the only dictionary value is referenced by every non-null code when the + // slice is non-empty. An empty code stream cannot reference a non-empty values + // array. + let sliced = unsafe { + DictArray::new_unchecked( ConstantArray::new(0u8, sliced_code.len()).into_array(), values, ) - .into_array(), - )) + }; + if sliced_code.is_empty() { + Ok(Some(sliced.into_array())) + } else { + Ok(Some(unsafe { + sliced.set_all_values_referenced(true).into_array() + })) + } } else { Ok(Some( ConstantArray::new(Scalar::null(array.dtype().clone()), sliced_code.len()) @@ -39,9 +50,18 @@ impl SliceReduce for Dict { }; } // SAFETY: slicing the codes preserves invariants. - Ok(Some( - unsafe { DictArray::new_unchecked(sliced_code, array.values().clone()) }.into_array(), - )) + let sliced = unsafe { DictArray::new_unchecked(sliced_code, array.values().clone()) }; + if is_full_slice { + // A full-length slice preserves the exact code stream, so the referenced-values + // metadata remains sound. Partial slices may drop the only reference to a value. + Ok(Some(unsafe { + sliced + .set_all_values_referenced(array.has_all_values_referenced()) + .into_array() + })) + } else { + Ok(Some(sliced.into_array())) + } } } @@ -51,8 +71,10 @@ mod tests { use vortex_error::VortexResult; use crate::IntoArray; + use crate::arrays::Dict; use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; + use crate::arrays::dict::DictArrayExt; use crate::arrays::dict::compute::slice::ConstantArray; use crate::assert_arrays_eq; use crate::dtype::DType; @@ -84,4 +106,36 @@ mod tests { assert_arrays_eq!(sliced, expected); Ok(()) } + + #[test] + fn full_slice_preserves_all_values_referenced_metadata() -> VortexResult<()> { + let dict = unsafe { + DictArray::new_unchecked( + buffer![0u8, 1].into_array(), + buffer![10i32, 20].into_array(), + ) + .set_all_values_referenced(true) + }; + + let sliced = dict.slice(0..2)?; + + assert!(sliced.as_::().has_all_values_referenced()); + Ok(()) + } + + #[test] + fn partial_slice_drops_all_values_referenced_metadata() -> VortexResult<()> { + let dict = unsafe { + DictArray::new_unchecked( + buffer![0u8, 1].into_array(), + buffer![10i32, 20].into_array(), + ) + .set_all_values_referenced(true) + }; + + let sliced = dict.slice(0..1)?; + + assert!(!sliced.as_::().has_all_values_referenced()); + Ok(()) + } } diff --git a/vortex-array/src/arrays/dict/mod.rs b/vortex-array/src/arrays/dict/mod.rs index 0414eea7def..f0bba784794 100644 --- a/vortex-array/src/arrays/dict/mod.rs +++ b/vortex-array/src/arrays/dict/mod.rs @@ -14,6 +14,7 @@ pub use arbitrary::ArbitraryDictArray; mod array; pub use array::*; +pub mod cardinality; pub(crate) mod compute; mod execute; diff --git a/vortex-array/src/arrays/dict/vtable/mod.rs b/vortex-array/src/arrays/dict/vtable/mod.rs index 33db223de72..a0cde010147 100644 --- a/vortex-array/src/arrays/dict/vtable/mod.rs +++ b/vortex-array/src/arrays/dict/vtable/mod.rs @@ -32,6 +32,7 @@ use crate::array::ArrayParts; use crate::array::ArrayView; use crate::array::VTable; use crate::arrays::ConstantArray; +use crate::arrays::Filter; use crate::arrays::Primitive; use crate::arrays::dict::DictArrayExt; use crate::arrays::dict::DictArraySlotsExt; @@ -50,6 +51,7 @@ use crate::validity::Validity; mod kernel; mod operations; +mod sparse; mod validity; /// A [`Dict`]-encoded Vortex array. @@ -186,6 +188,16 @@ impl VTable for Dict { ))); } + if !array.has_all_values_referenced() + // `take(FilterArray(...))` is also represented as a dictionary. Compacting that shape + // burns time before the lazy filter has a chance to push the row mask into its child. + && !array.values().is::() + && sparse::should_consider_sparse_canonicalize(array.codes().len(), array.values().len()) + && let Some(canonical) = sparse::sparse_canonicalize_dict(&array, ctx)? + { + return Ok(ExecutionResult::done(canonical)); + } + let array = require_child!(array, array.values(), DictSlots::VALUES => AnyCanonical); let DictParts { values, codes, .. } = array.into_parts(); diff --git a/vortex-array/src/arrays/dict/vtable/sparse.rs b/vortex-array/src/arrays/dict/vtable/sparse.rs new file mode 100644 index 00000000000..23944dc0b29 --- /dev/null +++ b/vortex-array/src/arrays/dict/vtable/sparse.rs @@ -0,0 +1,321 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_buffer::BitBuffer; +use vortex_buffer::Buffer; +use vortex_error::VortexResult; +use vortex_error::vortex_panic; +use vortex_mask::Mask; + +use super::super::array::DictSlots; +use super::super::array::compute_referenced_values_mask_from_codes; +use super::super::cardinality; +use super::DictArray; +use crate::Canonical; +use crate::IntoArray; +use crate::arrays::Primitive; +use crate::arrays::PrimitiveArray; +use crate::arrays::dict::DictArraySlotsExt; +use crate::builtins::ArrayBuiltins; +use crate::dtype::DType; +use crate::dtype::PType; +use crate::executor::ExecutionCtx; +use crate::validity::Validity; + +// TODO: Replace this fixed sparse-dictionary threshold with a cost model that accounts for values +// encoding, code count, unique-code count, and exporter/canonicalization costs. +const SPARSE_CANONICALIZE_CODES_PER_VALUE_THRESHOLD: usize = 4; +const SPARSE_CANONICALIZE_SAMPLED_CODES_PER_VALUE_THRESHOLD: usize = 2; +const SPARSE_CANONICALIZE_MIN_SAMPLED_VALUES_LEN: usize = 512; + +struct SparseDictCodes { + /// Original dictionary value indices that are actually referenced by the live codes. + unique_codes: PrimitiveArray, + /// Codes rewritten to index into `unique_codes` instead of the original values array. + remapped_codes: PrimitiveArray, +} + +#[cold] +#[inline(never)] +pub(super) fn sparse_canonicalize_dict( + array: &DictArray, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let codes = array.codes().as_::().into_owned(); + let Some(sparse_codes) = collect_sparse_codes(&codes, array.values().len(), ctx)? else { + return Ok(None); + }; + + // Build a temporary parent that represents `values.take(unique_codes)`. Calling + // `execute_parent` on the values child lets encodings such as FSST/VarBin sparse-take just + // the referenced dictionary values. If the child has no specialized parent execution, fall + // back to canonicalizing all values and then taking from the canonical array. + let values = array.values(); + let unique_values_parent = DictArray::new( + sparse_codes.unique_codes.clone().into_array(), + values.clone(), + ) + .into_array(); + let unique_values = if let Some(taken_values) = + values.execute_parent(&unique_values_parent, DictSlots::VALUES, ctx)? + { + taken_values.execute::(ctx)?.into_array() + } else { + let canonical_values = values.clone().execute::(ctx)?.into_array(); + DictArray::new(sparse_codes.unique_codes.into_array(), canonical_values) + .into_array() + .execute::(ctx)? + .into_array() + }; + + // Now the dictionary is dense over its compacted values, so normal dictionary execution only + // takes from the small `unique_values` array. This avoids `values.take(codes)` preserving a + // large dictionary with many unused values. + let compact_dict = unsafe { + DictArray::new_unchecked(sparse_codes.remapped_codes.into_array(), unique_values) + .set_all_values_referenced(true) + }; + + compact_dict + .into_array() + .execute::(ctx) + .map(Some) +} + +#[cold] +#[inline(never)] +fn collect_sparse_codes( + codes: &PrimitiveArray, + values_len: usize, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let validity = codes.validity()?; + let validity_mask = validity.execute_mask(codes.len(), ctx)?; + let codes = codes_as_u64(codes, ctx)?; + + // The exact pass below scans every code and allocates a remap table sized to the values array. + // Do it only when a cheap upper bound/sample says the dictionary is likely sparse enough. + if !should_collect_sparse_codes(&codes, values_len, &validity_mask) { + return Ok(None); + } + + let referenced_values = + compute_referenced_values_mask_from_codes(&codes, values_len, &validity_mask, true)?; + let unique_count = referenced_values.true_count(); + if unique_count.saturating_mul(SPARSE_CANONICALIZE_CODES_PER_VALUE_THRESHOLD) >= values_len { + return Ok(None); + } + + collect_sparse_codes_u64(&codes, referenced_values, validity_mask, validity).map(Some) +} + +fn codes_as_u64(codes: &PrimitiveArray, ctx: &mut ExecutionCtx) -> VortexResult { + if codes.ptype() == PType::U64 { + return Ok(codes.clone()); + } + + codes + .clone() + .into_array() + .cast(DType::Primitive(PType::U64, codes.dtype().nullability()))? + .execute::(ctx) +} + +#[cold] +#[inline(never)] +fn should_collect_sparse_codes( + codes: &PrimitiveArray, + values_len: usize, + validity_mask: &Mask, +) -> bool { + if codes.is_empty() || values_len == 0 || validity_mask.true_count() == 0 { + return false; + } + + // If even the worst case "every live code is unique" is sparse, skip sampling and go straight + // to the exact remap pass. + if codes + .len() + .saturating_mul(SPARSE_CANONICALIZE_CODES_PER_VALUE_THRESHOLD) + < values_len + { + return true; + } + + if !should_sample_sparse_canonicalize(codes.len(), values_len) { + return false; + } + + // Otherwise sample first. This catches cases like many live rows all referencing the same + // dictionary value without forcing dense dictionaries through the exact remap scan. + if !cardinality::has_repeated_code_sample::(codes, validity_mask) { + return false; + } + + let Some(estimated_unique_codes) = + cardinality::estimate_code_cardinality::(codes, validity_mask) + else { + return false; + }; + + estimated_unique_codes.saturating_mul(SPARSE_CANONICALIZE_CODES_PER_VALUE_THRESHOLD) + < values_len +} + +#[inline] +pub(super) fn should_consider_sparse_canonicalize(codes_len: usize, values_len: usize) -> bool { + codes_len.saturating_mul(SPARSE_CANONICALIZE_CODES_PER_VALUE_THRESHOLD) < values_len + || should_sample_sparse_canonicalize(codes_len, values_len) +} + +#[inline] +fn should_sample_sparse_canonicalize(codes_len: usize, values_len: usize) -> bool { + // Sampling is only a preflight for cases that are not sparse by row count alone. Keep it away + // from tiny dictionary domains and near-dense slices where the estimator overhead dominates. + values_len >= SPARSE_CANONICALIZE_MIN_SAMPLED_VALUES_LEN + && codes_len.saturating_mul(SPARSE_CANONICALIZE_SAMPLED_CODES_PER_VALUE_THRESHOLD) + < values_len +} + +#[cold] +#[inline(never)] +fn collect_sparse_codes_u64( + codes: &PrimitiveArray, + referenced_values: BitBuffer, + validity_mask: Mask, + validity: Validity, +) -> VortexResult { + let unique_count = referenced_values.true_count(); + let mut value_remap = vec![usize::MAX; referenced_values.len()]; + let mut unique_codes = Vec::with_capacity(unique_count); + + // Reuse the same exact referenced-values bitmap as the dictionary aggregate kernels. Walking + // the bitmap assigns compact codes in original dictionary order, which keeps compaction + // deterministic and independent of the first live row that happened to reference each value. + for old_code in referenced_values.set_indices() { + let new_code = unique_codes.len(); + value_remap[old_code] = new_code; + unique_codes.push(old_code as u64); + } + + let mut remapped_codes = Vec::with_capacity(codes.len()); + for (idx, &code) in codes.as_slice::().iter().enumerate() { + if !validity_mask.value(idx) { + remapped_codes.push(0); + continue; + } + + let old_code = usize::try_from(code) + .unwrap_or_else(|_| vortex_panic!("dictionary code {code} does not fit usize")); + let new_code = value_remap[old_code]; + debug_assert_ne!(new_code, usize::MAX); + + remapped_codes.push(new_code as u64); + } + + Ok(SparseDictCodes { + unique_codes: PrimitiveArray::new(Buffer::from_iter(unique_codes), Validity::NonNullable), + remapped_codes: PrimitiveArray::new(Buffer::from_iter(remapped_codes), validity), + }) +} + +#[cfg(test)] +mod tests { + use vortex_error::VortexResult; + + use super::*; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; + use crate::assert_arrays_eq; + + #[test] + fn collect_sparse_codes_remaps_unique_values() -> VortexResult<()> { + let codes = PrimitiveArray::from_option_iter([Some(50u32), None, Some(70), Some(50)]); + let Some(sparse) = + collect_sparse_codes(&codes, 100, &mut LEGACY_SESSION.create_execution_ctx())? + else { + panic!("codes are sparse"); + }; + + assert_arrays_eq!( + sparse.unique_codes.into_array(), + PrimitiveArray::from_iter([50u64, 70]).into_array() + ); + assert_arrays_eq!( + sparse.remapped_codes.into_array(), + PrimitiveArray::from_option_iter([Some(0u64), None, Some(1), Some(0)]).into_array() + ); + + Ok(()) + } + + #[test] + fn sampled_sparse_codes_remaps_repeated_large_codes() -> VortexResult<()> { + let codes = PrimitiveArray::from_iter((0..1024).map(|_| 42u32)); + let Some(sparse) = + collect_sparse_codes(&codes, 3000, &mut LEGACY_SESSION.create_execution_ctx())? + else { + panic!("sampled codes are sparse"); + }; + + assert_arrays_eq!( + sparse.unique_codes.into_array(), + PrimitiveArray::from_iter([42u64]).into_array() + ); + assert_arrays_eq!( + sparse.remapped_codes.into_array(), + PrimitiveArray::from_iter((0..1024).map(|_| 0u64)).into_array() + ); + + Ok(()) + } + + #[test] + fn dense_sample_skips_sparse_code_collection() -> VortexResult<()> { + let codes = PrimitiveArray::from_iter((0..1024).map(|idx| idx as u32)); + + assert!( + collect_sparse_codes(&codes, 3000, &mut LEGACY_SESSION.create_execution_ctx())? + .is_none() + ); + + Ok(()) + } + + #[test] + fn sparse_dict_canonicalizes_correctly() -> VortexResult<()> { + let dict = DictArray::new( + PrimitiveArray::from_option_iter([Some(50u32), None, Some(70), Some(50)]).into_array(), + PrimitiveArray::from_iter(0..100i32).into_array(), + ); + + let actual = dict + .into_array() + .execute::(&mut LEGACY_SESSION.create_execution_ctx())? + .into_array(); + + assert_arrays_eq!( + actual, + PrimitiveArray::from_option_iter([Some(50i32), None, Some(70), Some(50)]) + ); + + Ok(()) + } + + #[test] + fn sampled_sparse_dict_canonicalizes_repeated_codes() -> VortexResult<()> { + let dict = DictArray::new( + PrimitiveArray::from_iter((0..1024).map(|_| 42u32)).into_array(), + PrimitiveArray::from_iter(0..3000i32).into_array(), + ); + + let actual = dict + .into_array() + .execute::(&mut LEGACY_SESSION.create_execution_ctx())? + .into_array(); + + assert_arrays_eq!(actual, PrimitiveArray::from_iter((0..1024).map(|_| 42i32))); + + Ok(()) + } +} diff --git a/vortex-duckdb/src/exporter/cache.rs b/vortex-duckdb/src/exporter/cache.rs index 2f495ba9608..e20efbf7117 100644 --- a/vortex-duckdb/src/exporter/cache.rs +++ b/vortex-duckdb/src/exporter/cache.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use parking_lot::Mutex; use vortex::array::ArrayRef; -use vortex::array::Canonical; use vortex_utils::aliases::dash_map::DashMap; use crate::duckdb::ReusableDict; @@ -20,6 +19,5 @@ use crate::duckdb::Vector; pub struct ConversionCache { pub dict_cache: DashMap, pub values_cache: DashMap>)>, - pub canonical_cache: DashMap, pub file_index: usize, } diff --git a/vortex-duckdb/src/exporter/dict.rs b/vortex-duckdb/src/exporter/dict.rs index cba2f85591f..eec71dc08a6 100644 --- a/vortex-duckdb/src/exporter/dict.rs +++ b/vortex-duckdb/src/exporter/dict.rs @@ -11,7 +11,9 @@ use vortex::array::arrays::Constant; use vortex::array::arrays::ConstantArray; use vortex::array::arrays::DictArray; use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::dict::DictArrayExt; use vortex::array::arrays::dict::DictArraySlotsExt; +use vortex::array::arrays::dict::cardinality::estimate_code_cardinality; use vortex::array::match_each_integer_ptype; use vortex::dtype::IntegerPType; use vortex::error::VortexResult; @@ -33,6 +35,10 @@ struct DictExporter { codes_type: PhantomData, } +// TODO: Replace this fixed sparse-dictionary threshold with a cost model that accounts for values +// encoding, code count, unique-code count, and exporter/canonicalization costs. +const SPARSE_EXPORT_CODES_PER_VALUE_THRESHOLD: usize = 4; + pub(crate) fn new_exporter_with_flatten( array: &DictArray, cache: &ConversionCache, @@ -42,21 +48,21 @@ pub(crate) fn new_exporter_with_flatten( ) -> VortexResult> { // Grab the cache dictionary values. let values = array.values(); - let codes = array.codes(); - let codes_len = codes.len(); + let codes_array = array.codes(); + let codes_len = codes_array.len(); if let Some(constant) = values.as_opt::() { return constant::new_exporter_with_mask( ConstantArray::new(constant.scalar().clone(), codes_len), - codes.validity()?.execute_mask(codes_len, ctx)?, + codes_array.validity()?.execute_mask(codes_len, ctx)?, cache, ctx, ); } - let codes_mask = codes.validity()?.execute_mask(codes_len, ctx)?; + let codes_mask = codes_array.validity()?.execute_mask(codes_len, ctx)?; - match codes_mask { + match &codes_mask { Mask::AllTrue(_) => {} Mask::AllFalse(_) => return Ok(all_invalid::new_exporter()), Mask::Values(_) => { @@ -67,25 +73,27 @@ pub(crate) fn new_exporter_with_flatten( } let values_key = values.addr(); - let codes = array.codes().clone().execute::(ctx)?; + let codes = codes_array.clone().execute::(ctx)?; + + if !flatten + && !array.has_all_values_referenced() + && should_export_sparse(&codes, values.len(), &codes_mask) + { + return new_array_exporter( + array + .clone() + .into_array() + .execute::(ctx)? + .into_array(), + cache, + ctx, + ); + } let reusable_dict = if flatten { - let canonical = cache - .canonical_cache - .get(&values_key) - .map(|entry| entry.value().1.clone()); - let canonical = match canonical { - Some(c) => c, - None => { - let canonical = values.clone().execute::(ctx)?; - cache - .canonical_cache - .insert(values_key, (values.clone(), canonical.clone())); - canonical - } - }; return new_array_exporter( - DictArray::new(array.codes().clone(), canonical.into_array()) + array + .clone() .into_array() .execute::(ctx)? .into_array(), @@ -129,6 +137,28 @@ pub(crate) fn new_exporter_with_flatten( }) } +fn should_export_sparse(codes: &PrimitiveArray, values_len: usize, codes_mask: &Mask) -> bool { + if codes.is_empty() || values_len == 0 || codes_mask.true_count() == 0 { + return false; + } + + if codes + .len() + .saturating_mul(SPARSE_EXPORT_CODES_PER_VALUE_THRESHOLD) + < values_len + { + return true; + } + + let Some(estimated_unique_codes) = match_each_integer_ptype!(codes.ptype(), |I| { + estimate_code_cardinality::(codes, codes_mask) + }) else { + return false; + }; + + estimated_unique_codes.saturating_mul(SPARSE_EXPORT_CODES_PER_VALUE_THRESHOLD) < values_len +} + impl> ColumnExporter for DictExporter { fn export( &self, @@ -285,6 +315,63 @@ mod tests { Ok(()) } + #[test] + fn test_sparse_dict_exports_flat() -> VortexResult<()> { + let arr = DictArray::new( + PrimitiveArray::from_iter([50u32, 70]).into_array(), + PrimitiveArray::from_iter(0..100i32).into_array(), + ); + + let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]); + + new_exporter(&arr, &ConversionCache::default())?.export( + 0, + 2, + chunk.get_vector_mut(0), + &mut SESSION.create_execution_ctx(), + )?; + chunk.set_len(2); + + assert_eq!( + format!("{}", String::try_from(&*chunk)?), + r#"Chunk - [1 Columns] +- FLAT INTEGER: 2 = [ 50, 70] +"# + ); + + Ok(()) + } + + #[test] + fn test_sampled_sparse_dict_exports_flat() -> VortexResult<()> { + let arr = DictArray::new( + PrimitiveArray::from_iter((0..32).map(|_| 42u32)).into_array(), + PrimitiveArray::from_iter(0..100i32).into_array(), + ); + + let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]); + + new_exporter(&arr, &ConversionCache::default())?.export( + 0, + 32, + chunk.get_vector_mut(0), + &mut SESSION.create_execution_ctx(), + )?; + chunk.set_len(32); + + let expected_values = std::iter::repeat_n("42", 32).collect::>().join(", "); + assert_eq!( + format!("{}", String::try_from(&*chunk)?), + format!( + r#"Chunk - [1 Columns] +- FLAT INTEGER: 32 = [ {expected_values}] +"# + ) + ); + + Ok(()) + } + #[test] fn test_export_empty_dict() -> VortexResult<()> { let arr = DictArray::new(