From 9e2bcdc7b887148352d3b1cd78e10694a9b69849 Mon Sep 17 00:00:00 2001 From: mprammer Date: Fri, 8 May 2026 16:44:05 -0400 Subject: [PATCH] =?UTF-8?q?fix(fsst):=20adaptive=20i32=E2=86=92i64=20codes?= =?UTF-8?q?=5Foffsets=20promotion=20(#7833)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single-pass; common case keeps the compact i32 layout, pathological inputs transparently get i64. Co-Authored-By: Claude Signed-off-by: mprammer --- Cargo.lock | 1 + encodings/fsst/Cargo.toml | 1 + encodings/fsst/src/compress.rs | 159 +++++++++++++----- .../src/arrays/varbin/compute/compare.rs | 25 ++- 4 files changed, 142 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf8c74d7058..09fd1904d35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10844,6 +10844,7 @@ dependencies = [ "prost 0.14.3", "rand 0.10.1", "rstest", + "test-with", "vortex-array", "vortex-buffer", "vortex-error", diff --git a/encodings/fsst/Cargo.toml b/encodings/fsst/Cargo.toml index b95eeb1f444..0d722a131d3 100644 --- a/encodings/fsst/Cargo.toml +++ b/encodings/fsst/Cargo.toml @@ -33,6 +33,7 @@ _test-harness = ["dep:rand", "vortex-array/_test-harness"] divan = { workspace = true } rand = { workspace = true } rstest = { workspace = true } +test-with = { workspace = true } vortex-array = { workspace = true, features = ["_test-harness"] } [[bench]] diff --git a/encodings/fsst/src/compress.rs b/encodings/fsst/src/compress.rs index 872dcbc494d..2054f163d9c 100644 --- a/encodings/fsst/src/compress.rs +++ b/encodings/fsst/src/compress.rs @@ -1,22 +1,25 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -// Compress a set of values into an Array. - use fsst::Compressor; -use fsst::Symbol; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::accessor::ArrayAccessor; -use vortex_array::arrays::varbin::builder::VarBinBuilder; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; use vortex_array::dtype::DType; +use vortex_array::expr::stats::Precision; +use vortex_array::expr::stats::Stat; +use vortex_array::validity::Validity; +use vortex_buffer::BitBufferMut; use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_error::VortexExpect; -/// Compress a string array using FSST. use crate::FSST; use crate::FSSTArray; + +/// Compress a string array using FSST. pub fn fsst_compress>( strings: A, len: usize, @@ -42,14 +45,7 @@ where I: Iterator>, { let mut lines = Vec::with_capacity(8_192); - - for string in iter { - match string { - None => {} - Some(b) => lines.push(b), - } - } - + lines.extend(iter.flatten()); Compressor::train(&lines) } @@ -57,7 +53,9 @@ where /// the buffer to hold enough capacity for the worst-case compressed value. const DEFAULT_BUFFER_LEN: usize = 1024 * 1024; -/// Compress from an iterator of bytestrings using FSST. +/// Compress an iterator of bytestrings into an FSST array. Codes-offsets are +/// `i32` for small (typical) inputs, promoted to `i64` once cumulative +/// compressed bytes would exceed `i32::MAX`. pub fn fsst_compress_iter<'a, I>( iter: I, len: usize, @@ -69,48 +67,67 @@ where I: Iterator>, { let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN); - let mut builder = VarBinBuilder::::with_capacity(len); + let mut data: BufferMut = BufferMut::empty(); + let mut validity = BitBufferMut::with_capacity(len); let mut uncompressed_lengths: BufferMut = BufferMut::with_capacity(len); + let mut offsets_i32: BufferMut = BufferMut::with_capacity(len + 1); + offsets_i32.push(0); + let mut offsets_i64: Option> = None; + for string in iter { match string { None => { - builder.append_null(); + validity.append_false(); uncompressed_lengths.push(0); } Some(s) => { - uncompressed_lengths.push( - s.len() - .try_into() - .vortex_expect("string length must fit in i32"), - ); - - // make sure the buffer is 2x+7 larger than the input - let target_size = 2 * s.len() + 7; - if target_size > buffer.len() { - let additional_capacity = target_size - buffer.len(); - buffer.reserve(additional_capacity); + validity.append_true(); + uncompressed_lengths + .push(s.len().try_into().vortex_expect("string length fits i32")); + let target = 2 * s.len() + 7; + if target > buffer.len() { + buffer.reserve(target - buffer.len()); } - - // SAFETY: buffer is always sized to be large enough + // SAFETY: buffer holds at least 2*s.len()+7 bytes per the FSST contract. unsafe { compressor.compress_into(s, &mut buffer) }; + data.extend_from_slice(&buffer); + } + } - builder.append_value(&buffer); + let off = data.len(); + if offsets_i64.is_none() && off > i32::MAX as usize { + let mut o64 = BufferMut::with_capacity(len + 1); + for i in 0..offsets_i32.len() { + o64.push(i64::from(offsets_i32[i])); } + offsets_i64 = Some(o64); + } + match &mut offsets_i64 { + Some(o64) => o64.push(off as i64), + None => offsets_i32.push(i32::try_from(off).vortex_expect("offset fits i32")), } } - let codes = builder.finish(DType::Binary(dtype.nullability())); - let symbols: Buffer = Buffer::copy_from(compressor.symbol_table()); - let symbol_lengths: Buffer = Buffer::::copy_from(compressor.symbol_lengths()); - - let uncompressed_lengths = uncompressed_lengths.into_array(); + let offsets = match offsets_i64 { + Some(o64) => PrimitiveArray::new(o64.freeze(), Validity::NonNullable), + None => PrimitiveArray::new(offsets_i32.freeze(), Validity::NonNullable), + }; + offsets + .statistics() + .set(Stat::IsSorted, Precision::Exact(true.into())); + let codes = VarBinArray::new( + offsets.into_array(), + data.freeze(), + DType::Binary(dtype.nullability()), + Validity::from_bit_buffer(validity.freeze(), dtype.nullability()), + ); FSST::try_new( dtype, - symbols, - symbol_lengths, + Buffer::copy_from(compressor.symbol_table()), + Buffer::::copy_from(compressor.symbol_lengths()), codes, - uncompressed_lengths, + uncompressed_lengths.into_array(), ctx, ) .vortex_expect("FSST parts must be valid") @@ -119,14 +136,23 @@ where #[cfg(test)] mod tests { use fsst::CompressorBuilder; + use rand::SeedableRng; + use rand::rngs::StdRng; + use rand::seq::IndexedRandom; use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; + use vortex_array::arrays::VarBinArray; + use vortex_array::arrays::varbin::VarBinArrayExt; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; use vortex_array::scalar::Scalar; + use crate::FSSTArrayExt; use crate::compress::DEFAULT_BUFFER_LEN; + use crate::fsst_compress; use crate::fsst_compress_iter; + use crate::fsst_train_compressor; #[test] fn test_large_string() { @@ -148,9 +174,64 @@ mod tests { ); let decoded = compressed.execute_scalar(0, &mut ctx).unwrap(); - let expected = Scalar::utf8(big_string, Nullability::NonNullable); assert_eq!(decoded, expected); } + + fn assert_codes_offsets_ptype( + array: &VarBinArray, + compressor: &fsst::Compressor, + expected: PType, + ) { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let fsst = fsst_compress(array, array.len(), array.dtype(), compressor, &mut ctx); + assert_eq!(fsst.len(), array.len()); + let actual = PType::try_from(fsst.codes().offsets().dtype()).unwrap(); + assert_eq!(actual, expected); + } + + /// Regression for #7833: typical inputs keep `i32` codes-offsets. + #[test] + fn fsst_compress_keeps_i32_offsets_for_small_inputs() { + let array = VarBinArray::from_iter( + [Some("hello world"), Some("hello rust"), Some("hello vortex")], + DType::Utf8(Nullability::NonNullable), + ); + let compressor = fsst_train_compressor(&array); + assert_codes_offsets_ptype(&array, &compressor, PType::I32); + } + + /// Regression for #7833: in-loop promotion when cumulative compressed bytes + /// cross `i32::MAX`. Gated to CI runs (skipped when `CI` is unset; opt-out + /// with `VORTEX_SKIP_SLOW_TESTS=1`); peak ~4.5 GiB. + #[test_with::env(CI)] + #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)] + fn fsst_compress_promotes_in_loop_via_data_size() { + // High-entropy ASCII: pseudo-random data resists FSST symbol-table + // compression, so output stays close to input size and crosses i32::MAX. + const STRING_LEN: usize = 64 * 1024; + const TOTAL_BYTES: usize = (1usize << 31) + (256 << 20); // ~2.25 GiB + const N: usize = TOTAL_BYTES / STRING_LEN; + const POOL_LEN: usize = 64 * 1024 * 1024; + + // Printable ASCII so the result is valid UTF-8. + const ALPHABET: &[u8; 95] = + b" !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~"; + + let mut rng = StdRng::seed_from_u64(0xC0DE_C011_B711); + let pool: Vec = (0..POOL_LEN) + .map(|_| *ALPHABET.choose(&mut rng).unwrap()) + .collect(); + let array = VarBinArray::from_iter( + (0..N).map(|i| { + let off = i.wrapping_mul(31337) % (POOL_LEN - STRING_LEN); + Some(&pool[off..off + STRING_LEN]) + }), + DType::Utf8(Nullability::NonNullable), + ); + + let compressor = fsst_train_compressor(&array); + assert_codes_offsets_ptype(&array, &compressor, PType::I64); + } } diff --git a/vortex-array/src/arrays/varbin/compute/compare.rs b/vortex-array/src/arrays/varbin/compute/compare.rs index 511f930d3d0..f07a1ec0380 100644 --- a/vortex-array/src/arrays/varbin/compute/compare.rs +++ b/vortex-array/src/arrays/varbin/compute/compare.rs @@ -2,8 +2,11 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use arrow_array::BinaryArray; +use arrow_array::LargeBinaryArray; +use arrow_array::LargeStringArray; use arrow_array::StringArray; use arrow_ord::cmp; +use arrow_schema::DataType; use vortex_buffer::BitBuffer; use vortex_error::VortexExpect as _; use vortex_error::VortexResult; @@ -82,15 +85,27 @@ impl CompareKernel for VarBin { let lhs = Datum::try_new(lhs.array(), ctx)?; - // Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types - // produced by Datum::try_new (which uses execute_arrow(None, ctx)) - let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() { - DType::Utf8(_) => &rhs_const + // The RHS scalar must match the LHS Arrow data type. VarBin with i64 + // offsets is converted to LargeBinary/LargeUtf8 (see + // `preferred_arrow_type`), and Arrow refuses to compare LargeBinary + // with Binary (or LargeUtf8 with Utf8). + let arrow_rhs: &dyn arrow_array::Datum = match (rhs_const.dtype(), lhs.data_type()) { + (DType::Utf8(_), DataType::LargeUtf8) => &rhs_const + .as_utf8() + .value() + .map(LargeStringArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(LargeStringArray::new_null(1))), + (DType::Utf8(_), _) => &rhs_const .as_utf8() .value() .map(StringArray::new_scalar) .unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))), - DType::Binary(_) => &rhs_const + (DType::Binary(_), DataType::LargeBinary) => &rhs_const + .as_binary() + .value() + .map(LargeBinaryArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(LargeBinaryArray::new_null(1))), + (DType::Binary(_), _) => &rhs_const .as_binary() .value() .map(BinaryArray::new_scalar)