diff --git a/Cargo.lock b/Cargo.lock index bf8c74d7058..e8dbad977b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3579,6 +3579,12 @@ dependencies = [ "url", ] +[[package]] +name = "datasketches" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c286de4e81ea2590afc24d754e0f83810c566f50a1388fa75ebd57928c0d9745" + [[package]] name = "deepsize" version = "0.2.0" @@ -10663,6 +10669,19 @@ dependencies = [ "vortex-utils", ] +[[package]] +name = "vortex-datasketches" +version = "0.1.0" +dependencies = [ + "datasketches", + "num-traits", + "vortex-array", + "vortex-buffer", + "vortex-error", + "vortex-mask", + "vortex-session", +] + [[package]] name = "vortex-datetime-parts" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 58149ae7cb5..94f850af01f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "vortex-array", "vortex-tensor", "vortex-turboquant", + "vortex-datasketches", "vortex-compressor", "vortex-btrblocks", "vortex-layout", @@ -146,6 +147,7 @@ datafusion-physical-expr-common = { version = "53" } datafusion-physical-plan = { version = "53" } datafusion-pruning = { version = "53" } datafusion-sqllogictest = { version = "53" } +datasketches = "0.2.0" dirs = "6.0.0" divan = { package = "codspeed-divan-compat", version = "4.0.4" } enum-iterator = "2.0.0" @@ -277,6 +279,7 @@ vortex-buffer = { version = "0.1.0", path = "./vortex-buffer", default-features vortex-bytebool = { version = "0.1.0", path = "./encodings/bytebool", default-features = false } vortex-compressor = { version = "0.1.0", path = "./vortex-compressor", default-features = false } vortex-datafusion = { version = "0.1.0", path = "./vortex-datafusion", default-features = false } +vortex-datasketches = { version = "0.1.0", path = "./vortex-datasketches", default-features = false } vortex-datetime-parts = { version = "0.1.0", path = "./encodings/datetime-parts", default-features = false } vortex-decimal-byte-parts = { version = "0.1.0", path = "encodings/decimal-byte-parts", default-features = false } vortex-error = { version = "0.1.0", path = "./vortex-error", default-features = false } diff --git a/vortex-datasketches/Cargo.toml b/vortex-datasketches/Cargo.toml new file mode 100644 index 00000000000..356c8d8993e --- /dev/null +++ b/vortex-datasketches/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "vortex-datasketches" +authors = { workspace = true } +categories = { workspace = true } +description = "Apache DataSketches aggregate functions for Vortex" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +readme = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +datasketches = { workspace = true } +num-traits = { workspace = true } +vortex-array = { workspace = true } +vortex-buffer = { workspace = true } +vortex-error = { workspace = true } +vortex-mask = { workspace = true } +vortex-session = { workspace = true } diff --git a/vortex-datasketches/public-api.lock b/vortex-datasketches/public-api.lock new file mode 100644 index 00000000000..7e47be6b270 --- /dev/null +++ b/vortex-datasketches/public-api.lock @@ -0,0 +1,411 @@ +pub mod vortex_datasketches + +pub mod vortex_datasketches::hll + +pub enum vortex_datasketches::hll::HllTarget + +pub vortex_datasketches::hll::HllTarget::Hll4 + +pub vortex_datasketches::hll::HllTarget::Hll6 + +pub vortex_datasketches::hll::HllTarget::Hll8 + +impl core::clone::Clone for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::clone(&self) -> vortex_datasketches::hll::HllTarget + +impl core::cmp::Eq for vortex_datasketches::hll::HllTarget + +impl core::cmp::PartialEq for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::eq(&self, &vortex_datasketches::hll::HllTarget) -> bool + +impl core::convert::From for datasketches::hll::HllType + +pub fn datasketches::hll::HllType::from(vortex_datasketches::hll::HllTarget) -> Self + +impl core::default::Default for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::default() -> vortex_datasketches::hll::HllTarget + +impl core::fmt::Debug for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::fmt::Display for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::Copy for vortex_datasketches::hll::HllTarget + +impl core::marker::StructuralPartialEq for vortex_datasketches::hll::HllTarget + +pub struct vortex_datasketches::hll::Hll + +impl core::clone::Clone for vortex_datasketches::hll::Hll + +pub fn vortex_datasketches::hll::Hll::clone(&self) -> vortex_datasketches::hll::Hll + +impl core::fmt::Debug for vortex_datasketches::hll::Hll + +pub fn vortex_datasketches::hll::Hll::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::aggregate_fn::vtable::AggregateFnVTable for vortex_datasketches::hll::Hll + +pub type vortex_datasketches::hll::Hll::Options = vortex_datasketches::hll::HllOptions + +pub type vortex_datasketches::hll::Hll::Partial = vortex_datasketches::hll::HllPartial + +pub fn vortex_datasketches::hll::Hll::accumulate(&self, &mut Self::Partial, &vortex_array::columnar::Columnar, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::hll::Hll::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::hll::Hll::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::finalize(&self, vortex_array::array::erased::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::id(&self) -> vortex_array::aggregate_fn::AggregateFnId + +pub fn vortex_datasketches::hll::Hll::is_saturated(&self, &Self::Partial) -> bool + +pub fn vortex_datasketches::hll::Hll::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::hll::Hll::reset(&self, &mut Self::Partial) + +pub fn vortex_datasketches::hll::Hll::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::hll::Hll::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + +pub fn vortex_datasketches::hll::Hll::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub struct vortex_datasketches::hll::HllOptions + +pub vortex_datasketches::hll::HllOptions::lg_k: u8 + +pub vortex_datasketches::hll::HllOptions::target: vortex_datasketches::hll::HllTarget + +impl core::clone::Clone for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::clone(&self) -> vortex_datasketches::hll::HllOptions + +impl core::cmp::Eq for vortex_datasketches::hll::HllOptions + +impl core::cmp::PartialEq for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::eq(&self, &vortex_datasketches::hll::HllOptions) -> bool + +impl core::default::Default for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::default() -> Self + +impl core::fmt::Debug for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::fmt::Display for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_datasketches::hll::HllOptions + +pub struct vortex_datasketches::hll::HllPartial + +pub const vortex_datasketches::hll::DEFAULT_LG_K: u8 + +pub fn vortex_datasketches::hll::estimate(&[u8]) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::hll(&vortex_array::array::erased::ArrayRef, vortex_datasketches::hll::HllOptions, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub mod vortex_datasketches::tdigest + +pub struct vortex_datasketches::tdigest::TDigest + +impl core::clone::Clone for vortex_datasketches::tdigest::TDigest + +pub fn vortex_datasketches::tdigest::TDigest::clone(&self) -> vortex_datasketches::tdigest::TDigest + +impl core::fmt::Debug for vortex_datasketches::tdigest::TDigest + +pub fn vortex_datasketches::tdigest::TDigest::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::aggregate_fn::vtable::AggregateFnVTable for vortex_datasketches::tdigest::TDigest + +pub type vortex_datasketches::tdigest::TDigest::Options = vortex_datasketches::tdigest::TDigestOptions + +pub type vortex_datasketches::tdigest::TDigest::Partial = vortex_datasketches::tdigest::TDigestPartial + +pub fn vortex_datasketches::tdigest::TDigest::accumulate(&self, &mut Self::Partial, &vortex_array::columnar::Columnar, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::tdigest::TDigest::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::tdigest::TDigest::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::finalize(&self, vortex_array::array::erased::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::id(&self) -> vortex_array::aggregate_fn::AggregateFnId + +pub fn vortex_datasketches::tdigest::TDigest::is_saturated(&self, &Self::Partial) -> bool + +pub fn vortex_datasketches::tdigest::TDigest::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::tdigest::TDigest::reset(&self, &mut Self::Partial) + +pub fn vortex_datasketches::tdigest::TDigest::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::tdigest::TDigest::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + +pub fn vortex_datasketches::tdigest::TDigest::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub struct vortex_datasketches::tdigest::TDigestOptions + +pub vortex_datasketches::tdigest::TDigestOptions::k: u16 + +impl core::clone::Clone for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::clone(&self) -> vortex_datasketches::tdigest::TDigestOptions + +impl core::cmp::Eq for vortex_datasketches::tdigest::TDigestOptions + +impl core::cmp::PartialEq for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::eq(&self, &vortex_datasketches::tdigest::TDigestOptions) -> bool + +impl core::default::Default for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::default() -> Self + +impl core::fmt::Debug for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::fmt::Display for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_datasketches::tdigest::TDigestOptions + +pub struct vortex_datasketches::tdigest::TDigestPartial + +pub const vortex_datasketches::tdigest::DEFAULT_K: u16 + +pub fn vortex_datasketches::tdigest::quantile(&[u8], f64) -> vortex_error::VortexResult> + +pub fn vortex_datasketches::tdigest::tdigest(&vortex_array::array::erased::ArrayRef, vortex_datasketches::tdigest::TDigestOptions, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub enum vortex_datasketches::HllTarget + +pub vortex_datasketches::HllTarget::Hll4 + +pub vortex_datasketches::HllTarget::Hll6 + +pub vortex_datasketches::HllTarget::Hll8 + +impl core::clone::Clone for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::clone(&self) -> vortex_datasketches::hll::HllTarget + +impl core::cmp::Eq for vortex_datasketches::hll::HllTarget + +impl core::cmp::PartialEq for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::eq(&self, &vortex_datasketches::hll::HllTarget) -> bool + +impl core::convert::From for datasketches::hll::HllType + +pub fn datasketches::hll::HllType::from(vortex_datasketches::hll::HllTarget) -> Self + +impl core::default::Default for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::default() -> vortex_datasketches::hll::HllTarget + +impl core::fmt::Debug for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::fmt::Display for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_datasketches::hll::HllTarget + +pub fn vortex_datasketches::hll::HllTarget::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::Copy for vortex_datasketches::hll::HllTarget + +impl core::marker::StructuralPartialEq for vortex_datasketches::hll::HllTarget + +pub struct vortex_datasketches::Hll + +impl core::clone::Clone for vortex_datasketches::hll::Hll + +pub fn vortex_datasketches::hll::Hll::clone(&self) -> vortex_datasketches::hll::Hll + +impl core::fmt::Debug for vortex_datasketches::hll::Hll + +pub fn vortex_datasketches::hll::Hll::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::aggregate_fn::vtable::AggregateFnVTable for vortex_datasketches::hll::Hll + +pub type vortex_datasketches::hll::Hll::Options = vortex_datasketches::hll::HllOptions + +pub type vortex_datasketches::hll::Hll::Partial = vortex_datasketches::hll::HllPartial + +pub fn vortex_datasketches::hll::Hll::accumulate(&self, &mut Self::Partial, &vortex_array::columnar::Columnar, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::hll::Hll::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::hll::Hll::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::finalize(&self, vortex_array::array::erased::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_datasketches::hll::Hll::id(&self) -> vortex_array::aggregate_fn::AggregateFnId + +pub fn vortex_datasketches::hll::Hll::is_saturated(&self, &Self::Partial) -> bool + +pub fn vortex_datasketches::hll::Hll::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::hll::Hll::reset(&self, &mut Self::Partial) + +pub fn vortex_datasketches::hll::Hll::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::hll::Hll::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + +pub fn vortex_datasketches::hll::Hll::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub struct vortex_datasketches::HllOptions + +pub vortex_datasketches::HllOptions::lg_k: u8 + +pub vortex_datasketches::HllOptions::target: vortex_datasketches::hll::HllTarget + +impl core::clone::Clone for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::clone(&self) -> vortex_datasketches::hll::HllOptions + +impl core::cmp::Eq for vortex_datasketches::hll::HllOptions + +impl core::cmp::PartialEq for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::eq(&self, &vortex_datasketches::hll::HllOptions) -> bool + +impl core::default::Default for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::default() -> Self + +impl core::fmt::Debug for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::fmt::Display for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_datasketches::hll::HllOptions + +pub fn vortex_datasketches::hll::HllOptions::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_datasketches::hll::HllOptions + +pub struct vortex_datasketches::TDigest + +impl core::clone::Clone for vortex_datasketches::tdigest::TDigest + +pub fn vortex_datasketches::tdigest::TDigest::clone(&self) -> vortex_datasketches::tdigest::TDigest + +impl core::fmt::Debug for vortex_datasketches::tdigest::TDigest + +pub fn vortex_datasketches::tdigest::TDigest::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::aggregate_fn::vtable::AggregateFnVTable for vortex_datasketches::tdigest::TDigest + +pub type vortex_datasketches::tdigest::TDigest::Options = vortex_datasketches::tdigest::TDigestOptions + +pub type vortex_datasketches::tdigest::TDigest::Partial = vortex_datasketches::tdigest::TDigestPartial + +pub fn vortex_datasketches::tdigest::TDigest::accumulate(&self, &mut Self::Partial, &vortex_array::columnar::Columnar, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::tdigest::TDigest::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_datasketches::tdigest::TDigest::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::finalize(&self, vortex_array::array::erased::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub fn vortex_datasketches::tdigest::TDigest::id(&self) -> vortex_array::aggregate_fn::AggregateFnId + +pub fn vortex_datasketches::tdigest::TDigest::is_saturated(&self, &Self::Partial) -> bool + +pub fn vortex_datasketches::tdigest::TDigest::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::tdigest::TDigest::reset(&self, &mut Self::Partial) + +pub fn vortex_datasketches::tdigest::TDigest::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option + +pub fn vortex_datasketches::tdigest::TDigest::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + +pub fn vortex_datasketches::tdigest::TDigest::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult + +pub struct vortex_datasketches::TDigestOptions + +pub vortex_datasketches::TDigestOptions::k: u16 + +impl core::clone::Clone for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::clone(&self) -> vortex_datasketches::tdigest::TDigestOptions + +impl core::cmp::Eq for vortex_datasketches::tdigest::TDigestOptions + +impl core::cmp::PartialEq for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::eq(&self, &vortex_datasketches::tdigest::TDigestOptions) -> bool + +impl core::default::Default for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::default() -> Self + +impl core::fmt::Debug for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::fmt::Display for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::tdigest::TDigestOptions::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_datasketches::tdigest::TDigestOptions + +pub fn vortex_datasketches::hll(&vortex_array::array::erased::ArrayRef, vortex_datasketches::hll::HllOptions, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_datasketches::initialize(&vortex_session::VortexSession) + +pub fn vortex_datasketches::tdigest(&vortex_array::array::erased::ArrayRef, vortex_datasketches::tdigest::TDigestOptions, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult diff --git a/vortex-datasketches/src/hll/bool.rs b/vortex-datasketches/src/hll/bool.rs new file mode 100644 index 00000000000..aaa3d11e15a --- /dev/null +++ b/vortex-datasketches/src/hll/bool.rs @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ExecutionCtx; +use vortex_array::arrays::BoolArray; +use vortex_array::arrays::bool::BoolArrayExt; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use super::HllPartial; + +pub(super) fn accumulate_bool( + partial: &mut HllPartial, + array: &BoolArray, + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let values = array.to_bit_buffer(); + match array.validity()?.execute_mask(array.as_ref().len(), ctx)? { + Mask::AllTrue(_) => { + for value in values.iter() { + partial.update_value(value); + } + } + Mask::AllFalse(_) => {} + Mask::Values(validity) => { + for (value, valid) in values.iter().zip(validity.bit_buffer().iter()) { + if valid { + partial.update_value(value); + } + } + } + } + Ok(()) +} diff --git a/vortex-datasketches/src/hll/mod.rs b/vortex-datasketches/src/hll/mod.rs new file mode 100644 index 00000000000..465af2f5e69 --- /dev/null +++ b/vortex-datasketches/src/hll/mod.rs @@ -0,0 +1,440 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod bool; +mod primitive; +mod varbin; + +use std::fmt; +use std::fmt::Display; +use std::fmt::Formatter; + +use datasketches::hll::HllSketch as DatasketchesHllSketch; +use datasketches::hll::HllType; +use datasketches::hll::HllUnion; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::Columnar; +use vortex_array::ExecutionCtx; +use vortex_array::aggregate_fn::Accumulator; +use vortex_array::aggregate_fn::AggregateFnId; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::DynAccumulator; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_session::VortexSession; + +use self::bool::accumulate_bool; +use self::primitive::accumulate_primitive; +use self::primitive::update_primitive; +use self::varbin::accumulate_varbinview; + +/// Default HLL precision (`lg_k`) used by [`HllOptions`]. +pub const DEFAULT_LG_K: u8 = 12; + +const OPTIONS_VERSION: u8 = 1; + +/// Apache DataSketches HLL target representation. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +pub enum HllTarget { + /// HLL_4, the most compact representation. + Hll4, + /// HLL_6, a balanced representation. + Hll6, + /// HLL_8, the highest precision-byte representation. + #[default] + Hll8, +} + +impl HllTarget { + fn as_byte(self) -> u8 { + match self { + Self::Hll4 => 4, + Self::Hll6 => 6, + Self::Hll8 => 8, + } + } + + fn from_byte(byte: u8) -> VortexResult { + Ok(match byte { + 4 => Self::Hll4, + 6 => Self::Hll6, + 8 => Self::Hll8, + _ => vortex_bail!("Invalid HLL target type byte: {byte}"), + }) + } +} + +impl Display for HllTarget { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + Self::Hll4 => write!(f, "hll4"), + Self::Hll6 => write!(f, "hll6"), + Self::Hll8 => write!(f, "hll8"), + } + } +} + +impl From for HllType { + fn from(value: HllTarget) -> Self { + match value { + HllTarget::Hll4 => Self::Hll4, + HllTarget::Hll6 => Self::Hll6, + HllTarget::Hll8 => Self::Hll8, + } + } +} + +/// Options for the `datasketches.hll` aggregate function. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct HllOptions { + /// Log2 of the configured number of HLL buckets. Apache DataSketches supports `[4, 21]`. + pub lg_k: u8, + /// HLL target representation used for serialized aggregate results. + pub target: HllTarget, +} + +impl Default for HllOptions { + fn default() -> Self { + Self { + lg_k: DEFAULT_LG_K, + target: HllTarget::default(), + } + } +} + +impl Display for HllOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "lg_k={},target={}", self.lg_k, self.target) + } +} + +impl HllOptions { + fn validate(&self) -> VortexResult<()> { + vortex_ensure!( + (4..=21).contains(&self.lg_k), + "HLL lg_k must be in [4, 21], got {}", + self.lg_k + ); + Ok(()) + } +} + +/// Aggregate function vtable for Apache DataSketches HLL sketches. +#[derive(Clone, Debug)] +pub struct Hll; + +/// Partial accumulator state for [`Hll`]. +pub struct HllPartial { + options: HllOptions, + union: HllUnion, +} + +impl HllPartial { + fn update_value(&mut self, value: T) { + self.union.update_value(value); + } +} + +/// Build a serialized Apache DataSketches HLL sketch from an array. +pub fn hll(array: &ArrayRef, options: HllOptions, ctx: &mut ExecutionCtx) -> VortexResult { + let mut acc = Accumulator::try_new(Hll, options, array.dtype().clone())?; + acc.accumulate(array, ctx)?; + acc.finish() +} + +/// Return the cardinality estimate for a serialized HLL sketch. +pub fn estimate(bytes: &[u8]) -> VortexResult { + let sketch = DatasketchesHllSketch::deserialize(bytes) + .map_err(|err| vortex_err!("Failed to deserialize HLL sketch: {err}"))?; + Ok(sketch.estimate()) +} + +impl AggregateFnVTable for Hll { + type Options = HllOptions; + type Partial = HllPartial; + + fn id(&self) -> AggregateFnId { + AggregateFnId::new("datasketches.hll") + } + + fn serialize(&self, options: &Self::Options) -> VortexResult>> { + options.validate()?; + Ok(Some(vec![ + OPTIONS_VERSION, + options.lg_k, + options.target.as_byte(), + ])) + } + + fn deserialize( + &self, + metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + vortex_ensure!( + metadata.len() == 3, + "Invalid HLL options metadata length: expected 3, got {}", + metadata.len() + ); + vortex_ensure!( + metadata[0] == OPTIONS_VERSION, + "Unsupported HLL options metadata version: {}", + metadata[0] + ); + + let options = HllOptions { + lg_k: metadata[1], + target: HllTarget::from_byte(metadata[2])?, + }; + options.validate()?; + Ok(options) + } + + fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option { + if options.validate().is_err() { + return None; + } + match input_dtype { + DType::Null + | DType::Bool(_) + | DType::Primitive(..) + | DType::Utf8(_) + | DType::Binary(_) => Some(DType::Binary(Nullability::NonNullable)), + _ => None, + } + } + + fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option { + self.return_dtype(options, input_dtype) + } + + fn empty_partial( + &self, + options: &Self::Options, + _input_dtype: &DType, + ) -> VortexResult { + options.validate()?; + Ok(HllPartial { + options: options.clone(), + union: HllUnion::new(options.lg_k), + }) + } + + fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> { + if other.is_null() { + return Ok(()); + } + + let bytes = other + .as_binary() + .value() + .ok_or_else(|| vortex_err!("HLL partial scalar must be non-null"))?; + let sketch = DatasketchesHllSketch::deserialize(bytes.as_slice()) + .map_err(|err| vortex_err!("Failed to deserialize HLL partial: {err}"))?; + partial.union.update(&sketch); + Ok(()) + } + + fn to_scalar(&self, partial: &Self::Partial) -> VortexResult { + let sketch = partial.union.get_result(partial.options.target.into()); + Ok(Scalar::binary(sketch.serialize(), Nullability::NonNullable)) + } + + fn reset(&self, partial: &mut Self::Partial) { + partial.union.reset(); + } + + fn is_saturated(&self, _state: &Self::Partial) -> bool { + false + } + + fn accumulate( + &self, + partial: &mut Self::Partial, + batch: &Columnar, + ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + match batch { + Columnar::Constant(c) => { + if c.is_empty() || c.scalar().is_null() { + return Ok(()); + } + match c.scalar().dtype() { + DType::Bool(_) => { + let value = c + .scalar() + .as_bool() + .value() + .ok_or_else(|| vortex_err!("checked non-null bool scalar"))?; + partial.update_value(value); + } + DType::Primitive(..) => update_primitive(partial, c.scalar().as_primitive())?, + DType::Utf8(_) => { + let value = c + .scalar() + .as_utf8() + .value() + .ok_or_else(|| vortex_err!("checked non-null UTF-8 scalar"))?; + partial.update_value(value.as_bytes()); + } + DType::Binary(_) => { + let value = c + .scalar() + .as_binary() + .value() + .ok_or_else(|| vortex_err!("checked non-null binary scalar"))?; + partial.update_value(value.as_slice()); + } + DType::Null => {} + _ => vortex_bail!("Unsupported constant dtype for HLL: {}", c.scalar().dtype()), + } + Ok(()) + } + Columnar::Canonical(c) => match c { + Canonical::Null(_) => Ok(()), + Canonical::Bool(array) => accumulate_bool(partial, array, ctx), + Canonical::Primitive(array) => accumulate_primitive(partial, array, ctx), + Canonical::VarBinView(array) => accumulate_varbinview(partial, array, ctx), + Canonical::Decimal(_) + | Canonical::Extension(_) + | Canonical::Struct(_) + | Canonical::List(_) + | Canonical::FixedSizeList(_) + | Canonical::Variant(_) => { + vortex_bail!("Unsupported canonical type for HLL: {}", c.dtype()) + } + }, + } + } + + fn finalize(&self, partials: ArrayRef) -> VortexResult { + Ok(partials) + } + + fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult { + self.to_scalar(partial) + } +} + +#[cfg(test)] +mod tests { + use datasketches::hll::HllSketch as DatasketchesHllSketch; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::ConstantArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::session::ArraySession; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use super::*; + + fn estimate_scalar(scalar: Scalar) -> VortexResult { + let bytes = scalar + .as_binary() + .value() + .ok_or_else(|| vortex_err!("HLL result must be non-null"))?; + estimate(bytes.as_slice()) + } + + fn assert_estimate_near(actual: f64, expected: f64) { + assert!( + (actual - expected).abs() < 0.000001, + "expected estimate near {expected}, got {actual}" + ); + } + + #[test] + fn hll_skips_nulls() -> VortexResult<()> { + let session = VortexSession::empty().with::(); + let mut ctx = session.create_execution_ctx(); + let array = PrimitiveArray::from_option_iter([Some(1i32), None, Some(1), Some(2), None]) + .into_array(); + + let result = hll(&array, HllOptions::default(), &mut ctx)?; + + assert_estimate_near(estimate_scalar(result)?, 2.0); + Ok(()) + } + + #[test] + fn hll_supports_bool_and_varbin() -> VortexResult<()> { + let session = VortexSession::empty().with::(); + let mut ctx = session.create_execution_ctx(); + + let bools = BoolArray::from_iter([Some(true), None, Some(false), Some(true)]).into_array(); + assert_estimate_near( + estimate_scalar(hll(&bools, HllOptions::default(), &mut ctx)?)?, + 2.0, + ); + + let strings = VarBinViewArray::from_iter_nullable_str([ + Some("alpha"), + None, + Some("beta"), + Some("alpha"), + ]) + .into_array(); + assert_estimate_near( + estimate_scalar(hll(&strings, HllOptions::default(), &mut ctx)?)?, + 2.0, + ); + Ok(()) + } + + #[test] + fn hll_combines_partial_sketches() -> VortexResult<()> { + let dtype = DType::Primitive(PType::I32, Nullability::Nullable); + let mut partial = Hll.empty_partial(&HllOptions::default(), &dtype)?; + + let mut left = DatasketchesHllSketch::new(DEFAULT_LG_K, HllType::Hll8); + left.update(1i32); + left.update(2i32); + Hll.combine_partials( + &mut partial, + Scalar::binary(left.serialize(), Nullability::NonNullable), + )?; + + let mut right = DatasketchesHllSketch::new(DEFAULT_LG_K, HllType::Hll8); + right.update(2i32); + right.update(3i32); + Hll.combine_partials( + &mut partial, + Scalar::binary(right.serialize(), Nullability::NonNullable), + )?; + + assert_estimate_near(estimate_scalar(Hll.finalize_scalar(&partial)?)?, 3.0); + Ok(()) + } + + #[test] + fn hll_hashes_constant_utf8_like_canonical_utf8() -> VortexResult<()> { + let session = VortexSession::empty().with::(); + let mut ctx = session.create_execution_ctx(); + let dtype = DType::Utf8(Nullability::NonNullable); + let mut partial = Hll.empty_partial(&HllOptions::default(), &dtype)?; + + let constant = + ConstantArray::new(Scalar::utf8("alpha", Nullability::NonNullable), 5).into_array(); + let constant_sketch = hll(&constant, HllOptions::default(), &mut ctx)?; + Hll.combine_partials(&mut partial, constant_sketch)?; + + let canonical = VarBinViewArray::from_iter_str(["alpha"]).into_array(); + let canonical_sketch = hll(&canonical, HllOptions::default(), &mut ctx)?; + Hll.combine_partials(&mut partial, canonical_sketch)?; + + assert_estimate_near(estimate_scalar(Hll.finalize_scalar(&partial)?)?, 1.0); + Ok(()) + } +} diff --git a/vortex-datasketches/src/hll/primitive.rs b/vortex-datasketches/src/hll/primitive.rs new file mode 100644 index 00000000000..eab21a27c44 --- /dev/null +++ b/vortex-datasketches/src/hll/primitive.rs @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ExecutionCtx; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::dtype::NativePType; +use vortex_array::match_each_native_ptype; +use vortex_array::scalar::PValue; +use vortex_array::scalar::PrimitiveScalar; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use super::HllPartial; + +pub(super) fn update_primitive( + partial: &mut HllPartial, + scalar: PrimitiveScalar<'_>, +) -> VortexResult<()> { + if let Some(value) = scalar.pvalue() { + partial.update_value(value); + } + Ok(()) +} + +pub(super) fn accumulate_primitive( + partial: &mut HllPartial, + array: &PrimitiveArray, + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + match_each_native_ptype!(array.ptype(), |T| { + accumulate_typed::(partial, array, ctx) + }) +} + +fn accumulate_typed( + partial: &mut HllPartial, + array: &PrimitiveArray, + ctx: &mut ExecutionCtx, +) -> VortexResult<()> +where + T: NativePType, + PValue: From, +{ + let values = array.as_slice::(); + match array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)? + { + Mask::AllTrue(_) => { + for &value in values { + partial.update_value(PValue::from(value)); + } + } + Mask::AllFalse(_) => {} + Mask::Values(validity) => { + for (&value, valid) in values.iter().zip(validity.bit_buffer().iter()) { + if valid { + partial.update_value(PValue::from(value)); + } + } + } + } + Ok(()) +} diff --git a/vortex-datasketches/src/hll/varbin.rs b/vortex-datasketches/src/hll/varbin.rs new file mode 100644 index 00000000000..50da73542f4 --- /dev/null +++ b/vortex-datasketches/src/hll/varbin.rs @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ExecutionCtx; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::varbinview::VarBinViewArrayExt; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use super::HllPartial; + +pub(super) fn accumulate_varbinview( + partial: &mut HllPartial, + array: &VarBinViewArray, + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + match array + .varbinview_validity() + .execute_mask(array.as_ref().len(), ctx)? + { + Mask::AllTrue(_) => { + for idx in 0..array.len() { + let value = array.bytes_at(idx); + partial.update_value(value.as_slice()); + } + } + Mask::AllFalse(_) => {} + Mask::Values(validity) => { + for (idx, valid) in validity.bit_buffer().iter().enumerate() { + if valid { + let value = array.bytes_at(idx); + partial.update_value(value.as_slice()); + } + } + } + } + Ok(()) +} diff --git a/vortex-datasketches/src/lib.rs b/vortex-datasketches/src/lib.rs new file mode 100644 index 00000000000..d992a350329 --- /dev/null +++ b/vortex-datasketches/src/lib.rs @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Apache DataSketches aggregate functions for Vortex. +//! +//! Aggregates in this crate return serialized Apache DataSketches sketches as non-null +//! `Binary` scalars. The serialized bytes are also the partial aggregate format, so partial +//! aggregate states can be merged by the same Vortex aggregate function. + +pub mod hll; +pub mod tdigest; + +pub use hll::Hll; +pub use hll::HllOptions; +pub use hll::HllTarget; +pub use hll::hll; +pub use tdigest::TDigest; +pub use tdigest::TDigestOptions; +pub use tdigest::tdigest; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_session::VortexSession; + +/// Initialize Apache DataSketches aggregate functions in the given Vortex session. +pub fn initialize(session: &VortexSession) { + session.aggregate_fns().register(Hll); + session.aggregate_fns().register(TDigest); +} + +#[cfg(test)] +mod tests { + use vortex_array::aggregate_fn::AggregateFnVTable; + use vortex_array::aggregate_fn::session::AggregateFnSessionExt; + use vortex_session::VortexSession; + + use crate::hll::Hll; + use crate::tdigest::TDigest; + + #[test] + fn initialize_registers_aggregate_functions() { + let session = VortexSession::empty(); + crate::initialize(&session); + + let registry = session.aggregate_fns().registry().clone(); + assert!(registry.find(&Hll.id()).is_some()); + assert!(registry.find(&TDigest.id()).is_some()); + } +} diff --git a/vortex-datasketches/src/tdigest/mod.rs b/vortex-datasketches/src/tdigest/mod.rs new file mode 100644 index 00000000000..452b04e3f68 --- /dev/null +++ b/vortex-datasketches/src/tdigest/mod.rs @@ -0,0 +1,305 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod primitive; + +use std::fmt; +use std::fmt::Display; +use std::fmt::Formatter; + +use datasketches::tdigest::TDigestMut; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::Columnar; +use vortex_array::ExecutionCtx; +use vortex_array::aggregate_fn::Accumulator; +use vortex_array::aggregate_fn::AggregateFnId; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::DynAccumulator; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_session::VortexSession; + +use self::primitive::accumulate_primitive; +use self::primitive::update_primitive; + +/// Default t-digest compression parameter used by [`TDigestOptions`]. +pub const DEFAULT_K: u16 = 200; + +const OPTIONS_VERSION: u8 = 1; + +/// Options for the `datasketches.tdigest` aggregate function. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct TDigestOptions { + /// Apache DataSketches t-digest compression parameter. Must be at least `10`. + pub k: u16, +} + +impl Default for TDigestOptions { + fn default() -> Self { + Self { k: DEFAULT_K } + } +} + +impl Display for TDigestOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "k={}", self.k) + } +} + +impl TDigestOptions { + fn validate(&self) -> VortexResult<()> { + vortex_ensure!( + self.k >= 10, + "t-digest k must be at least 10, got {}", + self.k + ); + Ok(()) + } +} + +/// Aggregate function vtable for Apache DataSketches t-digest sketches. +#[derive(Clone, Debug)] +pub struct TDigest; + +/// Partial accumulator state for [`TDigest`]. +pub struct TDigestPartial { + k: u16, + digest: TDigestMut, +} + +impl TDigestPartial { + fn update(&mut self, value: f64) { + self.digest.update(value); + } +} + +/// Build a serialized Apache DataSketches t-digest sketch from a numeric array. +pub fn tdigest( + array: &ArrayRef, + options: TDigestOptions, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let mut acc = Accumulator::try_new(TDigest, options, array.dtype().clone())?; + acc.accumulate(array, ctx)?; + acc.finish() +} + +/// Return the approximate quantile from a serialized t-digest sketch. +pub fn quantile(bytes: &[u8], rank: f64) -> VortexResult> { + let digest = TDigestMut::deserialize(bytes, false) + .map_err(|err| vortex_err!("Failed to deserialize t-digest sketch: {err}"))?; + Ok(digest.freeze().quantile(rank)) +} + +impl AggregateFnVTable for TDigest { + type Options = TDigestOptions; + type Partial = TDigestPartial; + + fn id(&self) -> AggregateFnId { + AggregateFnId::new("datasketches.tdigest") + } + + fn serialize(&self, options: &Self::Options) -> VortexResult>> { + options.validate()?; + let mut bytes = Vec::with_capacity(3); + bytes.push(OPTIONS_VERSION); + bytes.extend_from_slice(&options.k.to_le_bytes()); + Ok(Some(bytes)) + } + + fn deserialize( + &self, + metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + vortex_ensure!( + metadata.len() == 3, + "Invalid t-digest options metadata length: expected 3, got {}", + metadata.len() + ); + vortex_ensure!( + metadata[0] == OPTIONS_VERSION, + "Unsupported t-digest options metadata version: {}", + metadata[0] + ); + let options = TDigestOptions { + k: u16::from_le_bytes([metadata[1], metadata[2]]), + }; + options.validate()?; + Ok(options) + } + + fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option { + if options.validate().is_err() { + return None; + } + match input_dtype { + DType::Null | DType::Primitive(..) => Some(DType::Binary(Nullability::NonNullable)), + _ => None, + } + } + + fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option { + self.return_dtype(options, input_dtype) + } + + fn empty_partial( + &self, + options: &Self::Options, + _input_dtype: &DType, + ) -> VortexResult { + options.validate()?; + Ok(TDigestPartial { + k: options.k, + digest: TDigestMut::try_new(options.k) + .map_err(|err| vortex_err!("Failed to create t-digest: {err}"))?, + }) + } + + fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> { + if other.is_null() { + return Ok(()); + } + + let bytes = other + .as_binary() + .value() + .ok_or_else(|| vortex_err!("t-digest partial scalar must be non-null"))?; + let other = TDigestMut::deserialize(bytes.as_slice(), false) + .map_err(|err| vortex_err!("Failed to deserialize t-digest partial: {err}"))?; + partial.digest.merge(&other); + Ok(()) + } + + fn to_scalar(&self, partial: &Self::Partial) -> VortexResult { + let mut digest = partial.digest.clone(); + Ok(Scalar::binary(digest.serialize(), Nullability::NonNullable)) + } + + fn reset(&self, partial: &mut Self::Partial) { + partial.digest = TDigestMut::new(partial.k); + } + + fn is_saturated(&self, _state: &Self::Partial) -> bool { + false + } + + fn accumulate( + &self, + partial: &mut Self::Partial, + batch: &Columnar, + ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + match batch { + Columnar::Constant(c) => { + if c.scalar().is_null() { + return Ok(()); + } + match c.scalar().dtype() { + DType::Primitive(..) => { + for _ in 0..c.len() { + update_primitive(partial, c.scalar().as_primitive())?; + } + Ok(()) + } + DType::Null => Ok(()), + _ => vortex_bail!( + "Unsupported constant dtype for t-digest: {}", + c.scalar().dtype() + ), + } + } + Columnar::Canonical(c) => match c { + Canonical::Null(_) => Ok(()), + Canonical::Primitive(array) => accumulate_primitive(partial, array, ctx), + Canonical::Bool(_) + | Canonical::Decimal(_) + | Canonical::Extension(_) + | Canonical::VarBinView(_) + | Canonical::Struct(_) + | Canonical::List(_) + | Canonical::FixedSizeList(_) + | Canonical::Variant(_) => { + vortex_bail!("Unsupported canonical type for t-digest: {}", c.dtype()) + } + }, + } + } + + fn finalize(&self, partials: ArrayRef) -> VortexResult { + Ok(partials) + } + + fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult { + self.to_scalar(partial) + } +} + +#[cfg(test)] +mod tests { + use datasketches::tdigest::TDigestMut; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::session::ArraySession; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use super::*; + + fn quantile_scalar(scalar: Scalar, rank: f64) -> VortexResult> { + let bytes = scalar + .as_binary() + .value() + .ok_or_else(|| vortex_err!("t-digest result must be non-null"))?; + quantile(bytes.as_slice(), rank) + } + + #[test] + fn tdigest_skips_nulls() -> VortexResult<()> { + let session = VortexSession::empty().with::(); + let mut ctx = session.create_execution_ctx(); + let array = + PrimitiveArray::from_option_iter([Some(1.0f64), None, Some(2.0), Some(3.0), None]) + .into_array(); + + let result = tdigest(&array, TDigestOptions::default(), &mut ctx)?; + + assert_eq!(quantile_scalar(result, 0.0)?, Some(1.0)); + Ok(()) + } + + #[test] + fn tdigest_combines_partial_sketches() -> VortexResult<()> { + let dtype = DType::Primitive(PType::F64, Nullability::Nullable); + let mut partial = TDigest.empty_partial(&TDigestOptions::default(), &dtype)?; + + let mut left = TDigestMut::new(DEFAULT_K); + left.update(1.0); + left.update(2.0); + TDigest.combine_partials( + &mut partial, + Scalar::binary(left.serialize(), Nullability::NonNullable), + )?; + + let mut right = TDigestMut::new(DEFAULT_K); + right.update(3.0); + TDigest.combine_partials( + &mut partial, + Scalar::binary(right.serialize(), Nullability::NonNullable), + )?; + + let result = TDigest.finalize_scalar(&partial)?; + assert_eq!(quantile_scalar(result, 0.0)?, Some(1.0)); + Ok(()) + } +} diff --git a/vortex-datasketches/src/tdigest/primitive.rs b/vortex-datasketches/src/tdigest/primitive.rs new file mode 100644 index 00000000000..bde159f2634 --- /dev/null +++ b/vortex-datasketches/src/tdigest/primitive.rs @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use num_traits::ToPrimitive; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::dtype::NativePType; +use vortex_array::match_each_native_ptype; +use vortex_array::scalar::PrimitiveScalar; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use super::TDigestPartial; + +pub(super) fn update_primitive( + partial: &mut TDigestPartial, + scalar: PrimitiveScalar<'_>, +) -> VortexResult<()> { + if let Some(value) = scalar.pvalue() { + partial.update(value.cast::()?); + } + Ok(()) +} + +pub(super) fn accumulate_primitive( + partial: &mut TDigestPartial, + array: &PrimitiveArray, + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + match_each_native_ptype!(array.ptype(), |T| { + accumulate_typed::(partial, array, ctx) + }) +} + +fn accumulate_typed( + partial: &mut TDigestPartial, + array: &PrimitiveArray, + ctx: &mut ExecutionCtx, +) -> VortexResult<()> +where + T: NativePType + ToPrimitive, +{ + let values = array.as_slice::(); + match array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)? + { + Mask::AllTrue(_) => { + for &value in values { + partial.update(value.to_f64().vortex_expect("primitive converts to f64")); + } + } + Mask::AllFalse(_) => {} + Mask::Values(validity) => { + for (&value, valid) in values.iter().zip(validity.bit_buffer().iter()) { + if valid { + partial.update(value.to_f64().vortex_expect("primitive converts to f64")); + } + } + } + } + Ok(()) +}