diff --git a/Cargo.lock b/Cargo.lock index f3d1d80a2da..772cf25e47c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10191,6 +10191,7 @@ version = "0.1.0" dependencies = [ "codspeed-divan-compat", "fsst-rs", + "num-traits", "prost 0.14.3", "rand 0.9.2", "rstest", diff --git a/encodings/fsst/Cargo.toml b/encodings/fsst/Cargo.toml index eb08bbda959..819107c909e 100644 --- a/encodings/fsst/Cargo.toml +++ b/encodings/fsst/Cargo.toml @@ -18,6 +18,7 @@ workspace = true [dependencies] fsst-rs = { workspace = true } +num-traits.workspace = true prost = { workspace = true } rand = { workspace = true, optional = true } vortex-array = { workspace = true } @@ -55,5 +56,10 @@ name = "chunked_dict_fsst_builder" harness = false required-features = ["_test-harness"] +[[bench]] +name = "fsst_decompress" +harness = false +required-features = ["_test-harness"] + [package.metadata.cargo-machete] ignored = ["fsst-rs"] diff --git a/encodings/fsst/benches/fsst_decompress.rs b/encodings/fsst/benches/fsst_decompress.rs new file mode 100644 index 00000000000..add57912c72 --- /dev/null +++ b/encodings/fsst/benches/fsst_decompress.rs @@ -0,0 +1,476 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(clippy::unwrap_used)] + +use std::sync::LazyLock; + +use divan::Bencher; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN; +use vortex_array::arrays::varbinview::build_views::build_views; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::match_each_integer_ptype; +use vortex_array::session::ArraySession; +use vortex_buffer::ByteBufferMut; +use vortex_fsst::FSSTArray; +use vortex_fsst::canonical::VIEW_BUILD_PADDING; +use vortex_fsst::canonical::build_views_fast; +use vortex_fsst::decompressor::OptimizedDecompressor; +use vortex_fsst::fsst_compress; +use vortex_fsst::fsst_train_compressor; +use vortex_fsst::test_utils; +use vortex_session::VortexSession; + +fn main() { + divan::main(); +} + +// --------------------------------------------------------------------------- +// Session for executing lazy expressions +// --------------------------------------------------------------------------- + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + +// --------------------------------------------------------------------------- +// Data generators +// --------------------------------------------------------------------------- + +/// Short strings (3-12 bytes), all ≤ BinaryView::MAX_INLINED_SIZE. +/// Exercises the inlined-view path exclusively. +fn generate_short_strings(count: usize) -> VarBinArray { + let mut rng = StdRng::seed_from_u64(42); + let words: &[&[u8]] = &[ + b"the", b"and", b"for", b"are", b"but", b"not", b"you", b"all", b"can", b"had", b"her", + b"was", b"one", b"our", b"out", b"day", b"get", b"has", b"him", b"his", b"how", b"its", + b"may", b"new", b"now", b"old", b"see", b"way", b"who", b"did", b"oil", b"sit", b"cat", + b"dog", b"red", b"big", b"top", b"sun", b"run", b"hot", b"yes", b"far", b"ask", b"own", + b"say", b"low", b"key", b"few", + ]; + let strings: Vec>> = (0..count) + .map(|_| { + // 1-3 words concatenated, always ≤ 12 bytes + let nwords = rng.random_range(1..=3usize); + let mut buf = Vec::with_capacity(12); + for idx in 0..nwords { + if idx > 0 { + buf.push(b'-'); + } + let word = words[rng.random_range(0..words.len())]; + if buf.len() + word.len() + usize::from(idx > 0) > 12 { + break; + } + buf.extend_from_slice(word); + } + Some(buf.into_boxed_slice()) + }) + .collect(); + VarBinArray::from_iter(strings, DType::Binary(Nullability::NonNullable)) +} + +/// Medium strings (8-20 bytes), mix of inlined and reference views. +/// Straddles the 12-byte BinaryView inlining threshold. +fn generate_medium_strings(count: usize) -> VarBinArray { + let mut rng = StdRng::seed_from_u64(42); + let prefixes: &[&[u8]] = &[ + b"usr_", b"grp_", b"tok_", b"ses_", b"evt_", b"req_", b"txn_", b"msg_", + ]; + let strings: Vec>> = (0..count) + .map(|_| { + let prefix = prefixes[rng.random_range(0..prefixes.len())]; + let suffix_len = rng.random_range(4..=16usize); + let mut buf = Vec::with_capacity(prefix.len() + suffix_len); + buf.extend_from_slice(prefix); + for _ in 0..suffix_len { + buf.push(rng.random_range(b'a'..=b'z')); + } + Some(buf.into_boxed_slice()) + }) + .collect(); + VarBinArray::from_iter(strings, DType::Binary(Nullability::NonNullable)) +} + +fn make_fsst(data: VarBinArray) -> FSSTArray { + let compressor = fsst_train_compressor(&data); + fsst_compress(data, &compressor) +} + +// --------------------------------------------------------------------------- +// Lazy-initialized datasets: real-world from test_utils + custom short/medium +// --------------------------------------------------------------------------- + +const N: usize = 100_000; + +static SHORT_STRINGS: LazyLock = LazyLock::new(|| make_fsst(generate_short_strings(N))); +static MEDIUM_STRINGS: LazyLock = + LazyLock::new(|| make_fsst(generate_medium_strings(N))); +static EMAILS: LazyLock = LazyLock::new(|| test_utils::make_fsst_emails(N)); +static SHORT_URLS: LazyLock = LazyLock::new(|| test_utils::make_fsst_short_urls(N)); +static CLICKBENCH_URLS: LazyLock = + LazyLock::new(|| test_utils::make_fsst_clickbench_urls(N)); +static LOG_LINES: LazyLock = LazyLock::new(|| test_utils::make_fsst_log_lines(N)); +static JSON_STRINGS: LazyLock = LazyLock::new(|| test_utils::make_fsst_json_strings(N)); +static FILE_PATHS: LazyLock = LazyLock::new(|| test_utils::make_fsst_file_paths(N)); + +// --------------------------------------------------------------------------- +// Pre-decompressed data for isolated view-building benchmarks +// --------------------------------------------------------------------------- + +struct DecompressedData { + bytes: Vec, + lens: Vec, +} + +/// Create a padded `ByteBufferMut` from a byte slice, with extra capacity for safe +/// 16-byte unaligned reads in `build_views_fast`. +fn padded_buffer(data: &[u8]) -> ByteBufferMut { + let mut buf = ByteBufferMut::with_capacity(data.len() + VIEW_BUILD_PADDING); + buf.extend_from_slice(data); + buf +} + +fn pre_decompress(encoded: &FSSTArray) -> DecompressedData { + let compressed = encoded.codes().sliced_bytes(); + let decompressor = OptimizedDecompressor::new( + encoded.symbols().as_slice(), + encoded.symbol_lengths().as_slice(), + ); + let max_cap = encoded + .decompressor() + .max_decompression_capacity(compressed.as_slice()) + + 7; + let mut out = Vec::with_capacity(max_cap); + let len = decompressor.decompress_into(compressed.as_slice(), out.spare_capacity_mut()); + unsafe { out.set_len(len) }; + + let mut ctx = SESSION.create_execution_ctx(); + let uncompressed_lens_array = encoded + .uncompressed_lengths() + .clone() + .execute::(&mut ctx) + .unwrap(); + + #[allow(clippy::cast_possible_truncation, clippy::unnecessary_cast)] + let lens: Vec = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| { + uncompressed_lens_array + .as_slice::

() + .iter() + .map(|x| *x as u64) + .collect() + }); + + DecompressedData { bytes: out, lens } +} + +static SHORT_STRINGS_DEC: LazyLock = + LazyLock::new(|| pre_decompress(&SHORT_STRINGS)); +static MEDIUM_STRINGS_DEC: LazyLock = + LazyLock::new(|| pre_decompress(&MEDIUM_STRINGS)); +static EMAILS_DEC: LazyLock = LazyLock::new(|| pre_decompress(&EMAILS)); +static SHORT_URLS_DEC: LazyLock = LazyLock::new(|| pre_decompress(&SHORT_URLS)); +static CLICKBENCH_URLS_DEC: LazyLock = + LazyLock::new(|| pre_decompress(&CLICKBENCH_URLS)); +static LOG_LINES_DEC: LazyLock = LazyLock::new(|| pre_decompress(&LOG_LINES)); +static JSON_STRINGS_DEC: LazyLock = + LazyLock::new(|| pre_decompress(&JSON_STRINGS)); +static FILE_PATHS_DEC: LazyLock = LazyLock::new(|| pre_decompress(&FILE_PATHS)); + +// ============================================================================ +// End-to-end decompress (to_canonical): measures full pipeline +// ============================================================================ + +#[divan::bench] +fn e2e_short_strings(bencher: Bencher) { + let arr = &*SHORT_STRINGS; + bencher.bench(|| arr.to_canonical()); +} + +#[divan::bench] +fn e2e_medium_strings(bencher: Bencher) { + let arr = &*MEDIUM_STRINGS; + bencher.bench(|| arr.to_canonical()); +} + +#[divan::bench] +fn e2e_emails(bencher: Bencher) { + let arr = &*EMAILS; + bencher.bench(|| arr.to_canonical()); +} + +#[divan::bench] +fn e2e_short_urls(bencher: Bencher) { + let arr = &*SHORT_URLS; + bencher.bench(|| arr.to_canonical()); +} + +#[divan::bench] +fn e2e_clickbench_urls(bencher: Bencher) { + let arr = &*CLICKBENCH_URLS; + bencher.bench(|| arr.to_canonical()); +} + +#[divan::bench] +fn e2e_log_lines(bencher: Bencher) { + let arr = &*LOG_LINES; + bencher.bench(|| arr.to_canonical()); +} + +#[divan::bench] +fn e2e_json_strings(bencher: Bencher) { + let arr = &*JSON_STRINGS; + bencher.bench(|| arr.to_canonical()); +} + +#[divan::bench] +fn e2e_file_paths(bencher: Bencher) { + let arr = &*FILE_PATHS; + bencher.bench(|| arr.to_canonical()); +} + +// ============================================================================ +// Isolated view building: old (general build_views) vs new (build_views_fast) +// ============================================================================ + +// --- Short strings (≤12 bytes, all inlined) --- + +#[divan::bench] +fn views_old_short_strings(bencher: Bencher) { + let d = &*SHORT_STRINGS_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_short_strings(bencher: Bencher) { + let d = &*SHORT_STRINGS_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// --- Medium strings (8-20 bytes, mix of inlined and reference) --- + +#[divan::bench] +fn views_old_medium_strings(bencher: Bencher) { + let d = &*MEDIUM_STRINGS_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_medium_strings(bencher: Bencher) { + let d = &*MEDIUM_STRINGS_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// --- Emails (~20 bytes, all reference) --- + +#[divan::bench] +fn views_old_emails(bencher: Bencher) { + let d = &*EMAILS_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_emails(bencher: Bencher) { + let d = &*EMAILS_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// --- Short URLs (~35 bytes) --- + +#[divan::bench] +fn views_old_short_urls(bencher: Bencher) { + let d = &*SHORT_URLS_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_short_urls(bencher: Bencher) { + let d = &*SHORT_URLS_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// --- ClickBench URLs (~80-120 bytes) --- + +#[divan::bench] +fn views_old_clickbench_urls(bencher: Bencher) { + let d = &*CLICKBENCH_URLS_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_clickbench_urls(bencher: Bencher) { + let d = &*CLICKBENCH_URLS_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// --- Log lines (~120+ bytes) --- + +#[divan::bench] +fn views_old_log_lines(bencher: Bencher) { + let d = &*LOG_LINES_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_log_lines(bencher: Bencher) { + let d = &*LOG_LINES_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// --- JSON strings (~80+ bytes) --- + +#[divan::bench] +fn views_old_json_strings(bencher: Bencher) { + let d = &*JSON_STRINGS_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_json_strings(bencher: Bencher) { + let d = &*JSON_STRINGS_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// --- File paths (~30-60 bytes) --- + +#[divan::bench] +fn views_old_file_paths(bencher: Bencher) { + let d = &*FILE_PATHS_DEC; + bencher.bench(|| { + let bytes = ByteBufferMut::copy_from(&d.bytes); + build_views(0, MAX_BUFFER_LEN, bytes, &d.lens) + }); +} + +#[divan::bench] +fn views_new_file_paths(bencher: Bencher) { + let d = &*FILE_PATHS_DEC; + bencher.bench(|| { + let bytes = padded_buffer(&d.bytes); + build_views_fast(0, bytes, &d.lens) + }); +} + +// ============================================================================ +// Raw decompress_into: baseline (fsst-rs Decompressor) vs OptimizedDecompressor +// ============================================================================ + +macro_rules! raw_bench_pair { + ($baseline_name:ident, $optimized_name:ident, $data:expr) => { + #[divan::bench] + fn $baseline_name(bencher: Bencher) { + let encoded = &*$data; + let decompressor = encoded.decompressor(); + let bytes = encoded.codes().sliced_bytes(); + let max_cap = decompressor.max_decompression_capacity(bytes.as_slice()) + 7; + + bencher.bench(|| { + let mut out = Vec::with_capacity(max_cap); + let len = decompressor.decompress_into(bytes.as_slice(), out.spare_capacity_mut()); + unsafe { out.set_len(len) }; + out + }); + } + + #[divan::bench] + fn $optimized_name(bencher: Bencher) { + let encoded = &*$data; + let decompressor = OptimizedDecompressor::new( + encoded.symbols().as_slice(), + encoded.symbol_lengths().as_slice(), + ); + let bytes = encoded.codes().sliced_bytes(); + let max_cap = encoded + .decompressor() + .max_decompression_capacity(bytes.as_slice()) + + 7; + + bencher.bench(|| { + let mut out = Vec::with_capacity(max_cap); + let len = decompressor.decompress_into(bytes.as_slice(), out.spare_capacity_mut()); + unsafe { out.set_len(len) }; + out + }); + } + }; +} + +raw_bench_pair!( + raw_baseline_short_strings, + raw_optimized_short_strings, + SHORT_STRINGS +); +raw_bench_pair!( + raw_baseline_medium_strings, + raw_optimized_medium_strings, + MEDIUM_STRINGS +); +raw_bench_pair!(raw_baseline_emails, raw_optimized_emails, EMAILS); +raw_bench_pair!( + raw_baseline_short_urls, + raw_optimized_short_urls, + SHORT_URLS +); +raw_bench_pair!( + raw_baseline_clickbench_urls, + raw_optimized_clickbench_urls, + CLICKBENCH_URLS +); +raw_bench_pair!(raw_baseline_log_lines, raw_optimized_log_lines, LOG_LINES); +raw_bench_pair!( + raw_baseline_json_strings, + raw_optimized_json_strings, + JSON_STRINGS +); +raw_bench_pair!( + raw_baseline_file_paths, + raw_optimized_file_paths, + FILE_PATHS +); diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index b5474d1d923..f60472cfbd2 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -3,22 +3,24 @@ use std::sync::Arc; +use num_traits::AsPrimitive; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::varbinview::build_views::BinaryView; -use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN; -use vortex_array::arrays::varbinview::build_views::build_views; +use vortex_array::dtype::NativePType; use vortex_array::match_each_integer_ptype; use vortex_array::vtable::ValidityHelper; use vortex_buffer::Buffer; +use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_error::VortexResult; use crate::FSSTArray; +use crate::decompressor::OptimizedDecompressor; pub(super) fn canonicalize_fsst( array: &FSSTArray, @@ -43,13 +45,6 @@ pub(crate) fn fsst_decode_views( start_buf_index: u32, ctx: &mut ExecutionCtx, ) -> VortexResult<(Vec, Buffer)> { - // FSSTArray has two child arrays: - // 1. A VarBinArray, which holds the string heap of the compressed codes. - // 2. An uncompressed_lengths primitive array, storing the length of each original - // string element. - // To speed up canonicalization, we can decompress the entire string-heap in a single - // call. We then turn our uncompressed_lengths into an offsets buffer - // necessary for a VarBinViewArray and construct the canonical array. let bytes = fsst_array.codes().sliced_bytes(); let uncompressed_lens_array = fsst_array @@ -57,6 +52,7 @@ pub(crate) fn fsst_decode_views( .clone() .execute::(ctx)?; + // Compute exact total size for the decompression buffer. #[allow(clippy::cast_possible_truncation)] let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| { uncompressed_lens_array @@ -66,24 +62,125 @@ pub(crate) fn fsst_decode_views( .sum() }); - // Bulk-decompress the entire array. - let decompressor = fsst_array.decompressor(); - let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7); + // Bulk-decompress the entire string heap in one call. + let decompressor = OptimizedDecompressor::new( + fsst_array.symbols().as_slice(), + fsst_array.symbol_lengths().as_slice(), + ); + let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + VIEW_BUILD_PADDING); let len = decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut()); unsafe { uncompressed_bytes.set_len(len) }; - // Directly create the binary views. + // Build views directly from the typed lengths slice — no intermediate Vec allocation. match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| { - Ok(build_views( + Ok(build_views_fast( start_buf_index, - MAX_BUFFER_LEN, uncompressed_bytes, uncompressed_lens_array.as_slice::

(), )) }) } +/// Minimum padding (in bytes) required after the logical end of the source buffer +/// for safe 16-byte unaligned reads in `make_view_inline`. +pub const VIEW_BUILD_PADDING: usize = 16; + +/// Optimized view builder for FSST decompression. +/// +/// Unlike the general-purpose `build_views`, this version: +/// - Inlines the view construction (avoids `#[inline(never)]` `make_view` call per string) +/// - Skips buffer splitting (FSST data fits in one buffer) +/// - Uses raw pointer writes to construct views directly +/// - Generic over the length type to avoid an intermediate `Vec` allocation +/// +/// # Safety requirement +/// +/// `bytes` must have at least [`VIEW_BUILD_PADDING`] bytes of allocated capacity +/// beyond the logical length, to allow safe 16-byte unaligned reads at any offset. +#[allow(clippy::cast_possible_truncation)] +pub fn build_views_fast>( + buf_index: u32, + bytes: ByteBufferMut, + lens: &[P], +) -> (Vec, Buffer) { + let mut views = BufferMut::::with_capacity(lens.len()); + let src = bytes.as_slice().as_ptr(); + let mut offset: usize = 0; + + for &raw_len in lens { + let len: usize = raw_len.as_(); + // SAFETY: we reserved the right capacity in `with_capacity` above, + // and the source buffer has VIEW_BUILD_PADDING bytes of padding. + unsafe { + let view = make_view_inline(src, offset, len, buf_index); + views.push_unchecked(view); + } + offset += len; + } + + let buffers = if bytes.is_empty() { + Vec::new() + } else { + vec![bytes.freeze()] + }; + + (buffers, views.freeze()) +} + +/// Byte masks for zeroing out trailing bytes when constructing inlined views. +/// `INLINE_MASKS[n]` keeps the lowest `n` bytes of a `u128`. +#[allow(clippy::cast_possible_truncation)] +const INLINE_MASKS: [u128; 13] = { + let mut table = [0u128; 13]; + let mut i = 1usize; + while i <= 12 { + table[i] = (1u128 << (i as u32 * 8)) - 1; + i += 1; + } + table +}; + +/// Inline view construction — avoids the `#[inline(never)]` overhead of `BinaryView::make_view`. +/// +/// For inlined views (len <= 12): performs a single 16-byte unaligned read from the source, +/// masks to `len` bytes, shifts into position, and ORs in the length — no zero-init or +/// variable-length copy needed. +/// +/// For reference views (len > 12): reads a 4-byte prefix and constructs the view directly +/// via arithmetic. +/// +/// # Safety +/// +/// The source buffer must have at least 16 bytes of readable memory from `offset` +/// (i.e., padding after the logical end). The caller must ensure `offset + len <= src.len()`. +#[inline(always)] +#[allow(clippy::cast_possible_truncation)] +unsafe fn make_view_inline( + src: *const u8, + offset: usize, + len: usize, + buf_index: u32, +) -> BinaryView { + if len <= BinaryView::MAX_INLINED_SIZE { + // Read 16 bytes from source (buffer has >=16 bytes padding, so this is safe). + // Mask to keep only `len` bytes, shift into data position (bytes 4-15), + // and OR in the length at bytes 0-3. + let raw = unsafe { src.add(offset).cast::().read_unaligned() }; + let masked = raw & INLINE_MASKS[len]; + BinaryView::from((len as u128) | (masked << 32)) + } else { + // Reference view: [size:u32][prefix:4 bytes][buf_index:u32][offset:u32] + let prefix = unsafe { src.add(offset).cast::().read_unaligned() }; + BinaryView::from( + (len as u128) + | ((prefix as u128) << 32) + | ((buf_index as u128) << 64) + | ((offset as u128) << 96), + ) + } +} + #[cfg(test)] mod tests { use std::sync::LazyLock; diff --git a/encodings/fsst/src/decompressor.rs b/encodings/fsst/src/decompressor.rs new file mode 100644 index 00000000000..1e03a510e10 --- /dev/null +++ b/encodings/fsst/src/decompressor.rs @@ -0,0 +1,472 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Optimized FSST decompressor that replaces the default fsst-rs decompressor +//! with a version tuned for throughput. +//! +//! Key optimizations over the baseline fsst-rs implementation: +//! 1. Symbols stored as `u64` directly, avoiding `Symbol::to_u64()` conversion per lookup. +//! 2. Multi-level block processing: 32-code and 8-code fast paths that process +//! compressed data in large chunks when no escape codes are present. +//! 3. Unified loop that re-enters the 32-code fast path after handling each escape, +//! instead of permanently dropping to the slower 8-code path. +//! 4. Fully unrolled escape handling via match statement for optimal branch prediction. +//! 5. Runtime CPU feature detection for BMI1/BMI2/POPCNT-optimized codegen on x86-64. + +use std::mem::MaybeUninit; + +use fsst::ESCAPE_CODE; +use fsst::Symbol; + +/// Hint that the calling branch is cold (unlikely). Placing a `#[cold]` +/// `#[inline(never)]` call at the top of a branch causes LLVM to treat +/// the entire branch as unlikely, improving code layout for the hot path. +#[cold] +#[inline(never)] +fn cold() {} + +/// Optimized FSST decompressor using separate symbol/length tables. +/// +/// The symbol table stores pre-converted `u64` values to avoid per-lookup +/// conversion overhead. Separate arrays keep the cache footprint small: +/// symbols (2KB) + lengths (256B) ≈ 2.3KB, fitting entirely in L1 cache. +pub struct OptimizedDecompressor { + symbols: Box<[u64; 256]>, + lengths: Box<[u8; 256]>, +} + +impl OptimizedDecompressor { + /// Build from symbol table slices (same inputs as `fsst::Decompressor::new`). + pub fn new(symbols: &[Symbol], lengths: &[u8]) -> Self { + assert!( + symbols.len() <= 255, + "symbol table cannot exceed 255 entries" + ); + assert_eq!(symbols.len(), lengths.len()); + + let mut sym_table = Box::new([0u64; 256]); + let mut len_table = Box::new([1u8; 256]); + for (i, (sym, &len)) in symbols.iter().zip(lengths.iter()).enumerate() { + sym_table[i] = sym.to_u64(); + len_table[i] = len; + } + Self { + symbols: sym_table, + lengths: len_table, + } + } + + /// Decompress `compressed` codes into `decoded` buffer. + /// + /// Returns the number of bytes written to `decoded`. + /// + /// # Panics + /// + /// Panics if `decoded` is smaller than `compressed.len() / 2`. + pub fn decompress_into(&self, compressed: &[u8], decoded: &mut [MaybeUninit]) -> usize { + assert!( + decoded.len() >= compressed.len() / 2, + "decoded buffer too small" + ); + + // Use target-feature-optimized path on x86-64 for better tzcnt codegen. + #[cfg(target_arch = "x86_64")] + { + if std::arch::is_x86_feature_detected!("bmi1") { + return unsafe { self.decompress_inner_bmi(compressed, decoded) }; + } + } + unsafe { self.decompress_inner(compressed, decoded) } + } + + #[cfg(target_arch = "x86_64")] + #[target_feature(enable = "bmi1,bmi2,popcnt")] + #[allow(unsafe_op_in_unsafe_fn)] + unsafe fn decompress_inner_bmi( + &self, + compressed: &[u8], + decoded: &mut [MaybeUninit], + ) -> usize { + self.decompress_inner(compressed, decoded) + } + + /// SWAR escape detection: returns a mask with the high bit set in each byte + /// that equals 0xFF. + #[inline(always)] + const fn escape_mask(block: u64) -> u64 { + let hi = block & 0x8080_8080_8080_8080; + let lo_inv = !block & 0x7F7F_7F7F_7F7F_7F7F; + hi & (lo_inv.wrapping_add(0x7F7F_7F7F_7F7F_7F7F) ^ 0x8080_8080_8080_8080) + } + + /// Safe end-pointer for block processing. Returns null when the buffer is + /// too small, which makes `ptr <= null` immediately false. + #[inline(always)] + fn block_end(end: *const u8, margin: usize, len: usize) -> *const u8 { + if len >= margin { + unsafe { end.sub(margin) } + } else { + core::ptr::null() + } + } + + #[inline(always)] + #[allow(unsafe_op_in_unsafe_fn, clippy::cast_possible_truncation)] + unsafe fn decompress_inner(&self, compressed: &[u8], decoded: &mut [MaybeUninit]) -> usize { + let mut in_ptr = compressed.as_ptr(); + let in_end = in_ptr.add(compressed.len()); + let mut out_ptr: *mut u8 = decoded.as_mut_ptr().cast(); + let out_begin = out_ptr.cast_const(); + let out_end = decoded.as_ptr().add(decoded.len()).cast::(); + + let symbols = self.symbols.as_ptr(); + let lengths = self.lengths.as_ptr(); + + // Emit one symbol: write 8 bytes (may overshoot), advance by actual length. + macro_rules! emit_symbol { + ($code:expr) => {{ + let c = $code as usize; + out_ptr.cast::().write_unaligned(*symbols.add(c)); + out_ptr = out_ptr.add(*lengths.add(c) as usize); + }}; + } + + // Emit all 8 symbols from a u64 block (no escapes). + macro_rules! emit_block { + ($block:expr) => {{ + let b = $block; + emit_symbol!((b) & 0xFF); + emit_symbol!((b >> 8) & 0xFF); + emit_symbol!((b >> 16) & 0xFF); + emit_symbol!((b >> 24) & 0xFF); + emit_symbol!((b >> 32) & 0xFF); + emit_symbol!((b >> 40) & 0xFF); + emit_symbol!((b >> 48) & 0xFF); + emit_symbol!((b >> 56) & 0xFF); + }}; + } + + // Emit symbols before the first escape, write the escaped literal, + // and advance `in_ptr`. The loop body is small enough for LLVM to + // unroll when `pos` is a known constant from `trailing_zeros`. + macro_rules! emit_before_escape { + ($b:expr, $esc_pos:expr) => {{ + let b = $b; + let pos = $esc_pos; + // Emit each non-escape symbol before the escape byte. + let mut i = 0usize; + while i < pos { + emit_symbol!((b >> (i as u32 * 8)) & 0xFF); + i += 1; + } + if pos < 7 { + // Literal byte follows the escape within this block. + let literal_shift = (pos as u32 + 1) * 8; + out_ptr.write(((b >> literal_shift) & 0xFF) as u8); + out_ptr = out_ptr.add(1); + in_ptr = in_ptr.add(pos + 2); + } else { + // Escape is at byte 7 — literal is in the next block. + // Just consume the 7 symbols; the outer loop will + // re-read starting at the escape byte. + in_ptr = in_ptr.add(7); + } + }}; + } + + let out_end32 = Self::block_end(out_end, 256, decoded.len()); + let in_end32 = Self::block_end(in_end, 32, compressed.len()); + let out_end8 = Self::block_end(out_end, 64, decoded.len()); + let in_end8 = Self::block_end(in_end, 8, compressed.len()); + + // Main loop: 32-code escape-free fast path, falling back to single + // 8-code blocks for escapes, then immediately re-entering the fast path. + 'outer: while out_ptr.cast_const() <= out_end8 && in_ptr < in_end8 { + // 32-code escape-free inner loop. + while out_ptr.cast_const() <= out_end32 && in_ptr < in_end32 { + let b0 = in_ptr.cast::().read_unaligned(); + let b1 = in_ptr.add(8).cast::().read_unaligned(); + let b2 = in_ptr.add(16).cast::().read_unaligned(); + let b3 = in_ptr.add(24).cast::().read_unaligned(); + + let m0 = Self::escape_mask(b0); + let m1 = Self::escape_mask(b1); + let m2 = Self::escape_mask(b2); + let m3 = Self::escape_mask(b3); + + if (m0 | m1 | m2 | m3) != 0 { + cold(); + // Process escape-free blocks before the first escape, + // then handle the escape and break to re-check bounds. + if m0 != 0 { + let first_esc = (m0.trailing_zeros() >> 3) as usize; + emit_before_escape!(b0, first_esc); + break; + } + emit_block!(b0); + in_ptr = in_ptr.add(8); + + if m1 != 0 { + let first_esc = (m1.trailing_zeros() >> 3) as usize; + emit_before_escape!(b1, first_esc); + break; + } + emit_block!(b1); + in_ptr = in_ptr.add(8); + + if m2 != 0 { + let first_esc = (m2.trailing_zeros() >> 3) as usize; + emit_before_escape!(b2, first_esc); + break; + } + emit_block!(b2); + in_ptr = in_ptr.add(8); + + let first_esc = (m3.trailing_zeros() >> 3) as usize; + emit_before_escape!(b3, first_esc); + break; + } + + emit_block!(b0); + emit_block!(b1); + emit_block!(b2); + emit_block!(b3); + in_ptr = in_ptr.add(32); + } + + // Single 8-code block with escape handling, then re-enter fast path. + if out_ptr.cast_const() > out_end8 || in_ptr >= in_end8 { + break 'outer; + } + let block = in_ptr.cast::().read_unaligned(); + let esc = Self::escape_mask(block); + + if esc == 0 { + emit_block!(block); + in_ptr = in_ptr.add(8); + } else { + cold(); + let first_esc = (esc.trailing_zeros() >> 3) as usize; + emit_before_escape!(block, first_esc); + } + } + + // Scalar tail. + while out_end.offset_from(out_ptr) > 8 && in_ptr < in_end { + let code = in_ptr.read(); + in_ptr = in_ptr.add(1); + if code == ESCAPE_CODE { + out_ptr.write(in_ptr.read()); + in_ptr = in_ptr.add(1); + out_ptr = out_ptr.add(1); + } else { + emit_symbol!(code); + } + } + + debug_assert_eq!( + in_ptr, in_end, + "decompression should exhaust input before output" + ); + + out_ptr.offset_from(out_begin) as usize + } +} + +#[cfg(test)] +mod tests { + use fsst::CompressorBuilder; + use rand::Rng; + use rand::SeedableRng; + use rand::rngs::StdRng; + use vortex_error::VortexResult; + + use super::*; + + #[test] + fn test_basic_decompress() -> VortexResult<()> { + let mut builder = CompressorBuilder::new(); + builder.insert(Symbol::from_slice(b"hello\0\0\0"), 5); + let compressor = builder.build(); + + let compressed = compressor.compress(b"hello"); + let decompressor = + OptimizedDecompressor::new(compressor.symbol_table(), compressor.symbol_lengths()); + + let mut output = Vec::with_capacity(64); + let len = decompressor.decompress_into(&compressed, output.spare_capacity_mut()); + unsafe { output.set_len(len) }; + + assert_eq!(&output, b"hello"); + Ok(()) + } + + #[test] + fn test_escape_codes() -> VortexResult<()> { + let compressor = CompressorBuilder::default().build(); + let input = b"abc"; + let compressed = compressor.compress(input); + + let decompressor = + OptimizedDecompressor::new(compressor.symbol_table(), compressor.symbol_lengths()); + + let mut output = Vec::with_capacity(64); + let len = decompressor.decompress_into(&compressed, output.spare_capacity_mut()); + unsafe { output.set_len(len) }; + + assert_eq!(&output, b"abc"); + Ok(()) + } + + #[test] + fn test_matches_baseline() -> VortexResult<()> { + let mut rng = StdRng::seed_from_u64(12345); + let mut owned: Vec> = Vec::new(); + + for _ in 0..100 { + let len = rng.random_range(5..50); + let s: Vec = (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect(); + owned.push(s); + } + let lines: Vec<&[u8]> = owned.iter().map(|s| s.as_slice()).collect(); + + let compressor = fsst::Compressor::train(&lines); + let baseline = compressor.decompressor(); + let optimized = + OptimizedDecompressor::new(compressor.symbol_table(), compressor.symbol_lengths()); + + for line in &lines { + let compressed = compressor.compress(line); + let baseline_result = baseline.decompress(&compressed); + + let mut opt_result = + Vec::with_capacity(baseline.max_decompression_capacity(&compressed) + 7); + let len = optimized.decompress_into(&compressed, opt_result.spare_capacity_mut()); + unsafe { opt_result.set_len(len) }; + + assert_eq!( + baseline_result, opt_result, + "Mismatch for input: {:?}", + line + ); + } + Ok(()) + } + + #[test] + fn test_matches_baseline_with_escapes() -> VortexResult<()> { + let mut rng = StdRng::seed_from_u64(99); + let mut owned: Vec> = Vec::new(); + + for _ in 0..100 { + let len = rng.random_range(5..100); + let s: Vec = (0..len).map(|_| rng.random_range(0..=255u8)).collect(); + owned.push(s); + } + let lines: Vec<&[u8]> = owned.iter().map(|s| s.as_slice()).collect(); + + let compressor = fsst::Compressor::train(&lines); + let baseline = compressor.decompressor(); + let optimized = + OptimizedDecompressor::new(compressor.symbol_table(), compressor.symbol_lengths()); + + for line in &lines { + let compressed = compressor.compress(line); + let baseline_result = baseline.decompress(&compressed); + + let mut opt_result = + Vec::with_capacity(baseline.max_decompression_capacity(&compressed) + 7); + let len = optimized.decompress_into(&compressed, opt_result.spare_capacity_mut()); + unsafe { opt_result.set_len(len) }; + + assert_eq!(baseline_result, opt_result); + } + Ok(()) + } + + #[test] + fn test_empty_input() -> VortexResult<()> { + let compressor = CompressorBuilder::default().build(); + let decompressor = + OptimizedDecompressor::new(compressor.symbol_table(), compressor.symbol_lengths()); + + let mut output = Vec::with_capacity(64); + let len = decompressor.decompress_into(&[], output.spare_capacity_mut()); + assert_eq!(len, 0); + Ok(()) + } + + #[test] + fn test_large_corpus() -> VortexResult<()> { + let mut rng = StdRng::seed_from_u64(42); + let mut owned: Vec> = Vec::new(); + + for _ in 0..1000 { + let len = rng.random_range(1..500); + let s: Vec = (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect(); + owned.push(s); + } + let lines: Vec<&[u8]> = owned.iter().map(|s| s.as_slice()).collect(); + + let compressor = fsst::Compressor::train(&lines); + let baseline = compressor.decompressor(); + let optimized = + OptimizedDecompressor::new(compressor.symbol_table(), compressor.symbol_lengths()); + + let mut all_compressed = Vec::new(); + let mut all_expected = Vec::new(); + for line in &lines { + let compressed = compressor.compress(line); + all_compressed.extend_from_slice(&compressed); + all_expected.extend_from_slice(line); + } + + let baseline_result = baseline.decompress(&all_compressed); + + let mut opt_result = + Vec::with_capacity(baseline.max_decompression_capacity(&all_compressed) + 7); + let len = optimized.decompress_into(&all_compressed, opt_result.spare_capacity_mut()); + unsafe { opt_result.set_len(len) }; + + assert_eq!(baseline_result, opt_result); + assert_eq!(all_expected, opt_result); + Ok(()) + } + + #[test] + fn test_large_corpus_with_escapes() -> VortexResult<()> { + let mut rng = StdRng::seed_from_u64(42); + let mut owned: Vec> = Vec::new(); + + for _ in 0..1000 { + let len = rng.random_range(1..500); + let s: Vec = (0..len).map(|_| rng.random_range(0..=255u8)).collect(); + owned.push(s); + } + let lines: Vec<&[u8]> = owned.iter().map(|s| s.as_slice()).collect(); + + let compressor = fsst::Compressor::train(&lines); + let baseline = compressor.decompressor(); + let optimized = + OptimizedDecompressor::new(compressor.symbol_table(), compressor.symbol_lengths()); + + let mut all_compressed = Vec::new(); + let mut all_expected = Vec::new(); + for line in &lines { + let compressed = compressor.compress(line); + all_compressed.extend_from_slice(&compressed); + all_expected.extend_from_slice(line); + } + + let baseline_result = baseline.decompress(&all_compressed); + + let mut opt_result = + Vec::with_capacity(baseline.max_decompression_capacity(&all_compressed) + 7); + let len = optimized.decompress_into(&all_compressed, opt_result.spare_capacity_mut()); + unsafe { opt_result.set_len(len) }; + + assert_eq!(baseline_result, opt_result); + assert_eq!(all_expected, opt_result); + Ok(()) + } +} diff --git a/encodings/fsst/src/lib.rs b/encodings/fsst/src/lib.rs index 5cc75c59b2a..b3f6de3f561 100644 --- a/encodings/fsst/src/lib.rs +++ b/encodings/fsst/src/lib.rs @@ -12,9 +12,13 @@ //! [fsst]: https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf mod array; +#[cfg(not(feature = "_test-harness"))] mod canonical; +#[cfg(feature = "_test-harness")] +pub mod canonical; mod compress; mod compute; +pub mod decompressor; mod kernel; mod ops; mod rules;