Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions encodings/alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,16 @@ fn compress_rd<T: ALPRDFloat + NativePType>(bencher: Bencher, args: (usize, f64)
let encoder = RDEncoder::new(primitive.as_slice::<T>());

bencher
.with_inputs(|| (&primitive, &encoder, SESSION.create_execution_ctx()))
.bench_refs(|(primitive, encoder, ctx)| encoder.encode(primitive.as_view(), ctx))
.with_inputs(|| (&primitive, &encoder))
.bench_refs(|(primitive, encoder)| encoder.encode(primitive.as_view()))
}

#[divan::bench(types = [f32, f64], args = RD_BENCH_ARGS)]
fn decompress_rd<T: ALPRDFloat + NativePType>(bencher: Bencher, args: (usize, f64)) {
let (n, fraction_patch) = args;
let primitive = make_rd_array::<T>(n, fraction_patch);
let encoder = RDEncoder::new(primitive.as_slice::<T>());
let encoded = encoder.encode(primitive.as_view(), &mut SESSION.create_execution_ctx());
let encoded = encoder.encode(primitive.as_view());

bencher
.with_inputs(|| (&encoded, SESSION.create_execution_ctx()))
Expand Down
9 changes: 7 additions & 2 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,15 @@ impl ALPData {

let expected_type = DType::Primitive(T::PTYPE, encoded.dtype().nullability());
vortex_ensure!(
patches.dtype() == &expected_type,
"Expected patches type {expected_type}, actual {}",
patches.dtype().eq_ignore_nullability(&expected_type),
"Expected patches type {expected_type} (ignoring nullability), actual {}",
patches.dtype(),
);
vortex_ensure!(
patches.values().validity()?.definitely_no_nulls(),
"ALP patch values must contain no nulls, got {}",
patches.values(),
);

Ok(())
}
Expand Down
21 changes: 13 additions & 8 deletions encodings/alp/src/alp/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@ impl MaskKernel for ALP {
) -> VortexResult<Option<ArrayRef>> {
let vortex_mask = Validity::Array(mask.not()?).execute_mask(array.len(), ctx)?;
let masked_encoded = array.encoded().clone().mask(mask.clone())?;
let masked_dtype = array
.dtype()
.with_nullability(masked_encoded.dtype().nullability());
// Patch values are concrete floats with no nulls; `Patches::mask` filters them and
// preserves that, so no cast is needed.
let masked_patches = array
.patches()
.map(|p| p.mask(&vortex_mask, ctx))
.transpose()?
.flatten()
.map(|patches| patches.cast_values(&masked_dtype))
.transpose()?;
.flatten();
Ok(Some(
ALP::new(masked_encoded, array.exponents(), masked_patches).into_array(),
))
Expand Down Expand Up @@ -99,7 +96,7 @@ mod test {
}

#[test]
fn test_mask_alp_with_patches_casts_surviving_patch_values_to_nullable() {
fn test_mask_alp_with_patches_keeps_patch_values_without_nulls() {
let mut ctx = array_session().create_execution_ctx();
let values = PrimitiveArray::from_iter([1.234f32, f32::NAN, 2.345, f32::INFINITY, 3.456]);
let alp = alp_encode(values.as_view(), None, &mut ctx).unwrap();
Expand All @@ -113,7 +110,15 @@ mod test {
let masked_alp = masked.as_opt::<crate::ALP>().unwrap();
let masked_patches = masked_alp.patches().unwrap();

// Masking introduces nulls into the array, but the surviving patch values stay concrete
// (no cast), so they remain free of nulls.
assert_eq!(masked.dtype().nullability(), Nullability::Nullable);
assert_eq!(masked_patches.dtype().nullability(), Nullability::Nullable);
assert!(
masked_patches
.values()
.validity()
.unwrap()
.definitely_no_nulls()
);
}
}
13 changes: 4 additions & 9 deletions encodings/alp/src/alp/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@ impl TakeExecute for ALP {
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let taken_encoded = array.encoded().take(indices.clone())?;
// Patch values are concrete floats with no nulls; `Patches::take` preserves that, so no
// cast is needed — `ALP::validate_patches` checks the dtype (ignoring nullability) and
// `definitely_no_nulls`.
let taken_patches = array
.patches()
.map(|p| p.take(indices, ctx))
.transpose()?
.flatten()
.map(|patches| {
patches.cast_values(
&array
.dtype()
.with_nullability(taken_encoded.dtype().nullability()),
)
})
.transpose()?;
.flatten();
Ok(Some(
ALP::new(taken_encoded, array.exponents(), taken_patches).into_array(),
))
Expand Down
64 changes: 28 additions & 36 deletions encodings/alp/src/alp_rd/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ use vortex_array::ArrayParts;
use vortex_array::ArrayRef;
use vortex_array::ArraySlots;
use vortex_array::ArrayView;
use vortex_array::Canonical;
use vortex_array::EqMode;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionResult;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::TypedArrayRef;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Primitive;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::buffer::BufferHandle;
Expand Down Expand Up @@ -209,13 +206,7 @@ impl VTable for ALPRD {
)
})
.transpose()?;
// NOTE: `VTable::deserialize` has a fixed trait signature without `ExecutionCtx`, so we
// cannot plumb a ctx in here. We construct a legacy ctx locally at this trait boundary.
let left_parts_patches = ALPRDData::canonicalize_patches(
&left_parts,
left_parts_patches,
&mut LEGACY_SESSION.create_execution_ctx(),
)?;
let left_parts_patches = ALPRDData::validate_patches(&left_parts, left_parts_patches)?;
let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
let data = ALPRDData::new(
left_parts_dictionary,
Expand Down Expand Up @@ -368,11 +359,9 @@ impl ALPRD {
right_parts: ArrayRef,
right_bit_width: u8,
left_parts_patches: Option<Patches>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ALPRDArray> {
let len = left_parts.len();
let left_parts_patches =
ALPRDData::canonicalize_patches(&left_parts, left_parts_patches, ctx)?;
let left_parts_patches = ALPRDData::validate_patches(&left_parts, left_parts_patches)?;
let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
let data = ALPRDData::new(left_parts_dictionary, right_bit_width, left_parts_patches);
Array::try_from_parts(ArrayParts::new(ALPRD, dtype, len, data).with_slots(slots))
Expand Down Expand Up @@ -400,26 +389,31 @@ impl ALPRD {
}

impl ALPRDData {
fn canonicalize_patches(
/// Validate that `left_parts_patches` are well-formed for an `ALPRDArray` without performing
/// any execution.
///
/// Patch values must have the same dtype as `left_parts` (ignoring nullability) and contain no
/// nulls. Both are checked statically — `definitely_no_nulls` requires no execution — so the
/// values never need to be cast. Callers are responsible for constructing patches with the
/// correct value dtype (the encoder, `deserialize`, and the compute kernels all already do so).
fn validate_patches(
left_parts: &ArrayRef,
left_parts_patches: Option<Patches>,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Patches>> {
left_parts_patches
.map(|patches| {
if !patches.values().all_valid(ctx)? {
vortex_bail!("patches must be all valid: {}", patches.values());
}
// TODO(ngates): assert the DType, don't cast it.
// TODO(joe): assert the DType, don't cast it in the next PR.
let mut patches = patches.cast_values(&left_parts.dtype().as_nonnullable())?;
// Force execution of the lazy cast so patch values are materialized
// before serialization.
let canonical = patches.values().clone().execute::<Canonical>(ctx)?;
*patches.values_mut() = canonical.into_array();
Ok(patches)
})
.transpose()
if let Some(patches) = &left_parts_patches {
vortex_ensure!(
patches.dtype().eq_ignore_nullability(left_parts.dtype()),
"ALPRD patch values dtype {} must match left-parts dtype {}",
patches.dtype(),
left_parts.dtype(),
);
vortex_ensure!(
patches.values().validity()?.definitely_no_nulls(),
"ALPRD patch values must contain no nulls, got {}",
patches.values(),
);
}
Ok(left_parts_patches)
}

/// Build a new `ALPRDArray` from components.
Expand Down Expand Up @@ -555,11 +549,9 @@ fn validate_parts(
left_parts.dtype(),
);
vortex_ensure!(
patches
.values()
.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?,
"patches must be all valid: {}",
patches.values()
patches.values().validity()?.definitely_no_nulls(),
"patches must contain no nulls: {}",
patches.values(),
);
}

Expand Down Expand Up @@ -664,7 +656,7 @@ mod test {
// Pick a seed that we know will trigger lots of patches.
let encoder: alp_rd::RDEncoder = alp_rd::RDEncoder::new(&[seed.powi(-2)]);

let rd_array = encoder.encode(real_array.as_view(), &mut ctx);
let rd_array = encoder.encode(real_array.as_view());

let decoded = rd_array
.as_array()
Expand Down
14 changes: 7 additions & 7 deletions encodings/alp/src/alp_rd/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod tests {
let values = vec![1.0f32, 1.1, 1.2, 1.3, 1.4];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
let alprd = encoder.encode(arr.as_view(), &mut ctx);
let alprd = encoder.encode(arr.as_view());

let casted = alprd
.into_array()
Expand All @@ -92,7 +92,7 @@ mod tests {
PrimitiveArray::from_option_iter([Some(10.0f64), None, Some(10.1), Some(10.2), None]);
let values = vec![10.0f64, 10.1, 10.2];
let encoder = RDEncoder::new(&values);
let alprd = encoder.encode(arr.as_view(), &mut ctx);
let alprd = encoder.encode(arr.as_view());

// Cast to NonNullable should fail since we have nulls. The failure surfaces during
// execution since the reduce path defers when the validity stat is not cached.
Expand Down Expand Up @@ -122,31 +122,31 @@ mod tests {
let values = vec![1.23f32, 4.56, 7.89, 10.11, 12.13];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::f64({
let values = vec![100.1f64, 200.2, 300.3, 400.4, 500.5];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::single({
let values = vec![42.42f64];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::negative({
let values = vec![0.0f32, -1.5, 2.5, -3.5, 4.5];
let arr = PrimitiveArray::from_iter(values.clone());
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
#[case::nullable({
let arr = PrimitiveArray::from_option_iter([Some(1.1f32), None, Some(2.2), Some(3.3), None]);
let values = vec![1.1f32, 2.2, 3.3];
let encoder = RDEncoder::new(&values);
encoder.encode(arr.as_view(), &mut array_session().create_execution_ctx())
encoder.encode(arr.as_view())
})]
fn test_cast_alprd_conformance(#[case] alprd: crate::alp_rd::ALPRDArray) {
test_cast_conformance(
Expand Down
9 changes: 2 additions & 7 deletions encodings/alp/src/alp_rd/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl FilterKernel for ALPRD {
array.right_parts().filter(mask.clone())?,
array.right_bit_width(),
left_parts_exceptions,
ctx,
)?
.into_array(),
))
Expand Down Expand Up @@ -71,7 +70,7 @@ mod test {
fn test_filter<T: ALPRDFloat>(#[case] a: T, #[case] b: T, #[case] outlier: T) {
let mut ctx = SESSION.create_execution_ctx();
let array = PrimitiveArray::new(buffer![a, b, outlier], Validity::NonNullable);
let encoded = RDEncoder::new(&[a, b]).encode(array.as_view(), &mut ctx);
let encoded = RDEncoder::new(&[a, b]).encode(array.as_view());

// Make sure that we're testing the exception pathway.
assert!(encoded.left_parts_patches().is_some());
Expand All @@ -90,10 +89,7 @@ mod test {
let mut ctx = SESSION.create_execution_ctx();
test_filter_conformance(
&RDEncoder::new(&[a, b])
.encode(
PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view(),
&mut ctx,
)
.encode(PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view())
.into_array(),
&mut ctx,
);
Expand All @@ -109,7 +105,6 @@ mod test {
.encode(
PrimitiveArray::from_option_iter([Some(a), None, Some(outlier), Some(a), None])
.as_view(),
&mut ctx,
)
.into_array(),
&mut ctx,
Expand Down
11 changes: 1 addition & 10 deletions encodings/alp/src/alp_rd/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::scalar_fn::ScalarFnFactoryExt;
use vortex_array::scalar_fn::EmptyOptions;
use vortex_array::scalar_fn::fns::mask::Mask as MaskExpr;
Expand All @@ -22,8 +20,6 @@ impl MaskReduce for ALPRD {
EmptyOptions,
[array.left_parts().clone(), mask.clone()],
)?;
// NOTE: `MaskReduce::mask` has a fixed trait signature without `ExecutionCtx`, so we
// construct a legacy ctx locally at this trait boundary.
Ok(Some(
ALPRD::try_new(
array.dtype().as_nullable(),
Expand All @@ -32,7 +28,6 @@ impl MaskReduce for ALPRD {
array.right_parts().clone(),
array.right_bit_width(),
array.left_parts_patches(),
&mut LEGACY_SESSION.create_execution_ctx(),
)?
.into_array(),
))
Expand All @@ -58,10 +53,7 @@ mod tests {
let mut ctx = array_session().create_execution_ctx();
test_mask_conformance(
&RDEncoder::new(&[a, b])
.encode(
PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view(),
&mut ctx,
)
.encode(PrimitiveArray::from_iter([a, b, outlier, b, outlier]).as_view())
.into_array(),
&mut ctx,
);
Expand All @@ -77,7 +69,6 @@ mod tests {
.encode(
PrimitiveArray::from_option_iter([Some(a), None, Some(outlier), Some(a), None])
.as_view(),
&mut ctx,
)
.into_array(),
&mut ctx,
Expand Down
Loading