Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ where
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
let mut builder = VarBinBuilder::<i32>::with_capacity(len);

// Offsets are widened to i64 because the cumulative compressed bytes can exceed i32::MAX for
// large inputs (see issue #7833). Per-string sizes still fit in i32.
let mut builder = VarBinBuilder::<i64>::with_capacity(len);
let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this match?


for string in iter {
match string {
None => {
Expand Down
59 changes: 59 additions & 0 deletions encodings/fsst/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use rand::SeedableRng;
use rand::rngs::StdRng;
use rand::seq::IndexedRandom;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
Expand Down Expand Up @@ -107,3 +110,59 @@ fn test_fsst_array_ops() {

assert_arrays_eq!(fsst_array, canonical_array);
}

/// Regression for #7833: [`fsst_compress`] must accept inputs whose cumulative compressed
/// bytes exceed [`i32::MAX`]. Before the fix, [`fsst_compress_iter`] hardcoded
/// [`VarBinBuilder<i32>`] for the FSST output and panicked in
/// [`VarBinBuilder::append_value`] once cumulative compressed bytes crossed the boundary.
///
/// The input is built with [`VarBinBuilder<i64>`] so the input itself does not panic, which
/// confirms the overflow is on the FSST output side. After the fix the test must succeed
/// with the row count preserved.
///
/// Marked `#[ignore]` because it allocates ~2.5 GiB for the input and ~2.5 GiB for the FSST
/// output (~5 GiB total). To run it explicitly, use:
///
/// ```text
/// cargo test --release -p vortex-fsst -- --ignored fsst_compress_offsets
/// ```
///
/// [`fsst_compress_iter`]: crate::compress::fsst_compress_iter
#[test]
#[ignore = "allocates ~5 GiB; run with --ignored"]
fn fsst_compress_offsets_overflow_i32() {
// High-entropy ASCII strings sliced from a random pool. FSST is a symbol-table
// compressor; pseudo-random data with no recurring byte sequences resists compression,
// so the compressed output stays close to input size and crosses the i32 boundary.
const STRING_LEN: usize = 64 * 1024;
const TOTAL_BYTES: usize = (1usize << 31) + (512 << 20); // ~2.5 GiB
const N: usize = TOTAL_BYTES / STRING_LEN;
const POOL_LEN: usize = 64 * 1024 * 1024;

// Printable ASCII alphabet so the result is valid UTF-8.
const ALPHABET: &[u8; 95] =
b" !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~";

let mut rng = StdRng::seed_from_u64(0xC0DE_C011_B711);
let pool: Vec<u8> = (0..POOL_LEN)
.map(|_| *ALPHABET.choose(&mut rng).unwrap())
.collect();

println!("building large VarBinArray");
let mut builder = VarBinBuilder::<i64>::with_capacity(N);
for i in 0..N {
let off = i.wrapping_mul(31337) % (POOL_LEN - STRING_LEN);
builder.append_value(&pool[off..off + STRING_LEN]);
}
let array = builder.finish(DType::Utf8(Nullability::NonNullable));

println!("training FSST compressor");
let compressor = fsst_train_compressor(&array);
let len = array.len();
let dtype = array.dtype().clone();
let mut ctx = LEGACY_SESSION.create_execution_ctx();

println!("compressing to FSST");
let compressed = fsst_compress(array, len, &dtype, &compressor, &mut ctx);
assert_eq!(compressed.len(), len);
}
80 changes: 75 additions & 5 deletions vortex-array/src/arrays/varbin/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use arrow_array::BinaryArray;
use arrow_array::LargeBinaryArray;
use arrow_array::LargeStringArray;
use arrow_array::StringArray;
use arrow_ord::cmp;
use arrow_schema::DataType;
use vortex_buffer::BitBuffer;
use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -82,15 +85,26 @@ impl CompareKernel for VarBin {

let lhs = Datum::try_new(lhs.array(), ctx)?;

// Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types
// produced by Datum::try_new (which uses execute_arrow(None, ctx))
let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() {
DType::Utf8(_) => &rhs_const
// The RHS scalar must match the LHS Arrow data type. VarBin with i64 offsets is
// converted to LargeBinary/LargeUtf8 (see `preferred_arrow_type`), and Arrow refuses to
// compare LargeBinary with Binary (or LargeUtf8 with Utf8).
let arrow_rhs: &dyn arrow_array::Datum = match (rhs_const.dtype(), lhs.data_type()) {
(DType::Utf8(_), DataType::LargeUtf8) => &rhs_const
.as_utf8()
.value()
.map(LargeStringArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(LargeStringArray::new_null(1))),
(DType::Utf8(_), _) => &rhs_const
.as_utf8()
.value()
.map(StringArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))),
DType::Binary(_) => &rhs_const
(DType::Binary(_), DataType::LargeBinary) => &rhs_const
.as_binary()
.value()
.map(LargeBinaryArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(LargeBinaryArray::new_null(1))),
(DType::Binary(_), _) => &rhs_const
.as_binary()
.value()
.map(BinaryArray::new_scalar)
Expand Down Expand Up @@ -237,9 +251,14 @@ mod test {

#[cfg(test)]
mod tests {
use vortex_buffer::ByteBuffer;

use crate::IntoArray;
use crate::arrays::BoolArray;
use crate::arrays::ConstantArray;
use crate::arrays::VarBinArray;
use crate::arrays::varbin::builder::VarBinBuilder;
use crate::assert_arrays_eq;
use crate::builtins::ArrayBuiltins;
use crate::dtype::DType;
use crate::dtype::Nullability;
Expand All @@ -260,4 +279,55 @@ mod tests {
&DType::Bool(Nullability::Nullable)
);
}

/// Regression: a [`VarBinArray`] built with `i64` offsets is canonicalised to
/// Arrow `LargeUtf8` / `LargeBinary` by `preferred_arrow_type`. Without an explicit
/// branch in [`CompareKernel`], the constant RHS is wrapped in a `StringArray` /
/// `BinaryArray` and Arrow rejects the `LargeUtf8 == Utf8` mismatch. Triggering
/// this only requires `i64` offsets, not large data.
///
/// [`CompareKernel`]: super::CompareKernel
#[test]
fn varbin_i64_offsets_compare_constant() {
let mut builder = VarBinBuilder::<i64>::with_capacity(3);
builder.append_value(b"abc");
builder.append_value(b"xyz");
builder.append_value(b"abc");
let array = builder.finish(DType::Utf8(Nullability::NonNullable));

let result = array
.into_array()
.binary(
ConstantArray::new(Scalar::utf8("abc", Nullability::NonNullable), 3).into_array(),
Operator::Eq,
)
.unwrap();

let expected = BoolArray::from_iter([true, false, true]);
assert_arrays_eq!(result, expected);
}

#[test]
fn varbin_i64_offsets_compare_constant_binary() {
let mut builder = VarBinBuilder::<i64>::with_capacity(3);
builder.append_value(b"abc");
builder.append_value(b"xyz");
builder.append_value(b"abc");
let array = builder.finish(DType::Binary(Nullability::NonNullable));

let result = array
.into_array()
.binary(
ConstantArray::new(
Scalar::binary(ByteBuffer::copy_from(b"abc"), Nullability::NonNullable),
3,
)
.into_array(),
Operator::Eq,
)
.unwrap();

let expected = BoolArray::from_iter([true, false, true]);
assert_arrays_eq!(result, expected);
}
}
Loading