From 7fa93097da52cd9e1f185bf2769108e5db22bfe0 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 19:18:51 -0400 Subject: [PATCH] feat: use cardinality estimator for distinct count stats Replace the exact `HashMap`/`HashSet` previously used to compute distinct-value counts during compression stats generation with Cloudflare's `cardinality-estimator` crate. The estimator gives us a bounded-memory approximation (exact up to ~128 distinct values, then HyperLogLog++) so high-cardinality arrays no longer require an O(n) auxiliary hash table to answer the single question "how many unique values does this have?". - Integer stats swap the hash map for a `CardinalityEstimator` and track the most frequent value via a Boyer-Moore majority candidate plus a second-pass exact count. Sparse/dict schemes only care about the heavy hitter (>= 90% threshold) or a rough distinct ratio, so this is behaviourally equivalent for the decisions they make. - Float and string stats likewise drop their hash sets in favor of the estimator. - The integer and float dictionary encoders now rebuild the exact set of distinct values from the source array at compress time, since they need the values themselves and the stats layer no longer retains them. - `SequenceScheme`'s fast-path check for "all values are distinct" now tolerates the estimator's small approximation error; the deferred callback still validates sequences exactly. Signed-off-by: Robert Kruszewski --- Cargo.lock | 32 ++ Cargo.toml | 1 + encodings/fastlanes/src/rle/array/mod.rs | 2 +- encodings/runend/src/compress.rs | 4 +- vortex-array/benches/dict_compare.rs | 50 ++- vortex-array/benches/dict_compress.rs | 48 +-- vortex-array/public-api.lock | 10 +- vortex-array/src/arrays/dict/array.rs | 22 +- vortex-array/src/arrays/dict/compute/cast.rs | 27 +- .../src/arrays/dict/compute/min_max.rs | 19 +- vortex-array/src/arrays/dict/compute/mod.rs | 98 +++--- .../src/arrays/primitive/array/cast.rs | 28 +- .../src/arrays/primitive/array/mod.rs | 52 ++- vortex-array/src/arrays/varbin/vtable/mod.rs | 4 +- vortex-array/src/builders/dict/bytes.rs | 314 +++++++++++++----- vortex-array/src/builders/dict/mod.rs | 15 +- vortex-array/src/builders/dict/primitive.rs | 102 ++++-- vortex-array/src/scalar/arrow.rs | 4 +- vortex-btrblocks/src/schemes/float.rs | 2 +- vortex-btrblocks/src/schemes/integer.rs | 23 +- vortex-btrblocks/src/schemes/patches.rs | 4 +- vortex-btrblocks/src/schemes/string.rs | 6 +- vortex-btrblocks/src/schemes/temporal.rs | 10 +- vortex-buffer/public-api.lock | 2 +- vortex-buffer/src/trusted_len.rs | 6 +- vortex-compressor/Cargo.toml | 1 + vortex-compressor/benches/dict_encode.rs | 24 +- vortex-compressor/public-api.lock | 12 - vortex-compressor/src/builtins/dict/float.rs | 133 +------- .../src/builtins/dict/integer.rs | 146 +------- vortex-compressor/src/builtins/dict/mod.rs | 3 - vortex-compressor/src/builtins/dict/string.rs | 4 +- vortex-compressor/src/builtins/mod.rs | 2 - vortex-compressor/src/compressor.rs | 6 +- vortex-compressor/src/stats/bool.rs | 16 +- vortex-compressor/src/stats/float.rs | 55 ++- vortex-compressor/src/stats/integer.rs | 172 +++++++--- vortex-compressor/src/stats/string.rs | 24 +- vortex-ffi/src/data_source.rs | 2 +- vortex-layout/src/layouts/dict/writer.rs | 6 +- .../arrays/synthetic/encodings/dict.rs | 30 +- vortex/benches/single_encoding_throughput.rs | 22 +- 42 files changed, 840 insertions(+), 703 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f7a33f3d14..af4afb92b5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1238,6 +1238,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "cardinality-estimator" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a7bc4e9a7ab9239a4a46df35d75101cee57defc8e4255a1b2b6f8eab8de87e" +dependencies = [ + "enum_dispatch", + "wyhash", +] + [[package]] name = "cargo-platform" version = "0.3.3" @@ -3760,6 +3770,18 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "env_filter" version = "1.0.1" @@ -10279,6 +10301,7 @@ dependencies = [ name = "vortex-compressor" version = "0.1.0" dependencies = [ + "cardinality-estimator", "codspeed-divan-compat", "itertools 0.14.0", "num-traits", @@ -11732,6 +11755,15 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" +[[package]] +name = "wyhash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf6e163c25e3fac820b4b453185ea2dea3b6a3e0a721d4d23d75bd33734c295" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 35179561e14..77114026ab4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,7 @@ bit-vec = "0.9.0" bitvec = "1.0.1" bytes = "1.11.1" bzip2 = "0.6.0" +cardinality-estimator = "1.0.3" cargo_metadata = "0.23.1" cbindgen = "0.29.0" cc = "1.2" diff --git a/encodings/fastlanes/src/rle/array/mod.rs b/encodings/fastlanes/src/rle/array/mod.rs index aeaba0ef42e..8793179c7d3 100644 --- a/encodings/fastlanes/src/rle/array/mod.rs +++ b/encodings/fastlanes/src/rle/array/mod.rs @@ -473,7 +473,7 @@ mod tests { .indices() .clone() .execute::(&mut ctx)? - .narrow()?; + .narrow(&mut ctx)?; let re_encoded = RLEData::encode(indices_prim.as_view(), &mut ctx)?; // Reconstruct the outer RLE with re-encoded indices. diff --git a/encodings/runend/src/compress.rs b/encodings/runend/src/compress.rs index b490076c355..ba46588d88b 100644 --- a/encodings/runend/src/compress.rs +++ b/encodings/runend/src/compress.rs @@ -84,7 +84,9 @@ pub fn runend_encode( } }; - let ends = ends.narrow().vortex_expect("Ends must succeed downcasting"); + let ends = ends + .narrow(ctx) + .vortex_expect("Ends must succeed downcasting"); ends.statistics() .set(Stat::IsStrictSorted, Precision::Exact(true.into())); diff --git a/vortex-array/benches/dict_compare.rs b/vortex-array/benches/dict_compare.rs index 602ad8638f7..b37974f6323 100644 --- a/vortex-array/benches/dict_compare.rs +++ b/vortex-array/benches/dict_compare.rs @@ -4,6 +4,7 @@ #![expect(clippy::unwrap_used)] use std::str::from_utf8; +use std::sync::LazyLock; use vortex_array::Canonical; use vortex_array::IntoArray; @@ -21,6 +22,7 @@ use vortex_array::expr::eq; use vortex_array::expr::lit; use vortex_array::expr::root; use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_array::session::ArraySession; use vortex_session::VortexSession; fn main() { @@ -45,15 +47,21 @@ const LENGTH_AND_UNIQUE_VALUES: &[(usize, usize)] = &[ (100_000, 2048), ]; +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let primitive_arr = gen_primitive_for_dict::(len, uniqueness); - let dict = dict_encode(&primitive_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &primitive_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let value = primitive_arr.as_slice::()[0]; - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .into_array() @@ -67,13 +75,16 @@ fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, u #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let varbin_arr = VarBinArray::from(gen_varbin_words(len, uniqueness)); - let dict = dict_encode(&varbin_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbin_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .into_array() @@ -87,13 +98,16 @@ fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usiz #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbinview(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, uniqueness)); - let dict = dict_encode(&varbinview_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbinview_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let bytes = varbinview_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .into_array() @@ -122,13 +136,16 @@ fn bench_compare_sliced_dict_primitive( (codes_len, values_len): (usize, usize), ) { let primitive_arr = gen_primitive_for_dict::(codes_len.max(values_len), values_len); - let dict = dict_encode(&primitive_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &primitive_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let dict = dict.into_array().slice(0..codes_len).unwrap(); let value = primitive_arr.as_slice::()[0]; - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .apply(&eq(root(), lit(value))) @@ -144,14 +161,17 @@ fn bench_compare_sliced_dict_varbinview( (codes_len, values_len): (usize, usize), ) { let varbin_arr = VarBinArray::from(gen_varbin_words(codes_len.max(values_len), values_len)); - let dict = dict_encode(&varbin_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbin_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let dict = dict.into_array().slice(0..codes_len).unwrap(); let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .apply(&eq(root(), lit(value))) diff --git a/vortex-array/benches/dict_compress.rs b/vortex-array/benches/dict_compress.rs index 4f220ccaa23..26cedd00a25 100644 --- a/vortex-array/benches/dict_compress.rs +++ b/vortex-array/benches/dict_compress.rs @@ -3,12 +3,13 @@ #![expect(clippy::unwrap_used)] +use std::sync::LazyLock; + use divan::Bencher; use rand::distr::Distribution; use rand::distr::StandardUniform; use vortex_array::Canonical; use vortex_array::IntoArray; -use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; @@ -16,6 +17,8 @@ use vortex_array::arrays::dict_test::gen_primitive_for_dict; use vortex_array::arrays::dict_test::gen_varbin_words; use vortex_array::builders::dict::dict_encode; use vortex_array::dtype::NativePType; +use vortex_array::session::ArraySession; +use vortex_session::VortexSession; fn main() { divan::main(); @@ -35,35 +38,39 @@ const BENCH_ARGS: &[(usize, usize)] = &[ (10_000, 512), ]; +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[divan::bench(types = [u8, f32, i64], args = BENCH_ARGS)] fn encode_primitives(bencher: Bencher, (len, unique_values): (usize, usize)) where T: NativePType, StandardUniform: Distribution, { - let primitive_arr = gen_primitive_for_dict::(len, unique_values); + let primitive_arr = gen_primitive_for_dict::(len, unique_values).into_array(); bencher - .with_inputs(|| &primitive_arr) - .bench_refs(|arr| dict_encode(&arr.clone().into_array())); + .with_inputs(|| (&primitive_arr, SESSION.create_execution_ctx())) + .bench_refs(|(arr, ctx)| dict_encode(arr, ctx)); } #[divan::bench(args = BENCH_ARGS)] fn encode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)); + let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)).into_array(); bencher - .with_inputs(|| &varbin_arr) - .bench_refs(|arr| dict_encode(&arr.clone().into_array())); + .with_inputs(|| (&varbin_arr, SESSION.create_execution_ctx())) + .bench_refs(|(arr, ctx)| dict_encode(arr, ctx)); } #[divan::bench(args = BENCH_ARGS)] fn encode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)); + let varbinview_arr = + VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)).into_array(); bencher - .with_inputs(|| &varbinview_arr) - .bench_refs(|arr| dict_encode(&arr.clone().into_array())); + .with_inputs(|| (&varbinview_arr, SESSION.create_execution_ctx())) + .bench_refs(|(arr, ctx)| dict_encode(arr, ctx)); } #[divan::bench(types = [u8, f32, i64], args = BENCH_ARGS)] @@ -72,34 +79,37 @@ where T: NativePType, StandardUniform: Distribution, { - let primitive_arr = gen_primitive_for_dict::(len, unique_values); - let dict = dict_encode(&primitive_arr.into_array()) + let primitive_arr = gen_primitive_for_dict::(len, unique_values).into_array(); + let dict = dict_encode(&primitive_arr, &mut SESSION.create_execution_ctx()) .unwrap() .into_array(); bencher - .with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| (**dict).clone().execute::(ctx)); } #[divan::bench(args = BENCH_ARGS)] fn decode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)); - let dict = dict_encode(&varbin_arr.into_array()).unwrap().into_array(); + let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)).into_array(); + let dict = dict_encode(&varbin_arr, &mut SESSION.create_execution_ctx()) + .unwrap() + .into_array(); bencher - .with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| (**dict).clone().execute::(ctx)); } #[divan::bench(args = BENCH_ARGS)] fn decode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)); - let dict = dict_encode(&varbinview_arr.into_array()) + let varbinview_arr = + VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)).into_array(); + let dict = dict_encode(&varbinview_arr, &mut SESSION.create_execution_ctx()) .unwrap() .into_array(); bencher - .with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| (**dict).clone().execute::(ctx)); } diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index b6de5284907..d7252f6f7b5 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -4038,7 +4038,7 @@ pub trait vortex_array::arrays::primitive::PrimitiveArrayExt: vortex_array::Type pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::buffer_handle(&self) -> &vortex_array::buffer::BufferHandle -pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::narrow(&self) -> vortex_error::VortexResult +pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::narrow(&self, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::nullability(&self) -> vortex_array::dtype::Nullability @@ -4054,7 +4054,7 @@ impl> vortex_arr pub fn T::buffer_handle(&self) -> &vortex_array::buffer::BufferHandle -pub fn T::narrow(&self) -> vortex_error::VortexResult +pub fn T::narrow(&self, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn T::nullability(&self) -> vortex_array::dtype::Nullability @@ -7440,13 +7440,13 @@ pub trait vortex_array::builders::dict::DictEncoder: core::marker::Send pub fn vortex_array::builders::dict::DictEncoder::codes_ptype(&self) -> vortex_array::dtype::PType -pub fn vortex_array::builders::dict::DictEncoder::encode(&mut self, &vortex_array::ArrayRef) -> vortex_array::ArrayRef +pub fn vortex_array::builders::dict::DictEncoder::encode(&mut self, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::builders::dict::DictEncoder::reset(&mut self) -> vortex_array::ArrayRef -pub fn vortex_array::builders::dict::dict_encode(&vortex_array::ArrayRef) -> vortex_error::VortexResult +pub fn vortex_array::builders::dict::dict_encode(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult -pub fn vortex_array::builders::dict::dict_encode_with_constraints(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints) -> vortex_error::VortexResult +pub fn vortex_array::builders::dict::dict_encode_with_constraints(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::builders::dict::dict_encoder(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints) -> alloc::boxed::Box diff --git a/vortex-array/src/arrays/dict/array.rs b/vortex-array/src/arrays/dict/array.rs index eafbc9f6642..b3af2e47f34 100644 --- a/vortex-array/src/arrays/dict/array.rs +++ b/vortex-array/src/arrays/dict/array.rs @@ -4,6 +4,7 @@ use std::fmt::Display; use std::fmt::Formatter; +use num_traits::AsPrimitive; use vortex_buffer::BitBuffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -133,7 +134,7 @@ pub trait DictArrayExt: TypedArrayRef + DictArraySlotsExt { } let referenced_mask = self.compute_referenced_values_mask(true)?; - let all_referenced = referenced_mask.iter().all(|v| v); + let all_referenced = referenced_mask.true_count() == referenced_mask.len(); vortex_ensure!(all_referenced, "value in dict not referenced"); } @@ -157,13 +158,9 @@ pub trait DictArrayExt: TypedArrayRef + DictArraySlotsExt { match codes_validity.bit_buffer() { AllOr::All => { match_each_integer_ptype!(codes_primitive.ptype(), |P| { - #[allow( - clippy::cast_possible_truncation, - clippy::cast_sign_loss, - reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" - )] - for &idx in codes_primitive.as_slice::

() { - values_vec[idx as usize] = referenced_value; + for idx in codes_primitive.as_slice::

() { + let idxu: usize = idx.as_(); + values_vec[idxu] = referenced_value; } }); } @@ -171,14 +168,9 @@ pub trait DictArrayExt: TypedArrayRef + DictArraySlotsExt { AllOr::Some(mask) => { match_each_integer_ptype!(codes_primitive.ptype(), |P| { let codes = codes_primitive.as_slice::

(); - - #[allow( - clippy::cast_possible_truncation, - clippy::cast_sign_loss, - reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" - )] mask.set_indices().for_each(|idx| { - values_vec[codes[idx] as usize] = referenced_value; + let idxu: usize = codes[idx].as_(); + values_vec[idxu] = referenced_value; }); }); } diff --git a/vortex-array/src/arrays/dict/compute/cast.rs b/vortex-array/src/arrays/dict/compute/cast.rs index ee80a1140b4..da4b04f94b8 100644 --- a/vortex-array/src/arrays/dict/compute/cast.rs +++ b/vortex-array/src/arrays/dict/compute/cast.rs @@ -46,13 +46,18 @@ impl CastReduce for Dict { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::buffer; + use vortex_session::VortexSession; use crate::IntoArray; #[expect(deprecated)] use crate::ToCanonical as _; + use crate::VortexSessionExecute; use crate::arrays::Dict; + use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::arrays::dict::DictArraySlotsExt; use crate::assert_arrays_eq; @@ -62,11 +67,19 @@ mod tests { use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn encode_dict(array: &crate::ArrayRef) -> DictArray { + dict_encode(array, &mut SESSION.create_execution_ctx()).unwrap() + } #[test] fn test_cast_dict_to_wider_type() { let values = buffer![1i32, 2, 3, 2, 1].into_array(); - let dict = dict_encode(&values).unwrap(); + let dict = encode_dict(&values); let casted = dict .into_array() @@ -86,7 +99,7 @@ mod tests { fn test_cast_dict_nullable() { let values = PrimitiveArray::from_option_iter([Some(10i32), None, Some(20), Some(10), None]); - let dict = dict_encode(&values.into_array()).unwrap(); + let dict = encode_dict(&values.into_array()); let casted = dict .into_array() @@ -102,7 +115,7 @@ mod tests { fn test_cast_dict_allvalid_to_nonnullable_and_back() { // Create an AllValid dict array (no nulls) let values = buffer![10i32, 20, 30, 40].into_array(); - let dict = dict_encode(&values).unwrap(); + let dict = encode_dict(&values); // Verify initial state - codes should be NonNullable, values should be NonNullable assert_eq!(dict.codes().dtype().nullability(), Nullability::NonNullable); @@ -171,10 +184,10 @@ mod tests { } #[rstest] - #[case(dict_encode(&buffer![1i32, 2, 3, 2, 1, 3].into_array()).unwrap().into_array())] - #[case(dict_encode(&buffer![100u32, 200, 100, 300, 200].into_array()).unwrap().into_array())] - #[case(dict_encode(&PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array()).unwrap().into_array())] - #[case(dict_encode(&buffer![1.5f32, 2.5, 1.5, 3.5].into_array()).unwrap().into_array())] + #[case(encode_dict(&buffer![1i32, 2, 3, 2, 1, 3].into_array()).into_array())] + #[case(encode_dict(&buffer![100u32, 200, 100, 300, 200].into_array()).into_array())] + #[case(encode_dict(&PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array()).into_array())] + #[case(encode_dict(&buffer![1.5f32, 2.5, 1.5, 3.5].into_array()).into_array())] fn test_cast_dict_conformance(#[case] array: crate::ArrayRef) { test_cast_conformance(&array); } diff --git a/vortex-array/src/arrays/dict/compute/min_max.rs b/vortex-array/src/arrays/dict/compute/min_max.rs index b390005adcf..47a1f0eb8ee 100644 --- a/vortex-array/src/arrays/dict/compute/min_max.rs +++ b/vortex-array/src/arrays/dict/compute/min_max.rs @@ -60,21 +60,27 @@ impl DynAggregateKernel for DictMinMaxKernel { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::buffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; - use crate::LEGACY_SESSION; use crate::VortexSessionExecute; use crate::aggregate_fn::fns::min_max::min_max; use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::builders::dict::dict_encode; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); fn assert_min_max(array: &ArrayRef, expected: Option<(i32, i32)>) -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); match (min_max(array, &mut ctx)?, expected) { (Some(result), Some((expected_min, expected_max))) => { assert_eq!(i32::try_from(&result.min)?, expected_min); @@ -117,7 +123,11 @@ mod tests { } fn dict_single() -> DictArray { - dict_encode(&buffer![42i32].into_array()).expect("valid single-value dictionary") + dict_encode( + &buffer![42i32].into_array(), + &mut SESSION.create_execution_ctx(), + ) + .expect("valid single-value dictionary") } fn dict_nullable_codes() -> DictArray { @@ -132,6 +142,7 @@ mod tests { dict_encode( &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]) .into_array(), + &mut SESSION.create_execution_ctx(), ) .expect("valid nullable-value dictionary") } @@ -166,7 +177,7 @@ mod tests { #[test] fn test_sliced_dict() -> VortexResult<()> { let reference = PrimitiveArray::from_iter([1, 5, 10, 50, 100]); - let dict = dict_encode(&reference.into_array())?; + let dict = dict_encode(&reference.into_array(), &mut SESSION.create_execution_ctx())?; let sliced = dict.slice(1..3)?; assert_min_max(&sliced, Some((5, 10))) } diff --git a/vortex-array/src/arrays/dict/compute/mod.rs b/vortex-array/src/arrays/dict/compute/mod.rs index c56cc8ef367..492cf92fbc5 100644 --- a/vortex-array/src/arrays/dict/compute/mod.rs +++ b/vortex-array/src/arrays/dict/compute/mod.rs @@ -54,16 +54,21 @@ impl FilterReduce for Dict { #[cfg(test)] mod test { + use std::sync::LazyLock; + #[expect(unused_imports)] use itertools::Itertools; use vortex_buffer::buffer; + use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; #[expect(deprecated)] use crate::ToCanonical as _; + use crate::VortexSessionExecute; use crate::accessor::ArrayAccessor; use crate::arrays::ConstantArray; + use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinArray; use crate::arrays::VarBinViewArray; @@ -77,6 +82,15 @@ mod test { use crate::dtype::Nullability; use crate::dtype::PType::I32; use crate::scalar_fn::fns::operators::Operator; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn encode_dict(array: &ArrayRef) -> DictArray { + dict_encode(array, &mut SESSION.create_execution_ctx()).unwrap() + } + #[test] fn canonicalise_nullable_primitive() { let values: Vec> = (0..65) @@ -88,8 +102,7 @@ mod test { }) .collect(); - let dict = - dict_encode(&PrimitiveArray::from_option_iter(values.clone()).into_array()).unwrap(); + let dict = encode_dict(&PrimitiveArray::from_option_iter(values.clone()).into_array()); #[expect(deprecated)] let actual = dict.as_array().to_primitive(); @@ -103,7 +116,7 @@ mod test { let unique_values: Vec = (0..32).collect(); let expected = PrimitiveArray::from_iter((0..1000).map(|i| unique_values[i % 32])); - let dict = dict_encode(&expected.clone().into_array()).unwrap(); + let dict = encode_dict(&expected.clone().into_array()); #[expect(deprecated)] let actual = dict.as_array().to_primitive(); @@ -115,7 +128,7 @@ mod test { let unique_values: Vec = (0..100).collect(); let expected = PrimitiveArray::from_iter((0..1000).map(|i| unique_values[i % 100])); - let dict = dict_encode(&expected.clone().into_array()).unwrap(); + let dict = encode_dict(&expected.clone().into_array()); #[expect(deprecated)] let actual = dict.as_array().to_primitive(); @@ -129,7 +142,7 @@ mod test { DType::Utf8(Nullability::Nullable), ); assert_eq!(reference.len(), 6); - let dict = dict_encode(&reference.clone().into_array()).unwrap(); + let dict = encode_dict(&reference.clone().into_array()); #[expect(deprecated)] let flattened_dict = dict.as_array().to_varbinview(); assert_eq!( @@ -151,7 +164,7 @@ mod test { Some(1), Some(5), ]); - let dict = dict_encode(&reference.into_array()).unwrap(); + let dict = encode_dict(&reference.into_array()); dict.slice(1..4).unwrap() } @@ -169,17 +182,16 @@ mod test { #[test] fn test_mask_dict_array() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = encode_dict(&buffer![2, 0, 2, 0, 10].into_array()); test_mask_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .into_array(), - ) - .unwrap(); + ); test_mask_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &VarBinArray::from_iter( [ Some("hello"), @@ -191,24 +203,22 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), - ) - .unwrap(); + ); test_mask_conformance(&array.into_array()); } #[test] fn test_filter_dict_array() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = encode_dict(&buffer![2, 0, 2, 0, 10].into_array()); test_filter_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .into_array(), - ) - .unwrap(); + ); test_filter_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &VarBinArray::from_iter( [ Some("hello"), @@ -220,14 +230,13 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), - ) - .unwrap(); + ); test_filter_conformance(&array.into_array()); } #[test] fn test_take_dict() { - let array = dict_encode(&buffer![1, 2].into_array()).unwrap(); + let array = encode_dict(&buffer![1, 2].into_array()); assert_eq!( array @@ -240,17 +249,16 @@ mod test { #[test] fn test_take_dict_conformance() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = encode_dict(&buffer![2, 0, 2, 0, 10].into_array()); test_take_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .into_array(), - ) - .unwrap(); + ); test_take_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &VarBinArray::from_iter( [ Some("hello"), @@ -262,18 +270,22 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), - ) - .unwrap(); + ); test_take_conformance(&array.into_array()); } } #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::buffer; + use vortex_session::VortexSession; + use crate::ArrayRef; use crate::IntoArray; + use crate::VortexSessionExecute; use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinArray; @@ -281,35 +293,43 @@ mod tests { use crate::compute::conformance::consistency::test_array_consistency; use crate::dtype::DType; use crate::dtype::Nullability; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn encode_dict(array: &ArrayRef) -> DictArray { + dict_encode(array, &mut SESSION.create_execution_ctx()).unwrap() + } #[rstest] // Primitive arrays - #[case::dict_i32(dict_encode(&buffer![1i32, 2, 3, 2, 1].into_array()).unwrap())] + #[case::dict_i32(encode_dict(&buffer![1i32, 2, 3, 2, 1].into_array()))] #[case::dict_nullable_codes(DictArray::try_new( buffer![0u32, 1, 2, 2, 0].into_array(), PrimitiveArray::from_option_iter([Some(10), Some(20), None]).into_array(), ).unwrap())] - #[case::dict_nullable_values(dict_encode( + #[case::dict_nullable_values(encode_dict( &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array() - ).unwrap())] - #[case::dict_u64(dict_encode(&buffer![100u64, 200, 100, 300, 200].into_array()).unwrap())] + ))] + #[case::dict_u64(encode_dict(&buffer![100u64, 200, 100, 300, 200].into_array()))] // String arrays - #[case::dict_str(dict_encode( + #[case::dict_str(encode_dict( &VarBinArray::from_iter( ["hello", "world", "hello", "test", "world"].map(Some), DType::Utf8(Nullability::NonNullable), ).into_array() - ).unwrap())] - #[case::dict_nullable_str(dict_encode( + ))] + #[case::dict_nullable_str(encode_dict( &VarBinArray::from_iter( [Some("hello"), None, Some("world"), Some("hello"), None], DType::Utf8(Nullability::Nullable), ).into_array() - ).unwrap())] + ))] // Edge cases - #[case::dict_single(dict_encode(&buffer![42i32].into_array()).unwrap())] - #[case::dict_all_same(dict_encode(&buffer![5i32, 5, 5, 5, 5].into_array()).unwrap())] - #[case::dict_large(dict_encode(&PrimitiveArray::from_iter((0..1000).map(|i| i % 10)).into_array()).unwrap())] + #[case::dict_single(encode_dict(&buffer![42i32].into_array()))] + #[case::dict_all_same(encode_dict(&buffer![5i32, 5, 5, 5, 5].into_array()))] + #[case::dict_large(encode_dict(&PrimitiveArray::from_iter((0..1000).map(|i| i % 10)).into_array()))] fn test_dict_consistency(#[case] array: DictArray) { test_array_consistency(&array.into_array()); } diff --git a/vortex-array/src/arrays/primitive/array/cast.rs b/vortex-array/src/arrays/primitive/array/cast.rs index 0023ea71fb7..0d9db991d26 100644 --- a/vortex-array/src/arrays/primitive/array/cast.rs +++ b/vortex-array/src/arrays/primitive/array/cast.rs @@ -37,17 +37,25 @@ impl PrimitiveData { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::Buffer; use vortex_buffer::buffer; + use vortex_session::VortexSession; + use crate::VortexSessionExecute; use crate::arrays::PrimitiveArray; use crate::arrays::primitive::PrimitiveArrayExt; use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; + use crate::session::ArraySession; use crate::validity::Validity; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_downcast_all_invalid() { let array = PrimitiveArray::new( @@ -55,7 +63,7 @@ mod tests { Validity::AllInvalid, ); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!( result.dtype(), &DType::Primitive(PType::U8, Nullability::Nullable) @@ -74,7 +82,7 @@ mod tests { #[case(vec![i32::MIN as i64, i32::MAX as i64], PType::I32)] fn test_downcast_signed(#[case] values: Vec, #[case] expected_ptype: PType) { let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), expected_ptype); } @@ -86,21 +94,21 @@ mod tests { #[case(vec![0_u64, u32::MAX as u64], PType::U32)] fn test_downcast_unsigned(#[case] values: Vec, #[case] expected_ptype: PType) { let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), expected_ptype); } #[test] fn test_downcast_keeps_original_if_too_large() { let array = PrimitiveArray::from_iter(vec![0_u64, u64::MAX]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), PType::U64); } #[test] fn test_downcast_preserves_nullability() { let array = PrimitiveArray::from_option_iter([Some(0_i32), None, Some(127)]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!( result.dtype(), &DType::Primitive(PType::U8, Nullability::Nullable) @@ -113,7 +121,7 @@ mod tests { fn test_downcast_preserves_values() { let values = vec![-100_i16, 0, 100]; let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), PType::I8); // Check that the values were properly downscaled @@ -124,14 +132,14 @@ mod tests { #[test] fn test_downcast_with_mixed_signs_chooses_signed() { let array = PrimitiveArray::from_iter(vec![-1_i32, 200]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), PType::I16); } #[test] fn test_downcast_floats() { let array = PrimitiveArray::from_iter(vec![1.0_f32, 2.0, 3.0]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); // Floats should remain unchanged since they can't be downscaled to integers assert_eq!(result.ptype(), PType::F32); } @@ -139,9 +147,9 @@ mod tests { #[test] fn test_downcast_empty_array() { let array = PrimitiveArray::new(Buffer::::empty(), Validity::AllInvalid); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); let array2 = PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable); - let result2 = array2.narrow().unwrap(); + let result2 = array2.narrow(&mut SESSION.create_execution_ctx()).unwrap(); // Empty arrays should not have their validity changed assert!(matches!(result.validity(), Ok(Validity::AllInvalid))); assert!(matches!(result2.validity(), Ok(Validity::NonNullable))); diff --git a/vortex-array/src/arrays/primitive/array/mod.rs b/vortex-array/src/arrays/primitive/array/mod.rs index 254c4b0baee..d0225ab532e 100644 --- a/vortex-array/src/arrays/primitive/array/mod.rs +++ b/vortex-array/src/arrays/primitive/array/mod.rs @@ -15,10 +15,9 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_error::vortex_panic; -use crate::LEGACY_SESSION; +use crate::ExecutionCtx; #[expect(deprecated)] use crate::ToCanonical as _; -use crate::VortexSessionExecute; use crate::array::Array; use crate::array::ArrayParts; use crate::array::TypedArrayRef; @@ -148,13 +147,12 @@ pub trait PrimitiveArrayExt: TypedArrayRef { } /// Narrow the array to the smallest possible integer type that can represent all values. - fn narrow(&self) -> VortexResult { + fn narrow(&self, ctx: &mut ExecutionCtx) -> VortexResult { if !self.ptype().is_int() { return Ok(self.to_owned()); } - let mut ctx = LEGACY_SESSION.create_execution_ctx(); - let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else { + let Some(min_max) = min_max(self.as_ref(), ctx)? else { return Ok(PrimitiveArray::new( Buffer::::zeroed(self.len()), self.validity(), @@ -183,58 +181,46 @@ pub trait PrimitiveArrayExt: TypedArrayRef { if min < 0 || max < 0 { // Signed if min >= i8::MIN as i64 && max <= i8::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::I8, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if min >= i16::MIN as i64 && max <= i16::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::I16, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if min >= i32::MIN as i64 && max <= i32::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::I32, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } } else { // Unsigned if max <= u8::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::U8, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if max <= u16::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::U16, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if max <= u32::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::U32, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } } @@ -493,18 +479,18 @@ impl Array { let buffer = match &validity { Validity::NonNullable | Validity::AllValid => { - BufferMut::::from_iter(buf_iter.zip(iter::repeat(true)).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(iter::repeat(true)).map(f)) } Validity::AllInvalid => { - BufferMut::::from_iter(buf_iter.zip(iter::repeat(false)).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(iter::repeat(false)).map(f)) } Validity::Array(val) => { #[expect(deprecated)] let val = val.to_bool().into_bit_buffer(); - BufferMut::::from_iter(buf_iter.zip(val.iter()).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(val.iter()).map(f)) } }; - Ok(PrimitiveArray::new(buffer.freeze(), validity)) + Ok(PrimitiveArray::new(buffer, validity)) } } diff --git a/vortex-array/src/arrays/varbin/vtable/mod.rs b/vortex-array/src/arrays/varbin/vtable/mod.rs index b72ac5eaa48..2def49ecb3b 100644 --- a/vortex-array/src/arrays/varbin/vtable/mod.rs +++ b/vortex-array/src/arrays/varbin/vtable/mod.rs @@ -22,6 +22,7 @@ use crate::array::VTable; use crate::arrays::varbin::VarBinArrayExt; use crate::arrays::varbin::VarBinData; use crate::arrays::varbin::array::NUM_SLOTS; +use crate::arrays::varbin::array::OFFSETS_SLOT; use crate::arrays::varbin::array::SLOT_NAMES; use crate::buffer::BufferHandle; use crate::dtype::DType; @@ -90,7 +91,7 @@ impl VTable for VarBin { "VarBinArray expected {NUM_SLOTS} slots, found {}", slots.len() ); - let offsets = slots[crate::arrays::varbin::array::OFFSETS_SLOT] + let offsets = slots[OFFSETS_SLOT] .as_ref() .vortex_expect("VarBinArray offsets slot"); vortex_ensure!( @@ -138,7 +139,6 @@ impl VTable for VarBin { dtype: &DType, len: usize, metadata: &[u8], - buffers: &[BufferHandle], children: &dyn ArrayChildren, _session: &VortexSession, diff --git a/vortex-array/src/builders/dict/bytes.rs b/vortex-array/src/builders/dict/bytes.rs index d66870d6ba5..2aa81e5b82b 100644 --- a/vortex-array/src/builders/dict/bytes.rs +++ b/vortex-array/src/builders/dict/bytes.rs @@ -1,15 +1,21 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cell::OnceCell; use std::hash::BuildHasher; use std::mem; use std::sync::Arc; +use itertools::Itertools; +use num_traits::AsPrimitive; +use vortex_array::ExecutionCtx; use vortex_buffer::BitBufferMut; use vortex_buffer::BufferMut; use vortex_buffer::ByteBufferMut; use vortex_error::VortexExpect; +use vortex_error::VortexResult; use vortex_error::vortex_panic; +use vortex_mask::AllOr; use vortex_utils::aliases::hash_map::DefaultHashBuilder; use vortex_utils::aliases::hash_map::HashTable; use vortex_utils::aliases::hash_map::HashTableEntry; @@ -18,23 +24,24 @@ use vortex_utils::aliases::hash_map::RandomState; use super::DictConstraints; use super::DictEncoder; use crate::ArrayRef; +use crate::ArrayView; use crate::IntoArray; -use crate::accessor::ArrayAccessor; use crate::arrays::PrimitiveArray; use crate::arrays::VarBin; use crate::arrays::VarBinView; use crate::arrays::VarBinViewArray; +use crate::arrays::varbin::VarBinArrayExt; use crate::arrays::varbinview::build_views::BinaryView; -#[expect(deprecated)] -use crate::canonical::ToCanonical as _; use crate::dtype::DType; use crate::dtype::PType; use crate::dtype::UnsignedPType; +use crate::match_each_integer_ptype; use crate::validity::Validity; /// Dictionary encode varbin array. Specializes for primitive byte arrays to avoid double copying -pub struct BytesDictBuilder { - lookup: Option>, +pub struct BytesDictBuilder { + lookup: Option>, + null_code: OnceCell, views: BufferMut, values: ByteBufferMut, values_nulls: BitBufferMut, @@ -58,6 +65,7 @@ impl BytesDictBuilder { Self { lookup: Some(HashTable::new()), views: BufferMut::::empty(), + null_code: OnceCell::new(), values: BufferMut::empty(), values_nulls: BitBufferMut::empty(), hasher: DefaultHashBuilder::default(), @@ -71,18 +79,16 @@ impl BytesDictBuilder { self.views.len() * size_of::() + self.values.len() } - fn lookup_bytes(&self, idx: usize) -> Option<&[u8]> { - self.values_nulls.value(idx).then(|| { - let bin_view = &self.views[idx]; - if bin_view.is_inlined() { - bin_view.as_inlined().value() - } else { - &self.values[bin_view.as_view().as_range()] - } - }) + fn lookup_bytes(&self, idx: usize) -> &[u8] { + let bin_view = &self.views[idx]; + if bin_view.is_inlined() { + bin_view.as_inlined().value() + } else { + &self.values[bin_view.as_view().as_range()] + } } - fn encode_value(&mut self, lookup: &mut HashTable, val: Option<&[u8]>) -> Option { + fn encode_value(&mut self, lookup: &mut HashTable, val: &[u8]) -> Option { match lookup.entry( self.hasher.hash_one(val), |idx| val == self.lookup_bytes(idx.as_()), @@ -95,35 +101,25 @@ impl BytesDictBuilder { } let next_code = self.views.len(); - match val { - None => { - // Null value - self.views.push(BinaryView::default()); - self.values_nulls.append_false(); - } - Some(val) => { - let view = BinaryView::make_view( - val, - 0, - u32::try_from(self.values.len()) - .vortex_expect("values length must fit in u32"), - ); - let additional_bytes = if view.is_inlined() { - size_of::() - } else { - size_of::() + val.len() - }; + let view = BinaryView::make_view( + val, + 0, + u32::try_from(self.values.len()).vortex_expect("values length must fit in u32"), + ); + let additional_bytes = if view.is_inlined() { + size_of::() + } else { + size_of::() + val.len() + }; - if self.dict_bytes() + additional_bytes > self.max_dict_bytes { - return None; - } + if self.dict_bytes() + additional_bytes > self.max_dict_bytes { + return None; + } - self.views.push(view); - self.values_nulls.append_true(); - if !view.is_inlined() { - self.values.extend_from_slice(val); - } - } + self.views.push(view); + self.values_nulls.append_true(); + if !view.is_inlined() { + self.values.extend_from_slice(val); } let next_code = Code::from_usize(next_code).unwrap_or_else(|| { @@ -134,29 +130,162 @@ impl BytesDictBuilder { } } - fn encode_bytes>(&mut self, accessor: &A, len: usize) -> ArrayRef { + #[expect(clippy::cognitive_complexity)] + fn encode_varbin( + &mut self, + var_bin: ArrayView, + ctx: &mut ExecutionCtx, + ) -> VortexResult { let mut local_lookup = self.lookup.take().vortex_expect("Must have a lookup dict"); - let mut codes: BufferMut = BufferMut::with_capacity(len); + let mut codes: BufferMut = BufferMut::with_capacity(var_bin.len()); - accessor.with_iterator(|it| { - for value in it { - let Some(code) = self.encode_value(&mut local_lookup, value) else { - break; - }; - // SAFETY: we reserved capacity in the buffer for `len` elements - unsafe { codes.push_unchecked(code) } + let offsets = var_bin.offsets().clone().execute::(ctx)?; + let bytes = var_bin.bytes(); + let validity_mask = var_bin.validity()?.execute_mask(var_bin.len(), ctx)?; + + match validity_mask.bit_buffer() { + AllOr::All => { + match_each_integer_ptype!(offsets.ptype(), |P| { + let slice_offsets = offsets.as_slice::

(); + for w in slice_offsets.windows(2) { + let start = w[0].as_(); + let end = w[1].as_(); + let Some(code) = self.encode_value(&mut local_lookup, &bytes[start..end]) + else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + }) } - }); + AllOr::None => { + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + unsafe { + codes.push_n_unchecked( + Code::from_usize(0).vortex_expect("must fit 0"), + var_bin.len(), + ) + } + } + AllOr::Some(b) => { + match_each_integer_ptype!(offsets.ptype(), |P| { + let slice_offsets = offsets.as_slice::

(); + for (w, valid) in slice_offsets.windows(2).zip_eq(b.iter()) { + if !valid { + let code = self.null_code.get_or_init(|| { + let code = self.views.len(); + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + Code::from_usize(code).unwrap_or_else(|| { + vortex_panic!("{} has to fit into {}", code, Code::PTYPE) + }) + }); + unsafe { codes.push_unchecked(*code) } + } else { + let start = w[0].as_(); + let end = w[1].as_(); + let Some(code) = + self.encode_value(&mut local_lookup, &bytes[start..end]) + else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + } + }) + } + } // Restore lookup dictionary back into the struct self.lookup = Some(local_lookup); - PrimitiveArray::new(codes, Validity::NonNullable).into_array() + Ok(PrimitiveArray::new(codes, Validity::NonNullable)) + } + + fn encode_varbinview( + &mut self, + var_bin_view: ArrayView, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let mut local_lookup = self.lookup.take().vortex_expect("Must have a lookup dict"); + let mut codes: BufferMut = BufferMut::with_capacity(var_bin_view.len()); + + let views = var_bin_view.views(); + let buffers = var_bin_view + .data_buffers() + .iter() + .map(|b| b.as_host()) + .collect::>(); + let validity_mask = var_bin_view + .validity()? + .execute_mask(var_bin_view.len(), ctx)?; + + match validity_mask.bit_buffer() { + AllOr::All => { + for view in views { + let value = if view.is_inlined() { + view.as_inlined().value() + } else { + &buffers[view.as_view().buffer_index as usize][view.as_view().as_range()] + }; + let Some(code) = self.encode_value(&mut local_lookup, value) else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + } + AllOr::None => { + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + unsafe { + codes.push_n_unchecked( + Code::from_usize(0).vortex_expect("must fit 0"), + var_bin_view.len(), + ) + } + } + AllOr::Some(b) => { + for (view, valid) in views.iter().zip_eq(b.iter()) { + if !valid { + let code = self.null_code.get_or_init(|| { + let code = self.views.len(); + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + Code::from_usize(code).unwrap_or_else(|| { + vortex_panic!("{} has to fit into {}", code, Code::PTYPE) + }) + }); + unsafe { codes.push_unchecked(*code) } + } else { + let value = if view.is_inlined() { + view.as_inlined().value() + } else { + &buffers[view.as_view().buffer_index as usize] + [view.as_view().as_range()] + }; + let Some(code) = self.encode_value(&mut local_lookup, value) else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + } + } + } + + // Restore lookup dictionary back into the struct + self.lookup = Some(local_lookup); + + Ok(PrimitiveArray::new(codes, Validity::NonNullable)) } } impl DictEncoder for BytesDictBuilder { - fn encode(&mut self, array: &ArrayRef) -> ArrayRef { + fn encode(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { debug_assert_eq!( &self.dtype, array.dtype(), @@ -165,17 +294,18 @@ impl DictEncoder for BytesDictBuilder { self.dtype ); - let len = array.len(); if let Some(varbinview) = array.as_opt::() { - self.encode_bytes(&varbinview.into_owned(), len) + self.encode_varbinview(varbinview, ctx) } else if let Some(varbin) = array.as_opt::() { - self.encode_bytes(&varbin.into_owned(), len) + self.encode_varbin(varbin, ctx) } else { // NOTE(aduffy): it is very rare that this path would be taken, only e.g. // if we're performing dictionary encoding downstream of some other compression. - #[expect(deprecated)] - let varbinview = array.to_varbinview(); - self.encode_bytes(&varbinview, len) + let vbv_array = array.clone().execute::(ctx)?.into_array(); + let varbinview = vbv_array + .as_opt::() + .vortex_expect("Must be a VarBinView"); + self.encode_varbinview(varbinview, ctx) } } @@ -205,24 +335,38 @@ impl DictEncoder for BytesDictBuilder { #[cfg(test)] mod test { use std::str; + use std::sync::LazyLock; + + use vortex_array::arrays::PrimitiveArray; + use vortex_session::VortexSession; use crate::IntoArray; - #[expect(deprecated)] - use crate::ToCanonical as _; + use crate::VortexSessionExecute; use crate::accessor::ArrayAccessor; use crate::arrays::VarBinArray; + use crate::arrays::VarBinViewArray; use crate::arrays::dict::DictArraySlotsExt; use crate::builders::dict::dict_encode; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); #[test] fn encode_varbin() { - let arr = VarBinArray::from(vec!["hello", "world", "hello", "again", "world"]); - let dict = dict_encode(&arr.into_array()).unwrap(); - #[expect(deprecated)] - let codes = dict.codes().to_primitive(); + let arr = VarBinViewArray::from_iter_str(vec!["hello", "world", "hello", "again", "world"]); + let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); + let codes = dict + .codes() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(codes.as_slice::(), &[0, 1, 0, 2, 1]); - #[expect(deprecated)] - let values = dict.values().to_varbinview(); + let values = dict + .values() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); values.with_iterator(|iter| { assert_eq!( iter.flatten() @@ -235,7 +379,7 @@ mod test { #[test] fn encode_varbin_nulls() { - let arr: VarBinArray = vec![ + let arr: VarBinViewArray = vec![ Some("hello"), None, Some("world"), @@ -247,12 +391,18 @@ mod test { ] .into_iter() .collect(); - let dict = dict_encode(&arr.into_array()).unwrap(); - #[expect(deprecated)] - let codes = dict.codes().to_primitive(); + let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); + let codes = dict + .codes() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(codes.as_slice::(), &[0, 1, 2, 0, 1, 3, 2, 1]); - #[expect(deprecated)] - let values = dict.values().to_varbinview(); + let values = dict + .values() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); values.with_iterator(|iter| { assert_eq!( iter.map(|b| b.map(|v| unsafe { str::from_utf8_unchecked(v) })) @@ -265,9 +415,12 @@ mod test { #[test] fn repeated_values() { let arr = VarBinArray::from(vec!["a", "a", "b", "b", "a", "b", "a", "b"]); - let dict = dict_encode(&arr.into_array()).unwrap(); - #[expect(deprecated)] - let values = dict.values().to_varbinview(); + let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); + let values = dict + .values() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); values.with_iterator(|iter| { assert_eq!( iter.flatten() @@ -276,8 +429,11 @@ mod test { vec!["a", "b"] ); }); - #[expect(deprecated)] - let codes = dict.codes().to_primitive(); + let codes = dict + .codes() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(codes.as_slice::(), &[0, 0, 1, 1, 0, 1, 0, 1]); } } diff --git a/vortex-array/src/builders/dict/mod.rs b/vortex-array/src/builders/dict/mod.rs index 7cfcfbbfb8a..4bfb89bb582 100644 --- a/vortex-array/src/builders/dict/mod.rs +++ b/vortex-array/src/builders/dict/mod.rs @@ -3,16 +3,16 @@ use bytes::bytes_dict_builder; use primitive::primitive_dict_builder; +use vortex_array::ExecutionCtx; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; use crate::ArrayRef; use crate::IntoArray; -#[expect(deprecated)] -use crate::ToCanonical as _; use crate::arrays::DictArray; use crate::arrays::Primitive; +use crate::arrays::PrimitiveArray; use crate::arrays::VarBin; use crate::arrays::VarBinView; use crate::arrays::primitive::PrimitiveArrayExt; @@ -35,7 +35,7 @@ pub const UNCONSTRAINED: DictConstraints = DictConstraints { pub trait DictEncoder: Send { /// Assign dictionary codes to the given input array. - fn encode(&mut self, array: &ArrayRef) -> ArrayRef; + fn encode(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult; /// Clear the encoder state to make it ready for a new round of decoding. fn reset(&mut self) -> ArrayRef; @@ -65,11 +65,10 @@ pub fn dict_encoder(array: &ArrayRef, constraints: &DictConstraints) -> Box VortexResult { let mut encoder = dict_encoder(array, constraints); - let encoded = encoder.encode(array); - #[expect(deprecated)] - let codes = encoded.to_primitive().narrow()?; + let codes = encoder.encode(array, ctx)?.narrow(ctx)?; // SAFETY: The encoding process will produce a value set of codes and values // All values in the dictionary are guaranteed to be referenced by at least one code // since we build the dictionary from the codes we observe during encoding @@ -81,8 +80,8 @@ pub fn dict_encode_with_constraints( } } -pub fn dict_encode(array: &ArrayRef) -> VortexResult { - let dict_array = dict_encode_with_constraints(array, &UNCONSTRAINED)?; +pub fn dict_encode(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let dict_array = dict_encode_with_constraints(array, &UNCONSTRAINED, ctx)?; if dict_array.len() != array.len() { vortex_bail!( "must have encoded all {} elements, but only encoded {}", diff --git a/vortex-array/src/builders/dict/primitive.rs b/vortex-array/src/builders/dict/primitive.rs index 03d930dbef1..ac1af105ec6 100644 --- a/vortex-array/src/builders/dict/primitive.rs +++ b/vortex-array/src/builders/dict/primitive.rs @@ -1,23 +1,25 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cell::OnceCell; use std::hash::Hash; use std::mem; use rustc_hash::FxBuildHasher; use vortex_buffer::BitBufferMut; use vortex_buffer::BufferMut; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; use vortex_error::vortex_panic; +use vortex_mask::Mask; use vortex_utils::aliases::hash_map::Entry; use vortex_utils::aliases::hash_map::HashMap; use super::DictConstraints; use super::DictEncoder; use crate::ArrayRef; +use crate::ExecutionCtx; use crate::IntoArray; -#[expect(deprecated)] -use crate::ToCanonical as _; -use crate::accessor::ArrayAccessor; use crate::arrays::PrimitiveArray; use crate::arrays::primitive::NativeValue; use crate::dtype::NativePType; @@ -72,6 +74,7 @@ where .min(constraints.max_bytes / T::PTYPE.byte_width()); Self { lookup: HashMap::with_hasher(FxBuildHasher), + null_code: OnceCell::new(), values: BufferMut::::empty(), values_nulls: BitBufferMut::empty(), nullability, @@ -79,8 +82,8 @@ where } } - fn encode_value(&mut self, v: Option) -> Option { - match self.lookup.entry(v.map(NativeValue)) { + fn encode_value(&mut self, v: T) -> Option { + match self.lookup.entry(NativeValue(v)) { Entry::Occupied(o) => Some(*o.get()), Entry::Vacant(vac) => { if self.values.len() >= self.max_dict_len { @@ -89,18 +92,9 @@ where let next_code = Code::from_usize(self.values.len()).unwrap_or_else(|| { vortex_panic!("{} has to fit into {}", self.values.len(), Code::PTYPE) }); - vac.insert(next_code); - match v { - None => { - self.values.push(T::default()); - self.values_nulls.append_false(); - } - Some(v) => { - self.values.push(v); - self.values_nulls.append_true(); - } - } - Some(next_code) + self.values.push(v); + self.values_nulls.append_true(); + Some(*vac.insert(next_code)) } } } @@ -110,7 +104,8 @@ where /// /// Null values are stored in the values of the dictionary such that codes are always non-null. pub struct PrimitiveDictBuilder { - lookup: HashMap>, Code, FxBuildHasher>, + lookup: HashMap, Code, FxBuildHasher>, + null_code: OnceCell, values: BufferMut, values_nulls: BitBufferMut, nullability: Nullability, @@ -123,21 +118,53 @@ where NativeValue: Hash + Eq, Code: UnsignedPType, { - fn encode(&mut self, array: &ArrayRef) -> ArrayRef { + fn encode(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { let mut codes = BufferMut::::with_capacity(array.len()); - #[expect(deprecated)] - let prim = array.to_primitive(); - prim.with_iterator(|it| { - for value in it { - let Some(code) = self.encode_value(value.copied()) else { - break; - }; - unsafe { codes.push_unchecked(code) } + let prim = array.clone().execute::(ctx)?; + match prim.validity()?.execute_mask(array.len(), ctx)? { + Mask::AllTrue(_) => { + for &value in prim.as_slice::() { + let Some(code) = self.encode_value(value) else { + break; + }; + unsafe { codes.push_unchecked(code) } + } + } + Mask::AllFalse(_) => { + self.values.push(T::default()); + self.values_nulls.append_false(); + unsafe { + codes.push_n_unchecked( + Code::from_usize(0).vortex_expect("must fit 0"), + array.len(), + ) + } } - }); + Mask::Values(v) => { + let bit_buff = v.bit_buffer(); + for (&value, valid) in prim.as_slice::().iter().zip(bit_buff) { + if !valid { + let code = self.null_code.get_or_init(|| { + let code = self.values.len(); + self.values.push(T::default()); + self.values_nulls.append_false(); + Code::from_usize(code).unwrap_or_else(|| { + vortex_panic!("{} has to fit into {}", code, Code::PTYPE) + }) + }); + unsafe { codes.push_unchecked(*code) } + } else { + let Some(code) = self.encode_value(value) else { + break; + }; + unsafe { codes.push_unchecked(code) } + } + } + } + } - PrimitiveArray::new(codes, Validity::NonNullable).into_array() + Ok(PrimitiveArray::new(codes, Validity::NonNullable)) } fn reset(&mut self) -> ArrayRef { @@ -155,20 +182,26 @@ where #[cfg(test)] mod test { - #[expect(unused_imports)] - use itertools::Itertools; + use std::sync::LazyLock; + use vortex_buffer::buffer; + use vortex_session::VortexSession; use crate::IntoArray as _; + use crate::VortexSessionExecute; use crate::arrays::dict::DictArraySlotsExt; use crate::assert_arrays_eq; use crate::builders::dict::dict_encode; use crate::builders::dict::primitive::PrimitiveArray; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); #[test] fn encode_primitive() { let arr = buffer![1, 1, 3, 3, 3].into_array(); - let dict = dict_encode(&arr).unwrap(); + let dict = dict_encode(&arr, &mut SESSION.create_execution_ctx()).unwrap(); let expected_codes = buffer![0u8, 0, 1, 1, 1].into_array(); assert_arrays_eq!(dict.codes(), expected_codes); @@ -188,8 +221,9 @@ mod test { None, Some(3), None, - ]); - let dict = dict_encode(&arr.into_array()).unwrap(); + ]) + .into_array(); + let dict = dict_encode(&arr, &mut SESSION.create_execution_ctx()).unwrap(); let expected_codes = buffer![0u8, 0, 1, 2, 2, 1, 2, 1].into_array(); assert_arrays_eq!(dict.codes(), expected_codes); diff --git a/vortex-array/src/scalar/arrow.rs b/vortex-array/src/scalar/arrow.rs index 811417144f3..f8277ecf9a6 100644 --- a/vortex-array/src/scalar/arrow.rs +++ b/vortex-array/src/scalar/arrow.rs @@ -556,10 +556,10 @@ mod tests { #[rstest] #[case(TimeUnit::Nanoseconds, "UTC", 1234567890000000000i64)] - #[case(TimeUnit::Microseconds, "EST", 1234567890000000i64)] + #[case(TimeUnit::Microseconds, "America/New_York", 1234567890000000i64)] #[case(TimeUnit::Microseconds, "Asia/Qatar", 1234567890000000i64)] #[case(TimeUnit::Microseconds, "Australia/Sydney", 1234567890000000i64)] - #[case(TimeUnit::Milliseconds, "HST", 1234567890000i64)] + #[case(TimeUnit::Milliseconds, "Pacific/Honolulu", 1234567890000i64)] #[case(TimeUnit::Seconds, "GMT", 1234567890i64)] fn test_temporal_timestamp_tz_to_arrow( #[case] time_unit: TimeUnit, diff --git a/vortex-btrblocks/src/schemes/float.rs b/vortex-btrblocks/src/schemes/float.rs index 8c55b38d345..687d79444ac 100644 --- a/vortex-btrblocks/src/schemes/float.rs +++ b/vortex-btrblocks/src/schemes/float.rs @@ -267,7 +267,7 @@ impl Scheme for NullDominatedSparseScheme { .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_indices = compressor.compress_child( &indices.into_array(), &compress_ctx, diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index dfd61deb80b..25dfb9fe444 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -553,7 +553,7 @@ impl Scheme for SparseScheme { .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_indices = compressor.compress_child( &indices.into_array(), @@ -723,11 +723,16 @@ impl Scheme for SequenceScheme { return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - // If the distinct_values_count was computed, and not all values are unique, then this - // cannot be encoded as a sequence array. - if stats - .distinct_count() - .is_some_and(|count| count as usize != data.array_len()) + // If the distinct_values_count was computed, and the array has clearly fewer distinct + // values than its length, then this cannot be encoded as a sequence array. The distinct + // count is now sourced from a cardinality estimator, so we allow a small tolerance to + // account for its approximation error (~1-2% for typical inputs). Arrays that are truly + // sequences will fall through to the deferred callback which validates them exactly. + let distinct_count = stats.distinct_count(); + let array_len = data.array_len(); + let tolerance = array_len.div_ceil(16); + if distinct_count + .is_some_and(|count| (count as usize).saturating_add(tolerance) < array_len) { return CompressionEstimate::Verdict(EstimateVerdict::Skip); } @@ -848,7 +853,7 @@ pub(crate) fn rle_compress( .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; try_compress_delta( compressor, &rle_indices_primitive.into_array(), @@ -865,7 +870,7 @@ pub(crate) fn rle_compress( .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; compressor.compress_child( &rle_indices_primitive.into_array(), &compress_ctx, @@ -879,7 +884,7 @@ pub(crate) fn rle_compress( .values_idx_offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_offsets = compressor.compress_child( &rle_offsets_primitive.into_array(), &compress_ctx, diff --git a/vortex-btrblocks/src/schemes/patches.rs b/vortex-btrblocks/src/schemes/patches.rs index 0d9854c3b23..69ca8450f12 100644 --- a/vortex-btrblocks/src/schemes/patches.rs +++ b/vortex-btrblocks/src/schemes/patches.rs @@ -18,7 +18,7 @@ pub fn compress_patches(patches: Patches, ctx: &mut ExecutionCtx) -> VortexResul .indices() .clone() .execute::(ctx)? - .narrow()? + .narrow(ctx)? .into_array(); // Check if the values are constant. @@ -39,7 +39,7 @@ pub fn compress_patches(patches: Patches, ctx: &mut ExecutionCtx) -> VortexResul let offsets_primitive = offsets .clone() .execute::(ctx)? - .narrow()? + .narrow(ctx)? .into_array(); Ok::(offsets_primitive) }) diff --git a/vortex-btrblocks/src/schemes/string.rs b/vortex-btrblocks/src/schemes/string.rs index 0df5a268157..ef7886e8584 100644 --- a/vortex-btrblocks/src/schemes/string.rs +++ b/vortex-btrblocks/src/schemes/string.rs @@ -95,7 +95,7 @@ impl Scheme for FSSTScheme { .uncompressed_lengths() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_original_lengths = compressor.compress_child( &uncompressed_lengths_primitive.into_array(), &compress_ctx, @@ -109,7 +109,7 @@ impl Scheme for FSSTScheme { .offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_codes_offsets = compressor.compress_child( &codes_offsets_primitive.into_array(), &compress_ctx, @@ -206,7 +206,7 @@ impl Scheme for NullDominatedSparseScheme { .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_indices = compressor.compress_child( &indices.into_array(), &compress_ctx, diff --git a/vortex-btrblocks/src/schemes/temporal.rs b/vortex-btrblocks/src/schemes/temporal.rs index 73aa9eedfb4..c5c0abc46aa 100644 --- a/vortex-btrblocks/src/schemes/temporal.rs +++ b/vortex-btrblocks/src/schemes/temporal.rs @@ -98,7 +98,7 @@ impl Scheme for TemporalScheme { subseconds, } = split_temporal(temporal_array, exec_ctx)?; - let days_primitive = days.execute::(exec_ctx)?.narrow()?; + let days_primitive = days.execute::(exec_ctx)?.narrow(exec_ctx)?; let days = compressor.compress_child( &days_primitive.into_array(), &compress_ctx, @@ -106,7 +106,9 @@ impl Scheme for TemporalScheme { 0, exec_ctx, )?; - let seconds_primitive = seconds.execute::(exec_ctx)?.narrow()?; + let seconds_primitive = seconds + .execute::(exec_ctx)? + .narrow(exec_ctx)?; let seconds = compressor.compress_child( &seconds_primitive.into_array(), &compress_ctx, @@ -114,7 +116,9 @@ impl Scheme for TemporalScheme { 1, exec_ctx, )?; - let subseconds_primitive = subseconds.execute::(exec_ctx)?.narrow()?; + let subseconds_primitive = subseconds + .execute::(exec_ctx)? + .narrow(exec_ctx)?; let subseconds = compressor.compress_child( &subseconds_primitive.into_array(), &compress_ctx, diff --git a/vortex-buffer/public-api.lock b/vortex-buffer/public-api.lock index f6c52f9df89..d3443a5d035 100644 --- a/vortex-buffer/public-api.lock +++ b/vortex-buffer/public-api.lock @@ -122,7 +122,7 @@ impl vortex_buffer::trusted_len::Trus impl vortex_buffer::trusted_len::TrustedLen for core::iter::adapters::skip::Skip where I: vortex_buffer::trusted_len::TrustedLen -impl vortex_buffer::trusted_len::TrustedLen for core::iter::adapters::zip::Zip where T: vortex_buffer::trusted_len::TrustedLen, U: vortex_buffer::trusted_len::TrustedLen +impl vortex_buffer::trusted_len::TrustedLen for core::iter::adapters::zip::Zip where T: vortex_buffer::trusted_len::TrustedLen, U: core::iter::traits::iterator::Iterator impl vortex_buffer::trusted_len::TrustedLen for core::array::iter::IntoIter diff --git a/vortex-buffer/src/trusted_len.rs b/vortex-buffer/src/trusted_len.rs index 13cf25d0546..7ef6f682be4 100644 --- a/vortex-buffer/src/trusted_len.rs +++ b/vortex-buffer/src/trusted_len.rs @@ -1,8 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use itertools::ProcessResults; - /// Trait for all types which have a known upper-bound. /// /// Functions that receive a `TrustedLen` iterator can assume that it's `size_hint` is exact, @@ -146,7 +144,7 @@ unsafe impl TrustedLen for crate::Iter<'_, T> {} unsafe impl TrustedLen for crate::BufferIterator {} // ProcessResults -unsafe impl<'a, I, T: 'a, E: 'a> TrustedLen for ProcessResults<'a, I, E> where +unsafe impl<'a, I, T: 'a, E: 'a> TrustedLen for itertools::ProcessResults<'a, I, E> where I: TrustedLen> { } @@ -158,7 +156,7 @@ unsafe impl TrustedLen for std::iter::Enumerate where I: TrustedLen TrustedLen for std::iter::Zip where T: TrustedLen, - U: TrustedLen, + U: Iterator, { } diff --git a/vortex-compressor/Cargo.toml b/vortex-compressor/Cargo.toml index 2d28d86a5e6..49870bea026 100644 --- a/vortex-compressor/Cargo.toml +++ b/vortex-compressor/Cargo.toml @@ -14,6 +14,7 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +cardinality-estimator = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } parking_lot = { workspace = true } diff --git a/vortex-compressor/benches/dict_encode.rs b/vortex-compressor/benches/dict_encode.rs index 02eaa906cb1..64612e3b664 100644 --- a/vortex-compressor/benches/dict_encode.rs +++ b/vortex-compressor/benches/dict_encode.rs @@ -3,17 +3,21 @@ #![expect(clippy::unwrap_used)] +use std::sync::LazyLock; + use divan::Bencher; use vortex_array::IntoArray; -use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::BoolArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::builders::dict::dict_encode; +use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::BufferMut; -use vortex_compressor::builtins::integer_dictionary_encode; -use vortex_compressor::stats::IntegerStats; +use vortex_session::VortexSession; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); fn make_array() -> PrimitiveArray { let values: BufferMut = (0..50).cycle().take(64_000).collect(); @@ -34,18 +38,8 @@ fn make_array() -> PrimitiveArray { fn encode_generic(bencher: Bencher) { let array = make_array().into_array(); bencher - .with_inputs(|| &array) - .bench_refs(|array| dict_encode(array).unwrap()); -} - -#[cfg(not(codspeed))] -#[divan::bench] -fn encode_specialized(bencher: Bencher) { - let array = make_array(); - let stats = IntegerStats::generate(&array, &mut LEGACY_SESSION.create_execution_ctx()); - bencher - .with_inputs(|| &stats) - .bench_refs(|stats| integer_dictionary_encode(array.as_view(), stats)); + .with_inputs(|| (&array, SESSION.create_execution_ctx())) + .bench_refs(|(array, ctx)| dict_encode(array, ctx).unwrap()); } fn main() { diff --git a/vortex-compressor/public-api.lock b/vortex-compressor/public-api.lock index cbdc65377f0..cd822932baf 100644 --- a/vortex-compressor/public-api.lock +++ b/vortex-compressor/public-api.lock @@ -268,10 +268,6 @@ pub fn vortex_compressor::builtins::StringDictScheme::scheme_name(&self) -> &'st pub fn vortex_compressor::builtins::StringDictScheme::stats_options(&self) -> vortex_compressor::stats::GenerateStatsOptions -pub fn vortex_compressor::builtins::float_dictionary_encode(vortex_array::array::view::ArrayView<'_, vortex_array::arrays::primitive::vtable::Primitive>, &vortex_compressor::stats::FloatStats) -> vortex_error::VortexResult - -pub fn vortex_compressor::builtins::integer_dictionary_encode(vortex_array::array::view::ArrayView<'_, vortex_array::arrays::primitive::vtable::Primitive>, &vortex_compressor::stats::IntegerStats) -> vortex_error::VortexResult - pub fn vortex_compressor::builtins::is_float_primitive(&vortex_array::canonical::Canonical) -> bool pub fn vortex_compressor::builtins::is_integer_primitive(&vortex_array::canonical::Canonical) -> bool @@ -754,10 +750,6 @@ pub fn vortex_compressor::stats::BoolStats::fmt(&self, &mut core::fmt::Formatter pub struct vortex_compressor::stats::FloatDistinctInfo -impl vortex_compressor::stats::FloatDistinctInfo - -pub fn vortex_compressor::stats::FloatDistinctInfo::distinct_values(&self) -> &vortex_utils::aliases::hash_set::HashSet, rustc_hash::FxBuildHasher> - impl core::clone::Clone for vortex_compressor::stats::FloatDistinctInfo pub fn vortex_compressor::stats::FloatDistinctInfo::clone(&self) -> vortex_compressor::stats::FloatDistinctInfo @@ -844,10 +836,6 @@ impl core::marker::Copy for vortex_compressor::stats::GenerateStatsOptions pub struct vortex_compressor::stats::IntegerDistinctInfo -impl vortex_compressor::stats::IntegerDistinctInfo - -pub fn vortex_compressor::stats::IntegerDistinctInfo::distinct_values(&self) -> &vortex_utils::aliases::hash_map::HashMap, u32, rustc_hash::FxBuildHasher> - impl core::clone::Clone for vortex_compressor::stats::IntegerDistinctInfo pub fn vortex_compressor::stats::IntegerDistinctInfo::clone(&self) -> vortex_compressor::stats::IntegerDistinctInfo diff --git a/vortex-compressor/src/builtins/dict/float.rs b/vortex-compressor/src/builtins/dict/float.rs index 51d553b591f..90a93cd5568 100644 --- a/vortex-compressor/src/builtins/dict/float.rs +++ b/vortex-compressor/src/builtins/dict/float.rs @@ -7,19 +7,15 @@ //! external compatibility. use vortex_array::ArrayRef; -use vortex_array::ArrayView; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::DictArray; -use vortex_array::arrays::Primitive; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArrayExt; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::arrays::primitive::PrimitiveArrayExt; -use vortex_array::dtype::half::f16; -use vortex_array::validity::Validity; -use vortex_buffer::Buffer; +use vortex_array::builders::dict::dict_encode; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -36,8 +32,6 @@ use crate::scheme::DescendantExclusion; use crate::scheme::Scheme; use crate::scheme::SchemeExt; use crate::stats::ArrayAndStats; -use crate::stats::FloatErasedStats; -use crate::stats::FloatStats; use crate::stats::GenerateStatsOptions; impl Scheme for FloatDictScheme { @@ -113,8 +107,7 @@ impl Scheme for FloatDictScheme { compress_ctx: CompressorContext, exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let stats = data.float_stats(exec_ctx); - let dict = dictionary_encode(data.array_as_primitive(), &stats)?; + let dict = dict_encode(data.array(), exec_ctx)?; let has_all_values_referenced = dict.has_all_values_referenced(); @@ -127,7 +120,7 @@ impl Scheme for FloatDictScheme { .codes() .clone() .execute::(exec_ctx)? - .narrow()? + .narrow(exec_ctx)? .into_array(); let compressed_codes = compressor.compress_child(&narrowed_codes, &compress_ctx, self.id(), 1, exec_ctx)?; @@ -143,106 +136,6 @@ impl Scheme for FloatDictScheme { } } -/// Encodes a typed float array into a [`DictArray`] using the pre-computed distinct values. -macro_rules! typed_encode { - ($source_array:ident, $stats:ident, $typed:ident, $typ:ty) => {{ - let distinct = $typed.distinct().vortex_expect( - "this must be present since `DictScheme` declared that we need distinct values", - ); - - let values_validity = match $source_array.validity()? { - Validity::NonNullable => Validity::NonNullable, - _ => Validity::AllValid, - }; - let codes_validity = $source_array.validity()?; - - let values: Buffer<$typ> = distinct.distinct_values().iter().map(|x| x.0).collect(); - - let max_code = values.len(); - let codes = if max_code <= u8::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else if max_code <= u16::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - }; - - let values = PrimitiveArray::new(values, values_validity).into_array(); - // SAFETY: enforced by the DictEncoder. - Ok(unsafe { DictArray::new_unchecked(codes, values).set_all_values_referenced(true) }) - }}; -} - -/// Compresses a floating-point array into a dictionary array according to attached stats. -/// -/// # Errors -/// -/// Returns an error if unable to compute validity. -pub fn dictionary_encode( - array: ArrayView<'_, Primitive>, - stats: &FloatStats, -) -> VortexResult { - match stats.erased() { - FloatErasedStats::F16(typed) => typed_encode!(array, stats, typed, f16), - FloatErasedStats::F32(typed) => typed_encode!(array, stats, typed, f32), - FloatErasedStats::F64(typed) => typed_encode!(array, stats, typed, f64), - } -} - -/// Stateless encoder that maps values to dictionary codes via a `HashMap`. -struct DictEncoder; - -/// Trait for encoding values of type `T` into codes of type `I`. -trait Encode { - /// Using the distinct value set, turn the values into a set of codes. - fn encode(distinct: &[T], values: &[T]) -> Buffer; -} - -/// Implements [`Encode`] for a float type using its bit representation as the hash key. -macro_rules! impl_encode { - ($typ:ty, $utyp:ty) => { impl_encode!($typ, $utyp, u8, u16, u32); }; - ($typ:ty, $utyp:ty, $($ityp:ty),+) => { - $( - impl Encode<$typ, $ityp> for DictEncoder { - #[expect(clippy::cast_possible_truncation)] - fn encode(distinct: &[$typ], values: &[$typ]) -> Buffer<$ityp> { - let mut codes = - vortex_utils::aliases::hash_map::HashMap::<$utyp, $ityp>::with_capacity( - distinct.len(), - ); - for (code, &value) in distinct.iter().enumerate() { - codes.insert(value.to_bits(), code as $ityp); - } - - let mut output = vortex_buffer::BufferMut::with_capacity(values.len()); - for value in values { - // Any code lookups which fail are for nulls, so their value does not matter. - output.push(codes.get(&value.to_bits()).copied().unwrap_or_default()); - } - - output.freeze() - } - } - )* - }; -} - -impl_encode!(f16, u16); -impl_encode!(f32, u32); -impl_encode!(f64, u64); - #[cfg(test)] mod tests { use vortex_array::IntoArray; @@ -251,16 +144,13 @@ mod tests { use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::assert_arrays_eq; + use vortex_array::builders::dict::dict_encode; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_session::VortexSession; - use super::dictionary_encode; - use crate::stats::FloatStats; - use crate::stats::GenerateStatsOptions; - #[test] fn test_float_dict_encode() -> VortexResult<()> { let mut ctx = VortexSession::empty() @@ -269,21 +159,14 @@ mod tests { let values = buffer![1f32, 2f32, 2f32, 0f32, 1f32]; let validity = Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()); - let array = PrimitiveArray::new(values, validity); + let array = PrimitiveArray::new(values, validity).into_array(); - let stats = FloatStats::generate_opts( - &array, - GenerateStatsOptions { - count_distinct_values: true, - }, - &mut ctx, - ); - let dict_array = dictionary_encode(array.as_view(), &stats)?; - assert_eq!(dict_array.values().len(), 2); + let dict_array = dict_encode(&array, &mut ctx)?; + assert_eq!(dict_array.values().len(), 3); assert_eq!(dict_array.codes().len(), 5); let expected = PrimitiveArray::new( - buffer![1f32, 2f32, 2f32, 1f32, 1f32], + buffer![1f32, 2f32, 2f32, 0f32, 1f32], Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()), ) .into_array(); diff --git a/vortex-compressor/src/builtins/dict/integer.rs b/vortex-compressor/src/builtins/dict/integer.rs index 140afdcebf1..a5cab56e97d 100644 --- a/vortex-compressor/src/builtins/dict/integer.rs +++ b/vortex-compressor/src/builtins/dict/integer.rs @@ -7,18 +7,15 @@ //! for external compatibility. use vortex_array::ArrayRef; -use vortex_array::ArrayView; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::DictArray; -use vortex_array::arrays::Primitive; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArrayExt; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::arrays::primitive::PrimitiveArrayExt; -use vortex_array::validity::Validity; -use vortex_buffer::Buffer; +use vortex_array::builders::dict::dict_encode; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -32,8 +29,6 @@ use crate::scheme::Scheme; use crate::scheme::SchemeExt; use crate::stats::ArrayAndStats; use crate::stats::GenerateStatsOptions; -use crate::stats::IntegerErasedStats; -use crate::stats::IntegerStats; impl Scheme for IntDictScheme { fn scheme_name(&self) -> &'static str { @@ -107,8 +102,7 @@ impl Scheme for IntDictScheme { compress_ctx: CompressorContext, exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let stats = data.integer_stats(exec_ctx); - let dict = dictionary_encode(data.array_as_primitive(), &stats)?; + let dict = dict_encode(data.array(), exec_ctx)?; // Values = child 0. let compressed_values = @@ -119,7 +113,7 @@ impl Scheme for IntDictScheme { .codes() .clone() .execute::(exec_ctx)? - .narrow()? + .narrow(exec_ctx)? .into_array(); let compressed_codes = compressor.compress_child(&narrowed_codes, &compress_ctx, self.id(), 1, exec_ctx)?; @@ -135,121 +129,6 @@ impl Scheme for IntDictScheme { } } -/// Encodes a typed integer array into a [`DictArray`] using the pre-computed distinct values. -macro_rules! typed_encode { - ($source_array:ident, $stats:ident, $typed:ident, $typ:ty) => {{ - let distinct = $typed.distinct().vortex_expect( - "this must be present since `DictScheme` declared that we need distinct values", - ); - - let values_validity = match $source_array.validity()? { - Validity::NonNullable => Validity::NonNullable, - _ => Validity::AllValid, - }; - let codes_validity = $source_array.validity()?; - - let values: Buffer<$typ> = distinct.distinct_values().keys().map(|x| x.0).collect(); - - let max_code = values.len(); - let codes = if max_code <= u8::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else if max_code <= u16::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - }; - - let values = PrimitiveArray::new(values, values_validity).into_array(); - // SAFETY: invariants enforced in DictEncoder. - Ok(unsafe { DictArray::new_unchecked(codes, values).set_all_values_referenced(true) }) - }}; -} - -/// Compresses an integer array into a dictionary array according to attached stats. -/// -/// # Errors -/// -/// Returns an error if unable to compute validity. -#[expect( - clippy::cognitive_complexity, - reason = "complexity from match on all integer types" -)] -pub fn dictionary_encode( - array: ArrayView<'_, Primitive>, - stats: &IntegerStats, -) -> VortexResult { - match stats.erased() { - IntegerErasedStats::U8(typed) => typed_encode!(array, stats, typed, u8), - IntegerErasedStats::U16(typed) => typed_encode!(array, stats, typed, u16), - IntegerErasedStats::U32(typed) => typed_encode!(array, stats, typed, u32), - IntegerErasedStats::U64(typed) => typed_encode!(array, stats, typed, u64), - IntegerErasedStats::I8(typed) => typed_encode!(array, stats, typed, i8), - IntegerErasedStats::I16(typed) => typed_encode!(array, stats, typed, i16), - IntegerErasedStats::I32(typed) => typed_encode!(array, stats, typed, i32), - IntegerErasedStats::I64(typed) => typed_encode!(array, stats, typed, i64), - } -} - -/// Stateless encoder that maps values to dictionary codes via a `HashMap`. -struct DictEncoder; - -/// Trait for encoding values of type `T` into codes of type `I`. -trait Encode { - /// Using the distinct value set, turn the values into a set of codes. - fn encode(distinct: &[T], values: &[T]) -> Buffer; -} - -/// Implements [`Encode`] for an integer type with all code width variants (u8, u16, u32). -macro_rules! impl_encode { - ($typ:ty) => { impl_encode!($typ, u8, u16, u32); }; - ($typ:ty, $($ityp:ty),+) => { - $( - impl Encode<$typ, $ityp> for DictEncoder { - #[expect(clippy::cast_possible_truncation)] - fn encode(distinct: &[$typ], values: &[$typ]) -> Buffer<$ityp> { - let mut codes = - vortex_utils::aliases::hash_map::HashMap::<$typ, $ityp>::with_capacity( - distinct.len(), - ); - for (code, &value) in distinct.iter().enumerate() { - codes.insert(value, code as $ityp); - } - - let mut output = vortex_buffer::BufferMut::with_capacity(values.len()); - for value in values { - // Any code lookups which fail are for nulls, so their value does not matter. - // SAFETY: we have exactly sized output to be as large as values. - unsafe { output.push_unchecked(codes.get(value).copied().unwrap_or_default()) }; - } - - output.freeze() - } - } - )* - }; -} - -impl_encode!(u8); -impl_encode!(u16); -impl_encode!(u32); -impl_encode!(u64); -impl_encode!(i8); -impl_encode!(i16); -impl_encode!(i32); -impl_encode!(i64); - #[cfg(test)] mod tests { use vortex_array::IntoArray; @@ -258,15 +137,13 @@ mod tests { use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::assert_arrays_eq; + use vortex_array::builders::dict::dict_encode; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_session::VortexSession; - use super::dictionary_encode; - use crate::stats::IntegerStats; - #[test] fn test_dict_encode_integer_stats() -> VortexResult<()> { let mut ctx = VortexSession::empty() @@ -275,21 +152,14 @@ mod tests { let data = buffer![100i32, 200, 100, 0, 100]; let validity = Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()); - let array = PrimitiveArray::new(data, validity); + let array = PrimitiveArray::new(data, validity).into_array(); - let stats = IntegerStats::generate_opts( - &array, - crate::stats::GenerateStatsOptions { - count_distinct_values: true, - }, - &mut ctx, - ); - let dict_array = dictionary_encode(array.as_view(), &stats)?; - assert_eq!(dict_array.values().len(), 2); + let dict_array = dict_encode(&array, &mut ctx)?; + assert_eq!(dict_array.values().len(), 3); assert_eq!(dict_array.codes().len(), 5); let expected = PrimitiveArray::new( - buffer![100i32, 200, 100, 100, 100], + buffer![100i32, 200, 100, 0, 100], Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()), ) .into_array(); diff --git a/vortex-compressor/src/builtins/dict/mod.rs b/vortex-compressor/src/builtins/dict/mod.rs index c8e573b4fbc..77d6a020196 100644 --- a/vortex-compressor/src/builtins/dict/mod.rs +++ b/vortex-compressor/src/builtins/dict/mod.rs @@ -18,6 +18,3 @@ pub struct StringDictScheme; mod float; mod integer; mod string; - -pub use float::dictionary_encode as float_dictionary_encode; -pub use integer::dictionary_encode as integer_dictionary_encode; diff --git a/vortex-compressor/src/builtins/dict/string.rs b/vortex-compressor/src/builtins/dict/string.rs index ac6affdf854..94be23fbd64 100644 --- a/vortex-compressor/src/builtins/dict/string.rs +++ b/vortex-compressor/src/builtins/dict/string.rs @@ -98,7 +98,7 @@ impl Scheme for StringDictScheme { compress_ctx: CompressorContext, exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let dict = dict_encode(data.array())?; + let dict = dict_encode(data.array(), exec_ctx)?; // Values = child 0. let compressed_values = @@ -109,7 +109,7 @@ impl Scheme for StringDictScheme { .codes() .clone() .execute::(exec_ctx)? - .narrow()? + .narrow(exec_ctx)? .into_array(); let compressed_codes = compressor.compress_child(&narrowed_codes, &compress_ctx, self.id(), 1, exec_ctx)?; diff --git a/vortex-compressor/src/builtins/mod.rs b/vortex-compressor/src/builtins/mod.rs index c5bd9f343f5..3dff9fe0f87 100644 --- a/vortex-compressor/src/builtins/mod.rs +++ b/vortex-compressor/src/builtins/mod.rs @@ -37,8 +37,6 @@ mod dict; pub use dict::FloatDictScheme; pub use dict::IntDictScheme; pub use dict::StringDictScheme; -pub use dict::float_dictionary_encode; -pub use dict::integer_dictionary_encode; mod constant; diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index d3ad0d9d5c3..4f10b9d540d 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -500,7 +500,7 @@ impl CascadingCompressor { .offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_offsets = self.compress_canonical( Canonical::Primitive(list_offsets_primitive), offset_ctx, @@ -530,7 +530,7 @@ impl CascadingCompressor { .offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_offsets = self.compress_canonical( Canonical::Primitive(list_view_offsets_primitive), offset_ctx, @@ -542,7 +542,7 @@ impl CascadingCompressor { .sizes() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_sizes = self.compress_canonical( Canonical::Primitive(list_view_sizes_primitive), sizes_ctx, diff --git a/vortex-compressor/src/stats/bool.rs b/vortex-compressor/src/stats/bool.rs index 8825ec8a7f6..8ea9f318536 100644 --- a/vortex-compressor/src/stats/bool.rs +++ b/vortex-compressor/src/stats/bool.rs @@ -92,18 +92,24 @@ impl BoolStats { #[cfg(test)] mod tests { - use vortex_array::LEGACY_SESSION; + use std::sync::LazyLock; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::BoolArray; + use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::BitBuffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use super::BoolStats; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_all_true() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![true, true, true]), Validity::NonNullable, @@ -118,7 +124,7 @@ mod tests { #[test] fn test_all_false() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![false, false, false]), Validity::NonNullable, @@ -133,7 +139,7 @@ mod tests { #[test] fn test_mixed() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![true, false, true]), Validity::NonNullable, @@ -148,7 +154,7 @@ mod tests { #[test] fn test_with_nulls() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![true, false, true]), Validity::from_iter([true, false, true]), diff --git a/vortex-compressor/src/stats/float.rs b/vortex-compressor/src/stats/float.rs index d968e8d368f..c103d252580 100644 --- a/vortex-compressor/src/stats/float.rs +++ b/vortex-compressor/src/stats/float.rs @@ -4,10 +4,11 @@ //! Float compression statistics. use std::hash::Hash; +use std::marker::PhantomData; +use cardinality_estimator::CardinalityEstimator; use itertools::Itertools; use num_traits::Float; -use rustc_hash::FxBuildHasher; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::primitive::NativeValue; @@ -19,24 +20,19 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_mask::AllOr; -use vortex_utils::aliases::hash_set::HashSet; use super::GenerateStatsOptions; /// Information about the distinct values in a float array. +/// +/// The distinct count is an estimate produced by Cloudflare's cardinality estimator, which is +/// exact for small cardinalities and approximate beyond that. #[derive(Debug, Clone)] pub struct DistinctInfo { - /// The set of distinct float values. - distinct_values: HashSet, FxBuildHasher>, - /// The count of unique values. This _must_ be non-zero. + /// The estimated count of unique values. This _must_ be non-zero. distinct_count: u32, -} - -impl DistinctInfo { - /// Returns a reference to the distinct values set. - pub fn distinct_values(&self) -> &HashSet, FxBuildHasher> { - &self.distinct_values - } + /// Phantom marker for the float element type. + _marker: PhantomData, } /// Typed statistics for a specific float type. @@ -188,8 +184,8 @@ where average_run_length: 0, erased: TypedStats { distinct: Some(DistinctInfo { - distinct_values: HashSet::with_capacity_and_hasher(0, FxBuildHasher), distinct_count: 0, + _marker: PhantomData, }), } .into(), @@ -202,13 +198,9 @@ where .ok_or_else(|| vortex_err!("Failed to compute null_count"))?; let value_count = array.len() - null_count; - // Keep a HashMap of T, then convert the keys into PValue afterward since value is - // so much more efficient to hash and search for. - let mut distinct_values = if count_distinct_values { - HashSet::with_capacity_and_hasher(array.len() / 2, FxBuildHasher) - } else { - HashSet::with_hasher(FxBuildHasher) - }; + // Cloudflare's cardinality estimator gives us a bounded-memory approximation of the + // number of distinct values, replacing the previous exact `HashSet`. + let mut estimator: CardinalityEstimator> = CardinalityEstimator::new(); let validity = array .as_ref() @@ -227,7 +219,7 @@ where AllOr::All => { for value in first_valid_buff { if count_distinct_values { - distinct_values.insert(NativeValue(value)); + estimator.insert(&NativeValue(value)); } if value != prev { @@ -244,7 +236,7 @@ where { if valid { if count_distinct_values { - distinct_values.insert(NativeValue(value)); + estimator.insert(&NativeValue(value)); } if value != prev { @@ -260,9 +252,10 @@ where let value_count = u32::try_from(value_count)?; let distinct = count_distinct_values.then(|| DistinctInfo { - distinct_count: u32::try_from(distinct_values.len()) - .vortex_expect("more than u32::MAX distinct values"), - distinct_values, + distinct_count: u32::try_from(estimator.estimate()) + .vortex_expect("more than u32::MAX distinct values") + .max(1), + _marker: PhantomData, }); Ok(FloatStats { @@ -275,19 +268,25 @@ where #[cfg(test)] mod tests { + use std::sync::LazyLock; + use vortex_array::IntoArray; - use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; + use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use super::FloatStats; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_float_stats() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let floats = buffer![0.0f32, 1.0f32, 2.0f32].into_array(); let floats = floats.execute::(&mut ctx)?; @@ -308,7 +307,7 @@ mod tests { #[test] fn test_float_stats_leading_nulls() { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let floats = PrimitiveArray::new( buffer![0.0f32, 1.0f32, 2.0f32], Validity::from_iter([false, true, true]), diff --git a/vortex-compressor/src/stats/integer.rs b/vortex-compressor/src/stats/integer.rs index 64345ac5f06..ee8020e3b53 100644 --- a/vortex-compressor/src/stats/integer.rs +++ b/vortex-compressor/src/stats/integer.rs @@ -5,8 +5,8 @@ use std::hash::Hash; +use cardinality_estimator::CardinalityEstimator; use num_traits::PrimInt; -use rustc_hash::FxBuildHasher; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::primitive::NativeValue; @@ -20,30 +20,27 @@ use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_mask::AllOr; -use vortex_utils::aliases::hash_map::HashMap; use super::GenerateStatsOptions; /// Information about the distinct values in an integer array. +/// +/// The `distinct_count` is an estimate computed using Cloudflare's cardinality estimator, which +/// yields exact counts for small cardinalities (<= 128 for the default parameters) and a +/// HyperLogLog++ approximation for larger cardinalities. The most frequent value is tracked using +/// the Boyer-Moore majority candidate algorithm, so `most_frequent_value` and `top_frequency` are +/// only guaranteed to reflect the true majority element when some value accounts for more than +/// half of the non-null entries; otherwise they are treated as a best-effort estimate. #[derive(Debug, Clone)] pub struct DistinctInfo { - /// The unique values and their occurrences. - distinct_values: HashMap, u32, FxBuildHasher>, - /// The count of unique values. This _must_ be non-zero. + /// The estimated count of unique values. This _must_ be non-zero. distinct_count: u32, - /// The most frequent value. + /// The most frequent value (Boyer-Moore majority candidate). most_frequent_value: T, - /// The number of times the most frequent value occurs. + /// The exact number of times `most_frequent_value` occurs in the array. top_frequency: u32, } -impl DistinctInfo { - /// Returns a reference to the distinct values map. - pub fn distinct_values(&self) -> &HashMap, u32, FxBuildHasher> { - &self.distinct_values - } -} - /// Typed statistics for a specific integer type. #[derive(Debug, Clone)] pub struct TypedStats { @@ -346,7 +343,6 @@ where min: T::max_value(), max: T::min_value(), distinct: Some(DistinctInfo { - distinct_values: HashMap::with_capacity_and_hasher(0, FxBuildHasher), distinct_count: 0, most_frequent_value: T::zero(), top_frequency: 0, @@ -370,12 +366,10 @@ where let buffer = array.to_buffer::(); let head = buffer[head_idx]; - let mut loop_state = LoopState { - distinct_values: if count_distinct_values { - HashMap::with_capacity_and_hasher(array.len() / 2, FxBuildHasher) - } else { - HashMap::with_hasher(FxBuildHasher) - }, + let mut loop_state = LoopState:: { + estimator: CardinalityEstimator::new(), + bm_candidate: head, + bm_votes: 0, prev: head, runs: 1, }; @@ -450,18 +444,25 @@ where .vortex_expect("max should be computed"); let distinct = count_distinct_values.then(|| { - let (&top_value, &top_count) = loop_state - .distinct_values - .iter() - .max_by_key(|&(_, &count)| count) - .vortex_expect("we know this is non-empty"); + // The cardinality estimator is exact for small cardinalities and approximate beyond. + // We clamp to at least 1 because we are inside the non-empty/non-all-null branch. + let distinct_count = u32::try_from(loop_state.estimator.estimate()) + .vortex_expect("there are more than `u32::MAX` distinct values") + .max(1); + + // Count the Boyer-Moore majority candidate exactly via a second pass. If any value + // accounts for more than half of the non-null entries, this counts that value; otherwise + // the returned count is a best-effort estimate for whichever candidate survived. + let top_frequency = count_occurrences::( + buffer.as_slice(), + validity.bit_buffer(), + loop_state.bm_candidate, + ); DistinctInfo { - distinct_count: u32::try_from(loop_state.distinct_values.len()) - .vortex_expect("there are more than `u32::MAX` distinct values"), - most_frequent_value: top_value.0, - top_frequency: top_count, - distinct_values: loop_state.distinct_values, + distinct_count, + most_frequent_value: loop_state.bm_candidate, + top_frequency, } }); @@ -479,13 +480,54 @@ where } /// Internal loop state for integer stats computation. -struct LoopState { +struct LoopState +where + T: IntegerPType, + NativeValue: Eq + Hash, +{ /// The previous value seen. prev: T, /// The run count. runs: u32, - /// The distinct values map. - distinct_values: HashMap, u32, FxBuildHasher>, + /// Cloudflare's cardinality estimator, used to approximate the number of distinct values + /// without materializing an exact hash map. + estimator: CardinalityEstimator>, + /// Boyer-Moore majority candidate; holds the current candidate for the most frequent value. + bm_candidate: T, + /// Boyer-Moore vote counter for `bm_candidate`. + bm_votes: u32, +} + +/// Updates the Boyer-Moore majority-vote state for a single value. +#[inline(always)] +fn boyer_moore_observe(state: &mut LoopState, value: T) +where + T: IntegerPType, + NativeValue: Eq + Hash, +{ + if state.bm_votes == 0 { + state.bm_candidate = value; + state.bm_votes = 1; + } else if value == state.bm_candidate { + state.bm_votes += 1; + } else { + state.bm_votes -= 1; + } +} + +/// Counts exact occurrences of `needle` in `buffer`, restricted to valid positions according to +/// `validity`. +fn count_occurrences(buffer: &[T], validity: AllOr<&BitBuffer>, needle: T) -> u32 { + let count = match validity { + AllOr::All => buffer.iter().filter(|&&v| v == needle).count(), + AllOr::None => 0, + AllOr::Some(mask) => buffer + .iter() + .enumerate() + .filter(|&(idx, &v)| mask.value(idx) && v == needle) + .count(), + }; + u32::try_from(count).vortex_expect("occurrences cannot exceed `u32::MAX`") } /// Inner loop for non-null chunks of 64 values. @@ -499,7 +541,8 @@ fn inner_loop_nonnull( { for &value in values { if count_distinct_values { - *state.distinct_values.entry(NativeValue(value)).or_insert(0) += 1; + state.estimator.insert(&NativeValue(value)); + boyer_moore_observe(state, value); } if value != state.prev { @@ -522,7 +565,8 @@ fn inner_loop_nullable( for (idx, &value) in values.iter().enumerate() { if is_valid.value(idx) { if count_distinct_values { - *state.distinct_values.entry(NativeValue(value)).or_insert(0) += 1; + state.estimator.insert(&NativeValue(value)); + boyer_moore_observe(state, value); } if value != state.prev { @@ -546,7 +590,8 @@ fn inner_loop_naive( for (idx, &value) in values.iter().enumerate() { if is_valid.value(idx) { if count_distinct_values { - *state.distinct_values.entry(NativeValue(value)).or_insert(0) += 1; + state.estimator.insert(&NativeValue(value)); + boyer_moore_observe(state, value); } if value != state.prev { @@ -560,22 +605,27 @@ fn inner_loop_naive( #[cfg(test)] mod tests { use std::iter; + use std::sync::LazyLock; - use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; + use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::BitBuffer; use vortex_buffer::Buffer; use vortex_buffer::buffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use super::IntegerStats; use super::typed_int_stats; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_naive_count_distinct_values() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new(buffer![217u8, 0], Validity::NonNullable); let stats = typed_int_stats::(&array, true, &mut ctx)?; assert_eq!(stats.distinct_count().unwrap(), 2); @@ -584,7 +634,7 @@ mod tests { #[test] fn test_naive_count_distinct_values_nullable() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new( buffer![217u8, 0], Validity::from(BitBuffer::from(vec![true, false])), @@ -596,7 +646,7 @@ mod tests { #[test] fn test_count_distinct_values() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new((0..128u8).collect::>(), Validity::NonNullable); let stats = typed_int_stats::(&array, true, &mut ctx)?; assert_eq!(stats.distinct_count().unwrap(), 128); @@ -605,7 +655,7 @@ mod tests { #[test] fn test_count_distinct_values_nullable() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new( (0..128u8).collect::>(), Validity::from(BitBuffer::from_iter( @@ -619,7 +669,7 @@ mod tests { #[test] fn test_integer_stats_leading_nulls() { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let ints = PrimitiveArray::new(buffer![0, 1, 2], Validity::from_iter([false, true, true])); let stats = IntegerStats::generate_opts( @@ -635,4 +685,40 @@ mod tests { assert_eq!(stats.average_run_length, 1); assert_eq!(stats.distinct_count().unwrap(), 2); } + + #[test] + fn test_most_frequent_value_dominates() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // A value that appears in 95% of the array must be recovered exactly by the + // Boyer-Moore tracking plus second-pass count. + let top = -1i32; + let mut data: Vec = vec![top; 950]; + data.extend(0..50i32); + let array = PrimitiveArray::new(Buffer::copy_from(&data), Validity::NonNullable); + let stats = typed_int_stats::(&array, true, &mut ctx)?; + let (top_value, top_count) = stats + .erased() + .most_frequent_value_and_count() + .expect("distinct info must be present"); + assert_eq!(top_value, top.into()); + assert_eq!(top_count, 950); + Ok(()) + } + + #[test] + fn test_cardinality_estimate_large_unique() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // For 1024 distinct values the estimator falls back to HyperLogLog++; verify the + // estimate is within the expected error bound (~1.6% for the default P/W). + let array = + PrimitiveArray::new((0..1024u32).collect::>(), Validity::NonNullable); + let stats = typed_int_stats::(&array, true, &mut ctx)?; + let estimated = stats.distinct_count().unwrap(); + let error_ratio = (estimated as f64 - 1024.0).abs() / 1024.0; + assert!( + error_ratio < 0.05, + "estimator error {error_ratio} exceeds 5% for 1024 distinct values" + ); + Ok(()) + } } diff --git a/vortex-compressor/src/stats/string.rs b/vortex-compressor/src/stats/string.rs index 8613aa5cc37..fdc4aa08cc5 100644 --- a/vortex-compressor/src/stats/string.rs +++ b/vortex-compressor/src/stats/string.rs @@ -3,12 +3,12 @@ //! String compression statistics. +use cardinality_estimator::CardinalityEstimator; use vortex_array::ExecutionCtx; use vortex_array::arrays::VarBinViewArray; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; -use vortex_utils::aliases::hash_set::HashSet; use super::GenerateStatsOptions; @@ -24,23 +24,27 @@ pub struct StringStats { null_count: u32, } -/// Estimate the number of distinct strings in the var bin view array. +/// Estimate the number of distinct strings in the var bin view array using Cloudflare's +/// cardinality estimator. +/// +/// The signal used for each string is the 64-bit combination of its length and first 4-byte +/// prefix: two strings that are equal must agree on both. This remains an approximation for +/// strings that share a 4-byte prefix and length, but is exact for distinct prefixes/lengths. +/// The cardinality estimator itself is exact for small cardinalities and falls back to +/// HyperLogLog++ for larger ones. fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult { let views = strings.views(); - // Iterate the views. Two strings which are equal must have the same first 8-bytes. - // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all - // share a 4-byte prefix and have the same length. - let mut distinct = HashSet::with_capacity(views.len() / 2); - views.iter().for_each(|&view| { + let mut estimator: CardinalityEstimator = CardinalityEstimator::new(); + for &view in views.iter() { #[expect( clippy::cast_possible_truncation, reason = "approximate uniqueness with view prefix" )] let len_and_prefix = view.as_u128() as u64; - distinct.insert(len_and_prefix); - }); + estimator.insert(&len_and_prefix); + } - Ok(u32::try_from(distinct.len())?) + Ok(u32::try_from(estimator.estimate())?) } impl StringStats { diff --git a/vortex-ffi/src/data_source.rs b/vortex-ffi/src/data_source.rs index 33ebe937b3a..a4b6e60293d 100644 --- a/vortex-ffi/src/data_source.rs +++ b/vortex-ffi/src/data_source.rs @@ -179,7 +179,7 @@ mod tests { assert_error(error); assert!(ds.is_null()); - opts.paths = c"*.vortex".as_ptr(); + opts.paths = c"definitely-missing-dir/*.vortex".as_ptr(); let ds = vx_data_source_new(session, &raw const opts, &raw mut error); assert_error(error); assert!(ds.is_null()); diff --git a/vortex-layout/src/layouts/dict/writer.rs b/vortex-layout/src/layouts/dict/writer.rs index 7013becd4df..f83c426b55b 100644 --- a/vortex-layout/src/layouts/dict/writer.rs +++ b/vortex-layout/src/layouts/dict/writer.rs @@ -20,6 +20,8 @@ use futures::stream::once; use futures::try_join; use vortex_array::ArrayContext; use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::Dict; use vortex_array::builders::dict::DictConstraints; @@ -557,7 +559,9 @@ fn encode_chunk( mut encoder: Box, chunk: &ArrayRef, ) -> VortexResult { - let encoded = encoder.encode(chunk); + let encoded = encoder + .encode(chunk, &mut LEGACY_SESSION.create_execution_ctx())? + .into_array(); match remainder(chunk, encoded.len())? { None => Ok(EncodingState::Continue((encoder, encoded))), Some(unencoded) => Ok(EncodingState::Done((encoder.reset(), encoded, unencoded))), diff --git a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs index add08b5f8e3..0e3a9d5a089 100644 --- a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs +++ b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs @@ -5,14 +5,17 @@ use vortex::array::ArrayId; use vortex::array::ArrayRef; use vortex::array::ArrayVTable; use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; use vortex::array::arrays::Dict; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::VarBinArray; use vortex::array::builders::dict::dict_encode; use vortex::array::dtype::FieldNames; +use vortex::array::session::ArraySession; use vortex::array::validity::Validity; use vortex::error::VortexResult; +use vortex::session::VortexSession; use super::N; use crate::fixtures::FlatLayoutFixture; @@ -83,6 +86,9 @@ impl FlatLayoutFixture for DictFixture { .map(|i| insertion_values[(i * 7 + 3) % insertion_values.len()]) .collect(); let insertion_ordered_col = VarBinArray::from_strs(insertion_ordered); + let mut ctx = VortexSession::empty() + .with::() + .create_execution_ctx(); let arr = StructArray::try_new( FieldNames::from([ @@ -100,18 +106,18 @@ impl FlatLayoutFixture for DictFixture { "insertion_ordered", ]), vec![ - dict_encode(&str_col.into_array())?.into_array(), - dict_encode(&int_col.into_array())?.into_array(), - dict_encode(&nullable_col.into_array())?.into_array(), - dict_encode(&single_col.into_array())?.into_array(), - dict_encode(&bool_cat_col.into_array())?.into_array(), - dict_encode(&all_null_col.into_array())?.into_array(), - dict_encode(&single_non_null_col.into_array())?.into_array(), - dict_encode(&threshold_255_col.into_array())?.into_array(), - dict_encode(&threshold_256_col.into_array())?.into_array(), - dict_encode(&threshold_257_col.into_array())?.into_array(), - dict_encode(&long_col.into_array())?.into_array(), - dict_encode(&insertion_ordered_col.into_array())?.into_array(), + dict_encode(&str_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&int_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&nullable_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&single_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&bool_cat_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&all_null_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&single_non_null_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&threshold_255_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&threshold_256_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&threshold_257_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&long_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&insertion_ordered_col.into_array(), &mut ctx)?.into_array(), ], N, Validity::NonNullable, diff --git a/vortex/benches/single_encoding_throughput.rs b/vortex/benches/single_encoding_throughput.rs index be253187956..f3b1d39911f 100644 --- a/vortex/benches/single_encoding_throughput.rs +++ b/vortex/benches/single_encoding_throughput.rs @@ -213,16 +213,21 @@ fn bench_for_decompress_i32(bencher: Bencher) { #[divan::bench(name = "dict_compress_u32")] fn bench_dict_compress_u32(bencher: Bencher) { let (uint_array, ..) = setup_primitive_arrays(); + let array = uint_array.into_array(); with_byte_counter(bencher, NUM_VALUES * 4) - .with_inputs(|| &uint_array) - .bench_refs(|a| dict_encode(&a.clone().into_array()).unwrap()); + .with_inputs(|| (&array, SESSION.create_execution_ctx())) + .bench_refs(|(a, ctx)| dict_encode(a, ctx).unwrap()); } #[divan::bench(name = "dict_decompress_u32")] fn bench_dict_decompress_u32(bencher: Bencher) { let (uint_array, ..) = setup_primitive_arrays(); - let compressed = dict_encode(&uint_array.into_array()).unwrap(); + let compressed = dict_encode( + &uint_array.into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| (&compressed, SESSION.create_execution_ctx())) @@ -393,17 +398,22 @@ fn bench_dict_compress_string(bencher: Bencher) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(NUM_VALUES as usize, 0.00005)); let nbytes = varbinview_arr.nbytes() as u64; + let array = varbinview_arr.into_array(); with_byte_counter(bencher, nbytes) - .with_inputs(|| &varbinview_arr) - .bench_refs(|a| dict_encode(&a.clone().into_array()).unwrap()); + .with_inputs(|| (&array, SESSION.create_execution_ctx())) + .bench_refs(|(a, ctx)| dict_encode(a, ctx).unwrap()); } #[divan::bench(name = "dict_decompress_string")] fn bench_dict_decompress_string(bencher: Bencher) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(NUM_VALUES as usize, 0.00005)); - let dict = dict_encode(&varbinview_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbinview_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let nbytes = varbinview_arr.into_array().nbytes() as u64; with_byte_counter(bencher, nbytes)