Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions datasketches/src/bloom/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use std::hash::Hasher;

use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::codec::family::Family;
use crate::error::Error;
use crate::hash::XxHash64;

// Serialization constants
const PREAMBLE_LONGS_EMPTY: u8 = 3;
const PREAMBLE_LONGS_STANDARD: u8 = 4;
const BLOOM_FAMILY_ID: u8 = 21; // Bloom filter family ID
const SERIAL_VERSION: u8 = 1;
const EMPTY_FLAG_MASK: u8 = 1 << 2;

Expand Down Expand Up @@ -353,9 +351,9 @@ impl BloomFilter {
pub fn serialize(&self) -> Vec<u8> {
let is_empty = self.is_empty();
let preamble_longs = if is_empty {
PREAMBLE_LONGS_EMPTY
Family::BLOOMFILTER.min_pre_longs
} else {
PREAMBLE_LONGS_STANDARD
Family::BLOOMFILTER.max_pre_longs
};

let capacity = 8 * preamble_longs as usize
Expand All @@ -369,7 +367,7 @@ impl BloomFilter {
// Preamble
bytes.write_u8(preamble_longs); // Byte 0
bytes.write_u8(SERIAL_VERSION); // Byte 1
bytes.write_u8(BLOOM_FAMILY_ID); // Byte 2
bytes.write_u8(Family::BLOOMFILTER.id); // Byte 2
bytes.write_u8(if is_empty { EMPTY_FLAG_MASK } else { 0 }); // Byte 3: flags
bytes.write_u16_le(self.num_hashes); // Bytes 4-5
bytes.write_u16_le(0); // Bytes 6-7: unused
Expand Down Expand Up @@ -432,24 +430,22 @@ impl BloomFilter {
.map_err(|_| Error::insufficient_data("flags"))?;

// Validate
if family_id != BLOOM_FAMILY_ID {
return Err(Error::invalid_family(
BLOOM_FAMILY_ID,
family_id,
"BloomFilter",
));
}
Family::BLOOMFILTER.validate_id(family_id)?;
if serial_version != SERIAL_VERSION {
return Err(Error::unsupported_serial_version(
SERIAL_VERSION,
serial_version,
));
}
if preamble_longs != PREAMBLE_LONGS_EMPTY && preamble_longs != PREAMBLE_LONGS_STANDARD {
return Err(Error::invalid_preamble_longs(
PREAMBLE_LONGS_STANDARD,
preamble_longs,
));
if !(Family::BLOOMFILTER.min_pre_longs..=Family::BLOOMFILTER.max_pre_longs)
.contains(&preamble_longs)
{
return Err(Error::deserial(format!(
"invalid preamble longs: expected [{}, {}], got {}",
Family::BLOOMFILTER.min_pre_longs,
Family::BLOOMFILTER.max_pre_longs,
preamble_longs
)));
}

let is_empty = (flags & EMPTY_FLAG_MASK) != 0;
Expand Down
118 changes: 23 additions & 95 deletions datasketches/src/codec.rs → datasketches/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,226 +15,154 @@
// specific language governing permissions and limitations
// under the License.

#![allow(dead_code)]

use std::io;
use std::io::Cursor;
use std::io::Read;

pub(crate) struct SketchBytes {
bytes: Vec<u8>,
}

impl SketchBytes {
pub fn with_capacity(capacity: usize) -> Self {
Self {
bytes: Vec::with_capacity(capacity),
}
}

pub fn into_bytes(self) -> Vec<u8> {
self.bytes
}

pub fn write(&mut self, buf: &[u8]) {
self.bytes.extend_from_slice(buf);
}

pub fn write_u8(&mut self, n: u8) {
self.bytes.push(n);
}

pub fn write_i8(&mut self, n: i8) {
self.bytes.push(n as u8);
}

pub fn write_u16_le(&mut self, n: u16) {
self.write(&n.to_le_bytes());
}

pub fn write_u16_be(&mut self, n: u16) {
self.write(&n.to_be_bytes());
}

pub fn write_i16_le(&mut self, n: i16) {
self.write(&n.to_le_bytes());
}

pub fn write_i16_be(&mut self, n: i16) {
self.write(&n.to_be_bytes());
}

pub fn write_u32_le(&mut self, n: u32) {
self.write(&n.to_le_bytes());
}

pub fn write_u32_be(&mut self, n: u32) {
self.write(&n.to_be_bytes());
}

pub fn write_i32_le(&mut self, n: i32) {
self.write(&n.to_le_bytes());
}

pub fn write_i32_be(&mut self, n: i32) {
self.write(&n.to_be_bytes());
}

pub fn write_u64_le(&mut self, n: u64) {
self.write(&n.to_le_bytes());
}

pub fn write_u64_be(&mut self, n: u64) {
self.write(&n.to_be_bytes());
}

pub fn write_i64_le(&mut self, n: i64) {
self.write(&n.to_le_bytes());
}

pub fn write_i64_be(&mut self, n: i64) {
self.write(&n.to_be_bytes());
}

pub fn write_f32_le(&mut self, n: f32) {
self.write(&n.to_le_bytes());
}

pub fn write_f32_be(&mut self, n: f32) {
self.write(&n.to_be_bytes());
}

pub fn write_f64_le(&mut self, n: f64) {
self.write(&n.to_le_bytes());
}

pub fn write_f64_be(&mut self, n: f64) {
self.write(&n.to_be_bytes());
}
}

pub(crate) struct SketchSlice<'a> {
/// A wrapper around a byte slice that provides methods for reading various types of data from it.
pub struct SketchSlice<'a> {
slice: Cursor<&'a [u8]>,
}

impl SketchSlice<'_> {
/// Creates a new `SketchSlice` from the given byte slice.
pub fn new(slice: &[u8]) -> SketchSlice<'_> {
SketchSlice {
slice: Cursor::new(slice),
}
}

/// Advances the position of the slice by `n` bytes.
pub fn advance(&mut self, n: u64) {
let pos = self.slice.position();
self.slice.set_position(pos + n);
}

/// Reads exactly `buf.len()` bytes from the slice into `buf`.
pub fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
self.slice.read_exact(buf)
}

/// Reads a single byte from the slice and returns it as a `u8`.
pub fn read_u8(&mut self) -> io::Result<u8> {
let mut buf = [0u8; 1];
self.read_exact(&mut buf)?;
Ok(buf[0])
}

/// Reads a single byte from the slice and returns it as an `i8`.
pub fn read_i8(&mut self) -> io::Result<i8> {
let mut buf = [0u8; 1];
self.read_exact(&mut buf)?;
Ok(buf[0] as i8)
}

/// Reads a 16-bit unsigned integer from the slice in little-endian byte order.
pub fn read_u16_le(&mut self) -> io::Result<u16> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf)?;
Ok(u16::from_le_bytes(buf))
}

/// Reads a 16-bit unsigned integer from the slice in big-endian byte order.
pub fn read_u16_be(&mut self) -> io::Result<u16> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf)?;
Ok(u16::from_be_bytes(buf))
}

/// Reads a 16-bit signed integer from the slice in little-endian byte order.
pub fn read_i16_le(&mut self) -> io::Result<i16> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf)?;
Ok(i16::from_le_bytes(buf))
}

/// Reads a 16-bit signed integer from the slice in big-endian byte order.
pub fn read_i16_be(&mut self) -> io::Result<i16> {
let mut buf = [0u8; 2];
self.read_exact(&mut buf)?;
Ok(i16::from_be_bytes(buf))
}

/// Reads a 32-bit unsigned integer from the slice in little-endian byte order.
pub fn read_u32_le(&mut self) -> io::Result<u32> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf)?;
Ok(u32::from_le_bytes(buf))
}

/// Reads a 32-bit unsigned integer from the slice in big-endian byte order.
pub fn read_u32_be(&mut self) -> io::Result<u32> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf)?;
Ok(u32::from_be_bytes(buf))
}

/// Reads a 32-bit signed integer from the slice in little-endian byte order.
pub fn read_i32_le(&mut self) -> io::Result<i32> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf)?;
Ok(i32::from_le_bytes(buf))
}

/// Reads a 32-bit signed integer from the slice in big-endian byte order.
pub fn read_i32_be(&mut self) -> io::Result<i32> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf)?;
Ok(i32::from_be_bytes(buf))
}

/// Reads a 16-bit unsigned integer from the slice in little-endian byte order.
pub fn read_u64_le(&mut self) -> io::Result<u64> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf)?;
Ok(u64::from_le_bytes(buf))
}

/// Reads a 16-bit unsigned integer from the slice in big-endian byte order.
pub fn read_u64_be(&mut self) -> io::Result<u64> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf)?;
Ok(u64::from_be_bytes(buf))
}

/// Reads a 16-bit signed integer from the slice in little-endian byte order.
pub fn read_i64_le(&mut self) -> io::Result<i64> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf)?;
Ok(i64::from_le_bytes(buf))
}

/// Reads a 16-bit signed integer from the slice in big-endian byte order.
pub fn read_i64_be(&mut self) -> io::Result<i64> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf)?;
Ok(i64::from_be_bytes(buf))
}

/// Reads a 32-bit floating-point number from the slice in little-endian byte order.
pub fn read_f32_le(&mut self) -> io::Result<f32> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf)?;
Ok(f32::from_le_bytes(buf))
}

/// Reads a 32-bit floating-point number from the slice in big-endian byte order.
pub fn read_f32_be(&mut self) -> io::Result<f32> {
let mut buf = [0u8; 4];
self.read_exact(&mut buf)?;
Ok(f32::from_be_bytes(buf))
}

/// Reads a 64-bit floating-point number from the slice in little-endian byte order.
pub fn read_f64_le(&mut self) -> io::Result<f64> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf)?;
Ok(f64::from_le_bytes(buf))
}

/// Reads a 64-bit floating-point number from the slice in big-endian byte order.
pub fn read_f64_be(&mut self) -> io::Result<f64> {
let mut buf = [0u8; 8];
self.read_exact(&mut buf)?;
Expand Down
Loading