Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions encodings/fsst/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ _test-harness = ["dep:rand", "vortex-array/_test-harness"]
divan = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
test-with = { workspace = true }
vortex-array = { workspace = true, features = ["_test-harness"] }

[[bench]]
Expand Down
159 changes: 120 additions & 39 deletions encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

// Compress a set of values into an Array.

use fsst::Compressor;
use fsst::Symbol;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::varbin::builder::VarBinBuilder;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::dtype::DType;
use vortex_array::expr::stats::Precision;
use vortex_array::expr::stats::Stat;
use vortex_array::validity::Validity;
use vortex_buffer::BitBufferMut;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;

/// Compress a string array using FSST.
use crate::FSST;
use crate::FSSTArray;

/// Compress a string array using FSST.
pub fn fsst_compress<A: ArrayAccessor<[u8]>>(
strings: A,
len: usize,
Expand All @@ -42,22 +45,17 @@ where
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut lines = Vec::with_capacity(8_192);

for string in iter {
match string {
None => {}
Some(b) => lines.push(b),
}
}

lines.extend(iter.flatten());
Compressor::train(&lines)
}

/// Most strings are small in practice. If we encounter a larger string, we reallocate
/// the buffer to hold enough capacity for the worst-case compressed value.
const DEFAULT_BUFFER_LEN: usize = 1024 * 1024;

/// Compress from an iterator of bytestrings using FSST.
/// Compress an iterator of bytestrings into an FSST array. Codes-offsets are
/// `i32` for small (typical) inputs, promoted to `i64` once cumulative
/// compressed bytes would exceed `i32::MAX`.
pub fn fsst_compress_iter<'a, I>(
iter: I,
len: usize,
Expand All @@ -69,48 +67,67 @@ where
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
let mut builder = VarBinBuilder::<i32>::with_capacity(len);
let mut data: BufferMut<u8> = BufferMut::empty();
let mut validity = BitBufferMut::with_capacity(len);
let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
let mut offsets_i32: BufferMut<i32> = BufferMut::with_capacity(len + 1);
offsets_i32.push(0);
let mut offsets_i64: Option<BufferMut<i64>> = None;

for string in iter {
match string {
None => {
builder.append_null();
validity.append_false();
uncompressed_lengths.push(0);
}
Some(s) => {
uncompressed_lengths.push(
s.len()
.try_into()
.vortex_expect("string length must fit in i32"),
);

// make sure the buffer is 2x+7 larger than the input
let target_size = 2 * s.len() + 7;
if target_size > buffer.len() {
let additional_capacity = target_size - buffer.len();
buffer.reserve(additional_capacity);
validity.append_true();
uncompressed_lengths
.push(s.len().try_into().vortex_expect("string length fits i32"));
let target = 2 * s.len() + 7;
if target > buffer.len() {
buffer.reserve(target - buffer.len());
}

// SAFETY: buffer is always sized to be large enough
// SAFETY: buffer holds at least 2*s.len()+7 bytes per the FSST contract.
unsafe { compressor.compress_into(s, &mut buffer) };
data.extend_from_slice(&buffer);
}
}

builder.append_value(&buffer);
let off = data.len();
if offsets_i64.is_none() && off > i32::MAX as usize {
let mut o64 = BufferMut::with_capacity(len + 1);
for i in 0..offsets_i32.len() {
o64.push(i64::from(offsets_i32[i]));
}
offsets_i64 = Some(o64);
}
match &mut offsets_i64 {
Some(o64) => o64.push(off as i64),
None => offsets_i32.push(i32::try_from(off).vortex_expect("offset fits i32")),
}
}

let codes = builder.finish(DType::Binary(dtype.nullability()));
let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());

let uncompressed_lengths = uncompressed_lengths.into_array();
let offsets = match offsets_i64 {
Some(o64) => PrimitiveArray::new(o64.freeze(), Validity::NonNullable),
None => PrimitiveArray::new(offsets_i32.freeze(), Validity::NonNullable),
};
offsets
.statistics()
.set(Stat::IsSorted, Precision::Exact(true.into()));
let codes = VarBinArray::new(
offsets.into_array(),
data.freeze(),
DType::Binary(dtype.nullability()),
Validity::from_bit_buffer(validity.freeze(), dtype.nullability()),
);

FSST::try_new(
dtype,
symbols,
symbol_lengths,
Buffer::copy_from(compressor.symbol_table()),
Buffer::<u8>::copy_from(compressor.symbol_lengths()),
codes,
uncompressed_lengths,
uncompressed_lengths.into_array(),
ctx,
)
.vortex_expect("FSST parts must be valid")
Expand All @@ -119,14 +136,23 @@ where
#[cfg(test)]
mod tests {
use fsst::CompressorBuilder;
use rand::SeedableRng;
use rand::rngs::StdRng;
use rand::seq::IndexedRandom;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::varbin::VarBinArrayExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_array::scalar::Scalar;

use crate::FSSTArrayExt;
use crate::compress::DEFAULT_BUFFER_LEN;
use crate::fsst_compress;
use crate::fsst_compress_iter;
use crate::fsst_train_compressor;

#[test]
fn test_large_string() {
Expand All @@ -148,9 +174,64 @@ mod tests {
);

let decoded = compressed.execute_scalar(0, &mut ctx).unwrap();

let expected = Scalar::utf8(big_string, Nullability::NonNullable);

assert_eq!(decoded, expected);
}

fn assert_codes_offsets_ptype(
array: &VarBinArray,
compressor: &fsst::Compressor,
expected: PType,
) {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let fsst = fsst_compress(array, array.len(), array.dtype(), compressor, &mut ctx);
assert_eq!(fsst.len(), array.len());
let actual = PType::try_from(fsst.codes().offsets().dtype()).unwrap();
assert_eq!(actual, expected);
}

/// Regression for #7833: typical inputs keep `i32` codes-offsets.
#[test]
fn fsst_compress_keeps_i32_offsets_for_small_inputs() {
let array = VarBinArray::from_iter(
[Some("hello world"), Some("hello rust"), Some("hello vortex")],
DType::Utf8(Nullability::NonNullable),
);
let compressor = fsst_train_compressor(&array);
assert_codes_offsets_ptype(&array, &compressor, PType::I32);
}

/// Regression for #7833: in-loop promotion when cumulative compressed bytes
/// cross `i32::MAX`. Gated to CI runs (skipped when `CI` is unset; opt-out
/// with `VORTEX_SKIP_SLOW_TESTS=1`); peak ~4.5 GiB.
#[test_with::env(CI)]
#[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
fn fsst_compress_promotes_in_loop_via_data_size() {
// High-entropy ASCII: pseudo-random data resists FSST symbol-table
// compression, so output stays close to input size and crosses i32::MAX.
const STRING_LEN: usize = 64 * 1024;
const TOTAL_BYTES: usize = (1usize << 31) + (256 << 20); // ~2.25 GiB
const N: usize = TOTAL_BYTES / STRING_LEN;
const POOL_LEN: usize = 64 * 1024 * 1024;

// Printable ASCII so the result is valid UTF-8.
const ALPHABET: &[u8; 95] =
b" !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~";

let mut rng = StdRng::seed_from_u64(0xC0DE_C011_B711);
let pool: Vec<u8> = (0..POOL_LEN)
.map(|_| *ALPHABET.choose(&mut rng).unwrap())
.collect();
let array = VarBinArray::from_iter(
(0..N).map(|i| {
let off = i.wrapping_mul(31337) % (POOL_LEN - STRING_LEN);
Some(&pool[off..off + STRING_LEN])
}),
DType::Utf8(Nullability::NonNullable),
);

let compressor = fsst_train_compressor(&array);
assert_codes_offsets_ptype(&array, &compressor, PType::I64);
}
}
25 changes: 20 additions & 5 deletions vortex-array/src/arrays/varbin/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use arrow_array::BinaryArray;
use arrow_array::LargeBinaryArray;
use arrow_array::LargeStringArray;
use arrow_array::StringArray;
use arrow_ord::cmp;
use arrow_schema::DataType;
use vortex_buffer::BitBuffer;
use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -82,15 +85,27 @@ impl CompareKernel for VarBin {

let lhs = Datum::try_new(lhs.array(), ctx)?;

// Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types
// produced by Datum::try_new (which uses execute_arrow(None, ctx))
let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() {
DType::Utf8(_) => &rhs_const
// The RHS scalar must match the LHS Arrow data type. VarBin with i64
// offsets is converted to LargeBinary/LargeUtf8 (see
// `preferred_arrow_type`), and Arrow refuses to compare LargeBinary
// with Binary (or LargeUtf8 with Utf8).
let arrow_rhs: &dyn arrow_array::Datum = match (rhs_const.dtype(), lhs.data_type()) {
(DType::Utf8(_), DataType::LargeUtf8) => &rhs_const
.as_utf8()
.value()
.map(LargeStringArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(LargeStringArray::new_null(1))),
(DType::Utf8(_), _) => &rhs_const
.as_utf8()
.value()
.map(StringArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))),
DType::Binary(_) => &rhs_const
(DType::Binary(_), DataType::LargeBinary) => &rhs_const
.as_binary()
.value()
.map(LargeBinaryArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(LargeBinaryArray::new_null(1))),
(DType::Binary(_), _) => &rhs_const
.as_binary()
.value()
.map(BinaryArray::new_scalar)
Expand Down
Loading