diff --git a/datasketches/src/theta/compact.rs b/datasketches/src/theta/compact.rs new file mode 100644 index 0000000..67e2d5e --- /dev/null +++ b/datasketches/src/theta/compact.rs @@ -0,0 +1,497 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Compact Theta sketch implementation +//! +//! A CompactThetaSketch is an immutable, serialized form of a Theta sketch. +//! It stores only the essential data needed for estimation and set operations: +//! - Theta value (sampling threshold) +//! - Sorted hash values +//! - Seed hash for validation +//! +//! This format is compatible with the Apache DataSketches "compact" format +//! used by Java, C++, and Python implementations. + +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; +use crate::common::NumStdDev; +use crate::common::binomial_bounds; +use crate::error::Error; +use crate::hash::DEFAULT_UPDATE_SEED; +use crate::hash::compute_seed_hash; +use crate::theta::hash_table::MAX_THETA; +use crate::theta::serialization::*; + +/// A compact, immutable Theta sketch. +/// +/// This is the serialized form of a Theta sketch, optimized for storage and +/// transmission. It contains sorted hash values and can be used for: +/// - Cardinality estimation +/// - Set operations (union, intersection, difference) +/// - Serialization to/from bytes +/// +/// Unlike [`ThetaSketch`](super::ThetaSketch), this sketch cannot be updated +/// with new values. +/// +/// # Example +/// +/// ``` +/// use datasketches::theta::CompactThetaSketch; +/// use datasketches::theta::ThetaSketch; +/// +/// let mut sketch = ThetaSketch::builder().build(); +/// sketch.update("apple"); +/// sketch.update("banana"); +/// +/// // Convert to compact form for serialization +/// let compact = sketch.compact(); +/// let bytes = compact.serialize(); +/// +/// // Deserialize +/// let restored = CompactThetaSketch::deserialize(&bytes).unwrap(); +/// assert_eq!(compact.estimate(), restored.estimate()); +/// ``` +#[derive(Debug, Clone)] +pub struct CompactThetaSketch { + theta: u64, + entries: Vec, + seed_hash: u16, + is_empty: bool, +} + +impl CompactThetaSketch { + /// Create a new compact sketch from components + pub(crate) fn new(theta: u64, entries: Vec, seed_hash: u16, is_empty: bool) -> Self { + Self { + theta, + entries, + seed_hash, + is_empty, + } + } + + /// Check if the sketch is empty (no values have been added) + pub fn is_empty(&self) -> bool { + self.is_empty + } + + /// Get the cardinality estimate + /// + /// Returns the estimated number of distinct values that were inserted + /// into the original sketch. + pub fn estimate(&self) -> f64 { + if self.is_empty { + return 0.0; + } + let num_retained = self.entries.len() as f64; + let theta_fraction = self.theta as f64 / MAX_THETA as f64; + num_retained / theta_fraction + } + + /// Return theta as a fraction (0.0 to 1.0) + pub fn theta(&self) -> f64 { + self.theta as f64 / MAX_THETA as f64 + } + + /// Return theta as u64 + pub fn theta64(&self) -> u64 { + self.theta + } + + /// Check if sketch is in estimation mode + pub fn is_estimation_mode(&self) -> bool { + self.theta < MAX_THETA + } + + /// Return number of retained entries + pub fn num_retained(&self) -> usize { + self.entries.len() + } + + /// Return iterator over hash values + pub fn iter(&self) -> impl Iterator + '_ { + self.entries.iter().copied() + } + + /// Get the seed hash + pub fn seed_hash(&self) -> u16 { + self.seed_hash + } + + /// Returns the approximate lower error bound given the specified number of Standard Deviations. + pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 { + if self.is_empty { + return 0.0; + } + if !self.is_estimation_mode() { + return self.num_retained() as f64; + } + binomial_bounds::lower_bound(self.num_retained() as u64, self.theta(), num_std_dev) + .expect("theta should always be valid") + } + + /// Returns the approximate upper error bound given the specified number of Standard Deviations. + pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 { + if self.is_empty { + return 0.0; + } + if !self.is_estimation_mode() { + return self.num_retained() as f64; + } + binomial_bounds::upper_bound( + self.num_retained() as u64, + self.theta(), + num_std_dev, + self.is_empty, + ) + .expect("theta should always be valid") + } + + /// Serialize the compact sketch to bytes + /// + /// The format is compatible with the Apache DataSketches compact sketch format. + /// + /// # Example + /// + /// ``` + /// use datasketches::theta::CompactThetaSketch; + /// use datasketches::theta::ThetaSketch; + /// + /// let mut sketch = ThetaSketch::builder().build(); + /// sketch.update("test"); + /// let compact = sketch.compact(); + /// let bytes = compact.serialize(); + /// assert!(!bytes.is_empty()); + /// ``` + pub fn serialize(&self) -> Vec { + let is_estimation_mode = self.is_estimation_mode(); + let num_entries = self.entries.len(); + + let preamble_longs = if self.is_empty { + PREAMBLE_LONGS_EMPTY + } else if is_estimation_mode { + PREAMBLE_LONGS_ESTIMATION + } else { + PREAMBLE_LONGS_EXACT + }; + + let preamble_bytes = (preamble_longs as usize) * 8; + let total_size = preamble_bytes + num_entries * HASH_SIZE_BYTES; + let mut bytes = SketchBytes::with_capacity(total_size); + + bytes.write_u8(preamble_longs); + bytes.write_u8(SERIAL_VERSION); + bytes.write_u8(THETA_FAMILY_ID); + bytes.write_u8(0); + bytes.write_u8(0); + + let mut flags = FLAG_READ_ONLY | FLAG_COMPACT | FLAG_ORDERED; + if self.is_empty { + flags |= FLAG_EMPTY; + } + bytes.write_u8(flags); + bytes.write_u16_le(self.seed_hash); + + if preamble_longs >= PREAMBLE_LONGS_EXACT { + bytes.write_u32_le(num_entries as u32); + bytes.write_u32_le(DEFAULT_P_FLOAT_BITS); + } + + if preamble_longs >= PREAMBLE_LONGS_ESTIMATION { + bytes.write_u64_le(self.theta); + } + + for hash in &self.entries { + bytes.write_u64_le(*hash); + } + + bytes.into_bytes() + } + + /// Deserialize a compact sketch from bytes + /// + /// Uses the default seed for validation. + /// + /// # Example + /// + /// ``` + /// use datasketches::theta::CompactThetaSketch; + /// use datasketches::theta::ThetaSketch; + /// + /// let mut sketch = ThetaSketch::builder().build(); + /// sketch.update("test"); + /// let compact = sketch.compact(); + /// let bytes = compact.serialize(); + /// + /// let restored = CompactThetaSketch::deserialize(&bytes).unwrap(); + /// assert_eq!(compact.estimate(), restored.estimate()); + /// ``` + pub fn deserialize(bytes: &[u8]) -> Result { + Self::deserialize_with_seed(bytes, DEFAULT_UPDATE_SEED) + } + + /// Deserialize a compact sketch from bytes with a specific seed + /// + /// # Errors + /// + /// Returns an error if: + /// - The data is too short + /// - The family ID doesn't match + /// - The serial version is unsupported + /// - The seed hash doesn't match + pub fn deserialize_with_seed(bytes: &[u8], seed: u64) -> Result { + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) + } + + let mut cursor = SketchSlice::new(bytes); + + let preamble_longs = cursor.read_u8().map_err(make_error("preamble_longs"))?; + let serial_version = cursor.read_u8().map_err(make_error("serial_version"))?; + let family_id = cursor.read_u8().map_err(make_error("family_id"))?; + let _lg_k = cursor.read_u8().map_err(make_error("lg_k"))?; + let _lg_resize = cursor.read_u8().map_err(make_error("lg_resize"))?; + let flags = cursor.read_u8().map_err(make_error("flags"))?; + let seed_hash = cursor.read_u16_le().map_err(make_error("seed_hash"))?; + + if family_id != THETA_FAMILY_ID { + return Err(Error::invalid_family(THETA_FAMILY_ID, family_id, "Theta")); + } + if serial_version != SERIAL_VERSION { + return Err(Error::unsupported_serial_version( + SERIAL_VERSION, + serial_version, + )); + } + + // Validate seed hash (seed_hash = 0 means legacy format, skip validation) + let expected_seed_hash = compute_seed_hash(seed); + if seed_hash != 0 && seed_hash != expected_seed_hash { + return Err(Error::deserial(format!( + "seed hash mismatch: expected {expected_seed_hash}, got {seed_hash}" + ))); + } + let seed_hash = if seed_hash == 0 { + expected_seed_hash + } else { + seed_hash + }; + + let is_empty = (flags & FLAG_EMPTY) != 0; + let is_compact = (flags & FLAG_COMPACT) != 0; + let is_single_item = (flags & FLAG_SINGLE_ITEM) != 0; + + if !is_compact { + return Err(Error::deserial( + "only compact sketches are supported".to_string(), + )); + } + + if is_empty { + return Ok(Self { + theta: MAX_THETA, + entries: Vec::new(), + seed_hash, + is_empty: true, + }); + } + + // Handle single-item format: preamble_longs = 1 with exactly one hash entry + if preamble_longs == PREAMBLE_LONGS_EMPTY && is_single_item { + let hash = cursor + .read_u64_le() + .map_err(make_error("single_item_hash"))?; + return Ok(Self { + theta: MAX_THETA, + entries: vec![hash], + seed_hash, + is_empty: false, + }); + } + + if preamble_longs < PREAMBLE_LONGS_EXACT { + return Err(Error::deserial(format!( + "non-empty sketch requires at least {PREAMBLE_LONGS_EXACT} preamble longs, got {preamble_longs}" + ))); + } + + let num_entries = cursor.read_u32_le().map_err(make_error("num_entries"))? as usize; + let _p = cursor.read_u32_le().map_err(make_error("p"))?; + + let theta = if preamble_longs >= PREAMBLE_LONGS_ESTIMATION { + cursor.read_u64_le().map_err(make_error("theta"))? + } else { + MAX_THETA + }; + + let mut entries = Vec::with_capacity(num_entries); + for i in 0..num_entries { + let hash = cursor.read_u64_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {num_entries} entries, failed at index {i}" + )) + })?; + entries.push(hash); + } + + Ok(Self { + theta, + entries, + seed_hash, + is_empty: false, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_compact_sketch() { + let sketch = CompactThetaSketch::new( + MAX_THETA, + Vec::new(), + compute_seed_hash(DEFAULT_UPDATE_SEED), + true, + ); + assert!(sketch.is_empty()); + assert_eq!(sketch.estimate(), 0.0); + assert_eq!(sketch.num_retained(), 0); + assert!(!sketch.is_estimation_mode()); + } + + #[test] + fn test_compact_sketch_with_entries() { + let entries = vec![100, 200, 300]; + let sketch = CompactThetaSketch::new( + MAX_THETA, + entries.clone(), + compute_seed_hash(DEFAULT_UPDATE_SEED), + false, + ); + assert!(!sketch.is_empty()); + assert_eq!(sketch.num_retained(), 3); + assert_eq!(sketch.estimate(), 3.0); + assert!(!sketch.is_estimation_mode()); + } + + #[test] + fn test_compact_sketch_estimation_mode() { + let entries = vec![100, 200, 300]; + let theta = MAX_THETA / 2; // Half of max theta + let sketch = CompactThetaSketch::new( + theta, + entries, + compute_seed_hash(DEFAULT_UPDATE_SEED), + false, + ); + assert!(sketch.is_estimation_mode()); + assert!(sketch.estimate() > 3.0); // Should be approximately 6.0 + } + + #[test] + fn test_serialize_deserialize_empty() { + let sketch = CompactThetaSketch::new( + MAX_THETA, + Vec::new(), + compute_seed_hash(DEFAULT_UPDATE_SEED), + true, + ); + let bytes = sketch.serialize(); + let restored = CompactThetaSketch::deserialize(&bytes).unwrap(); + + assert!(restored.is_empty()); + assert_eq!(sketch.theta64(), restored.theta64()); + assert_eq!(sketch.seed_hash(), restored.seed_hash()); + } + + #[test] + fn test_serialize_deserialize_exact_mode() { + let entries = vec![100, 200, 300, 400, 500]; + let sketch = CompactThetaSketch::new( + MAX_THETA, + entries.clone(), + compute_seed_hash(DEFAULT_UPDATE_SEED), + false, + ); + let bytes = sketch.serialize(); + let restored = CompactThetaSketch::deserialize(&bytes).unwrap(); + + assert!(!restored.is_empty()); + assert!(!restored.is_estimation_mode()); + assert_eq!(sketch.num_retained(), restored.num_retained()); + assert_eq!(sketch.estimate(), restored.estimate()); + assert_eq!(sketch.theta64(), restored.theta64()); + + // Verify all entries match + let restored_entries: Vec = restored.iter().collect(); + assert_eq!(entries, restored_entries); + } + + #[test] + fn test_serialize_deserialize_estimation_mode() { + let entries = vec![100, 200, 300]; + let theta = MAX_THETA / 2; + let sketch = CompactThetaSketch::new( + theta, + entries.clone(), + compute_seed_hash(DEFAULT_UPDATE_SEED), + false, + ); + let bytes = sketch.serialize(); + let restored = CompactThetaSketch::deserialize(&bytes).unwrap(); + + assert!(!restored.is_empty()); + assert!(restored.is_estimation_mode()); + assert_eq!(sketch.num_retained(), restored.num_retained()); + assert_eq!(sketch.estimate(), restored.estimate()); + assert_eq!(sketch.theta64(), restored.theta64()); + } + + #[test] + fn test_deserialize_invalid_family() { + let mut bytes = vec![ + 1, + SERIAL_VERSION, + 99, + 0, + 0, + FLAG_EMPTY | FLAG_COMPACT | FLAG_ORDERED, + ]; + bytes.extend_from_slice(&compute_seed_hash(DEFAULT_UPDATE_SEED).to_le_bytes()); + + let result = CompactThetaSketch::deserialize(&bytes); + assert!(result.is_err()); + } + + #[test] + fn test_deserialize_invalid_seed() { + let mut bytes = vec![ + 1, + SERIAL_VERSION, + THETA_FAMILY_ID, + 0, + 0, + FLAG_EMPTY | FLAG_COMPACT | FLAG_ORDERED, + ]; + bytes.extend_from_slice(&9999u16.to_le_bytes()); // Wrong seed hash + + let result = CompactThetaSketch::deserialize(&bytes); + assert!(result.is_err()); + } +} diff --git a/datasketches/src/theta/hash_table.rs b/datasketches/src/theta/hash_table.rs index ecba0fa..3d4a961 100644 --- a/datasketches/src/theta/hash_table.rs +++ b/datasketches/src/theta/hash_table.rs @@ -19,6 +19,7 @@ use std::hash::Hash; use crate::common::ResizeFactor; use crate::hash::MurmurHash3X64128; +use crate::hash::compute_seed_hash; /// Maximum theta value (signed max for compatibility with Java) pub const MAX_THETA: u64 = i64::MAX as u64; @@ -59,6 +60,7 @@ pub(crate) struct ThetaHashTable { resize_factor: ResizeFactor, sampling_probability: f32, hash_seed: u64, + seed_hash: u16, theta: u64, @@ -87,6 +89,7 @@ impl ThetaHashTable { sampling_probability, theta: starting_theta_from_sampling_probability(sampling_probability), hash_seed, + seed_hash: compute_seed_hash(hash_seed), entries, num_entries: 0, } @@ -296,6 +299,11 @@ impl ThetaHashTable { self.lg_nom_size } + /// Get seed hash + pub fn seed_hash(&self) -> u16 { + self.seed_hash + } + /// Get stride for hash table probing fn get_stride(key: u64, lg_size: u8) -> usize { (2 * ((key >> (lg_size)) & STRIDE_MASK) + 1) as usize diff --git a/datasketches/src/theta/mod.rs b/datasketches/src/theta/mod.rs index ccaac52..71c6a52 100644 --- a/datasketches/src/theta/mod.rs +++ b/datasketches/src/theta/mod.rs @@ -28,6 +28,7 @@ //! configurable accuracy and memory usage. The implementation supports: //! //! - **ThetaSketch**: Mutable sketch for building from input data +//! - **CompactThetaSketch**: Immutable sketch for serialization and set operations //! //! # Usage //! @@ -37,9 +38,30 @@ //! sketch.update("apple"); //! assert!(sketch.estimate() >= 1.0); //! ``` +//! +//! # Serialization +//! +//! Theta sketches can be serialized to a compact binary format that is compatible +//! with the Java and C++ DataSketches implementations: +//! +//! ```rust +//! # use datasketches::theta::{ThetaSketch, CompactThetaSketch}; +//! let mut sketch = ThetaSketch::builder().build(); +//! sketch.update("apple"); +//! +//! // Serialize to bytes +//! let bytes = sketch.serialize(); +//! +//! // Deserialize +//! let restored = CompactThetaSketch::deserialize(&bytes).unwrap(); +//! assert_eq!(sketch.estimate(), restored.estimate()); +//! ``` +mod compact; mod hash_table; +mod serialization; mod sketch; +pub use self::compact::CompactThetaSketch; pub use self::sketch::ThetaSketch; pub use self::sketch::ThetaSketchBuilder; diff --git a/datasketches/src/theta/serialization.rs b/datasketches/src/theta/serialization.rs new file mode 100644 index 0000000..1f33709 --- /dev/null +++ b/datasketches/src/theta/serialization.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary serialization format constants for Theta sketches +//! +//! This module contains all constants related to the Apache DataSketches +//! binary serialization format for Theta sketches. The format is compatible +//! with the Java and C++ implementations. +//! +//! # Compact Sketch Binary Format +//! +//! The compact theta sketch format stores sorted hash values with a minimal preamble. +//! +//! ## Preamble Layout (Little Endian) +//! +//! | Byte | Field | Description | +//! |------|-------|-------------| +//! | 0 | preamble_longs | Number of 8-byte longs in preamble (1, 2, or 3) | +//! | 1 | serial_version | Serialization version (currently 3) | +//! | 2 | family_id | Family ID (3 for Theta) | +//! | 3 | lg_k | Log2 of nominal entries | +//! | 4 | lg_resize | Unused in compact format (0) | +//! | 5 | flags | Bit flags (see below) | +//! | 6-7 | seed_hash | 16-bit hash of the seed | +//! +//! If preamble_longs >= 2: +//! | Byte 8-11 | retained_entries | Number of hash values stored | +//! | Byte 12-15 | p | Sampling probability as float (unused in compact, set to 1.0) | +//! +//! If preamble_longs >= 3: +//! | Byte 16-23 | theta | Theta value as 64-bit integer | +//! +//! ## Flags (Byte 5) +//! +//! | Bit | Name | Description | +//! |-----|------|-------------| +//! | 0 | BIG_ENDIAN | Not used (always 0 for little endian) | +//! | 1 | READ_ONLY | Sketch is read-only (always 1 for compact) | +//! | 2 | EMPTY | Sketch is empty | +//! | 3 | COMPACT | Sketch is in compact form (always 1) | +//! | 4 | ORDERED | Hash values are sorted (always 1 for compact) | + +pub const THETA_FAMILY_ID: u8 = 3; +pub const SERIAL_VERSION: u8 = 3; + +pub const FLAG_READ_ONLY: u8 = 1 << 1; +pub const FLAG_EMPTY: u8 = 1 << 2; +pub const FLAG_COMPACT: u8 = 1 << 3; +pub const FLAG_ORDERED: u8 = 1 << 4; +pub const FLAG_SINGLE_ITEM: u8 = 1 << 5; + +pub const PREAMBLE_LONGS_EMPTY: u8 = 1; +pub const PREAMBLE_LONGS_EXACT: u8 = 2; +pub const PREAMBLE_LONGS_ESTIMATION: u8 = 3; + +pub const HASH_SIZE_BYTES: usize = 8; +pub const DEFAULT_P_FLOAT_BITS: u32 = 0x3F80_0000; diff --git a/datasketches/src/theta/sketch.rs b/datasketches/src/theta/sketch.rs index 07110d4..715dc66 100644 --- a/datasketches/src/theta/sketch.rs +++ b/datasketches/src/theta/sketch.rs @@ -26,7 +26,9 @@ use crate::common::NumStdDev; use crate::common::ResizeFactor; use crate::common::binomial_bounds; use crate::common::canonical_double; +use crate::error::Error; use crate::hash::DEFAULT_UPDATE_SEED; +use crate::theta::compact::CompactThetaSketch; use crate::theta::hash_table::DEFAULT_LG_K; use crate::theta::hash_table::MAX_LG_K; use crate::theta::hash_table::MAX_THETA; @@ -248,6 +250,92 @@ impl ThetaSketch { ) .expect("theta should always be valid") } + + /// Convert to a compact, immutable form suitable for serialization. + /// + /// The compact form stores only the sorted hash values and theta, + /// making it smaller and suitable for storage or transmission. + /// + /// # Examples + /// + /// ``` + /// use datasketches::theta::ThetaSketch; + /// + /// let mut sketch = ThetaSketch::builder().build(); + /// sketch.update("apple"); + /// sketch.update("banana"); + /// + /// let compact = sketch.compact(); + /// assert_eq!(compact.num_retained(), sketch.num_retained()); + /// assert_eq!(compact.estimate(), sketch.estimate()); + /// ``` + pub fn compact(&self) -> CompactThetaSketch { + let mut entries: Vec = self.table.iter().collect(); + entries.sort_unstable(); + + CompactThetaSketch::new( + self.table.theta(), + entries, + self.table.seed_hash(), + self.is_empty(), + ) + } + + /// Serialize the sketch to bytes in compact format. + /// + /// This is equivalent to calling `self.compact().serialize()`. + /// + /// # Examples + /// + /// ``` + /// use datasketches::theta::ThetaSketch; + /// + /// let mut sketch = ThetaSketch::builder().build(); + /// sketch.update("test"); + /// let bytes = sketch.serialize(); + /// assert!(!bytes.is_empty()); + /// ``` + pub fn serialize(&self) -> Vec { + self.compact().serialize() + } + + /// Deserialize a compact sketch from bytes. + /// + /// Returns a [`CompactThetaSketch`] since the serialized form is immutable. + /// + /// # Examples + /// + /// ``` + /// use datasketches::theta::ThetaSketch; + /// + /// let mut sketch = ThetaSketch::builder().build(); + /// sketch.update("test"); + /// let bytes = sketch.serialize(); + /// + /// let restored = ThetaSketch::deserialize(&bytes).unwrap(); + /// assert_eq!(sketch.estimate(), restored.estimate()); + /// ``` + pub fn deserialize(bytes: &[u8]) -> Result { + CompactThetaSketch::deserialize(bytes) + } + + /// Deserialize a compact sketch from bytes with a specific seed. + /// + /// # Examples + /// + /// ``` + /// use datasketches::theta::ThetaSketch; + /// + /// let mut sketch = ThetaSketch::builder().seed(12345).build(); + /// sketch.update("test"); + /// let bytes = sketch.serialize(); + /// + /// let restored = ThetaSketch::deserialize_with_seed(&bytes, 12345).unwrap(); + /// assert_eq!(sketch.estimate(), restored.estimate()); + /// ``` + pub fn deserialize_with_seed(bytes: &[u8], seed: u64) -> Result { + CompactThetaSketch::deserialize_with_seed(bytes, seed) + } } /// Builder for ThetaSketch diff --git a/datasketches/tests/theta_serialization_test.rs b/datasketches/tests/theta_serialization_test.rs new file mode 100644 index 0000000..51e9962 --- /dev/null +++ b/datasketches/tests/theta_serialization_test.rs @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Theta Sketch Serialization Tests +//! +//! These tests verify binary serialization/deserialization for Theta sketches, +//! including cross-language compatibility with Java and C++ implementations. + +mod common; + +use std::fs; + +use common::serialization_test_data; +use datasketches::common::NumStdDev; +use datasketches::theta::CompactThetaSketch; +use datasketches::theta::ThetaSketch; + +#[test] +fn test_serialize_deserialize_empty_sketch() { + let sketch = ThetaSketch::builder().lg_k(12).build(); + assert!(sketch.is_empty()); + + let bytes = sketch.serialize(); + let restored = ThetaSketch::deserialize(&bytes).unwrap(); + + assert!(restored.is_empty()); + assert_eq!(sketch.estimate(), restored.estimate()); + assert_eq!(sketch.theta64(), restored.theta64()); +} + +#[test] +fn test_serialize_deserialize_single_value() { + let mut sketch = ThetaSketch::builder().lg_k(12).build(); + sketch.update("single_value"); + + let bytes = sketch.serialize(); + let restored = ThetaSketch::deserialize(&bytes).unwrap(); + + assert!(!restored.is_empty()); + assert_eq!(sketch.estimate(), restored.estimate()); + assert_eq!(sketch.num_retained(), restored.num_retained()); +} + +#[test] +fn test_serialize_deserialize_multiple_values() { + let mut sketch = ThetaSketch::builder().lg_k(12).build(); + for i in 0..100 { + sketch.update(format!("value_{}", i)); + } + + let bytes = sketch.serialize(); + let restored = ThetaSketch::deserialize(&bytes).unwrap(); + + assert_eq!(sketch.num_retained(), restored.num_retained()); + assert_eq!(sketch.estimate(), restored.estimate()); + assert!(!restored.is_estimation_mode()); // 100 values shouldn't trigger estimation mode with lg_k=12 +} + +#[test] +fn test_serialize_deserialize_estimation_mode() { + let mut sketch = ThetaSketch::builder().lg_k(5).build(); // Small k to trigger estimation mode + + // Insert enough values to trigger estimation mode + for i in 0..1000 { + sketch.update(format!("value_{}", i)); + } + + assert!(sketch.is_estimation_mode()); + + let bytes = sketch.serialize(); + let restored = ThetaSketch::deserialize(&bytes).unwrap(); + + assert!(restored.is_estimation_mode()); + assert_eq!(sketch.theta64(), restored.theta64()); + assert_eq!(sketch.num_retained(), restored.num_retained()); + + // Estimates should be equal + let sketch_estimate = sketch.estimate(); + let restored_estimate = restored.estimate(); + assert!( + (sketch_estimate - restored_estimate).abs() < 0.001, + "Estimates differ: {} vs {}", + sketch_estimate, + restored_estimate + ); +} + +#[test] +fn test_serialize_deserialize_with_custom_seed() { + let custom_seed = 12345u64; + let mut sketch = ThetaSketch::builder().lg_k(12).seed(custom_seed).build(); + + for i in 0..50 { + sketch.update(i); + } + + let bytes = sketch.serialize(); + + // Should fail with wrong seed + let result = ThetaSketch::deserialize(&bytes); + assert!(result.is_err(), "Should fail with default seed"); + + // Should succeed with correct seed + let restored = ThetaSketch::deserialize_with_seed(&bytes, custom_seed).unwrap(); + assert_eq!(sketch.estimate(), restored.estimate()); +} + +#[test] +fn test_round_trip_preserves_entries() { + let mut sketch = ThetaSketch::builder().lg_k(12).build(); + for i in 0..50 { + sketch.update(format!("value_{}", i)); + } + + let original_entries: Vec = sketch.iter().collect(); + + let bytes = sketch.serialize(); + let restored = ThetaSketch::deserialize(&bytes).unwrap(); + + let restored_entries: Vec = restored.iter().collect(); + + // Since compact sketches store sorted entries, compare sorted + let mut original_sorted = original_entries.clone(); + original_sorted.sort_unstable(); + + assert_eq!(original_sorted, restored_entries); +} + +#[test] +fn test_compact_preserves_functionality() { + let mut sketch = ThetaSketch::builder().lg_k(10).build(); + for i in 0..500 { + sketch.update(i); + } + + let compact = sketch.compact(); + + // All functionality should work on compact sketch + assert_eq!(sketch.estimate(), compact.estimate()); + assert_eq!(sketch.theta(), compact.theta()); + assert_eq!(sketch.theta64(), compact.theta64()); + assert_eq!(sketch.is_empty(), compact.is_empty()); + assert_eq!(sketch.is_estimation_mode(), compact.is_estimation_mode()); + assert_eq!(sketch.num_retained(), compact.num_retained()); + + // Bounds should also match + assert_eq!( + sketch.lower_bound(NumStdDev::One), + compact.lower_bound(NumStdDev::One) + ); + assert_eq!( + sketch.upper_bound(NumStdDev::One), + compact.upper_bound(NumStdDev::One) + ); + assert_eq!( + sketch.lower_bound(NumStdDev::Two), + compact.lower_bound(NumStdDev::Two) + ); + assert_eq!( + sketch.upper_bound(NumStdDev::Two), + compact.upper_bound(NumStdDev::Two) + ); +} + +#[test] +fn test_serialization_size() { + // Empty sketch should be minimal + let empty_sketch = ThetaSketch::builder().build(); + let empty_bytes = empty_sketch.serialize(); + assert_eq!(empty_bytes.len(), 8); // 1 preamble long + + // Non-empty sketch in exact mode + let mut exact_sketch = ThetaSketch::builder().lg_k(12).build(); + for i in 0..10 { + exact_sketch.update(i); + } + let exact_bytes = exact_sketch.serialize(); + // 2 preamble longs (16 bytes) + 10 hash values (80 bytes) = 96 bytes + assert_eq!(exact_bytes.len(), 16 + 10 * 8); + + // Sketch in estimation mode + let mut estimation_sketch = ThetaSketch::builder().lg_k(5).build(); + for i in 0..1000 { + estimation_sketch.update(i); + } + let estimation_bytes = estimation_sketch.serialize(); + // 3 preamble longs (24 bytes) + entries * 8 bytes + let expected_size = 24 + estimation_sketch.num_retained() * 8; + assert_eq!(estimation_bytes.len(), expected_size); +} + +#[test] +fn test_deserialize_truncated_data() { + let mut sketch = ThetaSketch::builder().build(); + sketch.update("test"); + let bytes = sketch.serialize(); + + // Try to deserialize truncated data + for len in 0..bytes.len() - 1 { + let truncated = &bytes[..len]; + let result = CompactThetaSketch::deserialize(truncated); + assert!(result.is_err(), "Should fail with {} bytes", len); + } +} + +#[test] +fn test_multiple_serialization_round_trips() { + let mut sketch = ThetaSketch::builder().lg_k(10).build(); + for i in 0..100 { + sketch.update(i); + } + + let original_estimate = sketch.estimate(); + + // Multiple round trips should preserve data + let mut bytes = sketch.serialize(); + for _ in 0..5 { + let restored = CompactThetaSketch::deserialize(&bytes).unwrap(); + assert_eq!(original_estimate, restored.estimate()); + bytes = restored.serialize(); + } + + let final_sketch = CompactThetaSketch::deserialize(&bytes).unwrap(); + assert_eq!(original_estimate, final_sketch.estimate()); +} + +#[test] +fn test_different_lg_k_values() { + for lg_k in [5, 8, 10, 12, 16, 20] { + let mut sketch = ThetaSketch::builder().lg_k(lg_k).build(); + for i in 0..100 { + sketch.update(i); + } + + let bytes = sketch.serialize(); + let restored = ThetaSketch::deserialize(&bytes).unwrap(); + + assert_eq!( + sketch.estimate(), + restored.estimate(), + "Failed for lg_k={}", + lg_k + ); + } +} + +#[test] +fn test_sampling_probability_serialization() { + let mut sketch = ThetaSketch::builder() + .lg_k(12) + .sampling_probability(0.5) + .build(); + + for i in 0..1000 { + sketch.update(i); + } + + // Should be in estimation mode due to sampling + assert!(sketch.is_estimation_mode()); + + let bytes = sketch.serialize(); + let restored = ThetaSketch::deserialize(&bytes).unwrap(); + + assert!(restored.is_estimation_mode()); + assert_eq!(sketch.theta64(), restored.theta64()); +} + +// ============================================================================= +// Cross-language compatibility tests (Java) +// ============================================================================= + +#[test] +fn test_java_theta_compatibility() { + let test_cases = [0, 1, 10, 100, 1000, 10000, 100000, 1000000]; + for n in test_cases { + let filename = format!("theta_n{}_java.sk", n); + let path = serialization_test_data("java_generated_files", &filename); + let bytes = fs::read(&path).unwrap(); + let sketch = CompactThetaSketch::deserialize(&bytes).unwrap(); + + if n == 0 { + assert!(sketch.is_empty(), "Sketch should be empty for n=0"); + } else { + assert!(!sketch.is_empty(), "Sketch should not be empty for n={}", n); + let estimate = sketch.estimate(); + let error = (estimate - n as f64).abs() / n as f64; + assert!( + error <= 0.03, + "Estimate {} too far from expected {} (error: {:.2}%)", + estimate, + n, + error * 100.0 + ); + } + } +} + +#[test] +fn test_java_theta_non_empty_no_entries() { + let path = + serialization_test_data("java_generated_files", "theta_non_empty_no_entries_java.sk"); + let bytes = fs::read(&path).unwrap(); + let sketch = CompactThetaSketch::deserialize(&bytes).unwrap(); + + assert!(!sketch.is_empty()); + assert_eq!(sketch.num_retained(), 0); +}