diff --git a/encodings/fsst/benches/fsst_compress.rs b/encodings/fsst/benches/fsst_compress.rs index 45aedfa464f..3630062f71d 100644 --- a/encodings/fsst/benches/fsst_compress.rs +++ b/encodings/fsst/benches/fsst_compress.rs @@ -56,10 +56,18 @@ const BENCH_ARGS: &[(usize, usize, u8)] = &[ fn compress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) { let array = generate_test_data(string_count, avg_len, unique_chars); let compressor = fsst_train_compressor(&array); + let total_uncompressed = array.bytes().len(); bencher .with_inputs(|| (&array, &compressor, LEGACY_SESSION.create_execution_ctx())) .bench_refs(|(array, compressor, ctx)| { - fsst_compress(*array, array.len(), array.dtype(), compressor, ctx) + fsst_compress( + *array, + array.len(), + total_uncompressed, + array.dtype(), + compressor, + ctx, + ) }) } @@ -68,10 +76,12 @@ fn decompress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usi let array = generate_test_data(string_count, avg_len, unique_chars); let compressor = fsst_train_compressor(&array); let len = array.len(); + let total_uncompressed = array.bytes().len(); let dtype = array.dtype().clone(); let encoded = fsst_compress( array, len, + total_uncompressed, &dtype, &compressor, &mut LEGACY_SESSION.create_execution_ctx(), @@ -97,6 +107,7 @@ fn pushdown_compare(bencher: Bencher, (string_count, avg_len, unique_chars): (us let fsst_array = fsst_compress( &array, array.len(), + array.bytes().len(), array.dtype(), &compressor, &mut LEGACY_SESSION.create_execution_ctx(), @@ -132,6 +143,7 @@ fn canonicalize_compare( let fsst_array = fsst_compress( &array, array.len(), + array.bytes().len(), array.dtype(), &compressor, &mut LEGACY_SESSION.create_execution_ctx(), @@ -241,8 +253,17 @@ fn generate_chunked_test_data( let array = generate_test_data(string_count, avg_len, unique_chars); let compressor = fsst_train_compressor(&array); let len = array.len(); + let total_uncompressed = array.bytes().len(); let dtype = array.dtype().clone(); - fsst_compress(array, len, &dtype, &compressor, &mut ctx).into_array() + fsst_compress( + array, + len, + total_uncompressed, + &dtype, + &compressor, + &mut ctx, + ) + .into_array() }) .collect::() } diff --git a/encodings/fsst/benches/fsst_url_compare.rs b/encodings/fsst/benches/fsst_url_compare.rs index 656cd9f1866..a625daf1f86 100644 --- a/encodings/fsst/benches/fsst_url_compare.rs +++ b/encodings/fsst/benches/fsst_url_compare.rs @@ -60,6 +60,7 @@ fn eq_pushdown_high_match(bencher: Bencher) { let fsst_array = fsst_compress( data, data.len(), + data.bytes().len(), data.dtype(), &compressor, &mut SESSION.create_execution_ctx(), @@ -87,6 +88,7 @@ fn eq_pushdown_low_match(bencher: Bencher) { let fsst_array = fsst_compress( data, data.len(), + data.bytes().len(), data.dtype(), &compressor, &mut SESSION.create_execution_ctx(), @@ -114,6 +116,7 @@ fn eq_canonicalize_high_match(bencher: Bencher) { let fsst_array = fsst_compress( data, data.len(), + data.bytes().len(), data.dtype(), &compressor, &mut SESSION.create_execution_ctx(), @@ -144,6 +147,7 @@ fn eq_canonicalize_low_match(bencher: Bencher) { let fsst_array = fsst_compress( data, data.len(), + data.bytes().len(), data.dtype(), &compressor, &mut SESSION.create_execution_ctx(), @@ -178,6 +182,7 @@ fn like_substr_high_match(bencher: Bencher) { let fsst_array = fsst_compress( data, data.len(), + data.bytes().len(), data.dtype(), &compressor, &mut SESSION.create_execution_ctx(), @@ -205,6 +210,7 @@ fn like_substr_low_match(bencher: Bencher) { let fsst_array = fsst_compress( data, data.len(), + data.bytes().len(), data.dtype(), &compressor, &mut SESSION.create_execution_ctx(), diff --git a/encodings/fsst/public-api.lock b/encodings/fsst/public-api.lock index 44cf663fa39..0c0c424f8e8 100644 --- a/encodings/fsst/public-api.lock +++ b/encodings/fsst/public-api.lock @@ -180,9 +180,9 @@ pub fn T::uncompressed_lengths(&self) -> &vortex_array::array::erased::ArrayRef pub fn T::uncompressed_lengths_dtype(&self) -> &vortex_array::dtype::DType -pub fn vortex_fsst::fsst_compress>(A, usize, &vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray +pub fn vortex_fsst::fsst_compress>(A, usize, usize, &vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray -pub fn vortex_fsst::fsst_compress_iter<'a, I>(I, usize, vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray where I: core::iter::traits::iterator::Iterator> +pub fn vortex_fsst::fsst_compress_iter<'a, I>(I, usize, usize, vortex_array::dtype::DType, &fsst::Compressor, &mut vortex_array::executor::ExecutionCtx) -> vortex_fsst::FSSTArray where I: core::iter::traits::iterator::Iterator> pub fn vortex_fsst::fsst_train_compressor>(&A) -> fsst::Compressor diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index 93c62eeebf5..1a0d92146ed 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -808,6 +808,7 @@ mod test { let fsst_array = fsst_compress_iter( [Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(), 2, + 16, DType::Utf8(Nullability::NonNullable), &compressor, &mut ctx, diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index a8b8171b043..a33fe66333e 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -155,8 +155,15 @@ mod tests { let (array, data) = make_data(); let compressor = fsst_train_compressor(&array); ( - fsst_compress(&array, array.len(), array.dtype(), &compressor, &mut ctx) - .into_array(), + fsst_compress( + &array, + array.len(), + array.bytes().len(), + array.dtype(), + &compressor, + &mut ctx, + ) + .into_array(), data, ) }) @@ -213,6 +220,7 @@ mod tests { let fsst_array = fsst_compress( &varbin, varbin.len(), + varbin.bytes().len(), varbin.dtype(), &fsst_train_compressor(&varbin), &mut ctx, diff --git a/encodings/fsst/src/compress.rs b/encodings/fsst/src/compress.rs index 872dcbc494d..0efc4d10e37 100644 --- a/encodings/fsst/src/compress.rs +++ b/encodings/fsst/src/compress.rs @@ -10,6 +10,7 @@ use vortex_array::IntoArray; use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::varbin::builder::VarBinBuilder; use vortex_array::dtype::DType; +use vortex_array::dtype::IntegerPType; use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_error::VortexExpect; @@ -20,11 +21,25 @@ use crate::FSSTArray; pub fn fsst_compress>( strings: A, len: usize, + total_uncompressed: usize, dtype: &DType, compressor: &Compressor, ctx: &mut ExecutionCtx, ) -> FSSTArray { - strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype.clone(), compressor, ctx)) + // Pick the narrowest sufficient codes-offsets type. The FSST contract + // bounds the compressed size at `2 * uncompressed + 7` per string, so + // if the upper bound fits in `i32::MAX` the actual offsets are + // guaranteed to fit; otherwise we widen to `i64` to avoid the overflow + // tracked in #7833. + if upper_bound_fits_i32(total_uncompressed, len) { + strings.with_iterator(|iter| { + fsst_compress_iter_with::(iter, len, dtype.clone(), compressor, ctx) + }) + } else { + strings.with_iterator(|iter| { + fsst_compress_iter_with::(iter, len, dtype.clone(), compressor, ctx) + }) + } } /// Train a compressor from an array. @@ -57,8 +72,41 @@ where /// the buffer to hold enough capacity for the worst-case compressed value. const DEFAULT_BUFFER_LEN: usize = 1024 * 1024; +/// Whether the FSST worst-case compressed size for `len` strings totalling +/// `total_uncompressed` bytes fits in an `i32` offset. +fn upper_bound_fits_i32(total_uncompressed: usize, len: usize) -> bool { + // 2 * total + 7 * n — computed in u64 so the arithmetic itself can't overflow. + let max_compressed = 2_u64 + .saturating_mul(total_uncompressed as u64) + .saturating_add(7_u64.saturating_mul(len as u64)); + max_compressed <= i32::MAX as u64 +} + /// Compress from an iterator of bytestrings using FSST. +/// +/// `total_uncompressed` is the total byte length of all strings in the input; +/// callers typically have it cheaply available (e.g. `VarBinArray::bytes().len()`). +/// It selects the narrowest codes-offsets type that the FSST upper bound +/// (`2 * total_uncompressed + 7 * len`) is guaranteed to fit into. pub fn fsst_compress_iter<'a, I>( + iter: I, + len: usize, + total_uncompressed: usize, + dtype: DType, + compressor: &Compressor, + ctx: &mut ExecutionCtx, +) -> FSSTArray +where + I: Iterator>, +{ + if upper_bound_fits_i32(total_uncompressed, len) { + fsst_compress_iter_with::(iter, len, dtype, compressor, ctx) + } else { + fsst_compress_iter_with::(iter, len, dtype, compressor, ctx) + } +} + +fn fsst_compress_iter_with<'a, O, I>( iter: I, len: usize, dtype: DType, @@ -66,10 +114,11 @@ pub fn fsst_compress_iter<'a, I>( ctx: &mut ExecutionCtx, ) -> FSSTArray where + O: IntegerPType, I: Iterator>, { let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN); - let mut builder = VarBinBuilder::::with_capacity(len); + let mut builder = VarBinBuilder::::with_capacity(len); let mut uncompressed_lengths: BufferMut = BufferMut::with_capacity(len); for string in iter { match string { @@ -121,12 +170,19 @@ mod tests { use fsst::CompressorBuilder; use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; + use vortex_array::arrays::VarBinArray; + use vortex_array::arrays::varbin::VarBinArrayExt; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; use vortex_array::scalar::Scalar; + use crate::FSSTArrayExt; use crate::compress::DEFAULT_BUFFER_LEN; + use crate::compress::upper_bound_fits_i32; + use crate::fsst_compress; use crate::fsst_compress_iter; + use crate::fsst_train_compressor; #[test] fn test_large_string() { @@ -142,6 +198,7 @@ mod tests { let compressed = fsst_compress_iter( [Some(big_string.as_bytes())].into_iter(), 1, + big_string.len(), DType::Utf8(Nullability::NonNullable), &compressor, &mut ctx, @@ -153,4 +210,64 @@ mod tests { assert_eq!(decoded, expected); } + + #[test] + fn upper_bound_fits_i32_handles_zero() { + assert!(upper_bound_fits_i32(0, 0)); + } + + #[test] + fn upper_bound_fits_i32_handles_small_inputs() { + assert!(upper_bound_fits_i32(1024, 100)); + assert!(upper_bound_fits_i32(1 << 20, 1024)); + } + + #[test] + fn upper_bound_fits_i32_at_boundary() { + // 2 * total + 7 * n == i32::MAX exactly + let n = 1; + let total = (i32::MAX as usize - 7) / 2; + assert!(upper_bound_fits_i32(total, n)); + // One more byte tips us over + assert!(!upper_bound_fits_i32(total + 1, n)); + } + + #[test] + fn upper_bound_fits_i32_rejects_huge() { + assert!(!upper_bound_fits_i32(usize::MAX / 4, 1000)); + } + + /// Regression for #7833: small inputs keep i32 codes offsets so the FSST + /// output retains its compact layout. The matching i64 path is exercised + /// only for inputs whose worst-case compressed size exceeds `i32::MAX`, + /// which is too expensive to test directly; the boundary unit tests above + /// cover the dispatch. + #[test] + fn fsst_compress_keeps_i32_offsets_for_small_inputs() { + let array = VarBinArray::from_iter( + [ + Some("The Greeks never said that the limit could not be overstepped"), + Some("They said it existed and that whoever dared to exceed it was struck down"), + Some("Nothing in present history can contradict them"), + ], + DType::Utf8(Nullability::NonNullable), + ); + let compressor = fsst_train_compressor(&array); + let len = array.len(); + let dtype = array.dtype().clone(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + + let total_uncompressed = array.bytes().len(); + let fsst = fsst_compress( + &array, + len, + total_uncompressed, + &dtype, + &compressor, + &mut ctx, + ); + + let codes_offsets_ptype = PType::try_from(fsst.codes().offsets().dtype()).unwrap(); + assert_eq!(codes_offsets_ptype, PType::I32); + } } diff --git a/encodings/fsst/src/compute/cast.rs b/encodings/fsst/src/compute/cast.rs index a1c96363ba0..a5f748f6301 100644 --- a/encodings/fsst/src/compute/cast.rs +++ b/encodings/fsst/src/compute/cast.rs @@ -118,8 +118,16 @@ mod tests { let compressor = fsst_train_compressor(&strings); let len = strings.len(); + let total_uncompressed = strings.bytes().len(); let dtype = strings.dtype().clone(); - let fsst = fsst_compress(strings, len, &dtype, &compressor, &mut ctx); + let fsst = fsst_compress( + strings, + len, + total_uncompressed, + &dtype, + &compressor, + &mut ctx, + ); // Cast to nullable let casted = fsst @@ -145,7 +153,14 @@ mod tests { fn test_cast_fsst_conformance(#[case] array: VarBinArray) { let mut ctx = SESSION.create_execution_ctx(); let compressor = fsst_train_compressor(&array); - let fsst = fsst_compress(&array, array.len(), array.dtype(), &compressor, &mut ctx); + let fsst = fsst_compress( + &array, + array.len(), + array.bytes().len(), + array.dtype(), + &compressor, + &mut ctx, + ); test_cast_conformance(&fsst.into_array()); } } diff --git a/encodings/fsst/src/compute/compare.rs b/encodings/fsst/src/compute/compare.rs index 553a3608e75..e7d103d8773 100644 --- a/encodings/fsst/src/compute/compare.rs +++ b/encodings/fsst/src/compute/compare.rs @@ -154,8 +154,9 @@ mod tests { ); let compressor = fsst_train_compressor(&lhs); let len = lhs.len(); + let total_uncompressed = lhs.bytes().len(); let dtype = lhs.dtype().clone(); - let lhs = fsst_compress(lhs, len, &dtype, &compressor, &mut ctx); + let lhs = fsst_compress(lhs, len, total_uncompressed, &dtype, &compressor, &mut ctx); let rhs = ConstantArray::new("world", lhs.len()); diff --git a/encodings/fsst/src/compute/like.rs b/encodings/fsst/src/compute/like.rs index a6b8c40d4c2..48a0bead3f3 100644 --- a/encodings/fsst/src/compute/like.rs +++ b/encodings/fsst/src/compute/like.rs @@ -112,10 +112,12 @@ mod tests { let varbin = VarBinArray::from_iter(strings.iter().copied(), DType::Utf8(nullability)); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); fsst_compress( varbin, len, + total_uncompressed, &dtype, &compressor, &mut SESSION.create_execution_ctx(), diff --git a/encodings/fsst/src/compute/mod.rs b/encodings/fsst/src/compute/mod.rs index 02efdf7febc..73f0f19df70 100644 --- a/encodings/fsst/src/compute/mod.rs +++ b/encodings/fsst/src/compute/mod.rs @@ -79,7 +79,14 @@ mod tests { let mut ctx = LEGACY_SESSION.create_execution_ctx(); let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable)); let compr = fsst_train_compressor(&arr); - let fsst = fsst_compress(&arr, arr.len(), arr.dtype(), &compr, &mut ctx); + let fsst = fsst_compress( + &arr, + arr.len(), + arr.bytes().len(), + arr.dtype(), + &compr, + &mut ctx, + ); let idx1: PrimitiveArray = (0..1).collect(); @@ -112,7 +119,14 @@ mod tests { fn test_take_fsst_conformance(#[case] varbin: VarBinArray) { let mut ctx = LEGACY_SESSION.create_execution_ctx(); let compressor = fsst_train_compressor(&varbin); - let array = fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, &mut ctx); + let array = fsst_compress( + &varbin, + varbin.len(), + varbin.bytes().len(), + varbin.dtype(), + &compressor, + &mut ctx, + ); test_take_conformance(&array.into_array()); } @@ -126,7 +140,7 @@ mod tests { DType::Utf8(Nullability::NonNullable), ); let compressor = fsst_train_compressor(&varbin); - fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx) + fsst_compress(&varbin, varbin.len(), varbin.bytes().len(), varbin.dtype(), &compressor, ctx) })] // Nullable strings #[case::fsst_nullable(|ctx: &mut ExecutionCtx| { @@ -136,8 +150,9 @@ mod tests { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) })] // Repetitive patterns (good for FSST compression) #[case::fsst_repetitive(|ctx: &mut ExecutionCtx| { @@ -146,7 +161,7 @@ mod tests { DType::Utf8(Nullability::NonNullable), ); let compressor = fsst_train_compressor(&varbin); - fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx) + fsst_compress(&varbin, varbin.len(), varbin.bytes().len(), varbin.dtype(), &compressor, ctx) })] // Edge cases #[case::fsst_single(|ctx: &mut ExecutionCtx| { @@ -155,7 +170,7 @@ mod tests { DType::Utf8(Nullability::NonNullable), ); let compressor = fsst_train_compressor(&varbin); - fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx) + fsst_compress(&varbin, varbin.len(), varbin.bytes().len(), varbin.dtype(), &compressor, ctx) })] #[case::fsst_empty_strings(|ctx: &mut ExecutionCtx| { let varbin = VarBinArray::from_iter( @@ -164,8 +179,9 @@ mod tests { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) })] // Large arrays #[case::fsst_large(|ctx: &mut ExecutionCtx| { @@ -186,8 +202,9 @@ mod tests { let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable)); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) })] fn test_fsst_consistency(#[case] build: FsstBuilder) { diff --git a/encodings/fsst/src/dfa/tests.rs b/encodings/fsst/src/dfa/tests.rs index 6ad30ca685d..a1e684edb87 100644 --- a/encodings/fsst/src/dfa/tests.rs +++ b/encodings/fsst/src/dfa/tests.rs @@ -228,10 +228,12 @@ fn make_fsst_str(strings: &[Option<&str>]) -> FSSTArray { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); fsst_compress( varbin, len, + total_uncompressed, &dtype, &compressor, &mut SESSION.create_execution_ctx(), diff --git a/encodings/fsst/src/kernel.rs b/encodings/fsst/src/kernel.rs index 7f455a06e16..e473c18516f 100644 --- a/encodings/fsst/src/kernel.rs +++ b/encodings/fsst/src/kernel.rs @@ -59,9 +59,18 @@ mod tests { let compressor = fsst_train_compressor(&input); let len = input.len(); + let total_uncompressed = input.bytes().len(); let dtype = input.dtype().clone(); let mut ctx = SESSION.create_execution_ctx(); - fsst_compress(input, len, &dtype, &compressor, &mut ctx).into_array() + fsst_compress( + input, + len, + total_uncompressed, + &dtype, + &compressor, + &mut ctx, + ) + .into_array() } #[test] @@ -138,6 +147,7 @@ mod tests { let fsst_array: ArrayRef = fsst_compress( input.clone(), input.len(), + input.bytes().len(), input.dtype(), &compressor, &mut ctx, @@ -173,6 +183,7 @@ mod tests { let fsst_array: ArrayRef = fsst_compress( input.clone(), input.len(), + input.bytes().len(), input.dtype(), &compressor, &mut ctx, diff --git a/encodings/fsst/src/test_utils.rs b/encodings/fsst/src/test_utils.rs index eeca412e377..88fae18656c 100644 --- a/encodings/fsst/src/test_utils.rs +++ b/encodings/fsst/src/test_utils.rs @@ -50,8 +50,9 @@ pub fn gen_fsst_test_data( let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx).into_array() + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx).into_array() } pub fn gen_dict_fsst_test_data( @@ -147,8 +148,9 @@ pub fn make_fsst_urls(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray { let varbin = generate_url_data_n(n); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } // --------------------------------------------------------------------------- @@ -243,8 +245,9 @@ pub fn make_fsst_clickbench_urls(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } // --------------------------------------------------------------------------- @@ -311,8 +314,9 @@ pub fn make_fsst_short_urls(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } // --------------------------------------------------------------------------- @@ -383,8 +387,9 @@ pub fn make_fsst_log_lines(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } // --------------------------------------------------------------------------- @@ -442,8 +447,9 @@ pub fn make_fsst_json_strings(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } // --------------------------------------------------------------------------- @@ -514,8 +520,9 @@ pub fn make_fsst_file_paths(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } // --------------------------------------------------------------------------- @@ -567,8 +574,9 @@ pub fn make_fsst_emails(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } // --------------------------------------------------------------------------- @@ -606,6 +614,7 @@ pub fn make_fsst_rare_match(n: usize, ctx: &mut ExecutionCtx) -> FSSTArray { ); let compressor = fsst_train_compressor(&varbin); let len = varbin.len(); + let total_uncompressed = varbin.bytes().len(); let dtype = varbin.dtype().clone(); - fsst_compress(varbin, len, &dtype, &compressor, ctx) + fsst_compress(varbin, len, total_uncompressed, &dtype, &compressor, ctx) } diff --git a/encodings/fsst/src/tests.rs b/encodings/fsst/src/tests.rs index fb4cb8bdc5c..2505eda7edb 100644 --- a/encodings/fsst/src/tests.rs +++ b/encodings/fsst/src/tests.rs @@ -30,9 +30,18 @@ pub(crate) fn build_fsst_array() -> ArrayRef { let compressor = fsst_train_compressor(&input_array); let len = input_array.len(); + let total_uncompressed = input_array.bytes().len(); let dtype = input_array.dtype().clone(); let mut ctx = LEGACY_SESSION.create_execution_ctx(); - fsst_compress(input_array, len, &dtype, &compressor, &mut ctx).into_array() + fsst_compress( + input_array, + len, + total_uncompressed, + &dtype, + &compressor, + &mut ctx, + ) + .into_array() } #[test] diff --git a/fuzz/src/fsst_like.rs b/fuzz/src/fsst_like.rs index 5ca10af310a..dceb90ac800 100644 --- a/fuzz/src/fsst_like.rs +++ b/fuzz/src/fsst_like.rs @@ -117,6 +117,7 @@ pub fn run_fsst_like_fuzz(fuzz: FuzzFsstLike) -> VortexFuzzResult { let fsst_array: FSSTArray = fsst_compress( varbin.clone(), varbin.len(), + varbin.bytes().len(), varbin.dtype(), &compressor, &mut ctx, diff --git a/vortex-array/src/arrays/varbin/compute/compare.rs b/vortex-array/src/arrays/varbin/compute/compare.rs index 511f930d3d0..f07a1ec0380 100644 --- a/vortex-array/src/arrays/varbin/compute/compare.rs +++ b/vortex-array/src/arrays/varbin/compute/compare.rs @@ -2,8 +2,11 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use arrow_array::BinaryArray; +use arrow_array::LargeBinaryArray; +use arrow_array::LargeStringArray; use arrow_array::StringArray; use arrow_ord::cmp; +use arrow_schema::DataType; use vortex_buffer::BitBuffer; use vortex_error::VortexExpect as _; use vortex_error::VortexResult; @@ -82,15 +85,27 @@ impl CompareKernel for VarBin { let lhs = Datum::try_new(lhs.array(), ctx)?; - // Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types - // produced by Datum::try_new (which uses execute_arrow(None, ctx)) - let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() { - DType::Utf8(_) => &rhs_const + // The RHS scalar must match the LHS Arrow data type. VarBin with i64 + // offsets is converted to LargeBinary/LargeUtf8 (see + // `preferred_arrow_type`), and Arrow refuses to compare LargeBinary + // with Binary (or LargeUtf8 with Utf8). + let arrow_rhs: &dyn arrow_array::Datum = match (rhs_const.dtype(), lhs.data_type()) { + (DType::Utf8(_), DataType::LargeUtf8) => &rhs_const + .as_utf8() + .value() + .map(LargeStringArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(LargeStringArray::new_null(1))), + (DType::Utf8(_), _) => &rhs_const .as_utf8() .value() .map(StringArray::new_scalar) .unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))), - DType::Binary(_) => &rhs_const + (DType::Binary(_), DataType::LargeBinary) => &rhs_const + .as_binary() + .value() + .map(LargeBinaryArray::new_scalar) + .unwrap_or_else(|| arrow_array::Scalar::new(LargeBinaryArray::new_null(1))), + (DType::Binary(_), _) => &rhs_const .as_binary() .value() .map(BinaryArray::new_scalar) diff --git a/vortex-btrblocks/src/schemes/string.rs b/vortex-btrblocks/src/schemes/string.rs index ade42f88668..b4f3d79a623 100644 --- a/vortex-btrblocks/src/schemes/string.rs +++ b/vortex-btrblocks/src/schemes/string.rs @@ -90,7 +90,15 @@ impl Scheme for FSSTScheme { ) -> VortexResult { let utf8 = data.array_as_utf8().into_owned(); let compressor_fsst = fsst_train_compressor(&utf8); - let fsst = fsst_compress(&utf8, utf8.len(), utf8.dtype(), &compressor_fsst, exec_ctx); + let total_uncompressed = utf8.views().iter().map(|v| v.len() as usize).sum::(); + let fsst = fsst_compress( + &utf8, + utf8.len(), + total_uncompressed, + utf8.dtype(), + &compressor_fsst, + exec_ctx, + ); let uncompressed_lengths_primitive = fsst .uncompressed_lengths() diff --git a/vortex-cuda/src/kernel/encodings/fsst.rs b/vortex-cuda/src/kernel/encodings/fsst.rs index 5d3d66eaf04..d70addf384e 100644 --- a/vortex-cuda/src/kernel/encodings/fsst.rs +++ b/vortex-cuda/src/kernel/encodings/fsst.rs @@ -244,8 +244,16 @@ mod tests { let compressor = fsst_train_compressor(&varbin); let dtype = varbin.dtype().clone(); let len = varbin.len(); - let fsst_array = - fsst_compress(&varbin, len, &dtype, &compressor, cuda_ctx.execution_ctx()).into_array(); + let total_uncompressed = varbin.bytes().len(); + let fsst_array = fsst_compress( + &varbin, + len, + total_uncompressed, + &dtype, + &compressor, + cuda_ctx.execution_ctx(), + ) + .into_array(); let cpu_result = crate::canonicalize_cpu(fsst_array.clone())?; let gpu_result = FSSTExecutor diff --git a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/fsst.rs b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/fsst.rs index 17314629737..e511ac491f2 100644 --- a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/fsst.rs +++ b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/fsst.rs @@ -127,6 +127,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &url_col, url_col.len(), + url_col.bytes().len(), url_col.dtype(), &url_comp, &mut ctx, @@ -135,6 +136,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &log_col, log_col.len(), + log_col.bytes().len(), log_col.dtype(), &log_comp, &mut ctx, @@ -143,6 +145,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &nullable_col, nullable_col.len(), + nullable_col.bytes().len(), nullable_col.dtype(), &nullable_comp, &mut ctx, @@ -151,6 +154,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &short_col, short_col.len(), + short_col.bytes().len(), short_col.dtype(), &short_comp, &mut ctx, @@ -159,6 +163,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &empty_and_unicode_col, empty_and_unicode_col.len(), + empty_and_unicode_col.bytes().len(), empty_and_unicode_col.dtype(), &empty_and_unicode_comp, &mut ctx, @@ -167,6 +172,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &suffix_shared_col, suffix_shared_col.len(), + suffix_shared_col.bytes().len(), suffix_shared_col.dtype(), &suffix_shared_comp, &mut ctx, @@ -175,6 +181,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &high_entropy_col, high_entropy_col.len(), + high_entropy_col.bytes().len(), high_entropy_col.dtype(), &high_entropy_comp, &mut ctx, @@ -183,6 +190,7 @@ impl FlatLayoutFixture for FsstFixture { fsst_compress( &all_null_clustered, all_null_clustered.len(), + all_null_clustered.bytes().len(), all_null_clustered.dtype(), &all_null_clustered_comp, &mut ctx, diff --git a/vortex/benches/common_encoding_tree_throughput.rs b/vortex/benches/common_encoding_tree_throughput.rs index 552f66c321f..e6714471e83 100644 --- a/vortex/benches/common_encoding_tree_throughput.rs +++ b/vortex/benches/common_encoding_tree_throughput.rs @@ -237,9 +237,15 @@ mod setup { let mut ctx = LEGACY_SESSION.create_execution_ctx(); let unique_varbinview = VarBinViewArray::from_iter_str(unique_strings); let fsst_compressor = fsst_train_compressor(&unique_varbinview); + let total_uncompressed = unique_varbinview + .views() + .iter() + .map(|v| v.len() as usize) + .sum::(); let fsst_values = fsst_compress( &unique_varbinview, unique_varbinview.len(), + total_uncompressed, unique_varbinview.dtype(), &fsst_compressor, &mut ctx, @@ -276,9 +282,15 @@ mod setup { let mut ctx = LEGACY_SESSION.create_execution_ctx(); let unique_varbinview = VarBinViewArray::from_iter_str(unique_strings); let fsst_compressor = fsst_train_compressor(&unique_varbinview); + let total_uncompressed = unique_varbinview + .views() + .iter() + .map(|v| v.len() as usize) + .sum::(); let fsst = fsst_compress( &unique_varbinview, unique_varbinview.len(), + total_uncompressed, unique_varbinview.dtype(), &fsst_compressor, &mut ctx, diff --git a/vortex/benches/single_encoding_throughput.rs b/vortex/benches/single_encoding_throughput.rs index be253187956..eff5c49c594 100644 --- a/vortex/benches/single_encoding_throughput.rs +++ b/vortex/benches/single_encoding_throughput.rs @@ -417,10 +417,24 @@ fn bench_fsst_compress_string(bencher: Bencher) { VarBinViewArray::from_iter_str(gen_varbin_words(NUM_VALUES as usize, 0.00005)); let fsst_compressor = fsst_train_compressor(&varbinview_arr); let nbytes = varbinview_arr.nbytes() as u64; + let total_uncompressed = varbinview_arr + .views() + .iter() + .map(|v| v.len() as usize) + .sum::(); with_byte_counter(bencher, nbytes) .with_inputs(|| (&varbinview_arr, LEGACY_SESSION.create_execution_ctx())) - .bench_refs(|(a, ctx)| fsst_compress(*a, a.len(), a.dtype(), &fsst_compressor, ctx)); + .bench_refs(|(a, ctx)| { + fsst_compress( + *a, + a.len(), + total_uncompressed, + a.dtype(), + &fsst_compressor, + ctx, + ) + }); } #[divan::bench(name = "fsst_decompress_string")] @@ -428,9 +442,15 @@ fn bench_fsst_decompress_string(bencher: Bencher) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(NUM_VALUES as usize, 0.00005)); let fsst_compressor = fsst_train_compressor(&varbinview_arr); + let total_uncompressed = varbinview_arr + .views() + .iter() + .map(|v| v.len() as usize) + .sum::(); let fsst_array = fsst_compress( &varbinview_arr, varbinview_arr.len(), + total_uncompressed, varbinview_arr.dtype(), &fsst_compressor, &mut LEGACY_SESSION.create_execution_ctx(),