diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index da4e648a138..8b01d83e3dc 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -15,10 +15,12 @@ use parquet_variant_compute::GetOptions; use parquet_variant_compute::VariantArray as ArrowVariantArray; use parquet_variant_compute::unshred_variant; use vortex_array::ArrayRef; +use vortex_array::CanonicalView; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; +use vortex_array::OrCanonical; +use vortex_array::OrCanonicalMatch; use vortex_array::VTable; -use vortex_array::arrays::Variant; use vortex_array::arrays::variant::VariantArrayExt; use vortex_array::arrow::ArrowExport; use vortex_array::arrow::ArrowExportVTable; @@ -30,7 +32,6 @@ use vortex_array::arrow::to_arrow_null_buffer; use vortex_array::dtype::DType; use vortex_error::VortexResult; use vortex_error::vortex_bail; -use vortex_error::vortex_err; use vortex_session::registry::CachedId; use vortex_session::registry::Id; @@ -117,24 +118,34 @@ pub(crate) fn export_unshredded_storage_to_target( } pub(crate) fn parquet_variant_for_export( - array: ArrayRef, + mut array: ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult { - let executed = array.execute_until::(ctx)?; - if executed.is::() { - return Ok(executed); - } + // Execution may converge to either `ParquetVariant` storage or the canonical `Variant` form; + // `OrCanonical` surfaces both rather than erroring on the canonical fallback. + let variant = match array.execute_until::>(ctx)? { + OrCanonicalMatch::Matched(parquet) => return Ok(parquet.into_owned().into_array()), + OrCanonicalMatch::Canonical(CanonicalView::Variant(variant)) => variant, + OrCanonicalMatch::Canonical(_) => { + vortex_bail!("cannot export Variant without ParquetVariant storage") + } + }; + + let mut core_storage = variant.core_storage().clone(); + // Materialize the ParquetVariant components from the executed core storage now, before the + // `shredded` branch, so the early-return path can still hand back the owned core array. + let (core_validity, core_metadata, core_value) = + match core_storage.execute_until::>(ctx)? { + OrCanonicalMatch::Matched(parquet_core) => ( + ParquetVariantArrayExt::validity(&parquet_core), + parquet_core.metadata_array().clone(), + parquet_core.value_array().cloned(), + ), + OrCanonicalMatch::Canonical(_) => { + vortex_bail!("cannot export Variant without ParquetVariant core storage") + } + }; - let variant = executed - .as_opt::() - .ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant storage"))?; - let core_storage = variant - .core_storage() - .clone() - .execute_until::(ctx)?; - let parquet_core = core_storage - .as_opt::() - .ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant core storage"))?; let Some(shredded) = variant.shredded() else { return Ok(core_storage); }; @@ -144,13 +155,8 @@ pub(crate) fn parquet_variant_for_export( // `to_arrow` and `unshred_variant` can consume. let typed_value = parquet_typed_value_from_logical_shredded(shredded.clone(), ctx)?; - ParquetVariant::try_new( - ParquetVariantArrayExt::validity(&parquet_core), - parquet_core.metadata_array().clone(), - parquet_core.value_array().cloned(), - Some(typed_value), - ) - .map(IntoArray::into_array) + ParquetVariant::try_new(core_validity, core_metadata, core_value, Some(typed_value)) + .map(IntoArray::into_array) } impl ArrowExportVTable for ParquetVariant { diff --git a/encodings/runend/src/compute/filter.rs b/encodings/runend/src/compute/filter.rs index ca99fdf1912..3bd491486ef 100644 --- a/encodings/runend/src/compute/filter.rs +++ b/encodings/runend/src/compute/filter.rs @@ -173,9 +173,14 @@ mod tests { // Keep every other row = 112/2 = 56 rows. let mask = Mask::from_iter((0..sliced.len()).map(|i| i % 2 == 0)); - let filtered = sliced.filter(mask)?; - - let executed = filtered.execute_until::(&mut ctx)?; + let mut filtered = sliced.filter(mask)?; + + // `execute_until::` now errors unless execution converges to RunEnd, so reaching + // a match already proves the encoding was preserved. + let executed = filtered + .execute_until::(&mut ctx)? + .into_owned() + .into_array(); assert_eq!( executed.encoding_id().as_ref(), "vortex.runend", diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 28953a93e4e..d4081289886 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -3,6 +3,7 @@ //! Encodings that enable zero-copy sharing of data with Arrow. +use std::marker::PhantomData; use std::sync::Arc; use vortex_buffer::BitBuffer; @@ -543,12 +544,8 @@ impl From for ArrayRef { /// canonical form. Callers should prefer to execute into `Columnar` if they are able to optimize /// their use for constant arrays. impl Executable for Canonical { - fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { - let result = array.execute_until::(ctx)?; - Ok(result - .as_opt::() - .map(Canonical::from) - .vortex_expect("execute_until:: must return a canonical array")) + fn execute(mut array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + Ok(Canonical::from(array.execute_until::(ctx)?)) } } @@ -1114,6 +1111,41 @@ impl Matcher for AnyCanonical { } } +/// A matcher that matches `M`, falling back to any canonical array. +/// +/// [`ArrayRef::execute_until`] errors if execution converges to a canonical form that the +/// requested matcher does not match. Wrapping the matcher in `OrCanonical` instead surfaces that +/// canonical result as [`OrCanonicalMatch::Canonical`], letting the caller decide how to handle +/// it rather than erroring. +pub struct OrCanonical(PhantomData); + +/// The match produced by [`OrCanonical`]: either the inner matcher matched, or execution +/// converged to a canonical form the inner matcher did not match. +pub enum OrCanonicalMatch<'a, M: Matcher> { + /// The inner matcher `M` matched. + Matched(M::Match<'a>), + /// `M` did not match, but the array reached canonical form. + Canonical(CanonicalView<'a>), +} + +impl Matcher for OrCanonical { + type Match<'a> = OrCanonicalMatch<'a, M>; + + #[inline] + fn matches(array: &ArrayRef) -> bool { + M::matches(array) || AnyCanonical::matches(array) + } + + #[inline] + fn try_match(array: &ArrayRef) -> Option> { + if let Some(matched) = M::try_match(array) { + Some(OrCanonicalMatch::Matched(matched)) + } else { + AnyCanonical::try_match(array).map(OrCanonicalMatch::Canonical) + } + } +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/vortex-array/src/columnar.rs b/vortex-array/src/columnar.rs index 2e4bdc328fd..3ad104bbef8 100644 --- a/vortex-array/src/columnar.rs +++ b/vortex-array/src/columnar.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_error::VortexExpect; use vortex_error::VortexResult; use crate::AnyCanonical; @@ -69,17 +68,12 @@ impl IntoArray for Columnar { /// Execute into [`Columnar`] by running `execute_until` with the [`AnyColumnar`] matcher. impl Executable for Columnar { - fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { - let result = array.execute_until::(ctx)?; - if let Some(constant) = result.as_opt::() { - Ok(Columnar::Constant(constant.into_owned())) - } else { - Ok(Columnar::Canonical( - result - .as_opt::() - .map(Canonical::from) - .vortex_expect("execute_until:: must return a columnar array"), - )) + fn execute(mut array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + match array.execute_until::(ctx)? { + ColumnarView::Constant(constant) => Ok(Columnar::Constant(constant.into_owned())), + ColumnarView::Canonical(canonical) => { + Ok(Columnar::Canonical(Canonical::from(canonical))) + } } } } diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 0c083f18ee5..5e387868ae4 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -26,6 +26,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; +use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_session::VortexSession; @@ -33,7 +34,9 @@ use crate::AnyCanonical; use crate::ArrayRef; use crate::Canonical; use crate::IntoArray; +use crate::array::Array; use crate::array::ArrayId; +use crate::arrays::Null; use crate::builders::ArrayBuilder; use crate::builders::builder_with_capacity_in; use crate::dtype::DType; @@ -92,11 +95,42 @@ impl ArrayRef { E::execute(self, ctx) } - /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work + /// Iteratively execute this array until the [`Matcher`] `M` matches, returning the matched + /// view borrowing the executed array. + /// + /// The fully-executed array is written back into `self`, and the returned [`Matcher::Match`] + /// borrows from it. This lets callers obtain a typed view of the result without re-downcasting + /// the returned array. + /// + /// # Errors + /// + /// Errors if execution converges to a canonical form that `M` does not match, since no further + /// execution progress is possible. Wrap `M` in [`OrCanonical`](crate::OrCanonical) to handle + /// the canonical fallback without erroring. + /// + /// Also errors once execution reaches a configurable maximum number of iterations (default + /// `2^22`, override with `VORTEX_MAX_ITERATIONS`). + pub fn execute_until<'a, M: Matcher>( + &'a mut self, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + // The loop consumes the array to extract owned buffers, so swap in a cheap, alloc-free + // placeholder while it runs, then write the executed array back into `self`. + let owned = std::mem::replace(self, Array::::new(0).into_array()); + *self = owned.execute_until_done(ctx, M::matches)?; + M::try_match(self).ok_or_else(|| { + vortex_err!( + "execute_until did not converge to a matching array (got {})", + self.encoding_id() + ) + }) + } + + /// Iteratively execute this array until `done` matches, using an explicit work /// stack plus an optional builder for `AppendChild`. /// - /// Note: the returned array may not match `M`. If execution converges to a canonical form - /// that does not match `M`, the canonical array is returned since no further execution + /// Note: the returned array may not satisfy `done`. If execution converges to a canonical form + /// that `done` does not match, the canonical array is returned since no further execution /// progress is possible. /// /// For safety, this errors once execution reaches a configurable maximum number of @@ -163,7 +197,11 @@ impl ArrayRef { /// partially consumes `current_array`: some slots already live in the builder, so a /// parent rewrite would observe inconsistent state and could discard accumulated builder /// data. - pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { + fn execute_until_done( + self, + ctx: &mut ExecutionCtx, + done: DonePredicate, + ) -> VortexResult { let mut current_array = self; let mut current_builder: Option> = None; let mut stack: Vec = Vec::new(); @@ -172,9 +210,7 @@ impl ArrayRef { let max_iterations = max_iterations(); for _ in 0..max_iterations { - let is_done = stack - .last() - .map_or(M::matches as DonePredicate, |frame| frame.done); + let is_done = stack.last().map_or(done, |frame| frame.done); if is_done(¤t_array) || AnyCanonical::matches(¤t_array) { match stack.pop() { diff --git a/vortex-tensor/src/scalar_fns/l2_norm.rs b/vortex-tensor/src/scalar_fns/l2_norm.rs index aa85ab4c78e..d3187e93a21 100644 --- a/vortex-tensor/src/scalar_fns/l2_norm.rs +++ b/vortex-tensor/src/scalar_fns/l2_norm.rs @@ -359,13 +359,9 @@ mod tests { let input = literal_vector_array(&[3.0f64, 4.0], 4); let scalar_fn = L2Norm::new().erased(); - let result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array(); + let mut result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array(); let mut ctx = SESSION.create_execution_ctx(); - let output = result.execute_until::(&mut ctx)?; - - let constant = output - .as_opt::() - .expect("L2Norm over a constant input must produce a constant output"); + let constant = result.execute_until::(&mut ctx)?; assert_eq!(constant.len(), 4); let norm = constant .scalar() @@ -390,13 +386,9 @@ mod tests { let input = ConstantArray::new(null_scalar, 3).into_array(); let scalar_fn = L2Norm::new().erased(); - let result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array(); + let mut result = ScalarFnArray::try_new(scalar_fn, vec![input])?.into_array(); let mut ctx = SESSION.create_execution_ctx(); - let output = result.execute_until::(&mut ctx)?; - - let constant = output - .as_opt::() - .expect("null constant input must produce a constant output"); + let constant = result.execute_until::(&mut ctx)?; assert_eq!(constant.len(), 3); assert!(constant.scalar().is_null()); assert_eq!(