diff --git a/encodings/fsst/src/compress.rs b/encodings/fsst/src/compress.rs index 872dcbc494d..058e0b7e0e1 100644 --- a/encodings/fsst/src/compress.rs +++ b/encodings/fsst/src/compress.rs @@ -69,8 +69,12 @@ where I: Iterator>, { let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN); - let mut builder = VarBinBuilder::::with_capacity(len); + + // Offsets are widened to i64 because the cumulative compressed bytes can exceed i32::MAX for + // large inputs (see issue #7833). Per-string sizes still fit in i32. + let mut builder = VarBinBuilder::::with_capacity(len); let mut uncompressed_lengths: BufferMut = BufferMut::with_capacity(len); + for string in iter { match string { None => { diff --git a/encodings/fsst/src/tests.rs b/encodings/fsst/src/tests.rs index fb4cb8bdc5c..19277b52b49 100644 --- a/encodings/fsst/src/tests.rs +++ b/encodings/fsst/src/tests.rs @@ -1,6 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use rand::SeedableRng; +use rand::rngs::StdRng; +use rand::seq::IndexedRandom; use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::LEGACY_SESSION; @@ -107,3 +110,59 @@ fn test_fsst_array_ops() { assert_arrays_eq!(fsst_array, canonical_array); } + +/// Regression for #7833: [`fsst_compress`] must accept inputs whose cumulative compressed +/// bytes exceed [`i32::MAX`]. Before the fix, [`fsst_compress_iter`] hardcoded +/// [`VarBinBuilder`] for the FSST output and panicked in +/// [`VarBinBuilder::append_value`] once cumulative compressed bytes crossed the boundary. +/// +/// The input is built with [`VarBinBuilder`] so the input itself does not panic, which +/// confirms the overflow is on the FSST output side. After the fix the test must succeed +/// with the row count preserved. +/// +/// Marked `#[ignore]` because it allocates ~2.5 GiB for the input and ~2.5 GiB for the FSST +/// output (~5 GiB total). To run it explicitly, use: +/// +/// ```text +/// cargo test --release -p vortex-fsst -- --ignored fsst_compress_offsets +/// ``` +/// +/// [`fsst_compress_iter`]: crate::compress::fsst_compress_iter +#[test] +#[ignore = "allocates ~5 GiB; run with --ignored"] +fn fsst_compress_offsets_overflow_i32() { + // High-entropy ASCII strings sliced from a random pool. FSST is a symbol-table + // compressor; pseudo-random data with no recurring byte sequences resists compression, + // so the compressed output stays close to input size and crosses the i32 boundary. + const STRING_LEN: usize = 64 * 1024; + const TOTAL_BYTES: usize = (1usize << 31) + (512 << 20); // ~2.5 GiB + const N: usize = TOTAL_BYTES / STRING_LEN; + const POOL_LEN: usize = 64 * 1024 * 1024; + + // Printable ASCII alphabet so the result is valid UTF-8. + const ALPHABET: &[u8; 95] = + b" !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~"; + + let mut rng = StdRng::seed_from_u64(0xC0DE_C011_B711); + let pool: Vec = (0..POOL_LEN) + .map(|_| *ALPHABET.choose(&mut rng).unwrap()) + .collect(); + + println!("building large VarBinArray"); + let mut builder = VarBinBuilder::::with_capacity(N); + for i in 0..N { + let off = i.wrapping_mul(31337) % (POOL_LEN - STRING_LEN); + builder.append_value(&pool[off..off + STRING_LEN]); + } + let array = builder.finish(DType::Utf8(Nullability::NonNullable)); + + println!("training FSST compressor"); + let compressor = fsst_train_compressor(&array); + let len = array.len(); + let dtype = array.dtype().clone(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + + println!("compressing to FSST"); + let compressed = fsst_compress(array, len, &dtype, &compressor, &mut ctx); + assert_eq!(compressed.len(), len); +} diff --git a/vortex-array/src/arrays/varbin/compute/compare.rs b/vortex-array/src/arrays/varbin/compute/compare.rs index 511f930d3d0..37ec64e0802 100644 --- a/vortex-array/src/arrays/varbin/compute/compare.rs +++ b/vortex-array/src/arrays/varbin/compute/compare.rs @@ -2,8 +2,11 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use arrow_array::BinaryArray; +use arrow_array::LargeBinaryArray; +use arrow_array::LargeStringArray; use arrow_array::StringArray; use arrow_ord::cmp; +use arrow_schema::DataType; use vortex_buffer::BitBuffer; use vortex_error::VortexExpect as _; use vortex_error::VortexResult; @@ -82,15 +85,26 @@ impl CompareKernel for VarBin { let lhs = Datum::try_new(lhs.array(), ctx)?; - // Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types - // produced by Datum::try_new (which uses execute_arrow(None, ctx)) - let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() { - DType::Utf8(_) => &rhs_const + // The RHS scalar must match the LHS Arrow data type. VarBin with i64 offsets is + // converted to LargeBinary/LargeUtf8 (see `preferred_arrow_type`), and Arrow refuses to + // compare LargeBinary with Binary (or LargeUtf8 with Utf8). + let arrow_rhs: &dyn arrow_array::Datum = match (rhs_const.dtype(), lhs.data_type()) { + (DType::Utf8(_), DataType::LargeUtf8) => &rhs_const + .as_utf8() + .value() + .map(LargeStringArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(LargeStringArray::new_null(1))), + (DType::Utf8(_), _) => &rhs_const .as_utf8() .value() .map(StringArray::new_scalar) .unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))), - DType::Binary(_) => &rhs_const + (DType::Binary(_), DataType::LargeBinary) => &rhs_const + .as_binary() + .value() + .map(LargeBinaryArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(LargeBinaryArray::new_null(1))), + (DType::Binary(_), _) => &rhs_const .as_binary() .value() .map(BinaryArray::new_scalar) @@ -237,9 +251,14 @@ mod test { #[cfg(test)] mod tests { + use vortex_buffer::ByteBuffer; + use crate::IntoArray; + use crate::arrays::BoolArray; use crate::arrays::ConstantArray; use crate::arrays::VarBinArray; + use crate::arrays::varbin::builder::VarBinBuilder; + use crate::assert_arrays_eq; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::Nullability; @@ -260,4 +279,55 @@ mod tests { &DType::Bool(Nullability::Nullable) ); } + + /// Regression: a [`VarBinArray`] built with `i64` offsets is canonicalised to + /// Arrow `LargeUtf8` / `LargeBinary` by `preferred_arrow_type`. Without an explicit + /// branch in [`CompareKernel`], the constant RHS is wrapped in a `StringArray` / + /// `BinaryArray` and Arrow rejects the `LargeUtf8 == Utf8` mismatch. Triggering + /// this only requires `i64` offsets, not large data. + /// + /// [`CompareKernel`]: super::CompareKernel + #[test] + fn varbin_i64_offsets_compare_constant() { + let mut builder = VarBinBuilder::::with_capacity(3); + builder.append_value(b"abc"); + builder.append_value(b"xyz"); + builder.append_value(b"abc"); + let array = builder.finish(DType::Utf8(Nullability::NonNullable)); + + let result = array + .into_array() + .binary( + ConstantArray::new(Scalar::utf8("abc", Nullability::NonNullable), 3).into_array(), + Operator::Eq, + ) + .unwrap(); + + let expected = BoolArray::from_iter([true, false, true]); + assert_arrays_eq!(result, expected); + } + + #[test] + fn varbin_i64_offsets_compare_constant_binary() { + let mut builder = VarBinBuilder::::with_capacity(3); + builder.append_value(b"abc"); + builder.append_value(b"xyz"); + builder.append_value(b"abc"); + let array = builder.finish(DType::Binary(Nullability::NonNullable)); + + let result = array + .into_array() + .binary( + ConstantArray::new( + Scalar::binary(ByteBuffer::copy_from(b"abc"), Nullability::NonNullable), + 3, + ) + .into_array(), + Operator::Eq, + ) + .unwrap(); + + let expected = BoolArray::from_iter([true, false, true]); + assert_arrays_eq!(result, expected); + } }