From c2b2d56885e024cb9606149d367829b7ed753ade Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 6 May 2026 22:54:05 -0400 Subject: [PATCH 01/11] save Signed-off-by: Andrew Duffy --- vortex-array/src/arrow/convert.rs | 87 ++++++++- vortex-array/src/arrow/mod.rs | 3 + vortex-array/src/arrow/session.rs | 183 +++++++++++++++++++ vortex-array/src/extension/datetime/arrow.rs | 2 + vortex-array/src/extension/datetime/mod.rs | 1 + vortex-array/src/extension/uuid/arrow.rs | 147 +++++++++++++++ vortex-array/src/extension/uuid/mod.rs | 1 + 7 files changed, 418 insertions(+), 6 deletions(-) create mode 100644 vortex-array/src/arrow/session.rs create mode 100644 vortex-array/src/extension/datetime/arrow.rs create mode 100644 vortex-array/src/extension/uuid/arrow.rs diff --git a/vortex-array/src/arrow/convert.rs b/vortex-array/src/arrow/convert.rs index ba95bdf0451..5602f374e29 100644 --- a/vortex-array/src/arrow/convert.rs +++ b/vortex-array/src/arrow/convert.rs @@ -3,7 +3,6 @@ use std::sync::Arc; -use arrow_array::AnyDictionaryArray; use arrow_array::Array as ArrowArray; use arrow_array::ArrowPrimitiveType; use arrow_array::BooleanArray as ArrowBooleanArray; @@ -49,15 +48,17 @@ use arrow_array::types::UInt8Type; use arrow_array::types::UInt16Type; use arrow_array::types::UInt32Type; use arrow_array::types::UInt64Type; +use arrow_array::{AnyDictionaryArray, ArrayRef as ArrowArrayRef}; use arrow_buffer::ArrowNativeType; use arrow_buffer::BooleanBuffer; use arrow_buffer::Buffer as ArrowBuffer; use arrow_buffer::ScalarBuffer; use arrow_buffer::buffer::NullBuffer; use arrow_buffer::buffer::OffsetBuffer; -use arrow_schema::DataType; use arrow_schema::TimeUnit as ArrowTimeUnit; +use arrow_schema::{DataType, Field}; use itertools::Itertools; +use vortex_array::arrow::session::ArrowVTable; use vortex_buffer::Alignment; use vortex_buffer::BitBuffer; use vortex_buffer::Buffer; @@ -66,7 +67,7 @@ use vortex_error::VortexExpect as _; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; - +use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; use crate::arrays::BoolArray; @@ -476,7 +477,7 @@ impl FromArrowArray<&DictionaryArray> for DictArra } } -fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity { +pub(crate) fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity { if nullable { nulls .map(|nulls| { @@ -627,6 +628,78 @@ impl FromArrowArray<&RecordBatch> for ArrayRef { } } +#[derive(Debug)] +struct CanonicalArrowVTable; + +impl ArrowVTable for CanonicalArrowVTable { + fn preferred_physical_type(&self, array: &ArrayRef) -> VortexResult> { + todo!() + } + + fn from_arrow_array( + &self, + _array: ArrowArrayRef, + _field: &Field, + ) -> VortexResult> { + todo!() + } + + fn from_arrow_field(&self, _field: &Field) -> VortexResult> { + todo!() + } + + fn execute_arrow( + &self, + _array: ArrayRef, + _physical_type: &Field, + _session: &VortexSession, + ) -> VortexResult> { + todo!() + } + + fn to_arrow_field(&self, name: &str, dtype: &DType, session: &VortexSession) -> VortexResult> { + let (data_type, is_nullable) = match dtype { + DType::Null => (DataType::Null, true), + DType::Bool(nullability) => (DataType::Boolean, nullability.is_nullable()), + DType::Primitive(ptype, nullability) => { + let data_type = match ptype { + PType::U8 => DataType::UInt8, + PType::U16 => {} + PType::U32 => {} + PType::U64 => {} + PType::I8 => {} + PType::I16 => {} + PType::I32 => {} + PType::I64 => {} + PType::F16 => {} + PType::F32 => {} + PType::F64 => {} + }; + + (data_type, nullability.is_nullable()) + } + DType::Decimal(decimal_type, nullability) => { + // Conversion for decimal? + } + DType::Utf8(nullability) => { + (DataType::Utf8, nullability.is_nullable()) + } + DType::Binary(nullability) => { + (DataType::Binary, nullability.is_nullable()) + } + DType::List(elem, nullability) => { + // We might need to query the field type recursively... + } + DType::FixedSizeList(_, _, _) => {} + DType::Struct(_, _) => {} + DType::Extension(_) => {} + DType::Variant(_) => {} + }; + + Ok(Some(Field::new(name.to_string(), data_type, is_nullable))) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -909,8 +982,10 @@ mod tests { Arc::new(Date32Array::from(vec![18000_i32, 18001, 18002, 18003])), )] #[case::date64( - Arc::new(Date64Array::from(vec![Some(1555200000000), None, Some(1555286400000), Some(1555372800000)])), - Arc::new(Date64Array::from(vec![1555200000000_i64, 1555213600000, 1555286400000, 1555372800000])), + Arc::new(Date64Array::from(vec![Some(1555200000000), None, Some(1555286400000), Some(1555372800000)] + )), + Arc::new(Date64Array::from(vec![1555200000000_i64, 1555213600000, 1555286400000, 1555372800000] + )), )] fn test_temporal_array_conversion( #[case] nullable: Arc, diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index efc83aa6af6..aaa9a36cf2c 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -13,10 +13,13 @@ mod executor; mod iter; mod null_buffer; mod record_batch; +mod session; +pub use convert::*; pub use datum::*; pub use executor::*; pub use iter::*; +pub use session::*; pub use null_buffer::to_arrow_null_buffer; pub use null_buffer::to_null_buffer; diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs new file mode 100644 index 00000000000..66ebeabeef5 --- /dev/null +++ b/vortex-array/src/arrow/session.rs @@ -0,0 +1,183 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use crate::dtype::DType; +use crate::{ArrayRef, ExecutionCtx}; +use arc_swap::ArcSwap; +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::{DataType, Field}; +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_session::{Ref, SessionExt, SessionVar, VortexSession}; + +/// A [`SessionVar`] that allows callers to register new Arrow conversion plugins at runtime for +/// custom extension types and encodings. +pub struct ArrowSession { + /// Set of registered plugins. + /// + /// The core plugins are registered at the start of the vec, and the user-defined plugins will + /// be at the end. Methods should scan in reverse order to make sure that configured plugins + /// override default behavior. + plugins: ArcSwap>, +} + +impl Debug for ArrowSession { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrowSession").finish_non_exhaustive() + } +} + +impl Default for ArrowSession { + fn default() -> Self { + // TODO(aduffy): register the default plugins. + Self { + plugins: ArcSwap::from_pointee(Vec::new()), + } + } +} + +impl ArrowSession { + /// Register a new plugin. This plugin will be registered using the given extension ID + /// type, which will enable us to deploy all of these instead. + pub fn register_plugin(&self, plugin: ArrowVTableRef) { + self.plugins.rcu(move |plugins| { + let mut plugins = (**plugins).clone(); + plugins.push(plugin); + plugins + }); + } + + /// Register yet another plugin system here...I think? + pub fn register(&self) {} + + /// Find the preferred Arrow type for a particular DType. + /// + /// Because we allow different ArrowPlugins to be stored and queried, we might have a set + /// of conversion functions which should be visible for this to be accessed separately. + pub fn preferred_physical_type(&self, dtype: &DType) -> VortexResult { + // Lookup the preferred physical type instead. + todo!() + } + + /// Execute a Vortex array into an Arrow array, using the plugins registered in the session + /// to perform the conversion. + /// + /// The caller must pass a `target` physical Arrow type for the result. The plugins will be + /// scanned until one is found that supports emitting the given logically encoded Vortex array + /// as the target Arrow type. + /// + /// If no suitable plugin is found in the registry, then the array will be executed to canonical + /// Vortex form, and then the canonical Arrow exporter will be called. + pub fn execute_arrow( + &self, + array: ArrayRef, + target: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let plugins = (**self.plugins.load()).clone(); + + // Iterate from back-to-front so we try the user overrides before we use the builtin + // plugins. + for plugin in plugins.iter().rev() { + // Attempt to execute, and give it access to the session. + if let Some(array) = plugin.execute_arrow(array, target, ctx)? { + return Ok(array); + } + } + + vortex_bail!("No plugin found for {:?}", target) + } + + /// Attempt to decode an [Arrow array][ArrowArrayRef] into a suitable Vortex array. + /// + /// The `ArrowSession` can be configured with one or more plugins that can override this + /// behavior to enable mapping Arrow extension types to new Vortex encodings. + /// + /// We might decide that we want to read a specifc Arrow array directly into the nearest + /// Vortex type, in which case I think this works as expected. + pub fn from_arrow_array(array: ArrowArrayRef, field: &Field) -> VortexResult { + todo!() + } +} + +impl SessionVar for ArrowSession { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +/// Plugin type that enables custom behavior for conversion between Arrow data types and physical +/// arrays and Vortex logical types and encodings. +pub trait ArrowVTable: 'static + Send + Sync + Debug { + /// Try to execute a Vortex encoding out as an Arrow physical array. + /// + /// If this plugin doesn't support the encoding/target combo, it should return `Ok(None)`. + fn execute_arrow( + &self, + _array: ArrayRef, + _physical_type: &Field, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + Ok(None) + } + + /// Try to convert Arrow data into a Vortex encoding. The Arrow physical data layout is provided + /// as well as the `Field` which contains any extension metadata which might be necessary for + /// decoding. + /// + /// If the plugin does not support Arrow arrays of this shape, it should return `Ok(None)`. + fn from_arrow_array( + &self, + _array: ArrowArrayRef, + _field: &Field, + ) -> VortexResult> { + Ok(None) + } + + /// Try to convert an Arrow physical `Field` type to a Vortex `DType`. + /// + /// If the plugin does not know how to handle fields of this type, it should return `Ok(None)`. + fn from_arrow_field(&self, _field: &Field) -> VortexResult> { + Ok(None) + } + + /// Get a preferred Arrow `Field` type for a Vortex type. + /// + /// If we have the field types consumed this way, I think this ends up being a lot cleaner + /// and simpler to implement. + fn to_arrow_field( + &self, + _name: &str, + _dtype: &DType, + _session: &VortexSession, + ) -> VortexResult> { + Ok(None) + } + + /// Find the preferred Arrow physical data type that most closely matches a particular + /// Vortex encoding. + /// + /// If the plugin does not know how to handle the array encoding, should return `Ok(None)`. + fn preferred_physical_type(&self, _array: &ArrayRef) -> VortexResult> { + Ok(None) + } +} + +/// Shared reference to an [`ArrowVTable`] that can be cheaply cloned and passed around. +pub type ArrowVTableRef = Arc; + +pub trait ArrowSessionExt: SessionExt { + fn arrow(&self) -> Ref<'_, ArrowSession>; +} + +impl ArrowSessionExt for S { + fn arrow(&self) -> Ref<'_, ArrowSession> { + self.get::() + } +} diff --git a/vortex-array/src/extension/datetime/arrow.rs b/vortex-array/src/extension/datetime/arrow.rs new file mode 100644 index 00000000000..0d735177e5d --- /dev/null +++ b/vortex-array/src/extension/datetime/arrow.rs @@ -0,0 +1,2 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors diff --git a/vortex-array/src/extension/datetime/mod.rs b/vortex-array/src/extension/datetime/mod.rs index 86c6df0737f..fdb4fee4e2e 100644 --- a/vortex-array/src/extension/datetime/mod.rs +++ b/vortex-array/src/extension/datetime/mod.rs @@ -8,6 +8,7 @@ mod matcher; mod time; mod timestamp; mod unit; +mod arrow; pub use date::*; pub use matcher::*; diff --git a/vortex-array/src/extension/uuid/arrow.rs b/vortex-array/src/extension/uuid/arrow.rs new file mode 100644 index 00000000000..a96ed13721a --- /dev/null +++ b/vortex-array/src/extension/uuid/arrow.rs @@ -0,0 +1,147 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use crate::arrays::extension::ExtensionArrayExt; +use crate::arrays::{BoolArray, FixedSizeListArray, PrimitiveArray}; +use crate::arrow::{ArrowSessionExt, ArrowVTable, nulls}; +use crate::buffer::BufferHandle; +use crate::dtype::DType; +use crate::extension::uuid::Uuid; +use crate::validity::Validity; +use crate::{ArrayRef, IntoArray}; +use arrow_array::cast::AsArray; +use arrow_array::types::UInt8Type; +use arrow_array::{Array, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray}; +use arrow_schema::extension::Uuid as ArrowUuid; +use arrow_schema::{DataType, Field}; +use std::sync::Arc; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::ExtensionArray; +use vortex_array::dtype::PType; +use vortex_buffer::{Alignment, Buffer}; +use vortex_error::{VortexExpect, VortexResult}; +use vortex_session::VortexSession; + +impl ArrowVTable for Uuid { + // We implement a special execution pathway to make sure we transmute from Vortex's + // FixedSizeList format to Arrow's expected FixedSizeBinary[16] format. + fn execute_arrow( + &self, + array: ArrayRef, + physical_type: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + // This VTable can only handle Arrow UUID extension type + if !physical_type.has_valid_extension_type::() { + return Ok(None); + } + + // The Arrow canonical UUID extension type can only be applied to FixedSizeBinary[16], but + // in Vortex we use a FixedSizeList. We need to handle the conversion on our end. + let executed = array.execute::(ctx)?; + let storage = match executed.try_into_parts() { + Ok(parts) => parts.slots[0].expect("ExtensionArray must have exactly 1 slot"), + Err(array) => array.storage_array().clone(), + }; + + // Execute the storage array into FixedSizeList, then we convert it to FixedSizeBinary + // (no copy). + let values = ctx.session().arrow().execute_arrow(storage, todo!(), ctx)?; + let fsl = values.as_fixed_size_list(); + let bytes = fsl + .values() + .as_primitive::() + .values() + .inner() + .clone(); + + Ok(Some(Arc::new(FixedSizeBinaryArray::new( + fsl.value_length(), + bytes, + fsl.nulls().cloned(), + )))) + } + + // When we observe an Arrow FixedSizeBinary array with UUID extension metadata, we should + // convert it into a Vortex FixedSizeList which is how we store UUID data. + fn from_arrow_array( + &self, + array: ArrowArrayRef, + field: &Field, + ) -> VortexResult> { + // Execute from an arrow array into a Vortex array. This should be doable + // more or less without copying. + if !field.has_valid_extension_type::() { + return Ok(None); + } + + if !matches!(array.data_type(), DataType::FixedSizeBinary(_)) { + return Ok(None); + } + + // Cast the elements first + let fsb = array.as_fixed_size_binary(); + + let binary = fsb.values().clone(); + + // TODO(aduffy): isn't this weird b/c we lose the alignment? + let buffer = Buffer::from_arrow_buffer(binary, Alignment::none()); + + // Capture values into nulls buffer + + let u8_array = PrimitiveArray::from_buffer_handle( + BufferHandle::new_host(buffer), + PType::U8, + Validity::NonNullable, + ); + + let validity = nulls(fsb.nulls(), field.is_nullable()); + + Ok(Some( + FixedSizeListArray::new( + u8_array.into_array(), + fsb.value_length() as u32, + validity, + fsb.len(), + ) + .into_array(), + )) + } + + // The Arrow Field equivalent of a Vortex UUID is an Arrow UUID extension type. + fn to_arrow_field( + &self, + name: &str, + dtype: &DType, + session: &VortexSession, + ) -> VortexResult> { + let DType::Extension(ext_dtype) = dtype else { + return Ok(None); + }; + + if !ext_dtype.metadata_opt::().is_some() { + return Ok(None); + } + + let mut field = Field::new( + name.to_string(), + DataType::FixedSizeBinary(16), + dtype.is_nullable(), + ); + + field + .try_with_extension_type(ArrowUuid) + .vortex_expect("FixedSizeBinary[16] is correct type for ArrowUuid"); + + Ok(Some(field)) + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_to_arrow() { + // Convert some of these other things to Arrow and make sure we can convert them back + // again. + } +} diff --git a/vortex-array/src/extension/uuid/mod.rs b/vortex-array/src/extension/uuid/mod.rs index e4347c2513c..bdfbf6d6072 100644 --- a/vortex-array/src/extension/uuid/mod.rs +++ b/vortex-array/src/extension/uuid/mod.rs @@ -14,6 +14,7 @@ mod metadata; pub use metadata::UuidMetadata; pub(crate) mod vtable; +mod arrow; /// The VTable for the UUID extension type. #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] From 336cf94a5c3af099245c00ba69554549dbe1e701 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 6 May 2026 23:25:29 -0400 Subject: [PATCH 02/11] more of the owl Signed-off-by: Andrew Duffy --- vortex-array/Cargo.toml | 2 +- vortex-array/src/arrow/convert.rs | 79 +---- vortex-array/src/arrow/executor/mod.rs | 33 +- vortex-array/src/arrow/executor/temporal.rs | 170 ---------- vortex-array/src/arrow/mod.rs | 4 +- vortex-array/src/arrow/session.rs | 245 +++++++-------- vortex-array/src/extension/datetime/arrow.rs | 311 +++++++++++++++++++ vortex-array/src/extension/datetime/mod.rs | 2 +- vortex-array/src/extension/uuid/arrow.rs | 214 ++++++------- vortex-array/src/extension/uuid/mod.rs | 2 +- 10 files changed, 555 insertions(+), 507 deletions(-) delete mode 100644 vortex-array/src/arrow/executor/temporal.rs diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 9701759a787..4a56284cc03 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -29,7 +29,7 @@ arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-data = { workspace = true } arrow-ord = { workspace = true } -arrow-schema = { workspace = true } +arrow-schema = { workspace = true, features = ["canonical_extension_types"] } arrow-select = { workspace = true } arrow-string = { workspace = true } async-lock = { workspace = true } diff --git a/vortex-array/src/arrow/convert.rs b/vortex-array/src/arrow/convert.rs index 5602f374e29..2cf1d3a3389 100644 --- a/vortex-array/src/arrow/convert.rs +++ b/vortex-array/src/arrow/convert.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use arrow_array::AnyDictionaryArray; use arrow_array::Array as ArrowArray; use arrow_array::ArrowPrimitiveType; use arrow_array::BooleanArray as ArrowBooleanArray; @@ -48,17 +49,15 @@ use arrow_array::types::UInt8Type; use arrow_array::types::UInt16Type; use arrow_array::types::UInt32Type; use arrow_array::types::UInt64Type; -use arrow_array::{AnyDictionaryArray, ArrayRef as ArrowArrayRef}; use arrow_buffer::ArrowNativeType; use arrow_buffer::BooleanBuffer; use arrow_buffer::Buffer as ArrowBuffer; use arrow_buffer::ScalarBuffer; use arrow_buffer::buffer::NullBuffer; use arrow_buffer::buffer::OffsetBuffer; +use arrow_schema::DataType; use arrow_schema::TimeUnit as ArrowTimeUnit; -use arrow_schema::{DataType, Field}; use itertools::Itertools; -use vortex_array::arrow::session::ArrowVTable; use vortex_buffer::Alignment; use vortex_buffer::BitBuffer; use vortex_buffer::Buffer; @@ -67,7 +66,7 @@ use vortex_error::VortexExpect as _; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; -use vortex_session::VortexSession; + use crate::ArrayRef; use crate::IntoArray; use crate::arrays::BoolArray; @@ -628,78 +627,6 @@ impl FromArrowArray<&RecordBatch> for ArrayRef { } } -#[derive(Debug)] -struct CanonicalArrowVTable; - -impl ArrowVTable for CanonicalArrowVTable { - fn preferred_physical_type(&self, array: &ArrayRef) -> VortexResult> { - todo!() - } - - fn from_arrow_array( - &self, - _array: ArrowArrayRef, - _field: &Field, - ) -> VortexResult> { - todo!() - } - - fn from_arrow_field(&self, _field: &Field) -> VortexResult> { - todo!() - } - - fn execute_arrow( - &self, - _array: ArrayRef, - _physical_type: &Field, - _session: &VortexSession, - ) -> VortexResult> { - todo!() - } - - fn to_arrow_field(&self, name: &str, dtype: &DType, session: &VortexSession) -> VortexResult> { - let (data_type, is_nullable) = match dtype { - DType::Null => (DataType::Null, true), - DType::Bool(nullability) => (DataType::Boolean, nullability.is_nullable()), - DType::Primitive(ptype, nullability) => { - let data_type = match ptype { - PType::U8 => DataType::UInt8, - PType::U16 => {} - PType::U32 => {} - PType::U64 => {} - PType::I8 => {} - PType::I16 => {} - PType::I32 => {} - PType::I64 => {} - PType::F16 => {} - PType::F32 => {} - PType::F64 => {} - }; - - (data_type, nullability.is_nullable()) - } - DType::Decimal(decimal_type, nullability) => { - // Conversion for decimal? - } - DType::Utf8(nullability) => { - (DataType::Utf8, nullability.is_nullable()) - } - DType::Binary(nullability) => { - (DataType::Binary, nullability.is_nullable()) - } - DType::List(elem, nullability) => { - // We might need to query the field type recursively... - } - DType::FixedSizeList(_, _, _) => {} - DType::Struct(_, _) => {} - DType::Extension(_) => {} - DType::Variant(_) => {} - }; - - Ok(Some(Field::new(name.to_string(), data_type, is_nullable))) - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/vortex-array/src/arrow/executor/mod.rs b/vortex-array/src/arrow/executor/mod.rs index 890e7f8a46a..d68355e3ecc 100644 --- a/vortex-array/src/arrow/executor/mod.rs +++ b/vortex-array/src/arrow/executor/mod.rs @@ -13,7 +13,6 @@ pub mod null; pub mod primitive; mod run_end; mod struct_; -mod temporal; mod validity; use arrow_array::ArrayRef as ArrowArrayRef; @@ -46,7 +45,7 @@ use crate::arrow::executor::null::to_arrow_null; use crate::arrow::executor::primitive::to_arrow_primitive; use crate::arrow::executor::run_end::to_arrow_run_end; use crate::arrow::executor::struct_::to_arrow_struct; -use crate::arrow::executor::temporal::to_arrow_temporal; +use crate::arrow::session::ArrowSessionExt; use crate::dtype::DType; use crate::dtype::PType; use crate::executor::ExecutionCtx; @@ -96,6 +95,24 @@ impl ArrowArrayExecutor for ArrayRef { None => preferred_arrow_type(&self)?, }; + // Extension dispatch: if this Vortex array carries an extension dtype with a registered + // ArrowVTable plugin, hand the conversion off to the plugin. Falls through to the + // canonical match below for all other arrays. + let plugin = self + .dtype() + .as_extension_opt() + .and_then(|ext| ctx.session().arrow().for_vortex_ext(&ext.id())); + if let Some(plugin) = plugin { + let target = Field::new("", resolved_type.clone(), self.dtype().is_nullable()); + let arrow = plugin.execute_arrow(self, &target, ctx)?; + vortex_ensure!( + arrow.len() == len, + "Arrow array length does not match Vortex array length after conversion to {:?}", + arrow + ); + return Ok(arrow); + } + let arrow = match &resolved_type { DataType::Null => to_arrow_null(self, ctx), DataType::Boolean => to_arrow_bool(self, ctx), @@ -110,11 +127,6 @@ impl ArrowArrayExecutor for ArrayRef { DataType::Float16 => to_arrow_primitive::(self, ctx), DataType::Float32 => to_arrow_primitive::(self, ctx), DataType::Float64 => to_arrow_primitive::(self, ctx), - DataType::Timestamp(..) - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) => to_arrow_temporal(self, &resolved_type, ctx), DataType::Binary => to_arrow_byte_array::(self, ctx), DataType::LargeBinary => to_arrow_byte_array::(self, ctx), DataType::Utf8 => to_arrow_byte_array::(self, ctx), @@ -161,7 +173,12 @@ impl ArrowArrayExecutor for ArrayRef { | DataType::Map(..) | DataType::Duration(_) | DataType::Interval(_) - | DataType::Union(..) => { + | DataType::Union(..) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(..) => { vortex_bail!("Conversion to Arrow type {resolved_type} is not supported"); } }?; diff --git a/vortex-array/src/arrow/executor/temporal.rs b/vortex-array/src/arrow/executor/temporal.rs deleted file mode 100644 index c68a4f3ee9f..00000000000 --- a/vortex-array/src/arrow/executor/temporal.rs +++ /dev/null @@ -1,170 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::sync::Arc; - -use arrow_array::ArrayRef as ArrowArrayRef; -use arrow_array::PrimitiveArray; -use arrow_array::PrimitiveArray as ArrowPrimitiveArray; -use arrow_array::types::ArrowTemporalType; -use arrow_array::types::ArrowTimestampType; -use arrow_array::types::Date32Type; -use arrow_array::types::Date64Type; -use arrow_array::types::Time32MillisecondType; -use arrow_array::types::Time32SecondType; -use arrow_array::types::Time64MicrosecondType; -use arrow_array::types::Time64NanosecondType; -use arrow_array::types::TimestampMicrosecondType; -use arrow_array::types::TimestampMillisecondType; -use arrow_array::types::TimestampNanosecondType; -use arrow_array::types::TimestampSecondType; -use arrow_schema::DataType; -use arrow_schema::TimeUnit as ArrowTimeUnit; -use vortex_error::VortexResult; -use vortex_error::vortex_bail; -use vortex_error::vortex_ensure; -use vortex_error::vortex_err; - -use crate::ArrayRef; -use crate::ExecutionCtx; -use crate::arrays::ExtensionArray; -use crate::arrays::PrimitiveArray as VortexPrimitiveArray; -use crate::arrays::extension::ExtensionArrayExt; -use crate::arrow::null_buffer::to_null_buffer; -use crate::dtype::NativePType; -use crate::extension::datetime::AnyTemporal; -use crate::extension::datetime::TemporalMetadata; -use crate::extension::datetime::TimeUnit; - -pub(super) fn to_arrow_temporal( - array: ArrayRef, - data_type: &DataType, - ctx: &mut ExecutionCtx, -) -> VortexResult { - let temporal_options = array - .dtype() - .as_extension() - .metadata_opt::() - .ok_or_else(|| { - vortex_err!( - "Array dtype {} is not a temporal extension type", - array.dtype() - ) - })?; - - match (temporal_options, &data_type) { - (TemporalMetadata::Date(TimeUnit::Days), DataType::Date32) => { - to_temporal::(array, ctx) - } - (TemporalMetadata::Date(TimeUnit::Milliseconds), DataType::Date64) => { - to_temporal::(array, ctx) - } - (TemporalMetadata::Time(TimeUnit::Seconds), DataType::Time32(ArrowTimeUnit::Second)) => { - to_temporal::(array, ctx) - } - ( - TemporalMetadata::Time(TimeUnit::Milliseconds), - DataType::Time32(ArrowTimeUnit::Millisecond), - ) => to_temporal::(array, ctx), - ( - TemporalMetadata::Time(TimeUnit::Microseconds), - DataType::Time64(ArrowTimeUnit::Microsecond), - ) => to_temporal::(array, ctx), - - ( - TemporalMetadata::Time(TimeUnit::Nanoseconds), - DataType::Time64(ArrowTimeUnit::Nanosecond), - ) => to_temporal::(array, ctx), - - (TemporalMetadata::Timestamp(unit, tz), DataType::Timestamp(arrow_unit, arrow_tz)) => { - vortex_ensure!( - tz == arrow_tz, - "Cannot convert {} array to Arrow type {} due to timezone mismatch", - array.dtype(), - data_type - ); - - match (unit, arrow_unit) { - (TimeUnit::Seconds, ArrowTimeUnit::Second) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - (TimeUnit::Milliseconds, ArrowTimeUnit::Millisecond) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - (TimeUnit::Microseconds, ArrowTimeUnit::Microsecond) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - (TimeUnit::Nanoseconds, ArrowTimeUnit::Nanosecond) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - _ => vortex_bail!( - "Cannot convert {} array to Arrow type {}", - array.dtype(), - data_type - ), - } - } - _ => vortex_bail!( - "Cannot convert {} array to Arrow type {}", - array.dtype(), - data_type - ), - } -} - -fn to_temporal( - array: ArrayRef, - ctx: &mut ExecutionCtx, -) -> VortexResult -where - T::Native: NativePType, -{ - // We cast the array to the native primitive type. - Ok(Arc::new(to_arrow_temporal_primitive::(array, ctx)?)) -} - -fn to_arrow_timestamp( - array: ArrayRef, - arrow_tz: Option<&Arc>, - ctx: &mut ExecutionCtx, -) -> VortexResult -where - T::Native: NativePType, -{ - Ok(Arc::new( - to_arrow_temporal_primitive::(array, ctx)?.with_timezone_opt(arrow_tz.cloned()), - )) -} - -fn to_arrow_temporal_primitive( - array: ArrayRef, - ctx: &mut ExecutionCtx, -) -> VortexResult> -where - T::Native: NativePType, -{ - debug_assert!(array.dtype().as_extension().is::()); - - let ext_array = array.execute::(ctx)?; - let primitive = ext_array - .storage_array() - .clone() - .execute::(ctx)?; - vortex_ensure!( - primitive.ptype() == T::Native::PTYPE, - "Expected temporal array to produce vector of width {}, found {}", - T::Native::PTYPE, - primitive.ptype() - ); - - let validity = primitive - .as_ref() - .validity()? - .execute_mask(primitive.as_ref().len(), ctx)?; - let buffer = primitive.to_buffer::(); - - let values = buffer.into_arrow_scalar_buffer(); - let nulls = to_null_buffer(validity); - - Ok(PrimitiveArray::::new(values, nulls)) -} diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index aaa9a36cf2c..8f26699659d 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -15,13 +15,13 @@ mod null_buffer; mod record_batch; mod session; -pub use convert::*; +pub(crate) use convert::nulls; pub use datum::*; pub use executor::*; pub use iter::*; -pub use session::*; pub use null_buffer::to_arrow_null_buffer; pub use null_buffer::to_null_buffer; +pub use session::*; use crate::ArrayRef; use crate::LEGACY_SESSION; diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs index 66ebeabeef5..000f25649fa 100644 --- a/vortex-array/src/arrow/session.rs +++ b/vortex-array/src/arrow/session.rs @@ -1,178 +1,141 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use crate::dtype::DType; -use crate::{ArrayRef, ExecutionCtx}; -use arc_swap::ArcSwap; -use arrow_array::ArrayRef as ArrowArrayRef; -use arrow_schema::{DataType, Field}; +//! Session-scoped registry of pluggable Arrow conversions. +//! +//! Each [`ArrowVTable`] is keyed by the Vortex extension ID it owns, and optionally by an Arrow +//! extension name (e.g. `arrow.uuid`). Dispatch is `O(1)`: callers consult the relevant index, and +//! fall back to the canonical Arrow conversion path when no plugin matches. + use std::any::Any; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use std::sync::Arc; -use vortex_error::{vortex_bail, VortexResult}; -use vortex_session::{Ref, SessionExt, SessionVar, VortexSession}; - -/// A [`SessionVar`] that allows callers to register new Arrow conversion plugins at runtime for -/// custom extension types and encodings. -pub struct ArrowSession { - /// Set of registered plugins. - /// - /// The core plugins are registered at the start of the vec, and the user-defined plugins will - /// be at the end. Methods should scan in reverse order to make sure that configured plugins - /// override default behavior. - plugins: ArcSwap>, -} - -impl Debug for ArrowSession { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ArrowSession").finish_non_exhaustive() - } -} -impl Default for ArrowSession { - fn default() -> Self { - // TODO(aduffy): register the default plugins. - Self { - plugins: ArcSwap::from_pointee(Vec::new()), - } - } -} +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::Field; +use vortex_error::VortexResult; +use vortex_session::Ref; +use vortex_session::SessionExt; +use vortex_session::SessionVar; +use vortex_session::VortexSession; +use vortex_session::registry::Id; +use vortex_session::registry::Registry; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::dtype::DType; +use crate::dtype::extension::ExtId; +use crate::extension::datetime::Date; +use crate::extension::datetime::Time; +use crate::extension::datetime::Timestamp; +use crate::extension::uuid::Uuid; + +/// A plugin that lets users plugin conversion between Vortex extension types and Arrow arrays +/// and data types. +pub trait ArrowVTable: 'static + Send + Sync + Debug { + /// Vortex extension type ID that this plugin handles. + fn vortex_ext_id(&self) -> ExtId; -impl ArrowSession { - /// Register a new plugin. This plugin will be registered using the given extension ID - /// type, which will enable us to deploy all of these instead. - pub fn register_plugin(&self, plugin: ArrowVTableRef) { - self.plugins.rcu(move |plugins| { - let mut plugins = (**plugins).clone(); - plugins.push(plugin); - plugins - }); + /// The name of the Vortex extension type handled by this plugin (e.g. `"arrow.uuid"`), if any. + fn arrow_ext_name(&self) -> Option<&'static str> { + Nones } - /// Register yet another plugin system here...I think? - pub fn register(&self) {} + /// Build the Arrow [`Field`] that represents `dtype` (which carries this plugin's + /// extension metadata). + fn to_arrow_field( + &self, + name: &str, + dtype: &DType, + session: &VortexSession, + ) -> VortexResult; - /// Find the preferred Arrow type for a particular DType. - /// - /// Because we allow different ArrowPlugins to be stored and queried, we might have a set - /// of conversion functions which should be visible for this to be accessed separately. - pub fn preferred_physical_type(&self, dtype: &DType) -> VortexResult { - // Lookup the preferred physical type instead. - todo!() - } + /// Build the Vortex [`DType`] that corresponds to `field` (which carries this plugin's + /// Arrow extension metadata). + fn from_arrow_field(&self, field: &Field) -> VortexResult; - /// Execute a Vortex array into an Arrow array, using the plugins registered in the session - /// to perform the conversion. - /// - /// The caller must pass a `target` physical Arrow type for the result. The plugins will be - /// scanned until one is found that supports emitting the given logically encoded Vortex array - /// as the target Arrow type. - /// - /// If no suitable plugin is found in the registry, then the array will be executed to canonical - /// Vortex form, and then the canonical Arrow exporter will be called. - pub fn execute_arrow( + /// Convert a Vortex extension array into an Arrow array shaped to `target`. + fn execute_arrow( &self, array: ArrayRef, target: &Field, ctx: &mut ExecutionCtx, - ) -> VortexResult { - let plugins = (**self.plugins.load()).clone(); - - // Iterate from back-to-front so we try the user overrides before we use the builtin - // plugins. - for plugin in plugins.iter().rev() { - // Attempt to execute, and give it access to the session. - if let Some(array) = plugin.execute_arrow(array, target, ctx)? { - return Ok(array); - } - } - - vortex_bail!("No plugin found for {:?}", target) - } + ) -> VortexResult; - /// Attempt to decode an [Arrow array][ArrowArrayRef] into a suitable Vortex array. - /// - /// The `ArrowSession` can be configured with one or more plugins that can override this - /// behavior to enable mapping Arrow extension types to new Vortex encodings. - /// - /// We might decide that we want to read a specifc Arrow array directly into the nearest - /// Vortex type, in which case I think this works as expected. - pub fn from_arrow_array(array: ArrowArrayRef, field: &Field) -> VortexResult { - todo!() - } + /// Convert an Arrow array (whose `field` carries this plugin's extension metadata) + /// back into a Vortex array. + fn from_arrow_array(&self, array: ArrowArrayRef, field: &Field) -> VortexResult; } -impl SessionVar for ArrowSession { - fn as_any(&self) -> &dyn Any { - self - } +/// Reference-counted pointer to an [`ArrowVTable`]. +pub type ArrowVTableRef = Arc; - fn as_any_mut(&mut self) -> &mut dyn Any { - self +/// Session-scoped registry of [`ArrowVTable`] plugins. +/// +/// Plugins are stored under two indices: [`ExtId`] for Vortex-side dispatch, and Arrow extension +/// name for Arrow-side dispatch. A single registration populates both indices. +/// +/// The default session pre-registers the builtin extension types (`uuid`, `date`, `time`, +/// `timestamp`). User code can override any builtin by registering a new plugin with the same +/// ID; last-write-wins. +#[derive(Debug)] +pub struct ArrowSession { + by_vortex_ext: Registry, + by_arrow_ext: Registry, +} + +impl Default for ArrowSession { + fn default() -> Self { + let this = Self { + by_vortex_ext: Registry::default(), + by_arrow_ext: Registry::default(), + }; + + // Builtin extension-type plugins. User registrations with the same ID will replace them. + this.register(Uuid); + this.register(Date); + this.register(Time); + this.register(Timestamp); + + this } } -/// Plugin type that enables custom behavior for conversion between Arrow data types and physical -/// arrays and Vortex logical types and encodings. -pub trait ArrowVTable: 'static + Send + Sync + Debug { - /// Try to execute a Vortex encoding out as an Arrow physical array. - /// - /// If this plugin doesn't support the encoding/target combo, it should return `Ok(None)`. - fn execute_arrow( - &self, - _array: ArrayRef, - _physical_type: &Field, - _ctx: &mut ExecutionCtx, - ) -> VortexResult> { - Ok(None) +impl ArrowSession { + /// Register a plugin under its [`ExtId`] (and its Arrow extension name, if any). + pub fn register(&self, plugin: V) { + let plugin: ArrowVTableRef = Arc::new(plugin); + self.by_vortex_ext + .register(plugin.vortex_ext_id(), plugin.clone()); + if let Some(name) = plugin.arrow_ext_name() { + self.by_arrow_ext.register(Id::new_static(name), plugin); + } } - /// Try to convert Arrow data into a Vortex encoding. The Arrow physical data layout is provided - /// as well as the `Field` which contains any extension metadata which might be necessary for - /// decoding. - /// - /// If the plugin does not support Arrow arrays of this shape, it should return `Ok(None)`. - fn from_arrow_array( - &self, - _array: ArrowArrayRef, - _field: &Field, - ) -> VortexResult> { - Ok(None) + /// Look up the plugin registered for the given Vortex extension ID. + pub fn for_vortex_ext(&self, id: &ExtId) -> Option { + self.by_vortex_ext.find(id) } - /// Try to convert an Arrow physical `Field` type to a Vortex `DType`. - /// - /// If the plugin does not know how to handle fields of this type, it should return `Ok(None)`. - fn from_arrow_field(&self, _field: &Field) -> VortexResult> { - Ok(None) + /// Look up the plugin registered for the given Arrow extension name. + pub fn for_arrow_ext(&self, name: &str) -> Option { + self.by_arrow_ext.find(&Id::new(name)) } +} - /// Get a preferred Arrow `Field` type for a Vortex type. - /// - /// If we have the field types consumed this way, I think this ends up being a lot cleaner - /// and simpler to implement. - fn to_arrow_field( - &self, - _name: &str, - _dtype: &DType, - _session: &VortexSession, - ) -> VortexResult> { - Ok(None) +impl SessionVar for ArrowSession { + fn as_any(&self) -> &dyn Any { + self } - /// Find the preferred Arrow physical data type that most closely matches a particular - /// Vortex encoding. - /// - /// If the plugin does not know how to handle the array encoding, should return `Ok(None)`. - fn preferred_physical_type(&self, _array: &ArrayRef) -> VortexResult> { - Ok(None) + fn as_any_mut(&mut self) -> &mut dyn Any { + self } } -/// Shared reference to an [`ArrowVTable`] that can be cheaply cloned and passed around. -pub type ArrowVTableRef = Arc; - +/// Extension trait for accessing the [`ArrowSession`] on a Vortex session. pub trait ArrowSessionExt: SessionExt { + /// Get the Arrow session. fn arrow(&self) -> Ref<'_, ArrowSession>; } diff --git a/vortex-array/src/extension/datetime/arrow.rs b/vortex-array/src/extension/datetime/arrow.rs index 0d735177e5d..9822a43686a 100644 --- a/vortex-array/src/extension/datetime/arrow.rs +++ b/vortex-array/src/extension/datetime/arrow.rs @@ -1,2 +1,313 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`ArrowVTable`] impls for the temporal extension types. + +use std::sync::Arc; + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::PrimitiveArray as ArrowPrimitiveArray; +use arrow_array::types::ArrowTemporalType; +use arrow_array::types::ArrowTimestampType; +use arrow_array::types::Date32Type; +use arrow_array::types::Date64Type; +use arrow_array::types::Time32MillisecondType; +use arrow_array::types::Time32SecondType; +use arrow_array::types::Time64MicrosecondType; +use arrow_array::types::Time64NanosecondType; +use arrow_array::types::TimestampMicrosecondType; +use arrow_array::types::TimestampMillisecondType; +use arrow_array::types::TimestampNanosecondType; +use arrow_array::types::TimestampSecondType; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::TimeUnit as ArrowTimeUnit; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_session::VortexSession; + +use crate::ArrayPlugin; +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::arrays::ExtensionArray; +use crate::arrays::PrimitiveArray as VortexPrimitiveArray; +use crate::arrays::extension::ExtensionArrayExt; +use crate::arrow::ArrowVTable; +use crate::arrow::to_null_buffer; +use crate::dtype::NativePType; +use crate::dtype::extension::ExtId; +use crate::extension::datetime::Date; +use crate::extension::datetime::Time; +use crate::extension::datetime::TimeUnit; +use crate::extension::datetime::Timestamp; + +impl ArrowVTable for Date { + fn vortex_ext_id(&self) -> ExtId { + Date.id() + } + + fn to_arrow_field( + &self, + name: &str, + dtype: &crate::dtype::DType, + _session: &VortexSession, + ) -> VortexResult { + let unit = unit_for::(dtype)?; + let data_type = match unit { + TimeUnit::Days => DataType::Date32, + TimeUnit::Milliseconds => DataType::Date64, + other => vortex_bail!("Date does not support time unit {other}"), + }; + Ok(Field::new(name, data_type, dtype.is_nullable())) + } + + fn from_arrow_field(&self, field: &Field) -> VortexResult { + let unit = match field.data_type() { + DataType::Date32 => TimeUnit::Days, + DataType::Date64 => TimeUnit::Milliseconds, + other => vortex_bail!("Date plugin cannot convert Arrow type {other}"), + }; + Ok(crate::dtype::DType::Extension( + Date::new(unit, field.is_nullable().into()).erased(), + )) + } + + fn execute_arrow( + &self, + array: ArrayRef, + target: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + match (unit_for::(array.dtype())?, target.data_type()) { + (TimeUnit::Days, DataType::Date32) => to_temporal_array::(array, ctx), + (TimeUnit::Milliseconds, DataType::Date64) => { + to_temporal_array::(array, ctx) + } + (unit, dt) => vortex_bail!("Cannot convert Date({unit}) array to Arrow type {dt}"), + } + } + + fn from_arrow_array(&self, _array: ArrowArrayRef, _field: &Field) -> VortexResult { + vortex_bail!("Date::from_arrow_array is not yet implemented") + } +} + +impl ArrowVTable for Time { + fn vortex_ext_id(&self) -> ExtId { + Time.id() + } + + fn to_arrow_field( + &self, + name: &str, + dtype: &crate::dtype::DType, + _session: &VortexSession, + ) -> VortexResult { + let unit = unit_for::