Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions encodings/fsst/benches/fsst_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,18 @@ const BENCH_ARGS: &[(usize, usize, u8)] = &[
fn compress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(&array);
let total_uncompressed = array.bytes().len();
bencher
.with_inputs(|| (&array, &compressor, LEGACY_SESSION.create_execution_ctx()))
.bench_refs(|(array, compressor, ctx)| {
fsst_compress(*array, array.len(), array.dtype(), compressor, ctx)
fsst_compress(
*array,
array.len(),
total_uncompressed,
array.dtype(),
compressor,
ctx,
)
})
}

Expand All @@ -68,10 +76,12 @@ fn decompress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usi
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(&array);
let len = array.len();
let total_uncompressed = array.bytes().len();
let dtype = array.dtype().clone();
let encoded = fsst_compress(
array,
len,
total_uncompressed,
&dtype,
&compressor,
&mut LEGACY_SESSION.create_execution_ctx(),
Expand All @@ -97,6 +107,7 @@ fn pushdown_compare(bencher: Bencher, (string_count, avg_len, unique_chars): (us
let fsst_array = fsst_compress(
&array,
array.len(),
array.bytes().len(),
array.dtype(),
&compressor,
&mut LEGACY_SESSION.create_execution_ctx(),
Expand Down Expand Up @@ -132,6 +143,7 @@ fn canonicalize_compare(
let fsst_array = fsst_compress(
&array,
array.len(),
array.bytes().len(),
array.dtype(),
&compressor,
&mut LEGACY_SESSION.create_execution_ctx(),
Expand Down Expand Up @@ -241,8 +253,17 @@ fn generate_chunked_test_data(
let array = generate_test_data(string_count, avg_len, unique_chars);
let compressor = fsst_train_compressor(&array);
let len = array.len();
let total_uncompressed = array.bytes().len();
let dtype = array.dtype().clone();
fsst_compress(array, len, &dtype, &compressor, &mut ctx).into_array()
fsst_compress(
array,
len,
total_uncompressed,
&dtype,
&compressor,
&mut ctx,
)
.into_array()
})
.collect::<ChunkedArray>()
}
6 changes: 6 additions & 0 deletions encodings/fsst/benches/fsst_url_compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ fn eq_pushdown_high_match(bencher: Bencher) {
let fsst_array = fsst_compress(
data,
data.len(),
data.bytes().len(),
data.dtype(),
&compressor,
&mut SESSION.create_execution_ctx(),
Expand Down Expand Up @@ -87,6 +88,7 @@ fn eq_pushdown_low_match(bencher: Bencher) {
let fsst_array = fsst_compress(
data,
data.len(),
data.bytes().len(),
data.dtype(),
&compressor,
&mut SESSION.create_execution_ctx(),
Expand Down Expand Up @@ -114,6 +116,7 @@ fn eq_canonicalize_high_match(bencher: Bencher) {
let fsst_array = fsst_compress(
data,
data.len(),
data.bytes().len(),
data.dtype(),
&compressor,
&mut SESSION.create_execution_ctx(),
Expand Down Expand Up @@ -144,6 +147,7 @@ fn eq_canonicalize_low_match(bencher: Bencher) {
let fsst_array = fsst_compress(
data,
data.len(),
data.bytes().len(),
data.dtype(),
&compressor,
&mut SESSION.create_execution_ctx(),
Expand Down Expand Up @@ -178,6 +182,7 @@ fn like_substr_high_match(bencher: Bencher) {
let fsst_array = fsst_compress(
data,
data.len(),
data.bytes().len(),
data.dtype(),
&compressor,
&mut SESSION.create_execution_ctx(),
Expand Down Expand Up @@ -205,6 +210,7 @@ fn like_substr_low_match(bencher: Bencher) {
let fsst_array = fsst_compress(
data,
data.len(),
data.bytes().len(),
data.dtype(),
&compressor,
&mut SESSION.create_execution_ctx(),
Expand Down
4 changes: 2 additions & 2 deletions encodings/fsst/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ pub fn T::uncompressed_lengths(&self) -> &vortex_array::array::erased::ArrayRef

pub fn T::uncompressed_lengths_dtype(&self) -> &vortex_array::dtype::DType

pub fn vortex_fsst::fsst_compress<A: vortex_array::accessor::ArrayAccessor<[u8]>>(A, usize, &vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray
pub fn vortex_fsst::fsst_compress<A: vortex_array::accessor::ArrayAccessor<[u8]>>(A, usize, usize, &vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray

pub fn vortex_fsst::fsst_compress_iter<'a, I>(I, usize, vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray where I: core::iter::traits::iterator::Iterator<Item = core::option::Option<&'a [u8]>>
pub fn vortex_fsst::fsst_compress_iter<'a, I>(I, usize, usize, vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray where I: core::iter::traits::iterator::Iterator<Item = core::option::Option<&'a [u8]>>

pub fn vortex_fsst::fsst_train_compressor<A: vortex_array::accessor::ArrayAccessor<[u8]>>(&A) -> fsst::Compressor

Expand Down
1 change: 1 addition & 0 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ mod test {
let fsst_array = fsst_compress_iter(
[Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
2,
16,
DType::Utf8(Nullability::NonNullable),
&compressor,
&mut ctx,
Expand Down
12 changes: 10 additions & 2 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,15 @@ mod tests {
let (array, data) = make_data();
let compressor = fsst_train_compressor(&array);
(
fsst_compress(&array, array.len(), array.dtype(), &compressor, &mut ctx)
.into_array(),
fsst_compress(
&array,
array.len(),
array.bytes().len(),
array.dtype(),
&compressor,
&mut ctx,
)
.into_array(),
data,
)
})
Expand Down Expand Up @@ -213,6 +220,7 @@ mod tests {
let fsst_array = fsst_compress(
&varbin,
varbin.len(),
varbin.bytes().len(),
varbin.dtype(),
&fsst_train_compressor(&varbin),
&mut ctx,
Expand Down
121 changes: 119 additions & 2 deletions encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use vortex_array::IntoArray;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::varbin::builder::VarBinBuilder;
use vortex_array::dtype::DType;
use vortex_array::dtype::IntegerPType;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
Expand All @@ -20,11 +21,25 @@ use crate::FSSTArray;
pub fn fsst_compress<A: ArrayAccessor<[u8]>>(
strings: A,
len: usize,
total_uncompressed: usize,
dtype: &DType,
compressor: &Compressor,
ctx: &mut ExecutionCtx,
) -> FSSTArray {
strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype.clone(), compressor, ctx))
// Pick the narrowest sufficient codes-offsets type. The FSST contract
// bounds the compressed size at `2 * uncompressed + 7` per string, so
// if the upper bound fits in `i32::MAX` the actual offsets are
// guaranteed to fit; otherwise we widen to `i64` to avoid the overflow
// tracked in #7833.
if upper_bound_fits_i32(total_uncompressed, len) {
strings.with_iterator(|iter| {
fsst_compress_iter_with::<i32, _>(iter, len, dtype.clone(), compressor, ctx)
})
} else {
strings.with_iterator(|iter| {
fsst_compress_iter_with::<i64, _>(iter, len, dtype.clone(), compressor, ctx)
})
}
}

/// Train a compressor from an array.
Expand Down Expand Up @@ -57,19 +72,53 @@ where
/// the buffer to hold enough capacity for the worst-case compressed value.
const DEFAULT_BUFFER_LEN: usize = 1024 * 1024;

/// Whether the FSST worst-case compressed size for `len` strings totalling
/// `total_uncompressed` bytes fits in an `i32` offset.
fn upper_bound_fits_i32(total_uncompressed: usize, len: usize) -> bool {
// 2 * total + 7 * n — computed in u64 so the arithmetic itself can't overflow.
let max_compressed = 2_u64
.saturating_mul(total_uncompressed as u64)
.saturating_add(7_u64.saturating_mul(len as u64));
max_compressed <= i32::MAX as u64
}

/// Compress from an iterator of bytestrings using FSST.
///
/// `total_uncompressed` is the total byte length of all strings in the input;
/// callers typically have it cheaply available (e.g. `VarBinArray::bytes().len()`).
/// It selects the narrowest codes-offsets type that the FSST upper bound
/// (`2 * total_uncompressed + 7 * len`) is guaranteed to fit into.
pub fn fsst_compress_iter<'a, I>(
iter: I,
len: usize,
total_uncompressed: usize,
dtype: DType,
compressor: &Compressor,
ctx: &mut ExecutionCtx,
) -> FSSTArray
where
I: Iterator<Item = Option<&'a [u8]>>,
{
if upper_bound_fits_i32(total_uncompressed, len) {
fsst_compress_iter_with::<i32, _>(iter, len, dtype, compressor, ctx)
} else {
fsst_compress_iter_with::<i64, _>(iter, len, dtype, compressor, ctx)
}
}

fn fsst_compress_iter_with<'a, O, I>(
iter: I,
len: usize,
dtype: DType,
compressor: &Compressor,
ctx: &mut ExecutionCtx,
) -> FSSTArray
where
O: IntegerPType,
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
let mut builder = VarBinBuilder::<i32>::with_capacity(len);
let mut builder = VarBinBuilder::<O>::with_capacity(len);
let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
for string in iter {
match string {
Expand Down Expand Up @@ -121,12 +170,19 @@ mod tests {
use fsst::CompressorBuilder;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::varbin::VarBinArrayExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_array::scalar::Scalar;

use crate::FSSTArrayExt;
use crate::compress::DEFAULT_BUFFER_LEN;
use crate::compress::upper_bound_fits_i32;
use crate::fsst_compress;
use crate::fsst_compress_iter;
use crate::fsst_train_compressor;

#[test]
fn test_large_string() {
Expand All @@ -142,6 +198,7 @@ mod tests {
let compressed = fsst_compress_iter(
[Some(big_string.as_bytes())].into_iter(),
1,
big_string.len(),
DType::Utf8(Nullability::NonNullable),
&compressor,
&mut ctx,
Expand All @@ -153,4 +210,64 @@ mod tests {

assert_eq!(decoded, expected);
}

#[test]
fn upper_bound_fits_i32_handles_zero() {
assert!(upper_bound_fits_i32(0, 0));
}

#[test]
fn upper_bound_fits_i32_handles_small_inputs() {
assert!(upper_bound_fits_i32(1024, 100));
assert!(upper_bound_fits_i32(1 << 20, 1024));
}

#[test]
fn upper_bound_fits_i32_at_boundary() {
// 2 * total + 7 * n == i32::MAX exactly
let n = 1;
let total = (i32::MAX as usize - 7) / 2;
assert!(upper_bound_fits_i32(total, n));
// One more byte tips us over
assert!(!upper_bound_fits_i32(total + 1, n));
}

#[test]
fn upper_bound_fits_i32_rejects_huge() {
assert!(!upper_bound_fits_i32(usize::MAX / 4, 1000));
}

/// Regression for #7833: small inputs keep i32 codes offsets so the FSST
/// output retains its compact layout. The matching i64 path is exercised
/// only for inputs whose worst-case compressed size exceeds `i32::MAX`,
/// which is too expensive to test directly; the boundary unit tests above
/// cover the dispatch.
#[test]
fn fsst_compress_keeps_i32_offsets_for_small_inputs() {
let array = VarBinArray::from_iter(
[
Some("The Greeks never said that the limit could not be overstepped"),
Some("They said it existed and that whoever dared to exceed it was struck down"),
Some("Nothing in present history can contradict them"),
],
DType::Utf8(Nullability::NonNullable),
);
let compressor = fsst_train_compressor(&array);
let len = array.len();
let dtype = array.dtype().clone();
let mut ctx = LEGACY_SESSION.create_execution_ctx();

let total_uncompressed = array.bytes().len();
let fsst = fsst_compress(
&array,
len,
total_uncompressed,
&dtype,
&compressor,
&mut ctx,
);

let codes_offsets_ptype = PType::try_from(fsst.codes().offsets().dtype()).unwrap();
assert_eq!(codes_offsets_ptype, PType::I32);
}
}
19 changes: 17 additions & 2 deletions encodings/fsst/src/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,16 @@ mod tests {

let compressor = fsst_train_compressor(&strings);
let len = strings.len();
let total_uncompressed = strings.bytes().len();
let dtype = strings.dtype().clone();
let fsst = fsst_compress(strings, len, &dtype, &compressor, &mut ctx);
let fsst = fsst_compress(
strings,
len,
total_uncompressed,
&dtype,
&compressor,
&mut ctx,
);

// Cast to nullable
let casted = fsst
Expand All @@ -145,7 +153,14 @@ mod tests {
fn test_cast_fsst_conformance(#[case] array: VarBinArray) {
let mut ctx = SESSION.create_execution_ctx();
let compressor = fsst_train_compressor(&array);
let fsst = fsst_compress(&array, array.len(), array.dtype(), &compressor, &mut ctx);
let fsst = fsst_compress(
&array,
array.len(),
array.bytes().len(),
array.dtype(),
&compressor,
&mut ctx,
);
test_cast_conformance(&fsst.into_array());
}
}
3 changes: 2 additions & 1 deletion encodings/fsst/src/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ mod tests {
);
let compressor = fsst_train_compressor(&lhs);
let len = lhs.len();
let total_uncompressed = lhs.bytes().len();
let dtype = lhs.dtype().clone();
let lhs = fsst_compress(lhs, len, &dtype, &compressor, &mut ctx);
let lhs = fsst_compress(lhs, len, total_uncompressed, &dtype, &compressor, &mut ctx);

let rhs = ConstantArray::new("world", lhs.len());

Expand Down
Loading
Loading