Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions encodings/parquet-variant/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use parquet_variant_compute::GetOptions;
use parquet_variant_compute::VariantArray as ArrowVariantArray;
use parquet_variant_compute::unshred_variant;
use vortex_array::ArrayRef;
use vortex_array::CanonicalView;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::OrCanonical;
use vortex_array::OrCanonicalMatch;
use vortex_array::VTable;
use vortex_array::arrays::Variant;
use vortex_array::arrays::variant::VariantArrayExt;
use vortex_array::arrow::ArrowExport;
use vortex_array::arrow::ArrowExportVTable;
Expand All @@ -30,7 +32,6 @@ use vortex_array::arrow::to_arrow_null_buffer;
use vortex_array::dtype::DType;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_err;
use vortex_session::registry::CachedId;
use vortex_session::registry::Id;

Expand Down Expand Up @@ -117,24 +118,34 @@ pub(crate) fn export_unshredded_storage_to_target<T: ParquetVariantArrayExt>(
}

pub(crate) fn parquet_variant_for_export(
array: ArrayRef,
mut array: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let executed = array.execute_until::<ParquetVariant>(ctx)?;
if executed.is::<ParquetVariant>() {
return Ok(executed);
}
// Execution may converge to either `ParquetVariant` storage or the canonical `Variant` form;
// `OrCanonical` surfaces both rather than erroring on the canonical fallback.
let variant = match array.execute_until::<OrCanonical<ParquetVariant>>(ctx)? {
OrCanonicalMatch::Matched(parquet) => return Ok(parquet.into_owned().into_array()),
OrCanonicalMatch::Canonical(CanonicalView::Variant(variant)) => variant,
OrCanonicalMatch::Canonical(_) => {
vortex_bail!("cannot export Variant without ParquetVariant storage")
}
};

let mut core_storage = variant.core_storage().clone();
// Materialize the ParquetVariant components from the executed core storage now, before the
// `shredded` branch, so the early-return path can still hand back the owned core array.
let (core_validity, core_metadata, core_value) =
match core_storage.execute_until::<OrCanonical<ParquetVariant>>(ctx)? {
OrCanonicalMatch::Matched(parquet_core) => (
ParquetVariantArrayExt::validity(&parquet_core),
parquet_core.metadata_array().clone(),
parquet_core.value_array().cloned(),
),
OrCanonicalMatch::Canonical(_) => {
vortex_bail!("cannot export Variant without ParquetVariant core storage")
}
};

let variant = executed
.as_opt::<Variant>()
.ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant storage"))?;
let core_storage = variant
.core_storage()
.clone()
.execute_until::<ParquetVariant>(ctx)?;
let parquet_core = core_storage
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant core storage"))?;
let Some(shredded) = variant.shredded() else {
return Ok(core_storage);
};
Expand All @@ -144,13 +155,8 @@ pub(crate) fn parquet_variant_for_export(
// `to_arrow` and `unshred_variant` can consume.
let typed_value = parquet_typed_value_from_logical_shredded(shredded.clone(), ctx)?;

ParquetVariant::try_new(
ParquetVariantArrayExt::validity(&parquet_core),
parquet_core.metadata_array().clone(),
parquet_core.value_array().cloned(),
Some(typed_value),
)
.map(IntoArray::into_array)
ParquetVariant::try_new(core_validity, core_metadata, core_value, Some(typed_value))
.map(IntoArray::into_array)
}

impl ArrowExportVTable for ParquetVariant {
Expand Down
11 changes: 8 additions & 3 deletions encodings/runend/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,14 @@ mod tests {

// Keep every other row = 112/2 = 56 rows.
let mask = Mask::from_iter((0..sliced.len()).map(|i| i % 2 == 0));
let filtered = sliced.filter(mask)?;

let executed = filtered.execute_until::<RunEnd>(&mut ctx)?;
let mut filtered = sliced.filter(mask)?;

// `execute_until::<RunEnd>` now errors unless execution converges to RunEnd, so reaching
// a match already proves the encoding was preserved.
let executed = filtered
.execute_until::<RunEnd>(&mut ctx)?
.into_owned()
.into_array();
assert_eq!(
executed.encoding_id().as_ref(),
"vortex.runend",
Expand Down
44 changes: 38 additions & 6 deletions vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! Encodings that enable zero-copy sharing of data with Arrow.

use std::marker::PhantomData;
use std::sync::Arc;

use vortex_buffer::BitBuffer;
Expand Down Expand Up @@ -543,12 +544,8 @@ impl From<Canonical> for ArrayRef {
/// canonical form. Callers should prefer to execute into `Columnar` if they are able to optimize
/// their use for constant arrays.
impl Executable for Canonical {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
let result = array.execute_until::<AnyCanonical>(ctx)?;
Ok(result
.as_opt::<AnyCanonical>()
.map(Canonical::from)
.vortex_expect("execute_until::<AnyCanonical> must return a canonical array"))
fn execute(mut array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
Ok(Canonical::from(array.execute_until::<AnyCanonical>(ctx)?))
}
}

Expand Down Expand Up @@ -1114,6 +1111,41 @@ impl Matcher for AnyCanonical {
}
}

/// A matcher that matches `M`, falling back to any canonical array.
///
/// [`ArrayRef::execute_until`] errors if execution converges to a canonical form that the
/// requested matcher does not match. Wrapping the matcher in `OrCanonical` instead surfaces that
/// canonical result as [`OrCanonicalMatch::Canonical`], letting the caller decide how to handle
/// it rather than erroring.
pub struct OrCanonical<M>(PhantomData<M>);

/// The match produced by [`OrCanonical`]: either the inner matcher matched, or execution
/// converged to a canonical form the inner matcher did not match.
pub enum OrCanonicalMatch<'a, M: Matcher> {
/// The inner matcher `M` matched.
Matched(M::Match<'a>),
/// `M` did not match, but the array reached canonical form.
Canonical(CanonicalView<'a>),
}

impl<M: Matcher> Matcher for OrCanonical<M> {
type Match<'a> = OrCanonicalMatch<'a, M>;

#[inline]
fn matches(array: &ArrayRef) -> bool {
M::matches(array) || AnyCanonical::matches(array)
}

#[inline]
fn try_match(array: &ArrayRef) -> Option<Self::Match<'_>> {
if let Some(matched) = M::try_match(array) {
Some(OrCanonicalMatch::Matched(matched))
} else {
AnyCanonical::try_match(array).map(OrCanonicalMatch::Canonical)
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
18 changes: 6 additions & 12 deletions vortex-array/src/columnar.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_error::VortexExpect;
use vortex_error::VortexResult;

use crate::AnyCanonical;
Expand Down Expand Up @@ -69,17 +68,12 @@ impl IntoArray for Columnar {

/// Execute into [`Columnar`] by running `execute_until` with the [`AnyColumnar`] matcher.
impl Executable for Columnar {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
let result = array.execute_until::<AnyColumnar>(ctx)?;
if let Some(constant) = result.as_opt::<Constant>() {
Ok(Columnar::Constant(constant.into_owned()))
} else {
Ok(Columnar::Canonical(
result
.as_opt::<AnyCanonical>()
.map(Canonical::from)
.vortex_expect("execute_until::<AnyColumnar> must return a columnar array"),
))
fn execute(mut array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
match array.execute_until::<AnyColumnar>(ctx)? {
ColumnarView::Constant(constant) => Ok(Columnar::Constant(constant.into_owned())),
ColumnarView::Canonical(canonical) => {
Ok(Columnar::Canonical(Canonical::from(canonical)))
}
}
}
}
Expand Down
50 changes: 43 additions & 7 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;

use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::array::Array;
use crate::array::ArrayId;
use crate::arrays::Null;
use crate::builders::ArrayBuilder;
use crate::builders::builder_with_capacity_in;
use crate::dtype::DType;
Expand Down Expand Up @@ -92,11 +95,42 @@ impl ArrayRef {
E::execute(self, ctx)
}

/// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
/// Iteratively execute this array until the [`Matcher`] `M` matches, returning the matched
/// view borrowing the executed array.
///
/// The fully-executed array is written back into `self`, and the returned [`Matcher::Match`]
/// borrows from it. This lets callers obtain a typed view of the result without re-downcasting
/// the returned array.
///
/// # Errors
///
/// Errors if execution converges to a canonical form that `M` does not match, since no further
/// execution progress is possible. Wrap `M` in [`OrCanonical`](crate::OrCanonical) to handle
/// the canonical fallback without erroring.
///
/// Also errors once execution reaches a configurable maximum number of iterations (default
/// `2^22`, override with `VORTEX_MAX_ITERATIONS`).
pub fn execute_until<'a, M: Matcher>(
&'a mut self,
ctx: &mut ExecutionCtx,
) -> VortexResult<M::Match<'a>> {
// The loop consumes the array to extract owned buffers, so swap in a cheap, alloc-free
// placeholder while it runs, then write the executed array back into `self`.
let owned = std::mem::replace(self, Array::<Null>::new(0).into_array());
*self = owned.execute_until_done(ctx, M::matches)?;
M::try_match(self).ok_or_else(|| {
vortex_err!(
"execute_until did not converge to a matching array (got {})",
self.encoding_id()
)
})
}

/// Iteratively execute this array until `done` matches, using an explicit work
/// stack plus an optional builder for `AppendChild`.
///
/// Note: the returned array may not match `M`. If execution converges to a canonical form
/// that does not match `M`, the canonical array is returned since no further execution
/// Note: the returned array may not satisfy `done`. If execution converges to a canonical form
/// that `done` does not match, the canonical array is returned since no further execution
/// progress is possible.
///
/// For safety, this errors once execution reaches a configurable maximum number of
Expand Down Expand Up @@ -163,7 +197,11 @@ impl ArrayRef {
/// partially consumes `current_array`: some slots already live in the builder, so a
/// parent rewrite would observe inconsistent state and could discard accumulated builder
/// data.
pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
fn execute_until_done(
self,
ctx: &mut ExecutionCtx,
done: DonePredicate,
) -> VortexResult<ArrayRef> {
let mut current_array = self;
let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
let mut stack: Vec<StackFrame> = Vec::new();
Expand All @@ -172,9 +210,7 @@ impl ArrayRef {
let max_iterations = max_iterations();

for _ in 0..max_iterations {
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.done);
let is_done = stack.last().map_or(done, |frame| frame.done);

if is_done(&current_array) || AnyCanonical::matches(&current_array) {
match stack.pop() {
Expand Down
16 changes: 4 additions & 12 deletions vortex-tensor/src/scalar_fns/l2_norm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,9 @@ mod tests {
let input = literal_vector_array(&[3.0f64, 4.0], 4);

let scalar_fn = L2Norm::new().erased();
let result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array();
let mut result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array();
let mut ctx = SESSION.create_execution_ctx();
let output = result.execute_until::<Constant>(&mut ctx)?;

let constant = output
.as_opt::<Constant>()
.expect("L2Norm over a constant input must produce a constant output");
let constant = result.execute_until::<Constant>(&mut ctx)?;
assert_eq!(constant.len(), 4);
let norm = constant
.scalar()
Expand All @@ -390,13 +386,9 @@ mod tests {
let input = ConstantArray::new(null_scalar, 3).into_array();

let scalar_fn = L2Norm::new().erased();
let result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array();
let mut result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array();
let mut ctx = SESSION.create_execution_ctx();
let output = result.execute_until::<Constant>(&mut ctx)?;

let constant = output
.as_opt::<Constant>()
.expect("null constant input must produce a constant output");
let constant = result.execute_until::<Constant>(&mut ctx)?;
assert_eq!(constant.len(), 3);
assert!(constant.scalar().is_null());
assert_eq!(
Expand Down
Loading