From f434b572e7f5e32372d8719439654dd82f712741 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Tue, 5 May 2026 15:55:46 +0200 Subject: [PATCH] feat[vortex-array]: add ArrowExportPlugin registry for extension types This allows extension types to define their arrow exporting logic. This commit replaces the temporal executor module. Signed-off-by: Alfonso Subiotto Marques --- vortex-array/public-api.lock | 170 ++++++++++++ vortex-array/src/arrow/executor/mod.rs | 52 ++-- vortex-array/src/arrow/executor/temporal.rs | 170 ------------ vortex-array/src/arrow/export_plugin.rs | 46 ++++ vortex-array/src/arrow/export_session.rs | 69 +++++ vortex-array/src/arrow/mod.rs | 4 + vortex-array/src/dtype/arrow.rs | 39 +-- vortex-array/src/extension/datetime/arrow.rs | 266 +++++++++++++++++++ vortex-array/src/extension/datetime/mod.rs | 2 + 9 files changed, 604 insertions(+), 214 deletions(-) delete mode 100644 vortex-array/src/arrow/executor/temporal.rs create mode 100644 vortex-array/src/arrow/export_plugin.rs create mode 100644 vortex-array/src/arrow/export_session.rs create mode 100644 vortex-array/src/extension/datetime/arrow.rs diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index a849acdb20e..fc956c8752e 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -6756,6 +6756,72 @@ pub fn vortex_array::arrow::byte_view::canonical_varbinview_to_arrow(array: &vortex_array::arrays::VarBinViewArray, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub mod vortex_array::arrow::export_plugin + +pub trait vortex_array::arrow::export_plugin::ArrowExportPlugin: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::export_plugin::ArrowExportPlugin::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::export_plugin::ArrowExportPlugin::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::arrow::export_plugin::ArrowExportPlugin::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::DateArrowExport + +pub fn vortex_array::extension::datetime::DateArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::DateArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::DateArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::TimeArrowExport + +pub fn vortex_array::extension::datetime::TimeArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::TimeArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::TimeArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::TimestampArrowExport + +pub fn vortex_array::extension::datetime::TimestampArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::TimestampArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::TimestampArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub type vortex_array::arrow::export_plugin::ArrowExportPluginRef = alloc::sync::Arc + +pub mod vortex_array::arrow::export_session + +pub struct vortex_array::arrow::export_session::ArrowExportSession + +impl vortex_array::arrow::ArrowExportSession + +pub fn vortex_array::arrow::ArrowExportSession::find(&self, id: &vortex_array::dtype::extension::ExtId) -> core::option::Option + +pub fn vortex_array::arrow::ArrowExportSession::register(&self, plugin: impl vortex_array::arrow::ArrowExportPlugin) + +pub fn vortex_array::arrow::ArrowExportSession::registry(&self) -> &vortex_array::arrow::ArrowExportRegistry + +impl core::default::Default for vortex_array::arrow::ArrowExportSession + +pub fn vortex_array::arrow::ArrowExportSession::default() -> Self + +impl core::fmt::Debug for vortex_array::arrow::ArrowExportSession + +pub fn vortex_array::arrow::ArrowExportSession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub trait vortex_array::arrow::export_session::ArrowExportSessionExt: vortex_session::SessionExt + +pub fn vortex_array::arrow::export_session::ArrowExportSessionExt::arrow_exports(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowExportSession> + +impl vortex_array::arrow::ArrowExportSessionExt for S + +pub fn S::arrow_exports(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowExportSession> + +pub type vortex_array::arrow::export_session::ArrowExportRegistry = vortex_session::registry::Registry + pub mod vortex_array::arrow::null pub fn vortex_array::arrow::null::canonical_null_to_arrow(array: &vortex_array::arrays::null::NullArray) -> arrow_array::array::ArrayRef @@ -6780,6 +6846,24 @@ impl vortex_array::iter::ArrayIterator for vortex_array::arrow::ArrowArrayStream pub fn vortex_array::arrow::ArrowArrayStreamAdapter::dtype(&self) -> &vortex_array::dtype::DType +pub struct vortex_array::arrow::ArrowExportSession + +impl vortex_array::arrow::ArrowExportSession + +pub fn vortex_array::arrow::ArrowExportSession::find(&self, id: &vortex_array::dtype::extension::ExtId) -> core::option::Option + +pub fn vortex_array::arrow::ArrowExportSession::register(&self, plugin: impl vortex_array::arrow::ArrowExportPlugin) + +pub fn vortex_array::arrow::ArrowExportSession::registry(&self) -> &vortex_array::arrow::ArrowExportRegistry + +impl core::default::Default for vortex_array::arrow::ArrowExportSession + +pub fn vortex_array::arrow::ArrowExportSession::default() -> Self + +impl core::fmt::Debug for vortex_array::arrow::ArrowExportSession + +pub fn vortex_array::arrow::ArrowExportSession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + pub struct vortex_array::arrow::Datum impl vortex_array::arrow::Datum @@ -6816,6 +6900,46 @@ pub fn vortex_array::ArrayRef::execute_record_batch(self, schema: &arrow_schema: pub fn vortex_array::ArrayRef::execute_record_batches(self, schema: &arrow_schema::schema::Schema, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +pub trait vortex_array::arrow::ArrowExportPlugin: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::ArrowExportPlugin::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowExportPlugin::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::arrow::ArrowExportPlugin::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::DateArrowExport + +pub fn vortex_array::extension::datetime::DateArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::DateArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::DateArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::TimeArrowExport + +pub fn vortex_array::extension::datetime::TimeArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::TimeArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::TimeArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::TimestampArrowExport + +pub fn vortex_array::extension::datetime::TimestampArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::TimestampArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::TimestampArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub trait vortex_array::arrow::ArrowExportSessionExt: vortex_session::SessionExt + +pub fn vortex_array::arrow::ArrowExportSessionExt::arrow_exports(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowExportSession> + +impl vortex_array::arrow::ArrowExportSessionExt for S + +pub fn S::arrow_exports(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowExportSession> + pub trait vortex_array::arrow::FromArrowArray pub fn vortex_array::arrow::FromArrowArray::from_arrow(array: A, nullable: bool) -> vortex_error::VortexResult where Self: core::marker::Sized @@ -6986,6 +7110,10 @@ pub fn vortex_array::arrow::to_arrow_null_buffer(validity: vortex_array::validit pub fn vortex_array::arrow::to_null_buffer(mask: vortex_mask::Mask) -> core::option::Option +pub type vortex_array::arrow::ArrowExportPluginRef = alloc::sync::Arc + +pub type vortex_array::arrow::ArrowExportRegistry = vortex_session::registry::Registry + pub mod vortex_array::buffer pub struct vortex_array::buffer::BufferHandle(_) @@ -12628,6 +12756,20 @@ pub fn vortex_array::extension::datetime::Date::validate_dtype(ext_dtype: &vorte pub fn vortex_array::extension::datetime::Date::validate_scalar_value(ext_dtype: &vortex_array::dtype::extension::ExtDType, storage_value: &vortex_array::scalar::ScalarValue) -> vortex_error::VortexResult<()> +pub struct vortex_array::extension::datetime::DateArrowExport + +impl core::fmt::Debug for vortex_array::extension::datetime::DateArrowExport + +pub fn vortex_array::extension::datetime::DateArrowExport::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::DateArrowExport + +pub fn vortex_array::extension::datetime::DateArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::DateArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::DateArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + pub struct vortex_array::extension::datetime::Time impl vortex_array::extension::datetime::Time @@ -12684,6 +12826,20 @@ pub fn vortex_array::extension::datetime::Time::validate_dtype(ext_dtype: &vorte pub fn vortex_array::extension::datetime::Time::validate_scalar_value(ext_dtype: &vortex_array::dtype::extension::ExtDType, storage_value: &vortex_array::scalar::ScalarValue) -> vortex_error::VortexResult<()> +pub struct vortex_array::extension::datetime::TimeArrowExport + +impl core::fmt::Debug for vortex_array::extension::datetime::TimeArrowExport + +pub fn vortex_array::extension::datetime::TimeArrowExport::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::TimeArrowExport + +pub fn vortex_array::extension::datetime::TimeArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::TimeArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::TimeArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + pub struct vortex_array::extension::datetime::Timestamp impl vortex_array::extension::datetime::Timestamp @@ -12742,6 +12898,20 @@ pub fn vortex_array::extension::datetime::Timestamp::validate_dtype(ext_dtype: & pub fn vortex_array::extension::datetime::Timestamp::validate_scalar_value(ext_dtype: &vortex_array::dtype::extension::ExtDType, storage_value: &vortex_array::scalar::ScalarValue) -> vortex_error::VortexResult<()> +pub struct vortex_array::extension::datetime::TimestampArrowExport + +impl core::fmt::Debug for vortex_array::extension::datetime::TimestampArrowExport + +pub fn vortex_array::extension::datetime::TimestampArrowExport::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::ArrowExportPlugin for vortex_array::extension::datetime::TimestampArrowExport + +pub fn vortex_array::extension::datetime::TimestampArrowExport::execute_to_arrow(&self, array: vortex_array::ArrayRef, target: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::datetime::TimestampArrowExport::id(&self) -> vortex_array::dtype::extension::ExtId + +pub fn vortex_array::extension::datetime::TimestampArrowExport::to_arrow_data_type(&self, ext_dtype: &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + pub struct vortex_array::extension::datetime::TimestampOptions pub vortex_array::extension::datetime::TimestampOptions::tz: core::option::Option> diff --git a/vortex-array/src/arrow/executor/mod.rs b/vortex-array/src/arrow/executor/mod.rs index 890e7f8a46a..684c61b502c 100644 --- a/vortex-array/src/arrow/executor/mod.rs +++ b/vortex-array/src/arrow/executor/mod.rs @@ -13,7 +13,6 @@ pub mod null; pub mod primitive; mod run_end; mod struct_; -mod temporal; mod validity; use arrow_array::ArrayRef as ArrowArrayRef; @@ -46,7 +45,7 @@ use crate::arrow::executor::null::to_arrow_null; use crate::arrow::executor::primitive::to_arrow_primitive; use crate::arrow::executor::run_end::to_arrow_run_end; use crate::arrow::executor::struct_::to_arrow_struct; -use crate::arrow::executor::temporal::to_arrow_temporal; +use crate::arrow::export_session::ArrowExportSessionExt; use crate::dtype::DType; use crate::dtype::PType; use crate::executor::ExecutionCtx; @@ -89,11 +88,30 @@ impl ArrowArrayExecutor for ArrayRef { ) -> VortexResult { let len = self.len(); - // Resolve the DataType if it is a leaf type - // we should likely make this extensible. + if let Some(ext) = self.dtype().as_extension_opt() { + let plugin = ctx.session().arrow_exports().find(&ext.id()); + let Some(plugin) = plugin else { + vortex_bail!( + "no ArrowExportPlugin registered for extension id {}", + ext.id() + ); + }; + let target = match data_type { + Some(dt) => dt.clone(), + None => plugin.to_arrow_data_type(ext)?, + }; + let arrow = plugin.execute_to_arrow(self, &target, ctx)?; + vortex_ensure!( + arrow.len() == len, + "Arrow array length does not match Vortex array length after conversion to {:?}", + arrow + ); + return Ok(arrow); + } + let resolved_type: DataType = match data_type { Some(dt) => dt.clone(), - None => preferred_arrow_type(&self)?, + None => preferred_arrow_type(&self, ctx)?, }; let arrow = match &resolved_type { @@ -110,11 +128,6 @@ impl ArrowArrayExecutor for ArrayRef { DataType::Float16 => to_arrow_primitive::(self, ctx), DataType::Float32 => to_arrow_primitive::(self, ctx), DataType::Float64 => to_arrow_primitive::(self, ctx), - DataType::Timestamp(..) - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) => to_arrow_temporal(self, &resolved_type, ctx), DataType::Binary => to_arrow_byte_array::(self, ctx), DataType::LargeBinary => to_arrow_byte_array::(self, ctx), DataType::Utf8 => to_arrow_byte_array::(self, ctx), @@ -157,7 +170,12 @@ impl ArrowArrayExecutor for ArrayRef { DataType::RunEndEncoded(ends_type, values_type) => { to_arrow_run_end(self, ends_type.data_type(), values_type, ctx) } - DataType::FixedSizeBinary(_) + DataType::Timestamp(..) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::FixedSizeBinary(_) | DataType::Map(..) | DataType::Duration(_) | DataType::Interval(_) @@ -192,7 +210,7 @@ impl ArrowArrayExecutor for ArrayRef { /// However, some encodings have cheaper Arrow representations: /// - `VarBinArray`: Uses `Utf8`/`Binary` (offset-based) instead of `Utf8View`/`BinaryView` /// - `ListArray`: Uses `List` instead of `ListView` -fn preferred_arrow_type(array: &ArrayRef) -> VortexResult { +fn preferred_arrow_type(array: &ArrayRef, ctx: &ExecutionCtx) -> VortexResult { // VarBinArray: use offset-based Binary/Utf8 instead of View types if let Some(varbin) = array.as_opt::() { let offsets_ptype = PType::try_from(varbin.offsets().dtype())?; @@ -211,8 +229,7 @@ fn preferred_arrow_type(array: &ArrayRef) -> VortexResult { if let Some(list) = array.as_opt::() { let offsets_ptype = PType::try_from(list.offsets().dtype())?; let use_large = matches!(offsets_ptype, PType::I64 | PType::U64); - // Recursively get the preferred type for elements - let elem_dtype = preferred_arrow_type(list.elements())?; + let elem_dtype = preferred_arrow_type(list.elements(), ctx)?; let field = FieldRef::new(Field::new_list_field( elem_dtype, list.elements().dtype().is_nullable(), @@ -225,6 +242,11 @@ fn preferred_arrow_type(array: &ArrayRef) -> VortexResult { }); } - // Everything else: use canonical dtype conversion + if let Some(ext) = array.dtype().as_extension_opt() + && let Some(plugin) = ctx.session().arrow_exports().find(&ext.id()) + { + return plugin.to_arrow_data_type(ext); + } + array.dtype().to_arrow_dtype() } diff --git a/vortex-array/src/arrow/executor/temporal.rs b/vortex-array/src/arrow/executor/temporal.rs deleted file mode 100644 index b3941b69916..00000000000 --- a/vortex-array/src/arrow/executor/temporal.rs +++ /dev/null @@ -1,170 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::sync::Arc; - -use arrow_array::ArrayRef as ArrowArrayRef; -use arrow_array::PrimitiveArray; -use arrow_array::PrimitiveArray as ArrowPrimitiveArray; -use arrow_array::types::ArrowTemporalType; -use arrow_array::types::ArrowTimestampType; -use arrow_array::types::Date32Type; -use arrow_array::types::Date64Type; -use arrow_array::types::Time32MillisecondType; -use arrow_array::types::Time32SecondType; -use arrow_array::types::Time64MicrosecondType; -use arrow_array::types::Time64NanosecondType; -use arrow_array::types::TimestampMicrosecondType; -use arrow_array::types::TimestampMillisecondType; -use arrow_array::types::TimestampNanosecondType; -use arrow_array::types::TimestampSecondType; -use arrow_schema::DataType; -use arrow_schema::TimeUnit as ArrowTimeUnit; -use vortex_error::VortexResult; -use vortex_error::vortex_bail; -use vortex_error::vortex_ensure; -use vortex_error::vortex_err; - -use crate::ArrayRef; -use crate::ExecutionCtx; -use crate::arrays::ExtensionArray; -use crate::arrays::PrimitiveArray as VortexPrimitiveArray; -use crate::arrays::extension::ExtensionArrayExt; -use crate::arrow::null_buffer::to_null_buffer; -use crate::dtype::NativePType; -use crate::extension::datetime::AnyTemporal; -use crate::extension::datetime::TemporalMetadata; -use crate::extension::datetime::TimeUnit; - -pub(super) fn to_arrow_temporal( - array: ArrayRef, - data_type: &DataType, - ctx: &mut ExecutionCtx, -) -> VortexResult { - let temporal_options = array - .dtype() - .as_extension() - .metadata_opt::() - .ok_or_else(|| { - vortex_err!( - "Array dtype {} is not a temporal extension type", - array.dtype() - ) - })?; - - match (temporal_options, &data_type) { - (TemporalMetadata::Date(TimeUnit::Days), DataType::Date32) => { - to_temporal::(array, ctx) - } - (TemporalMetadata::Date(TimeUnit::Milliseconds), DataType::Date64) => { - to_temporal::(array, ctx) - } - (TemporalMetadata::Time(TimeUnit::Seconds), DataType::Time32(ArrowTimeUnit::Second)) => { - to_temporal::(array, ctx) - } - ( - TemporalMetadata::Time(TimeUnit::Milliseconds), - DataType::Time32(ArrowTimeUnit::Millisecond), - ) => to_temporal::(array, ctx), - ( - TemporalMetadata::Time(TimeUnit::Microseconds), - DataType::Time64(ArrowTimeUnit::Microsecond), - ) => to_temporal::(array, ctx), - - ( - TemporalMetadata::Time(TimeUnit::Nanoseconds), - DataType::Time64(ArrowTimeUnit::Nanosecond), - ) => to_temporal::(array, ctx), - - (TemporalMetadata::Timestamp(unit, tz), DataType::Timestamp(arrow_unit, arrow_tz)) => { - vortex_ensure!( - tz == arrow_tz, - "Cannot convert {} array to Arrow type {} due to timezone mismatch", - array.dtype(), - data_type - ); - - match (unit, arrow_unit) { - (TimeUnit::Seconds, ArrowTimeUnit::Second) => { - to_arrow_timestamp::(array, arrow_tz, ctx) - } - (TimeUnit::Milliseconds, ArrowTimeUnit::Millisecond) => { - to_arrow_timestamp::(array, arrow_tz, ctx) - } - (TimeUnit::Microseconds, ArrowTimeUnit::Microsecond) => { - to_arrow_timestamp::(array, arrow_tz, ctx) - } - (TimeUnit::Nanoseconds, ArrowTimeUnit::Nanosecond) => { - to_arrow_timestamp::(array, arrow_tz, ctx) - } - _ => vortex_bail!( - "Cannot convert {} array to Arrow type {}", - array.dtype(), - data_type - ), - } - } - _ => vortex_bail!( - "Cannot convert {} array to Arrow type {}", - array.dtype(), - data_type - ), - } -} - -fn to_temporal( - array: ArrayRef, - ctx: &mut ExecutionCtx, -) -> VortexResult -where - T::Native: NativePType, -{ - // We cast the array to the native primitive type. - Ok(Arc::new(to_arrow_temporal_primitive::(array, ctx)?)) -} - -fn to_arrow_timestamp( - array: ArrayRef, - arrow_tz: &Option>, - ctx: &mut ExecutionCtx, -) -> VortexResult -where - T::Native: NativePType, -{ - Ok(Arc::new( - to_arrow_temporal_primitive::(array, ctx)?.with_timezone_opt(arrow_tz.clone()), - )) -} - -fn to_arrow_temporal_primitive( - array: ArrayRef, - ctx: &mut ExecutionCtx, -) -> VortexResult> -where - T::Native: NativePType, -{ - debug_assert!(array.dtype().as_extension().is::()); - - let ext_array = array.execute::(ctx)?; - let primitive = ext_array - .storage_array() - .clone() - .execute::(ctx)?; - vortex_ensure!( - primitive.ptype() == T::Native::PTYPE, - "Expected temporal array to produce vector of width {}, found {}", - T::Native::PTYPE, - primitive.ptype() - ); - - let validity = primitive - .as_ref() - .validity()? - .to_mask(primitive.as_ref().len(), ctx)?; - let buffer = primitive.to_buffer::(); - - let values = buffer.into_arrow_scalar_buffer(); - let nulls = to_null_buffer(validity); - - Ok(PrimitiveArray::::new(values, nulls)) -} diff --git a/vortex-array/src/arrow/export_plugin.rs b/vortex-array/src/arrow/export_plugin.rs new file mode 100644 index 00000000000..92ea1dd6ac9 --- /dev/null +++ b/vortex-array/src/arrow/export_plugin.rs @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Plugin trait for exporting extension-typed arrays to Arrow. + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::DataType; +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::dtype::extension::ExtDTypeRef; +use crate::dtype::extension::ExtId; +use crate::executor::ExecutionCtx; + +/// Shared reference to an [`ArrowExportPlugin`]. +pub type ArrowExportPluginRef = Arc; + +/// Plugin for exporting an extension-typed array to Arrow. +/// +/// Extension types register an [`ArrowExportPlugin`] to own the mapping from their extension +/// dtype to Arrow [`DataType`] and the conversion of array data. The core arrow executor +/// delegates to these plugins instead of hard-coding extension-specific behavior. +pub trait ArrowExportPlugin: 'static + Send + Sync + Debug { + /// The extension type id this plugin handles. + fn id(&self) -> ExtId; + + /// Preferred Arrow [`DataType`] for this extension type, given its metadata. + /// + /// Called by the executor when no target Arrow type was supplied by the caller. + fn to_arrow_data_type(&self, ext_dtype: &ExtDTypeRef) -> VortexResult; + + /// Execute the extension-typed `array` to an Arrow array of type `target`. + /// + /// `array` is the full extension-typed array; the plugin is responsible for unwrapping it + /// to storage. If `target` is not a type this plugin can produce, the plugin must + /// `vortex_bail!` rather than attempting a best-effort conversion. + fn execute_to_arrow( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult; +} diff --git a/vortex-array/src/arrow/export_session.rs b/vortex-array/src/arrow/export_session.rs new file mode 100644 index 00000000000..5b73fded254 --- /dev/null +++ b/vortex-array/src/arrow/export_session.rs @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Session-scoped registry of [`ArrowExportPlugin`]s. + +use std::sync::Arc; + +use vortex_session::Ref; +use vortex_session::SessionExt; +use vortex_session::registry::Registry; + +use crate::arrow::export_plugin::ArrowExportPlugin; +use crate::arrow::export_plugin::ArrowExportPluginRef; +use crate::dtype::extension::ExtId; +use crate::extension::datetime::DateArrowExport; +use crate::extension::datetime::TimeArrowExport; +use crate::extension::datetime::TimestampArrowExport; + +/// Registry of Arrow export plugins keyed by extension id. +pub type ArrowExportRegistry = Registry; + +/// Session for managing Arrow export plugins. +#[derive(Debug)] +pub struct ArrowExportSession { + registry: ArrowExportRegistry, +} + +impl Default for ArrowExportSession { + fn default() -> Self { + let this = Self { + registry: Registry::default(), + }; + this.register(DateArrowExport); + this.register(TimeArrowExport); + this.register(TimestampArrowExport); + this + } +} + +impl ArrowExportSession { + /// Register an Arrow export plugin, replacing any existing plugin with the same [`ExtId`]. + pub fn register(&self, plugin: impl ArrowExportPlugin) { + let id = plugin.id(); + self.registry + .register(id, Arc::new(plugin) as ArrowExportPluginRef); + } + + /// Find the plugin registered for `id`, if any. + pub fn find(&self, id: &ExtId) -> Option { + self.registry.find(id) + } + + /// Return the underlying registry. + pub fn registry(&self) -> &ArrowExportRegistry { + &self.registry + } +} + +/// Extension trait for accessing the [`ArrowExportSession`] from a Vortex session. +pub trait ArrowExportSessionExt: SessionExt { + /// Get the Arrow export session. + fn arrow_exports(&self) -> Ref<'_, ArrowExportSession>; +} + +impl ArrowExportSessionExt for S { + fn arrow_exports(&self) -> Ref<'_, ArrowExportSession> { + self.get::() + } +} diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index bcc9e6094a1..c063f9a7002 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -10,12 +10,16 @@ use vortex_error::VortexResult; mod convert; mod datum; mod executor; +pub mod export_plugin; +pub mod export_session; mod iter; mod null_buffer; mod record_batch; pub use datum::*; pub use executor::*; +pub use export_plugin::*; +pub use export_session::*; pub use iter::*; pub use null_buffer::to_arrow_null_buffer; pub use null_buffer::to_null_buffer; diff --git a/vortex-array/src/dtype/arrow.rs b/vortex-array/src/dtype/arrow.rs index 17af749cfc0..783c11783dd 100644 --- a/vortex-array/src/dtype/arrow.rs +++ b/vortex-array/src/dtype/arrow.rs @@ -14,6 +14,7 @@ //! materialize an Arrow ArrayRef at the very end of the processing chain. use std::sync::Arc; +use std::sync::LazyLock; use arrow_schema::DataType; use arrow_schema::Field; @@ -28,21 +29,23 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; -use vortex_error::vortex_panic; +use crate::arrow::export_session::ArrowExportSession; use crate::dtype::DType; use crate::dtype::DecimalDType; use crate::dtype::FieldName; use crate::dtype::Nullability; use crate::dtype::PType; use crate::dtype::StructFields; -use crate::extension::datetime::AnyTemporal; use crate::extension::datetime::Date; -use crate::extension::datetime::TemporalMetadata; use crate::extension::datetime::Time; use crate::extension::datetime::TimeUnit; use crate::extension::datetime::Timestamp; +/// Default [`ArrowExportSession`] used by [`DType::to_arrow_dtype`] when no session is available. +static DEFAULT_ARROW_EXPORTS: LazyLock = + LazyLock::new(ArrowExportSession::default); + /// Trait for converting Arrow types to Vortex types. pub trait FromArrowType: Sized { /// Convert the Arrow type to a Vortex type. @@ -324,32 +327,10 @@ impl DType { "DType::Variant requires Arrow Field metadata; use to_arrow_schema or a Field helper" ), DType::Extension(ext_dtype) => { - // Try and match against the known extension DTypes. - if let Some(temporal) = ext_dtype.metadata_opt::() { - return Ok(match temporal { - TemporalMetadata::Timestamp(unit, tz) => { - DataType::Timestamp(ArrowTimeUnit::try_from(*unit)?, tz.clone()) - } - TemporalMetadata::Date(unit) => match unit { - TimeUnit::Days => DataType::Date32, - TimeUnit::Milliseconds => DataType::Date64, - TimeUnit::Nanoseconds | TimeUnit::Microseconds | TimeUnit::Seconds => { - vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext_dtype.id()) - } - }, - TemporalMetadata::Time(unit) => match unit { - TimeUnit::Seconds => DataType::Time32(ArrowTimeUnit::Second), - TimeUnit::Milliseconds => DataType::Time32(ArrowTimeUnit::Millisecond), - TimeUnit::Microseconds => DataType::Time64(ArrowTimeUnit::Microsecond), - TimeUnit::Nanoseconds => DataType::Time64(ArrowTimeUnit::Nanosecond), - TimeUnit::Days => { - vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext_dtype.id()) - } - }, - }); - }; - - vortex_bail!("Unsupported extension type \"{}\"", ext_dtype.id()) + let plugin = DEFAULT_ARROW_EXPORTS.find(&ext_dtype.id()).ok_or_else(|| { + vortex_err!("Unsupported extension type \"{}\"", ext_dtype.id()) + })?; + return plugin.to_arrow_data_type(ext_dtype); } }) } diff --git a/vortex-array/src/extension/datetime/arrow.rs b/vortex-array/src/extension/datetime/arrow.rs new file mode 100644 index 00000000000..15c502da546 --- /dev/null +++ b/vortex-array/src/extension/datetime/arrow.rs @@ -0,0 +1,266 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Arrow export plugins for the built-in temporal extension types. + +use std::sync::Arc; + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::PrimitiveArray; +use arrow_array::types::ArrowTemporalType; +use arrow_array::types::ArrowTimestampType; +use arrow_array::types::Date32Type; +use arrow_array::types::Date64Type; +use arrow_array::types::Time32MillisecondType; +use arrow_array::types::Time32SecondType; +use arrow_array::types::Time64MicrosecondType; +use arrow_array::types::Time64NanosecondType; +use arrow_array::types::TimestampMicrosecondType; +use arrow_array::types::TimestampMillisecondType; +use arrow_array::types::TimestampNanosecondType; +use arrow_array::types::TimestampSecondType; +use arrow_schema::DataType; +use arrow_schema::TimeUnit as ArrowTimeUnit; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; + +use crate::ArrayRef; +use crate::arrays::ExtensionArray; +use crate::arrays::PrimitiveArray as VortexPrimitiveArray; +use crate::arrays::extension::ExtensionArrayExt; +use crate::arrow::export_plugin::ArrowExportPlugin; +use crate::arrow::to_null_buffer; +use crate::dtype::NativePType; +use crate::dtype::extension::ExtDTypeRef; +use crate::dtype::extension::ExtId; +use crate::dtype::extension::ExtVTable; +use crate::executor::ExecutionCtx; +use crate::extension::datetime::Date; +use crate::extension::datetime::Time; +use crate::extension::datetime::TimeUnit; +use crate::extension::datetime::Timestamp; + +/// Arrow export plugin for [`Date`] extension types. +#[derive(Debug)] +pub struct DateArrowExport; + +impl ArrowExportPlugin for DateArrowExport { + fn id(&self) -> ExtId { + Date.id() + } + + fn to_arrow_data_type(&self, ext_dtype: &ExtDTypeRef) -> VortexResult { + let unit = date_metadata(ext_dtype)?; + date_arrow_type(unit) + } + + fn execute_to_arrow( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + match target { + DataType::Date32 => to_temporal::(array, ctx), + DataType::Date64 => to_temporal::(array, ctx), + _ => vortex_bail!( + "Cannot convert {} array to Arrow type {target}", + array.dtype() + ), + } + } +} + +/// Arrow export plugin for [`Time`] extension types. +#[derive(Debug)] +pub struct TimeArrowExport; + +impl ArrowExportPlugin for TimeArrowExport { + fn id(&self) -> ExtId { + Time.id() + } + + fn to_arrow_data_type(&self, ext_dtype: &ExtDTypeRef) -> VortexResult { + let unit = time_metadata(ext_dtype)?; + time_arrow_type(unit) + } + + fn execute_to_arrow( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + match target { + DataType::Time32(ArrowTimeUnit::Second) => to_temporal::(array, ctx), + DataType::Time32(ArrowTimeUnit::Millisecond) => { + to_temporal::(array, ctx) + } + DataType::Time64(ArrowTimeUnit::Microsecond) => { + to_temporal::(array, ctx) + } + DataType::Time64(ArrowTimeUnit::Nanosecond) => { + to_temporal::(array, ctx) + } + _ => vortex_bail!( + "Cannot convert {} array to Arrow type {target}", + array.dtype() + ), + } + } +} + +/// Arrow export plugin for [`Timestamp`] extension types. +#[derive(Debug)] +pub struct TimestampArrowExport; + +impl ArrowExportPlugin for TimestampArrowExport { + fn id(&self) -> ExtId { + Timestamp.id() + } + + fn to_arrow_data_type(&self, ext_dtype: &ExtDTypeRef) -> VortexResult { + let opts = ext_dtype + .metadata_opt::() + .ok_or_else(|| vortex_err!("expected Timestamp metadata, got {}", ext_dtype.id()))?; + Ok(DataType::Timestamp( + ArrowTimeUnit::try_from(opts.unit)?, + opts.tz.clone(), + )) + } + + fn execute_to_arrow( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let DataType::Timestamp(arrow_unit, arrow_tz) = target else { + vortex_bail!( + "Cannot convert {} array to Arrow type {target}", + array.dtype() + ); + }; + + let opts = array + .dtype() + .as_extension() + .metadata_opt::() + .ok_or_else(|| vortex_err!("expected Timestamp metadata, got {}", array.dtype()))?; + + vortex_ensure!( + &opts.tz == arrow_tz, + "Cannot convert {} array to Arrow type {} due to timezone mismatch", + array.dtype(), + target + ); + + match (&opts.unit, arrow_unit) { + (TimeUnit::Seconds, ArrowTimeUnit::Second) => { + to_arrow_timestamp::(array, arrow_tz, ctx) + } + (TimeUnit::Milliseconds, ArrowTimeUnit::Millisecond) => { + to_arrow_timestamp::(array, arrow_tz, ctx) + } + (TimeUnit::Microseconds, ArrowTimeUnit::Microsecond) => { + to_arrow_timestamp::(array, arrow_tz, ctx) + } + (TimeUnit::Nanoseconds, ArrowTimeUnit::Nanosecond) => { + to_arrow_timestamp::(array, arrow_tz, ctx) + } + _ => vortex_bail!( + "Cannot convert {} array to Arrow type {}", + array.dtype(), + target + ), + } + } +} + +fn date_metadata(ext_dtype: &ExtDTypeRef) -> VortexResult<&TimeUnit> { + ext_dtype + .metadata_opt::() + .ok_or_else(|| vortex_err!("expected Date metadata, got {}", ext_dtype.id())) +} + +fn time_metadata(ext_dtype: &ExtDTypeRef) -> VortexResult<&TimeUnit> { + ext_dtype + .metadata_opt::