diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 9701759a787..4a56284cc03 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -29,7 +29,7 @@ arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-data = { workspace = true } arrow-ord = { workspace = true } -arrow-schema = { workspace = true } +arrow-schema = { workspace = true, features = ["canonical_extension_types"] } arrow-select = { workspace = true } arrow-string = { workspace = true } async-lock = { workspace = true } diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 87eca1bb0c2..e77be646425 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -7076,6 +7076,18 @@ pub mod vortex_array::arrow::primitive pub fn vortex_array::arrow::primitive::canonical_primitive_to_arrow(vortex_array::arrays::PrimitiveArray, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult where ::Native: vortex_array::dtype::NativePType +pub enum vortex_array::arrow::ArrowExport + +pub vortex_array::arrow::ArrowExport::Exported(arrow_array::array::ArrayRef) + +pub vortex_array::arrow::ArrowExport::Unsupported(vortex_array::ArrayRef) + +pub enum vortex_array::arrow::ArrowImport + +pub vortex_array::arrow::ArrowImport::Imported(vortex_array::ArrayRef) + +pub vortex_array::arrow::ArrowImport::Unsupported(arrow_array::array::ArrayRef) + pub struct vortex_array::arrow::ArrowArrayStreamAdapter impl vortex_array::arrow::ArrowArrayStreamAdapter @@ -7092,6 +7104,44 @@ impl vortex_array::iter::ArrayIterator for vortex_array::arrow::ArrowArrayStream pub fn vortex_array::arrow::ArrowArrayStreamAdapter::dtype(&self) -> &vortex_array::dtype::DType +pub struct vortex_array::arrow::ArrowSession + +impl vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::execute_arrow(&self, vortex_array::ArrayRef, core::option::Option<&arrow_schema::field::Field>, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::from_arrow_array(&self, arrow_array::array::ArrayRef, &arrow_schema::field::Field) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::from_arrow_field(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::from_arrow_record_batch(&self, arrow_array::record_batch::RecordBatch, &arrow_schema::schema::Schema) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::from_arrow_schema(&self, &arrow_schema::schema::Schema) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::register_exporter(&self, vortex_array::arrow::ArrowExportVTableRef) + +pub fn vortex_array::arrow::ArrowSession::register_importer(&self, vortex_array::arrow::ArrowImportVTableRef) + +pub fn vortex_array::arrow::ArrowSession::to_arrow_data_type(&self, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::to_arrow_field(&self, &str, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::to_arrow_schema(&self, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +impl core::default::Default for vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::default() -> Self + +impl core::fmt::Debug for vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_session::SessionVar for vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::as_any(&self) -> &dyn core::any::Any + +pub fn vortex_array::arrow::ArrowSession::as_any_mut(&mut self) -> &mut dyn core::any::Any + pub struct vortex_array::arrow::Datum impl vortex_array::arrow::Datum @@ -7128,6 +7178,50 @@ pub fn vortex_array::ArrayRef::execute_record_batch(self, &arrow_schema::schema: pub fn vortex_array::ArrayRef::execute_record_batches(self, &arrow_schema::schema::Schema, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +pub trait vortex_array::arrow::ArrowExportVTable: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::ArrowExportVTable::arrow_ext_id(&self) -> vortex_session::registry::Id + +pub fn vortex_array::arrow::ArrowExportVTable::execute_arrow(&self, vortex_array::ArrayRef, &arrow_schema::field::Field, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowExportVTable::to_arrow_field(&self, &str, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::ArrowExportVTable::vortex_ext_id(&self) -> vortex_array::dtype::extension::ExtId + +impl vortex_array::arrow::ArrowExportVTable for vortex_array::extension::uuid::Uuid + +pub fn vortex_array::extension::uuid::Uuid::arrow_ext_id(&self) -> vortex_session::registry::Id + +pub fn vortex_array::extension::uuid::Uuid::execute_arrow(&self, vortex_array::ArrayRef, &arrow_schema::field::Field, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::uuid::Uuid::to_arrow_field(&self, &str, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult> + +pub fn vortex_array::extension::uuid::Uuid::vortex_ext_id(&self) -> vortex_array::dtype::extension::ExtId + +pub trait vortex_array::arrow::ArrowImportVTable: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::ArrowImportVTable::arrow_ext_id(&self) -> vortex_session::registry::Id + +pub fn vortex_array::arrow::ArrowImportVTable::from_arrow_array(&self, arrow_array::array::ArrayRef, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowImportVTable::from_arrow_field(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + +impl vortex_array::arrow::ArrowImportVTable for vortex_array::extension::uuid::Uuid + +pub fn vortex_array::extension::uuid::Uuid::arrow_ext_id(&self) -> vortex_session::registry::Id + +pub fn vortex_array::extension::uuid::Uuid::from_arrow_array(&self, arrow_array::array::ArrayRef, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::extension::uuid::Uuid::from_arrow_field(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + +pub trait vortex_array::arrow::ArrowSessionExt: vortex_session::SessionExt + +pub fn vortex_array::arrow::ArrowSessionExt::arrow(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowSession> + +impl vortex_array::arrow::ArrowSessionExt for S + +pub fn S::arrow(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowSession> + pub trait vortex_array::arrow::FromArrowArray pub fn vortex_array::arrow::FromArrowArray::from_arrow(A, bool) -> vortex_error::VortexResult where Self: core::marker::Sized @@ -7298,6 +7392,10 @@ pub fn vortex_array::arrow::to_arrow_null_buffer(vortex_array::validity::Validit pub fn vortex_array::arrow::to_null_buffer(vortex_mask::Mask) -> core::option::Option +pub type vortex_array::arrow::ArrowExportVTableRef = alloc::sync::Arc + +pub type vortex_array::arrow::ArrowImportVTableRef = alloc::sync::Arc + pub mod vortex_array::buffer pub struct vortex_array::buffer::BufferHandle(_) @@ -13118,6 +13216,24 @@ pub fn vortex_array::extension::uuid::Uuid::hash<__H: core::hash::Hasher>(&self, impl core::marker::StructuralPartialEq for vortex_array::extension::uuid::Uuid +impl vortex_array::arrow::ArrowExportVTable for vortex_array::extension::uuid::Uuid + +pub fn vortex_array::extension::uuid::Uuid::arrow_ext_id(&self) -> vortex_session::registry::Id + +pub fn vortex_array::extension::uuid::Uuid::execute_arrow(&self, vortex_array::ArrayRef, &arrow_schema::field::Field, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::extension::uuid::Uuid::to_arrow_field(&self, &str, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult> + +pub fn vortex_array::extension::uuid::Uuid::vortex_ext_id(&self) -> vortex_array::dtype::extension::ExtId + +impl vortex_array::arrow::ArrowImportVTable for vortex_array::extension::uuid::Uuid + +pub fn vortex_array::extension::uuid::Uuid::arrow_ext_id(&self) -> vortex_session::registry::Id + +pub fn vortex_array::extension::uuid::Uuid::from_arrow_array(&self, arrow_array::array::ArrayRef, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::extension::uuid::Uuid::from_arrow_field(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + impl vortex_array::dtype::extension::ExtVTable for vortex_array::extension::uuid::Uuid pub type vortex_array::extension::uuid::Uuid::Metadata = vortex_array::extension::uuid::UuidMetadata diff --git a/vortex-array/src/arrow/convert.rs b/vortex-array/src/arrow/convert.rs index ba95bdf0451..7c64d937704 100644 --- a/vortex-array/src/arrow/convert.rs +++ b/vortex-array/src/arrow/convert.rs @@ -331,7 +331,7 @@ impl FromArrowArray<&ArrowBooleanArray> for ArrayRef { } /// Strip out the nulls from this array and return a new array without nulls. -fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData { +pub(crate) fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData { if data.null_count() == 0 { // No nulls to remove, return the array as is return data; @@ -476,7 +476,7 @@ impl FromArrowArray<&DictionaryArray> for DictArra } } -fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity { +pub(crate) fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity { if nullable { nulls .map(|nulls| { @@ -909,8 +909,10 @@ mod tests { Arc::new(Date32Array::from(vec![18000_i32, 18001, 18002, 18003])), )] #[case::date64( - Arc::new(Date64Array::from(vec![Some(1555200000000), None, Some(1555286400000), Some(1555372800000)])), - Arc::new(Date64Array::from(vec![1555200000000_i64, 1555213600000, 1555286400000, 1555372800000])), + Arc::new(Date64Array::from(vec![Some(1555200000000), None, Some(1555286400000), Some(1555372800000)] + )), + Arc::new(Date64Array::from(vec![1555200000000_i64, 1555213600000, 1555286400000, 1555372800000] + )), )] fn test_temporal_array_conversion( #[case] nullable: Arc, diff --git a/vortex-array/src/arrow/executor/fixed_size_list.rs b/vortex-array/src/arrow/executor/fixed_size_list.rs index 7abea9aaa7a..caeb5804da2 100644 --- a/vortex-array/src/arrow/executor/fixed_size_list.rs +++ b/vortex-array/src/arrow/executor/fixed_size_list.rs @@ -12,8 +12,8 @@ use crate::ExecutionCtx; use crate::arrays::FixedSizeList; use crate::arrays::FixedSizeListArray; use crate::arrays::fixed_size_list::FixedSizeListArrayExt; -use crate::arrow::ArrowArrayExecutor; use crate::arrow::executor::validity::to_arrow_null_buffer; +use crate::arrow::session::ArrowSessionExt; pub(super) fn to_arrow_fixed_list( array: ArrayRef, @@ -44,10 +44,11 @@ fn list_to_list( list_size ); - let elements = array - .elements() - .clone() - .execute_arrow(Some(elements_field.data_type()), ctx)?; + let elements = ctx.session().clone().arrow().execute_arrow( + array.elements().clone(), + Some(elements_field.as_ref()), + ctx, + )?; vortex_ensure!( elements_field.is_nullable() || elements.null_count() == 0, "Cannot convert FixedSizeListArray to non-nullable Arrow array when elements are nullable" diff --git a/vortex-array/src/arrow/executor/list.rs b/vortex-array/src/arrow/executor/list.rs index 6c7c271e933..d59088ade34 100644 --- a/vortex-array/src/arrow/executor/list.rs +++ b/vortex-array/src/arrow/executor/list.rs @@ -26,8 +26,8 @@ use crate::arrays::list::ListArrayExt; use crate::arrays::listview::ListViewArrayExt; use crate::arrays::listview::ListViewDataParts; use crate::arrays::listview::ListViewRebuildMode; -use crate::arrow::ArrowArrayExecutor; use crate::arrow::executor::validity::to_arrow_null_buffer; +use crate::arrow::session::ArrowSessionExt; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::NativePType; @@ -96,10 +96,11 @@ fn list_to_list( .to_buffer::() .into_arrow_offset_buffer(); - let elements = array - .elements() - .clone() - .execute_arrow(Some(elements_field.data_type()), ctx)?; + let elements = ctx.session().clone().arrow().execute_arrow( + array.elements().clone(), + Some(elements_field.as_ref()), + ctx, + )?; vortex_ensure!( elements_field.is_nullable() || elements.null_count() == 0, "Cannot convert to non-nullable Arrow array with null elements" @@ -124,10 +125,11 @@ fn list_view_zctl( assert!(array.is_zero_copy_to_list()); if array.is_empty() { - let elements = array - .elements() - .clone() - .execute_arrow(Some(elements_field.data_type()), ctx)?; + let elements = ctx.session().clone().arrow().execute_arrow( + array.elements().clone(), + Some(elements_field.as_ref()), + ctx, + )?; return Ok(Arc::new(GenericListArray::::new( Arc::clone(elements_field), OffsetBuffer::new_empty(), @@ -176,7 +178,11 @@ fn list_view_zctl( }); // Extract the elements array. - let elements = elements.execute_arrow(Some(elements_field.data_type()), ctx)?; + let elements = ctx.session().clone().arrow().execute_arrow( + elements, + Some(elements_field.as_ref()), + ctx, + )?; vortex_ensure!( elements_field.is_nullable() || elements.null_count() == 0, "Cannot convert to non-nullable Arrow array with null elements" diff --git a/vortex-array/src/arrow/executor/list_view.rs b/vortex-array/src/arrow/executor/list_view.rs index a6795b1adaa..ef858fa9916 100644 --- a/vortex-array/src/arrow/executor/list_view.rs +++ b/vortex-array/src/arrow/executor/list_view.rs @@ -15,8 +15,8 @@ use crate::arrays::ListView; use crate::arrays::ListViewArray; use crate::arrays::PrimitiveArray; use crate::arrays::listview::ListViewDataParts; -use crate::arrow::ArrowArrayExecutor; use crate::arrow::executor::validity::to_arrow_null_buffer; +use crate::arrow::session::ArrowSessionExt; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::IntegerPType; @@ -51,7 +51,11 @@ fn list_view_to_list_view( .. } = array.into_data_parts(); - let elements = elements.execute_arrow(Some(elements_field.data_type()), ctx)?; + let elements = ctx.session().clone().arrow().execute_arrow( + elements, + Some(elements_field.as_ref()), + ctx, + )?; vortex_ensure!( elements_field.is_nullable() || elements.null_count() == 0, "Elements field is non-nullable but elements array contains nulls" diff --git a/vortex-array/src/arrow/executor/mod.rs b/vortex-array/src/arrow/executor/mod.rs index 890e7f8a46a..4e737d6e9f9 100644 --- a/vortex-array/src/arrow/executor/mod.rs +++ b/vortex-array/src/arrow/executor/mod.rs @@ -46,7 +46,10 @@ use crate::arrow::executor::null::to_arrow_null; use crate::arrow::executor::primitive::to_arrow_primitive; use crate::arrow::executor::run_end::to_arrow_run_end; use crate::arrow::executor::struct_::to_arrow_struct; -use crate::arrow::executor::temporal::to_arrow_temporal; +use crate::arrow::executor::temporal::to_arrow_date; +use crate::arrow::executor::temporal::to_arrow_time; +use crate::arrow::executor::temporal::to_arrow_timestamp; +use crate::arrow::session::ArrowSessionExt; use crate::dtype::DType; use crate::dtype::PType; use crate::executor::ExecutionCtx; @@ -87,92 +90,22 @@ impl ArrowArrayExecutor for ArrayRef { data_type: Option<&DataType>, ctx: &mut ExecutionCtx, ) -> VortexResult { - let len = self.len(); - - // Resolve the DataType if it is a leaf type - // we should likely make this extensible. - let resolved_type: DataType = match data_type { - Some(dt) => dt.clone(), - None => preferred_arrow_type(&self)?, - }; - - let arrow = match &resolved_type { - DataType::Null => to_arrow_null(self, ctx), - DataType::Boolean => to_arrow_bool(self, ctx), - DataType::Int8 => to_arrow_primitive::(self, ctx), - DataType::Int16 => to_arrow_primitive::(self, ctx), - DataType::Int32 => to_arrow_primitive::(self, ctx), - DataType::Int64 => to_arrow_primitive::(self, ctx), - DataType::UInt8 => to_arrow_primitive::(self, ctx), - DataType::UInt16 => to_arrow_primitive::(self, ctx), - DataType::UInt32 => to_arrow_primitive::(self, ctx), - DataType::UInt64 => to_arrow_primitive::(self, ctx), - DataType::Float16 => to_arrow_primitive::(self, ctx), - DataType::Float32 => to_arrow_primitive::(self, ctx), - DataType::Float64 => to_arrow_primitive::(self, ctx), - DataType::Timestamp(..) - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) => to_arrow_temporal(self, &resolved_type, ctx), - DataType::Binary => to_arrow_byte_array::(self, ctx), - DataType::LargeBinary => to_arrow_byte_array::(self, ctx), - DataType::Utf8 => to_arrow_byte_array::(self, ctx), - DataType::LargeUtf8 => to_arrow_byte_array::(self, ctx), - DataType::BinaryView => to_arrow_byte_view::(self, ctx), - DataType::Utf8View => to_arrow_byte_view::(self, ctx), - // TODO(joe): pass down preferred - DataType::List(elements_field) => to_arrow_list::(self, elements_field, ctx), - // TODO(joe): pass down preferred - DataType::LargeList(elements_field) => to_arrow_list::(self, elements_field, ctx), - // TODO(joe): pass down preferred - DataType::FixedSizeList(elements_field, list_size) => { - to_arrow_fixed_list(self, *list_size, elements_field, ctx) - } - // TODO(joe): pass down preferred - DataType::ListView(elements_field) => { - to_arrow_list_view::(self, elements_field, ctx) - } - // TODO(joe): pass down preferred - DataType::LargeListView(elements_field) => { - to_arrow_list_view::(self, elements_field, ctx) - } - DataType::Struct(fields) => { - let fields = if data_type.is_none() { - None - } else { - Some(fields) - }; - to_arrow_struct(self, fields, ctx) + // Clone the session out of `ctx` to break the immutable borrow chain that prevents + // `ctx` from being passed back through to the session method. + let session = ctx.session().clone(); + let target = match data_type { + Some(dt) => Some(Field::new("", dt.clone(), self.dtype().is_nullable())), + // No target supplied: if the source dtype tree contains any Vortex extension, + // synthesize a Field via session-aware inference so registered plugins run and + // ARROW:extension:name metadata is preserved end-to-end. For non-extension + // trees we leave target as None so canonical preferred-type logic (e.g. + // VarBin → Utf8 instead of Utf8View) keeps running. + None if dtype_has_extension(self.dtype()) => { + Some(session.arrow().to_arrow_field("", self.dtype())?) } - // TODO(joe): pass down preferred - DataType::Dictionary(codes_type, values_type) => { - to_arrow_dictionary(self, codes_type, values_type, ctx) - } - dt @ DataType::Decimal32(..) => to_arrow_decimal(self, dt, ctx), - dt @ DataType::Decimal64(..) => to_arrow_decimal(self, dt, ctx), - dt @ DataType::Decimal128(..) => to_arrow_decimal(self, dt, ctx), - dt @ DataType::Decimal256(..) => to_arrow_decimal(self, dt, ctx), - // TODO(joe): pass down preferred - DataType::RunEndEncoded(ends_type, values_type) => { - to_arrow_run_end(self, ends_type.data_type(), values_type, ctx) - } - DataType::FixedSizeBinary(_) - | DataType::Map(..) - | DataType::Duration(_) - | DataType::Interval(_) - | DataType::Union(..) => { - vortex_bail!("Conversion to Arrow type {resolved_type} is not supported"); - } - }?; - - vortex_ensure!( - arrow.len() == len, - "Arrow array length does not match Vortex array length after conversion to {:?}", - arrow - ); - - Ok(arrow) + None => None, + }; + session.arrow().execute_arrow(self, target.as_ref(), ctx) } fn execute_record_batches( @@ -186,6 +119,98 @@ impl ArrowArrayExecutor for ArrayRef { } } +/// Canonical Vortex → Arrow conversion, dispatched by Arrow [`DataType`]. +/// +/// This is the fallback path used by [`crate::arrow::ArrowSession::execute_arrow`] when no +/// extension plugin matches. Callers normally go through the session; this is `pub(crate)` +/// purely so the session can hand off after its own dispatch. +pub(crate) fn canonical_execute_arrow( + array: ArrayRef, + data_type: Option<&DataType>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let len = array.len(); + + let resolved_type: DataType = match data_type { + Some(dt) => dt.clone(), + None => preferred_arrow_type(&array)?, + }; + + let arrow = match &resolved_type { + DataType::Null => to_arrow_null(array, ctx), + DataType::Boolean => to_arrow_bool(array, ctx), + DataType::Int8 => to_arrow_primitive::(array, ctx), + DataType::Int16 => to_arrow_primitive::(array, ctx), + DataType::Int32 => to_arrow_primitive::(array, ctx), + DataType::Int64 => to_arrow_primitive::(array, ctx), + DataType::UInt8 => to_arrow_primitive::(array, ctx), + DataType::UInt16 => to_arrow_primitive::(array, ctx), + DataType::UInt32 => to_arrow_primitive::(array, ctx), + DataType::UInt64 => to_arrow_primitive::(array, ctx), + DataType::Float16 => to_arrow_primitive::(array, ctx), + DataType::Float32 => to_arrow_primitive::(array, ctx), + DataType::Float64 => to_arrow_primitive::(array, ctx), + DataType::Binary => to_arrow_byte_array::(array, ctx), + DataType::LargeBinary => to_arrow_byte_array::(array, ctx), + DataType::Utf8 => to_arrow_byte_array::(array, ctx), + DataType::LargeUtf8 => to_arrow_byte_array::(array, ctx), + DataType::BinaryView => to_arrow_byte_view::(array, ctx), + DataType::Utf8View => to_arrow_byte_view::(array, ctx), + // TODO(joe): pass down preferred + DataType::List(elements_field) => to_arrow_list::(array, elements_field, ctx), + // TODO(joe): pass down preferred + DataType::LargeList(elements_field) => to_arrow_list::(array, elements_field, ctx), + // TODO(joe): pass down preferred + DataType::FixedSizeList(elements_field, list_size) => { + to_arrow_fixed_list(array, *list_size, elements_field, ctx) + } + // TODO(joe): pass down preferred + DataType::ListView(elements_field) => to_arrow_list_view::(array, elements_field, ctx), + // TODO(joe): pass down preferred + DataType::LargeListView(elements_field) => { + to_arrow_list_view::(array, elements_field, ctx) + } + DataType::Struct(fields) => { + let fields = if data_type.is_none() { + None + } else { + Some(fields) + }; + to_arrow_struct(array, fields, ctx) + } + // TODO(joe): pass down preferred + DataType::Dictionary(codes_type, values_type) => { + to_arrow_dictionary(array, codes_type, values_type, ctx) + } + dt @ DataType::Decimal32(..) => to_arrow_decimal(array, dt, ctx), + dt @ DataType::Decimal64(..) => to_arrow_decimal(array, dt, ctx), + dt @ DataType::Decimal128(..) => to_arrow_decimal(array, dt, ctx), + dt @ DataType::Decimal256(..) => to_arrow_decimal(array, dt, ctx), + // TODO(joe): pass down preferred + DataType::RunEndEncoded(ends_type, values_type) => { + to_arrow_run_end(array, ends_type.data_type(), values_type, ctx) + } + dt @ (DataType::Date32 | DataType::Date64) => to_arrow_date(array, dt, ctx), + dt @ (DataType::Time32(_) | DataType::Time64(_)) => to_arrow_time(array, dt, ctx), + dt @ DataType::Timestamp(..) => to_arrow_timestamp(array, dt, ctx), + DataType::FixedSizeBinary(_) + | DataType::Map(..) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Union(..) => { + vortex_bail!("Conversion to Arrow type {resolved_type} is not supported"); + } + }?; + + vortex_ensure!( + arrow.len() == len, + "Arrow array length does not match Vortex array length after conversion to {:?}", + arrow + ); + + Ok(arrow) +} + /// Determine the preferred (cheapest) Arrow type for an array. /// /// For most arrays, this returns the canonical Arrow type from `dtype.to_arrow_dtype()`. @@ -228,3 +253,17 @@ fn preferred_arrow_type(array: &ArrayRef) -> VortexResult { // Everything else: use canonical dtype conversion array.dtype().to_arrow_dtype() } + +/// Recursively check whether a dtype tree contains a [`DType::Extension`] node. +/// +/// Used by the executor entry to decide whether to synthesize a session-aware target +/// [`Field`] (so plugins run + extension metadata survives) or to fall through to the +/// canonical `preferred_arrow_type` path. +fn dtype_has_extension(dtype: &DType) -> bool { + match dtype { + DType::Extension(_) => true, + DType::List(elem, _) | DType::FixedSizeList(elem, ..) => dtype_has_extension(elem), + DType::Struct(fields, _) => fields.fields().any(|f| dtype_has_extension(&f)), + _ => false, + } +} diff --git a/vortex-array/src/arrow/executor/struct_.rs b/vortex-array/src/arrow/executor/struct_.rs index 909fd78059d..ebea596e219 100644 --- a/vortex-array/src/arrow/executor/struct_.rs +++ b/vortex-array/src/arrow/executor/struct_.rs @@ -23,6 +23,7 @@ use crate::arrays::scalar_fn::ScalarFnArrayExt; use crate::arrays::struct_::StructDataParts; use crate::arrow::ArrowArrayExecutor; use crate::arrow::executor::validity::to_arrow_null_buffer; +use crate::arrow::session::ArrowSessionExt; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::FieldNames; @@ -132,9 +133,13 @@ fn create_from_fields( let mut arrow_arrays = Vec::with_capacity(vortex_fields.len()); for (field, vx_field) in fields.iter().zip_eq(vortex_fields.iter()) { - let arrow_field = vx_field - .clone() - .execute_arrow(Some(field.data_type()), ctx)?; + // Route through the session with the full Field (not just data_type) so any + // ARROW:extension:name metadata reaches the export-plugin dispatcher. + let arrow_field = ctx.session().clone().arrow().execute_arrow( + vx_field.clone(), + Some(field.as_ref()), + ctx, + )?; vortex_ensure!( field.is_nullable() || arrow_field.null_count() == 0, "Cannot convert field '{}' to non-nullable Arrow field because it contains nulls", diff --git a/vortex-array/src/arrow/executor/temporal.rs b/vortex-array/src/arrow/executor/temporal.rs index c68a4f3ee9f..bed70f8baec 100644 --- a/vortex-array/src/arrow/executor/temporal.rs +++ b/vortex-array/src/arrow/executor/temporal.rs @@ -1,10 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +//! Canonical Vortex → Arrow conversion for the temporal extension types. +//! +//! `Date`, `Time`, and `Timestamp` are Vortex builtin extension types that map directly to +//! native Arrow temporal types (`Date32`/`Date64`, `Time32`/`Time64`, `Timestamp`). These +//! conversions live in the canonical executor rather than in the plugin layer because they +//! aren't Arrow extensions and the mapping is fully determined by the source `ExtDType`. + use std::sync::Arc; use arrow_array::ArrayRef as ArrowArrayRef; -use arrow_array::PrimitiveArray; use arrow_array::PrimitiveArray as ArrowPrimitiveArray; use arrow_array::types::ArrowTemporalType; use arrow_array::types::ArrowTimestampType; @@ -22,149 +28,166 @@ use arrow_schema::DataType; use arrow_schema::TimeUnit as ArrowTimeUnit; use vortex_error::VortexResult; use vortex_error::vortex_bail; -use vortex_error::vortex_ensure; -use vortex_error::vortex_err; use crate::ArrayRef; use crate::ExecutionCtx; -use crate::arrays::ExtensionArray; -use crate::arrays::PrimitiveArray as VortexPrimitiveArray; -use crate::arrays::extension::ExtensionArrayExt; +use crate::arrays::PrimitiveArray; use crate::arrow::null_buffer::to_null_buffer; +use crate::builtins::ArrayBuiltins; +use crate::dtype::DType; use crate::dtype::NativePType; +use crate::dtype::Nullability; use crate::extension::datetime::AnyTemporal; use crate::extension::datetime::TemporalMetadata; use crate::extension::datetime::TimeUnit; -pub(super) fn to_arrow_temporal( +pub(super) fn to_arrow_date( array: ArrayRef, - data_type: &DataType, + target: &DataType, ctx: &mut ExecutionCtx, ) -> VortexResult { - let temporal_options = array - .dtype() - .as_extension() - .metadata_opt::() - .ok_or_else(|| { - vortex_err!( - "Array dtype {} is not a temporal extension type", - array.dtype() - ) - })?; + validate_temporal_extension(&array, target)?; + Ok(match target { + DataType::Date32 => Arc::new(to_temporal_primitive::(array, ctx)?), + DataType::Date64 => Arc::new(to_temporal_primitive::(array, ctx)?), + _ => unreachable!("to_arrow_date called with non-Date type {target}"), + }) +} - match (temporal_options, &data_type) { - (TemporalMetadata::Date(TimeUnit::Days), DataType::Date32) => { - to_temporal::(array, ctx) +pub(super) fn to_arrow_time( + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, +) -> VortexResult { + validate_temporal_extension(&array, target)?; + Ok(match target { + DataType::Time32(ArrowTimeUnit::Second) => { + Arc::new(to_temporal_primitive::(array, ctx)?) } - (TemporalMetadata::Date(TimeUnit::Milliseconds), DataType::Date64) => { - to_temporal::(array, ctx) + DataType::Time32(ArrowTimeUnit::Millisecond) => { + Arc::new(to_temporal_primitive::(array, ctx)?) } - (TemporalMetadata::Time(TimeUnit::Seconds), DataType::Time32(ArrowTimeUnit::Second)) => { - to_temporal::(array, ctx) + DataType::Time64(ArrowTimeUnit::Microsecond) => { + Arc::new(to_temporal_primitive::(array, ctx)?) } - ( - TemporalMetadata::Time(TimeUnit::Milliseconds), - DataType::Time32(ArrowTimeUnit::Millisecond), - ) => to_temporal::(array, ctx), - ( - TemporalMetadata::Time(TimeUnit::Microseconds), - DataType::Time64(ArrowTimeUnit::Microsecond), - ) => to_temporal::(array, ctx), - - ( - TemporalMetadata::Time(TimeUnit::Nanoseconds), - DataType::Time64(ArrowTimeUnit::Nanosecond), - ) => to_temporal::(array, ctx), - - (TemporalMetadata::Timestamp(unit, tz), DataType::Timestamp(arrow_unit, arrow_tz)) => { - vortex_ensure!( - tz == arrow_tz, - "Cannot convert {} array to Arrow type {} due to timezone mismatch", - array.dtype(), - data_type - ); - - match (unit, arrow_unit) { - (TimeUnit::Seconds, ArrowTimeUnit::Second) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - (TimeUnit::Milliseconds, ArrowTimeUnit::Millisecond) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - (TimeUnit::Microseconds, ArrowTimeUnit::Microsecond) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - (TimeUnit::Nanoseconds, ArrowTimeUnit::Nanosecond) => { - to_arrow_timestamp::(array, arrow_tz.as_ref(), ctx) - } - _ => vortex_bail!( - "Cannot convert {} array to Arrow type {}", - array.dtype(), - data_type - ), - } + DataType::Time64(ArrowTimeUnit::Nanosecond) => { + Arc::new(to_temporal_primitive::(array, ctx)?) } - _ => vortex_bail!( - "Cannot convert {} array to Arrow type {}", - array.dtype(), - data_type - ), - } + _ => unreachable!("to_arrow_time called with non-Time type {target}"), + }) } -fn to_temporal( +pub(super) fn to_arrow_timestamp( array: ArrayRef, + target: &DataType, ctx: &mut ExecutionCtx, -) -> VortexResult -where - T::Native: NativePType, -{ - // We cast the array to the native primitive type. - Ok(Arc::new(to_arrow_temporal_primitive::(array, ctx)?)) +) -> VortexResult { + validate_temporal_extension(&array, target)?; + let DataType::Timestamp(unit, tz) = target else { + unreachable!("to_arrow_timestamp called with non-Timestamp type {target}"); + }; + Ok(match unit { + ArrowTimeUnit::Second => with_timezone::(array, tz.as_ref(), ctx)?, + ArrowTimeUnit::Millisecond => { + with_timezone::(array, tz.as_ref(), ctx)? + } + ArrowTimeUnit::Microsecond => { + with_timezone::(array, tz.as_ref(), ctx)? + } + ArrowTimeUnit::Nanosecond => { + with_timezone::(array, tz.as_ref(), ctx)? + } + }) } -fn to_arrow_timestamp( +fn with_timezone( array: ArrayRef, - arrow_tz: Option<&Arc>, + tz: Option<&Arc>, ctx: &mut ExecutionCtx, ) -> VortexResult where + T: ArrowTimestampType, T::Native: NativePType, { Ok(Arc::new( - to_arrow_temporal_primitive::(array, ctx)?.with_timezone_opt(arrow_tz.cloned()), + to_temporal_primitive::(array, ctx)?.with_timezone_opt(tz.cloned()), )) } -fn to_arrow_temporal_primitive( +fn to_temporal_primitive( array: ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult> where + T: ArrowTemporalType, T::Native: NativePType, { - debug_assert!(array.dtype().as_extension().is::()); - - let ext_array = array.execute::(ctx)?; - let primitive = ext_array - .storage_array() - .clone() - .execute::(ctx)?; - vortex_ensure!( - primitive.ptype() == T::Native::PTYPE, - "Expected temporal array to produce vector of width {}, found {}", - T::Native::PTYPE, - primitive.ptype() - ); - + let array = array.cast(DType::Primitive(T::Native::PTYPE, Nullability::Nullable))?; + let primitive = array.execute::(ctx)?; let validity = primitive .as_ref() .validity()? .execute_mask(primitive.as_ref().len(), ctx)?; - let buffer = primitive.to_buffer::(); - - let values = buffer.into_arrow_scalar_buffer(); - let nulls = to_null_buffer(validity); + let buffer = primitive.into_buffer::(); + Ok(ArrowPrimitiveArray::::new( + buffer.into_arrow_scalar_buffer(), + to_null_buffer(validity), + )) +} - Ok(PrimitiveArray::::new(values, nulls)) +/// Verify that the source array is compatible with the target Arrow temporal type. +/// +/// Vortex does not silently reinterpret across mismatched temporal units or timezones, so a +/// source with temporal extension metadata must agree exactly with `target`. Non-extension +/// sources are passed through; the cast in `to_temporal_primitive` will surface any width +/// mismatch. +fn validate_temporal_extension(array: &ArrayRef, target: &DataType) -> VortexResult<()> { + let Some(ext) = array.dtype().as_extension_opt() else { + return Ok(()); + }; + let Some(temporal) = ext.metadata_opt::() else { + vortex_bail!( + "Cannot convert extension {} to Arrow type {target}", + ext.id() + ); + }; + match (temporal, target) { + (TemporalMetadata::Date(TimeUnit::Days), DataType::Date32) => Ok(()), + (TemporalMetadata::Date(TimeUnit::Milliseconds), DataType::Date64) => Ok(()), + ( + TemporalMetadata::Time(unit), + DataType::Time32(arrow_unit) | DataType::Time64(arrow_unit), + ) if matches!( + (unit, arrow_unit), + (TimeUnit::Seconds, ArrowTimeUnit::Second) + | (TimeUnit::Milliseconds, ArrowTimeUnit::Millisecond) + | (TimeUnit::Microseconds, ArrowTimeUnit::Microsecond) + | (TimeUnit::Nanoseconds, ArrowTimeUnit::Nanosecond) + ) => + { + Ok(()) + } + (TemporalMetadata::Timestamp(unit, src_tz), DataType::Timestamp(arrow_unit, tgt_tz)) => { + let src_arrow_unit = ArrowTimeUnit::try_from(*unit)?; + if src_arrow_unit != *arrow_unit { + vortex_bail!( + "Cannot convert Timestamp({unit}) to Arrow Timestamp({arrow_unit:?}): unit mismatch" + ); + } + if src_tz != tgt_tz { + vortex_bail!( + "Cannot convert Timestamp(tz={src_tz:?}) to Arrow Timestamp(tz={tgt_tz:?}): timezone mismatch" + ); + } + Ok(()) + } + (temporal, target) => vortex_bail!( + "Cannot convert {} to Arrow type {target}", + match temporal { + TemporalMetadata::Date(unit) => format!("Date({unit})"), + TemporalMetadata::Time(unit) => format!("Time({unit})"), + TemporalMetadata::Timestamp(unit, tz) => format!("Timestamp({unit}, tz={tz:?})"), + } + ), + } } diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index efc83aa6af6..8f26699659d 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -13,12 +13,15 @@ mod executor; mod iter; mod null_buffer; mod record_batch; +mod session; +pub(crate) use convert::nulls; pub use datum::*; pub use executor::*; pub use iter::*; pub use null_buffer::to_arrow_null_buffer; pub use null_buffer::to_null_buffer; +pub use session::*; use crate::ArrayRef; use crate::LEGACY_SESSION; diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs new file mode 100644 index 00000000000..dd6546fd61c --- /dev/null +++ b/vortex-array/src/arrow/session.rs @@ -0,0 +1,777 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Plugin layer for moving Arrow extension types in and out of Vortex. +//! +//! Vortex's canonical Arrow conversion (see [`crate::dtype::arrow`] and the executor in +//! [`crate::arrow::executor`]) handles every non-extension Arrow type and the builtin temporal +//! extensions. The plugins registered here cover the remaining case: **Arrow extension types**. +//! +//! * An [`ArrowExportVTable`] is dispatched purely by the **target Arrow extension Id** — +//! the plugin is selected when the caller asks for an Arrow [`Field`] carrying matching +//! `ARROW:extension:name` metadata. The Vortex source dtype/encoding is irrelevant to +//! dispatch. +//! * An [`ArrowImportVTable`] is dispatched by the **source Arrow extension name** carried +//! on the incoming [`Field`]. The plugin is responsible for both preserving extension +//! identity and re-encoding storage if needed (e.g. Arrow `FixedSizeBinary[16]` for UUID +//! becomes Vortex `FixedSizeList`). +//! +//! Multiple plugins may register against the same key. They are tried in registration order; +//! each may return [`ArrowExport::Unsupported`] / [`ArrowImport::Unsupported`] to defer to +//! the next. + +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use arrow_array::Array as _; +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::RecordBatch; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; +use arrow_schema::extension::ExtensionType; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; +use vortex_session::Ref; +use vortex_session::SessionExt; +use vortex_session::SessionVar; +use vortex_session::registry::Id; +use vortex_utils::aliases::hash_map::HashMap; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::IntoArray; +use crate::arrays::StructArray; +use crate::arrow::FromArrowArray; +use crate::arrow::executor::canonical_execute_arrow; +use crate::dtype::DType; +use crate::dtype::FieldName; +use crate::dtype::FieldNames; +use crate::dtype::Nullability; +use crate::dtype::StructFields; +use crate::dtype::arrow::FromArrowType; +use crate::dtype::extension::ExtDTypeRef; +use crate::dtype::extension::ExtId; +use crate::extension::uuid::Uuid; +use crate::validity::Validity; + +/// Outcome of a successful call to [`ArrowExportVTable::execute_arrow`]. +/// +/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported] +/// with ownership of the input so the session can probe the next plugin or fall back to the +/// canonical path. Errors are propagated through [`VortexResult`]. +pub enum ArrowExport { + /// The plugin does not handle this input; the session may try another plugin. + Unsupported(ArrayRef), + /// A successful export. + Exported(ArrowArrayRef), +} + +/// Outcome of a successful call to [`ArrowImportVTable::from_arrow_array`]. +/// +/// Plugins that don't handle the supplied array return [`Unsupported`][Self::Unsupported] +/// with ownership of the input so the session can probe the next plugin or fall back to the +/// canonical path. Errors are propagated through [`VortexResult`]. +pub enum ArrowImport { + /// The plugin does not handle this input; the session may try another plugin. + Unsupported(ArrowArrayRef), + /// A successful import. + Imported(ArrayRef), +} + +/// Plugin layer for exporting a Vortex array to an Arrow extension type. +/// +/// This is purely an implementation trait, its methods should not be called directly. Instead, +/// use the methods on [`ArrowSession`]. +pub trait ArrowExportVTable: 'static + Send + Sync + Debug { + /// The Arrow extension ID this plugin produces. + fn arrow_ext_id(&self) -> Id; + + /// The Vortex extension ID this plugin maps from. Used only for inference by + /// [`ArrowSession::to_arrow_field`] / [`ArrowSession::to_arrow_schema`]; never as a + /// dispatch key for [`execute_arrow`][Self::execute_arrow]. + fn vortex_ext_id(&self) -> ExtId; + + /// Build the Arrow [`Field`] this plugin produces for the given Vortex extension + /// `dtype`. Used during schema inference. + fn to_arrow_field(&self, name: &str, dtype: &ExtDTypeRef) -> VortexResult>; + + /// Convert a Vortex array into an Arrow array shaped to `target`. + /// + /// Returns ownership of `array` via [`ArrowExport::Unsupported`] when the plugin cannot + /// handle the input. + fn execute_arrow( + &self, + array: ArrayRef, + target: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult; +} + +/// Plugin layer for importing an Arrow extension-typed array into a Vortex extension array. +/// +/// Plugins are dispatched by `arrow_ext_id`. +/// +/// This is purely an implementation trait, its methods should not be called directly. Instead, +/// use the methods on [`ArrowSession`]. +pub trait ArrowImportVTable: 'static + Send + Sync + Debug { + /// The Arrow extension name this plugin handles. + fn arrow_ext_id(&self) -> Id; + + /// Build the Vortex [`DType`] that corresponds to `field` (which carries this plugin's + /// Arrow extension metadata). + #[allow(clippy::wrong_self_convention)] + fn from_arrow_field(&self, field: &Field) -> VortexResult>; + + /// Convert an Arrow array into a Vortex extension array of `dtype`. + /// + /// Returns ownership of `array` via [`ArrowImport::Unsupported`] when the plugin cannot + /// handle the input. + #[allow(clippy::wrong_self_convention)] + fn from_arrow_array( + &self, + array: ArrowArrayRef, + dtype: &ExtDTypeRef, + ) -> VortexResult; +} + +pub type ArrowExportVTableRef = Arc; +pub type ArrowImportVTableRef = Arc; + +type ExportMap = HashMap>; +type ImportMap = HashMap>; +type ExportDTypeMap = HashMap>; + +/// Session-scoped registry of Arrow extension plugins. +/// +/// Exporters are stored in two indices: one keyed by Arrow extension Id (used for +/// `execute_arrow` dispatch) and one keyed by Vortex extension Id (used **only** by +/// `to_arrow_field` / `to_arrow_schema` inference, when callers need to translate a Vortex +/// extension `DType` into an Arrow `Field` with no target schema in hand). Importers are +/// keyed by Arrow extension name. The default session pre-registers the builtin UUID +/// plugin; temporal extensions are handled by the canonical Arrow ↔ Vortex path and do not +/// need plugins. +#[derive(Debug)] +pub struct ArrowSession { + exporters: ArcSwap, + exporters_by_vortex: ArcSwap, + importers: ArcSwap, +} + +impl Default for ArrowSession { + fn default() -> Self { + let session = Self { + exporters: ArcSwap::from_pointee(ExportMap::default()), + exporters_by_vortex: ArcSwap::from_pointee(ExportDTypeMap::default()), + importers: ArcSwap::from_pointee(ImportMap::default()), + }; + + session.register_exporter(Arc::new(Uuid)); + session.register_importer(Arc::new(Uuid)); + + session + } +} + +impl ArrowSession { + /// Register an [`ArrowExportVTable`] under its target Arrow extension Id (for dispatch) + /// and its source Vortex extension Id (for schema inference). + pub fn register_exporter(&self, exporter: ArrowExportVTableRef) { + Self::insert( + &self.exporters, + exporter.arrow_ext_id(), + ArrowExportVTableRef::clone(&exporter), + ); + Self::insert( + &self.exporters_by_vortex, + exporter.vortex_ext_id(), + exporter, + ); + } + + /// Register an [`ArrowImportVTable`] under its source Arrow extension name. + pub fn register_importer(&self, importer: ArrowImportVTableRef) { + Self::insert(&self.importers, importer.arrow_ext_id(), importer); + } + + fn insert(slot: &ArcSwap>>, key: K, value: T) + where + K: Clone + Eq + std::hash::Hash, + T: Clone, + { + slot.rcu(move |map| { + let mut next = (**map).clone(); + let entry = next.entry(key.clone()).or_insert_with(|| Arc::from([])); + let mut extended: Vec = entry.iter().cloned().collect(); + extended.push(value.clone()); + *entry = Arc::from(extended); + next + }); + } + + fn exporters(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> { + self.exporters + .load() + .get(id) + .cloned() + .unwrap_or_else(|| Arc::from([])) + } + + fn exporters_by_vortex(&self, id: &ExtId) -> Arc<[ArrowExportVTableRef]> { + self.exporters_by_vortex + .load() + .get(id) + .cloned() + .unwrap_or_else(|| Arc::from([])) + } + + fn importers(&self, id: &Id) -> Arc<[ArrowImportVTableRef]> { + self.importers + .load() + .get(id) + .cloned() + .unwrap_or_else(|| Arc::from([])) + } + + /// Build the Arrow [`Field`] for a Vortex [`DType`]. + /// + /// For [`DType::Extension`]s, plugins registered against the extension's Vortex Id + /// are tried in registration order; the first plugin to return `Some(field)` wins. + /// If every registered plugin returns `None` (or none are registered) the field is + /// built from [`Self::to_arrow_data_type`]. Container types ([`DType::List`], + /// [`DType::FixedSizeList`], [`DType::Struct`]) recurse through this method so + /// extension metadata on nested fields is preserved. + pub fn to_arrow_field(&self, name: &str, dtype: &DType) -> VortexResult { + if let Some(ext) = dtype.as_extension_opt() { + for plugin in self.exporters_by_vortex(&ext.id()).iter() { + if let Some(field) = plugin.to_arrow_field(name, ext)? { + return Ok(field); + } + } + } + Ok(Field::new( + name, + self.to_arrow_data_type(dtype)?, + dtype.is_nullable(), + )) + } + + /// Build the Arrow [`DataType`] for a Vortex [`DType`]. + /// + /// Recurses into container types so nested extension fields go through registered + /// plugins; non-recursive, non-extension dtypes delegate to + /// [`DType::to_arrow_dtype`]. Note that a bare [`DataType`] cannot carry Arrow + /// extension metadata — for inference that needs `ARROW:extension:name` on the + /// outermost node, use [`Self::to_arrow_field`] instead. + pub fn to_arrow_data_type(&self, dtype: &DType) -> VortexResult { + Ok(match dtype { + DType::Extension(ext) => { + let mut data_type = None; + for plugin in self.exporters_by_vortex(&ext.id()).iter() { + if let Some(field) = plugin.to_arrow_field("", ext)? { + data_type = Some(field.data_type().clone()); + break; + } + } + match data_type { + Some(dt) => dt, + None => dtype.to_arrow_dtype()?, + } + } + DType::List(elem, _) => DataType::List(Arc::new(self.to_arrow_field("item", elem)?)), + DType::FixedSizeList(elem, size, _) => { + DataType::FixedSizeList(Arc::new(self.to_arrow_field("item", elem)?), *size as i32) + } + DType::Struct(struct_dtype, _) => { + let mut fields = Vec::with_capacity(struct_dtype.names().len()); + for (name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) { + fields.push(self.to_arrow_field(name.as_ref(), &field_dtype)?); + } + DataType::Struct(fields.into()) + } + _ => dtype.to_arrow_dtype()?, + }) + } + + /// Build the Arrow [`Schema`] for a Vortex top-level [`DType::Struct`], dispatching + /// extension fields through registered export plugins for inference. Nested + /// extensions are preserved via [`Self::to_arrow_field`]. + pub fn to_arrow_schema(&self, dtype: &DType) -> VortexResult { + let DType::Struct(struct_dtype, _) = dtype else { + vortex_error::vortex_bail!( + "to_arrow_schema requires a top-level struct dtype, got {dtype}" + ); + }; + let mut fields = Vec::with_capacity(struct_dtype.names().len()); + for (name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) { + fields.push(self.to_arrow_field(name.as_ref(), &field_dtype)?); + } + Ok(Schema::new(fields)) + } + + /// Build the Vortex [`DType`] for an Arrow [`Field`]. + /// + /// Plugins registered against the field's Arrow extension name are tried in + /// registration order; the first plugin to return `Some(dtype)` wins. If none + /// match (or all return `None`), recurses into container types ([`DataType::List`] + /// family, [`DataType::FixedSizeList`], [`DataType::Struct`]) so extension metadata + /// on nested element/struct fields is preserved. Leaf types use the canonical + /// Arrow → Vortex mapping via [`DType::from_arrow`]. + pub fn from_arrow_field(&self, field: &Field) -> VortexResult { + if let Some(name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) { + for plugin in self.importers(&Id::new(name)).iter() { + if let Some(dtype) = plugin.from_arrow_field(field)? { + return Ok(dtype); + } + } + } + let nullability: Nullability = field.is_nullable().into(); + Ok(match field.data_type() { + DataType::List(elem) + | DataType::LargeList(elem) + | DataType::ListView(elem) + | DataType::LargeListView(elem) => { + DType::List(Arc::new(self.from_arrow_field(elem.as_ref())?), nullability) + } + DataType::FixedSizeList(elem, size) => DType::FixedSizeList( + Arc::new(self.from_arrow_field(elem.as_ref())?), + *size as u32, + nullability, + ), + DataType::Struct(fields) => { + let entries = fields + .iter() + .map(|f| { + self.from_arrow_field(f) + .map(|dt| (FieldName::from(f.name().as_str()), dt)) + }) + .collect::>>()?; + DType::Struct(StructFields::from_iter(entries), nullability) + } + _ => DType::from_arrow(field), + }) + } + + /// Build the Vortex [`DType`] for an Arrow [`Schema`], dispatching extension fields + /// through registered import plugins. The result is a top-level non-nullable struct + /// matching the schema's fields. + pub fn from_arrow_schema(&self, schema: &Schema) -> VortexResult { + let entries = schema + .fields() + .iter() + .map(|f| { + self.from_arrow_field(f) + .map(|dt| (FieldName::from(f.name().as_str()), dt)) + }) + .collect::>>()?; + Ok(DType::Struct( + StructFields::from_iter(entries), + Nullability::NonNullable, + )) + } + + /// Decode an Arrow [`RecordBatch`] into a Vortex struct array, dispatching each + /// extension column through its registered import plugin. + /// + /// `schema` is the authoritative Arrow schema used for dispatch — the columns are + /// consumed positionally. Pass an external schema (rather than relying on + /// `batch.schema()`) when upstream DataFusion plumbing may have stripped Field-level + /// extension metadata from the runtime RecordBatch. + pub fn from_arrow_record_batch( + &self, + batch: RecordBatch, + schema: &Schema, + ) -> VortexResult { + vortex_ensure!( + batch.num_columns() == schema.fields().len(), + "RecordBatch has {} columns but schema has {} fields", + batch.num_columns(), + schema.fields().len() + ); + let length = batch.num_rows(); + let names = FieldNames::from_iter( + schema + .fields() + .iter() + .map(|f| FieldName::from(f.name().as_str())), + ); + let mut columns = Vec::with_capacity(schema.fields().len()); + for (col, field) in batch.columns().iter().zip(schema.fields().iter()) { + columns.push(self.from_arrow_array(ArrowArrayRef::clone(col), field)?); + } + Ok(StructArray::try_new(names, columns, length, Validity::NonNullable)?.into_array()) + } + + /// Execute a Vortex array into an Arrow array. + /// + /// If `target` carries an `ARROW:extension:name`, the matching export plugin runs. If no + /// plugin matches (or all return [`ArrowExport::Unsupported`]), falls back to the + /// canonical Vortex → Arrow path. With `target = None` the canonical path picks the + /// array's preferred Arrow type. + pub fn execute_arrow( + &self, + array: ArrayRef, + target: Option<&Field>, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let Some(target_field) = target else { + return canonical_execute_arrow(array, None, ctx); + }; + let Some(arrow_ext_name) = target_field.metadata().get(EXTENSION_TYPE_NAME_KEY) else { + return canonical_execute_arrow(array, Some(target_field.data_type()), ctx); + }; + + let exporters = self.exporters(&Id::new(arrow_ext_name)); + if exporters.is_empty() { + return canonical_execute_arrow(array, Some(target_field.data_type()), ctx); + } + + let len = array.len(); + let mut current = array; + for plugin in exporters.iter() { + match plugin.execute_arrow(current, target_field, ctx)? { + ArrowExport::Exported(arrow) => { + vortex_ensure!( + arrow.len() == len, + "Arrow array length does not match Vortex array length after conversion to {:?}", + arrow + ); + return Ok(arrow); + } + ArrowExport::Unsupported(array) => current = array, + } + } + + // Fallback to canonical execution path + canonical_execute_arrow(current, Some(target_field.data_type()), ctx) + } + + /// Decode an Arrow array into a Vortex array. + /// + /// Routes through the registered import plugin if `field` carries an Arrow extension + /// name we recognize, probing each plugin in registration order until one handles the + /// input or all return [`ArrowImport::Unsupported`]. Otherwise recurses into container + /// arrays ([`arrow_array::StructArray`], [`arrow_array::GenericListArray`], + /// [`arrow_array::FixedSizeListArray`], [`arrow_array::GenericListViewArray`]) so + /// extension fields nested inside containers reach their importers; leaf types fall + /// through to the canonical Arrow → Vortex array conversion. + pub fn from_arrow_array(&self, array: ArrowArrayRef, field: &Field) -> VortexResult { + if let Some(extension_name) = field.metadata().get(EXTENSION_TYPE_NAME_KEY) { + let importers = self.importers(&Id::new(extension_name)); + if !importers.is_empty() { + let dtype = self.from_arrow_field(field)?; + if let DType::Extension(ext_dtype) = dtype { + let mut current = array; + for plugin in importers.iter() { + match plugin.from_arrow_array(current, &ext_dtype)? { + ArrowImport::Imported(arr) => return Ok(arr), + ArrowImport::Unsupported(arr) => current = arr, + } + } + return ArrayRef::from_arrow(current.as_ref(), field.is_nullable()); + } + } + } + self.from_arrow_array_canonical(array, field) + } + + /// Recurse into Arrow container arrays so nested fields with extension metadata reach + /// their importers, falling through to [`ArrayRef::from_arrow`] for leaf types. + #[allow(clippy::wrong_self_convention)] + fn from_arrow_array_canonical( + &self, + array: ArrowArrayRef, + field: &Field, + ) -> VortexResult { + use arrow_array::cast::AsArray; + + match field.data_type() { + DataType::Struct(fields) => { + let arrow_struct = array.as_struct(); + let names = FieldNames::from_iter( + fields.iter().map(|f| FieldName::from(f.name().as_str())), + ); + let columns = arrow_struct + .columns() + .iter() + .zip(fields.iter()) + .map(|(col, child_field)| { + // Arrow pushes nulls into non-nullable fields; strip before recursing + // so Vortex's stricter validity invariants are upheld. + let inner = if col.null_count() > 0 && !child_field.is_nullable() { + arrow_array::make_array(crate::arrow::convert::remove_nulls( + col.to_data(), + )) + } else { + ArrowArrayRef::clone(col) + }; + self.from_arrow_array(inner, child_field.as_ref()) + }) + .collect::>>()?; + let validity = + crate::arrow::convert::nulls(arrow_struct.nulls(), field.is_nullable()); + Ok( + StructArray::try_new(names, columns, arrow_struct.len(), validity)? + .into_array(), + ) + } + DataType::List(elem_field) => { + let list = array.as_list::(); + let elements = self + .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?; + let offsets = list.offsets().clone().into_array(); + let validity = crate::arrow::convert::nulls(list.nulls(), field.is_nullable()); + Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array()) + } + DataType::LargeList(elem_field) => { + let list = array.as_list::(); + let elements = self + .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?; + let offsets = list.offsets().clone().into_array(); + let validity = crate::arrow::convert::nulls(list.nulls(), field.is_nullable()); + Ok(crate::arrays::ListArray::try_new(elements, offsets, validity)?.into_array()) + } + DataType::FixedSizeList(elem_field, list_size) => { + let fsl = array.as_fixed_size_list(); + let elements = + self.from_arrow_array(ArrowArrayRef::clone(fsl.values()), elem_field.as_ref())?; + let validity = crate::arrow::convert::nulls(fsl.nulls(), field.is_nullable()); + Ok(crate::arrays::FixedSizeListArray::try_new( + elements, + *list_size as u32, + validity, + fsl.len(), + )? + .into_array()) + } + DataType::ListView(elem_field) => { + let list = array.as_list_view::(); + let elements = self + .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?; + let offsets = list.offsets().clone().into_array(); + let sizes = list.sizes().clone().into_array(); + let validity = crate::arrow::convert::nulls(list.nulls(), field.is_nullable()); + Ok( + crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)? + .into_array(), + ) + } + DataType::LargeListView(elem_field) => { + let list = array.as_list_view::(); + let elements = self + .from_arrow_array(ArrowArrayRef::clone(list.values()), elem_field.as_ref())?; + let offsets = list.offsets().clone().into_array(); + let sizes = list.sizes().clone().into_array(); + let validity = crate::arrow::convert::nulls(list.nulls(), field.is_nullable()); + Ok( + crate::arrays::ListViewArray::try_new(elements, offsets, sizes, validity)? + .into_array(), + ) + } + _ => ArrayRef::from_arrow(array.as_ref(), field.is_nullable()), + } + } +} + +// NOTE(aduffy): We should remove this once we bump Arrow to 0.59.0. This is replicating the +// `Field::has_valid_extension_type` method on Arrow added in 58.2.0, we polyfill it here so that +// this crate can build with minimal-versions declared. +pub(crate) fn has_valid_extension_type(field: &Field) -> bool { + if field.extension_type_name() != Some(E::NAME) { + return false; + } + + E::try_new_from_field_metadata(field.data_type(), field.metadata()).is_ok() +} + +impl SessionVar for ArrowSession { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +/// Extension trait for accessing the [`ArrowSession`] on a Vortex session. +pub trait ArrowSessionExt: SessionExt { + /// Get the Arrow session. + fn arrow(&self) -> Ref<'_, ArrowSession>; +} + +impl ArrowSessionExt for S { + fn arrow(&self) -> Ref<'_, ArrowSession> { + self.get::() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::FixedSizeBinaryArray; + use arrow_array::cast::AsArray; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::extension::Uuid as ArrowUuid; + use vortex_error::VortexResult; + + use super::*; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; + use crate::arrow::ArrowArrayExecutor; + use crate::dtype::DType; + use crate::dtype::FieldName; + use crate::dtype::Nullability; + use crate::dtype::PType; + use crate::dtype::StructFields; + use crate::dtype::extension::ExtDType; + use crate::dtype::extension::ExtVTable; + use crate::extension::uuid::Uuid; + use crate::extension::uuid::UuidMetadata; + + fn uuid_dtype(nullable: bool) -> DType { + let storage = DType::FixedSizeList( + Arc::new(DType::Primitive(PType::U8, Nullability::NonNullable)), + 16, + nullable.into(), + ); + DType::Extension( + ExtDType::try_with_vtable(Uuid, UuidMetadata::default(), storage) + .expect("uuid ext dtype") + .erased(), + ) + } + + #[test] + fn to_arrow_field_top_level_uuid_carries_extension_metadata() -> VortexResult<()> { + let session = ArrowSession::default(); + let field = session.to_arrow_field("id", &uuid_dtype(false))?; + assert!(has_valid_extension_type::(&field)); + Ok(()) + } + + #[test] + fn to_arrow_field_struct_with_nested_uuid_preserves_metadata() -> VortexResult<()> { + let session = ArrowSession::default(); + let dtype = DType::Struct( + StructFields::from_iter([(FieldName::from("id"), uuid_dtype(false))]), + Nullability::NonNullable, + ); + let field = session.to_arrow_field("row", &dtype)?; + let DataType::Struct(inner) = field.data_type() else { + panic!("expected Struct, got {:?}", field.data_type()); + }; + assert_eq!(inner.len(), 1); + assert_eq!(inner[0].data_type(), &DataType::FixedSizeBinary(16)); + assert!(has_valid_extension_type::(&inner[0])); + Ok(()) + } + + #[test] + fn to_arrow_field_list_of_uuid_preserves_metadata() -> VortexResult<()> { + let session = ArrowSession::default(); + let dtype = DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable); + let field = session.to_arrow_field("ids", &dtype)?; + let DataType::List(elem) = field.data_type() else { + panic!("expected List, got {:?}", field.data_type()); + }; + assert!(has_valid_extension_type::(elem)); + Ok(()) + } + + #[test] + fn to_arrow_field_fixed_size_list_of_uuid_preserves_metadata() -> VortexResult<()> { + let session = ArrowSession::default(); + let dtype = DType::FixedSizeList(Arc::new(uuid_dtype(false)), 3, Nullability::NonNullable); + let field = session.to_arrow_field("triple", &dtype)?; + let DataType::FixedSizeList(elem, size) = field.data_type() else { + panic!("expected FixedSizeList, got {:?}", field.data_type()); + }; + assert_eq!(*size, 3); + assert!(has_valid_extension_type::(elem)); + Ok(()) + } + + #[test] + fn to_arrow_schema_struct_of_struct_uuid() -> VortexResult<()> { + let session = ArrowSession::default(); + let inner = DType::Struct( + StructFields::from_iter([(FieldName::from("id"), uuid_dtype(true))]), + Nullability::NonNullable, + ); + let outer = DType::Struct( + StructFields::from_iter([(FieldName::from("payload"), inner)]), + Nullability::NonNullable, + ); + let schema = session.to_arrow_schema(&outer)?; + let payload = schema.field(0); + let DataType::Struct(inner_fields) = payload.data_type() else { + panic!("expected Struct, got {:?}", payload.data_type()); + }; + assert!(has_valid_extension_type::(&inner_fields[0])); + Ok(()) + } + + #[test] + fn from_arrow_field_recurses_into_nested_uuid() -> VortexResult<()> { + let session = ArrowSession::default(); + let mut elem = Field::new("item", DataType::FixedSizeBinary(16), false); + elem.try_with_extension_type(ArrowUuid)?; + let outer = Field::new("ids", DataType::List(Arc::new(elem)), false); + + let dtype = session.from_arrow_field(&outer)?; + let DType::List(inner_dt, _) = dtype else { + panic!("expected List dtype, got {dtype}"); + }; + assert!( + matches!(inner_dt.as_ref(), DType::Extension(ext) if ext.id() == Uuid.id()), + "expected Uuid extension element, got {inner_dt}", + ); + Ok(()) + } + + #[test] + fn schema_roundtrip_preserves_nested_uuid() -> VortexResult<()> { + let session = ArrowSession::default(); + let dtype = DType::Struct( + StructFields::from_iter([ + (FieldName::from("id"), uuid_dtype(false)), + ( + FieldName::from("ids"), + DType::List(Arc::new(uuid_dtype(true)), Nullability::NonNullable), + ), + ]), + Nullability::NonNullable, + ); + let schema = session.to_arrow_schema(&dtype)?; + let roundtripped = session.from_arrow_schema(&schema)?; + assert_eq!(roundtripped, dtype); + Ok(()) + } + + #[test] + fn execute_arrow_target_none_preserves_top_level_uuid_metadata() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let session = LEGACY_SESSION.arrow(); + + let mut field = Field::new("id", DataType::FixedSizeBinary(16), false); + field.try_with_extension_type(ArrowUuid)?; + let arrow_array: ArrowArrayRef = Arc::new(FixedSizeBinaryArray::try_from_iter( + [*b"0123456789abcdef", *b"fedcba9876543210"].into_iter(), + )?); + + let vortex_array = session.from_arrow_array(arrow_array, &field)?; + let exported = vortex_array.execute_arrow(None, &mut ctx)?; + assert_eq!(exported.data_type(), &DataType::FixedSizeBinary(16)); + let fsb = exported.as_fixed_size_binary(); + assert_eq!(fsb.len(), 2); + assert_eq!(fsb.value(0), b"0123456789abcdef"); + assert_eq!(fsb.value(1), b"fedcba9876543210"); + Ok(()) + } +} diff --git a/vortex-array/src/dtype/arrow.rs b/vortex-array/src/dtype/arrow.rs index 17af749cfc0..1db7b31aab8 100644 --- a/vortex-array/src/dtype/arrow.rs +++ b/vortex-array/src/dtype/arrow.rs @@ -224,6 +224,11 @@ impl FromArrowType<&Field> for DType { impl DType { /// Convert a Vortex [`DType`] into an Arrow [`Schema`]. + /// + /// **Prefer `ArrowSession::to_arrow_schema`.** This method is not plugin-aware and + /// strips any `ARROW:extension:name` metadata for non-builtin extensions (only + /// `arrow.parquet.variant` is special-cased here). Use the session method when you + /// need round-trippable extension metadata. pub fn to_arrow_schema(&self) -> VortexResult { let DType::Struct(struct_dtype, nullable) = self else { vortex_bail!("only DType::Struct can be converted to arrow schema"); @@ -258,6 +263,12 @@ impl DType { } /// Returns the Arrow [`DataType`] that best corresponds to this Vortex [`DType`]. + /// + /// **Prefer `ArrowSession::to_arrow_data_type` (or `to_arrow_field`).** This method + /// has no awareness of registered Arrow extension plugins, so any [`DType::Extension`] + /// outside the builtin temporal set will fail or silently lose its + /// `ARROW:extension:name` metadata. The session methods recurse through containers + /// and dispatch plugins at every extension node. pub fn to_arrow_dtype(&self) -> VortexResult { Ok(match self { DType::Null => DataType::Null, diff --git a/vortex-array/src/dtype/session.rs b/vortex-array/src/dtype/session.rs index 2314658869f..757ffbd0ae5 100644 --- a/vortex-array/src/dtype/session.rs +++ b/vortex-array/src/dtype/session.rs @@ -16,6 +16,7 @@ use crate::dtype::extension::ExtVTable; use crate::extension::datetime::Date; use crate::extension::datetime::Time; use crate::extension::datetime::Timestamp; +use crate::extension::uuid::Uuid; /// Registry for extension dtypes. pub type ExtDTypeRegistry = Registry; @@ -36,6 +37,7 @@ impl Default for DTypeSession { this.register(Date); this.register(Time); this.register(Timestamp); + this.register(Uuid); this } diff --git a/vortex-array/src/extension/uuid/arrow.rs b/vortex-array/src/extension/uuid/arrow.rs new file mode 100644 index 00000000000..b808e2c917b --- /dev/null +++ b/vortex-array/src/extension/uuid/arrow.rs @@ -0,0 +1,173 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Arrow plugin impls for the UUID extension type. +//! +//! UUIDs are a canonical Arrow extension type backed by `FixedSizeBinary[16]`. The Vortex side +//! stores them as `FixedSizeList`, so the conversion is a zero-copy reinterpretation +//! of the byte buffer in both directions. + +use std::sync::Arc; + +use arrow_array::Array; +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::FixedSizeBinaryArray; +use arrow_array::cast::AsArray; +use arrow_array::types::UInt8Type; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::extension::ExtensionType; +use arrow_schema::extension::Uuid as ArrowUuid; +use vortex_array::arrow::has_valid_extension_type; +use vortex_buffer::Alignment; +use vortex_buffer::Buffer; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_session::registry::CachedId; +use vortex_session::registry::Id; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::IntoArray; +use crate::arrays::ExtensionArray; +use crate::arrays::FixedSizeListArray; +use crate::arrays::PrimitiveArray; +use crate::arrays::extension::ExtensionArrayExt; +use crate::arrow::ArrowArrayExecutor; +use crate::arrow::ArrowExport; +use crate::arrow::ArrowExportVTable; +use crate::arrow::ArrowImport; +use crate::arrow::ArrowImportVTable; +use crate::arrow::nulls; +use crate::buffer::BufferHandle; +use crate::dtype::DType; +use crate::dtype::Nullability; +use crate::dtype::PType; +use crate::dtype::extension::ExtDType; +use crate::dtype::extension::ExtDTypeRef; +use crate::dtype::extension::ExtId; +use crate::dtype::extension::ExtVTable; +use crate::extension::uuid::Uuid; +use crate::extension::uuid::UuidMetadata; +use crate::validity::Validity; + +const UUID_BYTE_LEN: i32 = 16; + +static ARROW_UUID: CachedId = CachedId::new(ArrowUuid::NAME); + +impl ArrowExportVTable for Uuid { + fn arrow_ext_id(&self) -> Id { + *ARROW_UUID + } + + fn vortex_ext_id(&self) -> ExtId { + Uuid.id() + } + + fn to_arrow_field(&self, name: &str, dtype: &ExtDTypeRef) -> VortexResult> { + let mut field = Field::new( + name.to_string(), + DataType::FixedSizeBinary(UUID_BYTE_LEN), + dtype.is_nullable(), + ); + field + .try_with_extension_type(ArrowUuid) + .vortex_expect("FixedSizeBinary[16] is correct type for ArrowUuid"); + Ok(Some(field)) + } + + fn execute_arrow( + &self, + array: ArrayRef, + _target: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let is_uuid = array + .dtype() + .as_extension_opt() + .map(|ext| ext.is::()) + .unwrap_or(false); + if !is_uuid { + return Ok(ArrowExport::Unsupported(array)); + } + Ok(ArrowExport::Exported(try_fsl_to_fsb(array, ctx)?)) + } +} + +impl ArrowImportVTable for Uuid { + fn arrow_ext_id(&self) -> Id { + *ARROW_UUID + } + + fn from_arrow_field(&self, field: &Field) -> VortexResult> { + if !has_valid_extension_type::(field) { + return Ok(None); + } + + let storage_dtype = DType::FixedSizeList( + Arc::new(DType::Primitive(PType::U8, Nullability::NonNullable)), + UUID_BYTE_LEN as u32, + field.is_nullable().into(), + ); + + Ok(Some(DType::Extension( + ExtDType::try_with_vtable(Uuid, UuidMetadata::default(), storage_dtype)?.erased(), + ))) + } + + fn from_arrow_array( + &self, + array: ArrowArrayRef, + dtype: &ExtDTypeRef, + ) -> VortexResult { + if !matches!(array.data_type(), DataType::FixedSizeBinary(UUID_BYTE_LEN)) + || !dtype.is::() + { + return Ok(ArrowImport::Unsupported(array)); + } + + let fsb = array.as_fixed_size_binary(); + let buffer = Buffer::from_arrow_buffer(fsb.values().clone(), Alignment::none()); + let u8_array = PrimitiveArray::from_buffer_handle( + BufferHandle::new_host(buffer), + PType::U8, + Validity::NonNullable, + ); + let validity = nulls(fsb.nulls(), dtype.is_nullable()); + + let storage = FixedSizeListArray::new( + u8_array.into_array(), + fsb.value_length() as u32, + validity, + fsb.len(), + ) + .into_array(); + Ok(ArrowImport::Imported( + ExtensionArray::new(dtype.clone(), storage).into_array(), + )) + } +} + +fn try_fsl_to_fsb(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let executed = array.execute::(ctx)?; + let storage = executed.storage_array().clone(); + let storage_arrow_type = DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::UInt8, false)), + UUID_BYTE_LEN, + ); + let arrow_storage = storage.execute_arrow(Some(&storage_arrow_type), ctx)?; + + let fsl = arrow_storage.as_fixed_size_list(); + let bytes = fsl + .values() + .as_primitive::() + .values() + .inner() + .clone(); + + Ok(Arc::new(FixedSizeBinaryArray::new( + fsl.value_length(), + bytes, + fsl.nulls().cloned(), + ))) +} diff --git a/vortex-array/src/extension/uuid/mod.rs b/vortex-array/src/extension/uuid/mod.rs index e4347c2513c..8a178035d2f 100644 --- a/vortex-array/src/extension/uuid/mod.rs +++ b/vortex-array/src/extension/uuid/mod.rs @@ -13,6 +13,7 @@ mod metadata; pub use metadata::UuidMetadata; +mod arrow; pub(crate) mod vtable; /// The VTable for the UUID extension type. diff --git a/vortex-datafusion/src/convert/schema.rs b/vortex-datafusion/src/convert/schema.rs index a6a4d6c40bf..26d9d86ce18 100644 --- a/vortex-datafusion/src/convert/schema.rs +++ b/vortex-datafusion/src/convert/schema.rs @@ -6,6 +6,7 @@ use arrow_schema::Field; use arrow_schema::Schema; use datafusion_common::Result as DFResult; use datafusion_common::exec_datafusion_err; +use vortex::array::arrow::ArrowSession; use vortex::dtype::DType; /// Calculate the physical Arrow schema for a Vortex file given its DType and the expected logical schema. @@ -22,6 +23,7 @@ use vortex::dtype::DType; pub fn calculate_physical_schema( dtype: &DType, reference_logical_schema: &Schema, + arrow_session: &ArrowSession, ) -> DFResult { let DType::Struct(struct_dtype, _) = dtype else { return Err(exec_datafusion_err!( @@ -34,23 +36,23 @@ pub fn calculate_physical_schema( .iter() .zip(struct_dtype.fields()) .map(|(name, field_dtype)| { - let arrow_type = match reference_logical_schema.field_with_name(name.as_ref()).ok() { + let logical_field = reference_logical_schema.field_with_name(name.as_ref()).ok(); + match logical_field { Some(logical_field) => { - calculate_physical_field_type(&field_dtype, logical_field.data_type())? + let arrow_type = calculate_physical_field_type( + &field_dtype, + logical_field.data_type(), + arrow_session, + )?; + Ok( + Field::new(name.to_string(), arrow_type, field_dtype.is_nullable()) + .with_metadata(logical_field.metadata().clone()), + ) } - None => { - // Field not in logical schema, use default conversion - field_dtype.to_arrow_dtype().map_err(|e| { - exec_datafusion_err!("Failed to convert dtype to arrow: {e}") - })? - } - }; - - Ok(Field::new( - name.to_string(), - arrow_type, - field_dtype.is_nullable(), - )) + None => arrow_session + .to_arrow_field(name.as_ref(), &field_dtype) + .map_err(|e| exec_datafusion_err!("Failed to convert dtype to arrow: {e}")), + } }) .collect::>>()?; @@ -59,7 +61,11 @@ pub fn calculate_physical_schema( /// Calculate the physical Arrow type for a field, preferring the logical type when the /// DType doesn't roundtrip cleanly. -fn calculate_physical_field_type(dtype: &DType, logical_type: &DataType) -> DFResult { +fn calculate_physical_field_type( + dtype: &DType, + logical_type: &DataType, + arrow_session: &ArrowSession, +) -> DFResult { // Check if the logical type is one that doesn't roundtrip through DType Ok(match logical_type { // Dictionary types lose their encoding when converted to DType @@ -87,22 +93,26 @@ fn calculate_physical_field_type(dtype: &DType, logical_type: &DataType) -> DFRe .iter() .zip(struct_dtype.fields()) .map(|(name, field_dtype)| { - let arrow_type = - match logical_fields.iter().find(|f| f.name() == name.as_ref()) { - Some(logical_field) => calculate_physical_field_type( + match logical_fields.iter().find(|f| f.name() == name.as_ref()) { + Some(logical_field) => { + let arrow_type = calculate_physical_field_type( &field_dtype, logical_field.data_type(), - )?, - None => field_dtype.to_arrow_dtype().map_err(|e| { + arrow_session, + )?; + Ok(Field::new( + name.to_string(), + arrow_type, + field_dtype.is_nullable(), + ) + .with_metadata(logical_field.metadata().clone())) + } + None => arrow_session + .to_arrow_field(name.as_ref(), &field_dtype) + .map_err(|e| { exec_datafusion_err!("Failed to convert dtype to arrow: {e}") - })?, - }; - - Ok(Field::new( - name.to_string(), - arrow_type, - field_dtype.is_nullable(), - )) + }), + } }) .collect::>>()?; @@ -117,8 +127,11 @@ fn calculate_physical_field_type(dtype: &DType, logical_type: &DataType) -> DFRe // For list types, recursively check the element type DataType::List(logical_elem) | DataType::LargeList(logical_elem) => { if let DType::List(elem_dtype, _) = dtype { - let physical_elem_type = - calculate_physical_field_type(elem_dtype, logical_elem.data_type())?; + let physical_elem_type = calculate_physical_field_type( + elem_dtype, + logical_elem.data_type(), + arrow_session, + )?; let physical_field = Field::new( logical_elem.name(), physical_elem_type, @@ -139,8 +152,11 @@ fn calculate_physical_field_type(dtype: &DType, logical_type: &DataType) -> DFRe // For fixed-size list types, recursively check the element type DataType::FixedSizeList(logical_elem, size) => { if let DType::FixedSizeList(elem_dtype, ..) = dtype { - let physical_elem_type = - calculate_physical_field_type(elem_dtype, logical_elem.data_type())?; + let physical_elem_type = calculate_physical_field_type( + elem_dtype, + logical_elem.data_type(), + arrow_session, + )?; let physical_field = Field::new( logical_elem.name(), physical_elem_type, @@ -157,8 +173,11 @@ fn calculate_physical_field_type(dtype: &DType, logical_type: &DataType) -> DFRe // For list view types, recursively check the element type DataType::ListView(logical_elem) | DataType::LargeListView(logical_elem) => { if let DType::List(elem_dtype, _) = dtype { - let physical_elem_type = - calculate_physical_field_type(elem_dtype, logical_elem.data_type())?; + let physical_elem_type = calculate_physical_field_type( + elem_dtype, + logical_elem.data_type(), + arrow_session, + )?; let physical_field = Field::new( logical_elem.name(), physical_elem_type, @@ -175,10 +194,13 @@ fn calculate_physical_field_type(dtype: &DType, logical_type: &DataType) -> DFRe )); } } - // All other types roundtrip cleanly, use the DType's natural conversion - _ => dtype - .to_arrow_dtype() - .map_err(|e| exec_datafusion_err!("Failed to convert dtype to arrow: {e}"))?, + // All other types roundtrip cleanly, use the session-aware Arrow Field inference + // (canonical for non-extension dtypes, plugin-routed for extensions like UUID). + _ => arrow_session + .to_arrow_field("", dtype) + .map_err(|e| exec_datafusion_err!("Failed to convert dtype to arrow: {e}"))? + .data_type() + .clone(), }) } @@ -209,7 +231,8 @@ mod tests { Nullability::NonNullable, ); - let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + let physical_schema = + calculate_physical_schema(&dtype, &logical_schema, &ArrowSession::default()).unwrap(); // Should preserve the dictionary type from the logical schema assert_eq!( @@ -239,7 +262,8 @@ mod tests { Nullability::NonNullable, ); - let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + let physical_schema = + calculate_physical_schema(&dtype, &logical_schema, &ArrowSession::default()).unwrap(); assert_eq!(physical_schema.field(0).data_type(), &DataType::Utf8); assert_eq!(physical_schema.field(1).data_type(), &DataType::LargeUtf8); @@ -259,7 +283,7 @@ mod tests { Nullability::NonNullable, ); - let result = calculate_physical_schema(&dtype, &logical_schema); + let result = calculate_physical_schema(&dtype, &logical_schema, &ArrowSession::default()); assert!( result .unwrap_err() @@ -279,7 +303,7 @@ mod tests { Nullability::NonNullable, ); - let result = calculate_physical_schema(&dtype, &logical_schema); + let result = calculate_physical_schema(&dtype, &logical_schema, &ArrowSession::default()); assert!( result .unwrap_err() @@ -325,7 +349,8 @@ mod tests { Nullability::NonNullable, ); - let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + let physical_schema = + calculate_physical_schema(&dtype, &logical_schema, &ArrowSession::default()).unwrap(); // Check outer structure assert_eq!(physical_schema.fields().len(), 2); @@ -366,7 +391,8 @@ mod tests { Nullability::NonNullable, ); - let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + let physical_schema = + calculate_physical_schema(&dtype, &logical_schema, &ArrowSession::default()).unwrap(); if let DataType::List(elem_field) = physical_schema.field(0).data_type() { assert_eq!( @@ -385,7 +411,7 @@ mod tests { let dtype = DType::Primitive(PType::I32, Nullability::NonNullable); - let result = calculate_physical_schema(&dtype, &logical_schema); + let result = calculate_physical_schema(&dtype, &logical_schema, &ArrowSession::default()); assert!(result.is_err()); assert!( result diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index cb24be972c2..b00dfadaaf0 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -44,11 +44,11 @@ use futures::stream; use object_store::ObjectMeta; use object_store::ObjectStore; use vortex::VortexSessionDefault; +use vortex::array::arrow::ArrowSessionExt; use vortex::array::memory::MemorySessionExt; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::dtype::PType; -use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_err; @@ -378,7 +378,9 @@ impl FileFormat for VortexFormat { .as_any() .downcast_ref::() { - let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?; + let inferred_schema = session + .arrow() + .to_arrow_schema(cached_vortex.footer().dtype())?; return VortexResult::Ok((object.location, inferred_schema)); } @@ -402,7 +404,7 @@ impl FileFormat for VortexFormat { let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata); cache.put(&object.location, entry); - let inferred_schema = vxf.dtype().to_arrow_schema()?; + let inferred_schema = session.arrow().to_arrow_schema(vxf.dtype())?; VortexResult::Ok((object.location, inferred_schema)) }) .map(|f| f.vortex_expect("Failed to spawn infer_schema")) @@ -526,7 +528,16 @@ impl FileFormat for VortexFormat { let column_size = stats_set.get_as::(Stat::UncompressedSizeInBytes, &PType::U64.into()); - let target_dtype = DType::from_arrow(field.as_ref()); + let target_dtype = + session + .arrow() + .from_arrow_field(field.as_ref()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to derive Vortex DType for field {}: {e}", + field.name() + )) + })?; let min = scalar_stat_to_df( Stat::Min, stats_set.get(Stat::Min), diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index acff2a22806..578d26c6dbb 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -34,6 +34,7 @@ use object_store::path::Path; use tracing::Instrument; use vortex::array::VortexSessionExecute; use vortex::array::arrow::ArrowArrayExecutor; +use vortex::array::arrow::ArrowSessionExt; use vortex::dtype::FieldMask; use vortex::error::VortexError; use vortex::error::VortexExpect; @@ -216,6 +217,7 @@ impl FileOpener for VortexOpener { let this_file_schema = Arc::new(calculate_physical_schema( vxf.dtype(), &unified_file_schema, + &session.arrow(), )?); let projected_physical_schema = projection.project_schema(&unified_file_schema)?; @@ -273,7 +275,8 @@ impl FileOpener for VortexOpener { .collect(); Schema::new(fields) }; - let stream_schema = calculate_physical_schema(&scan_dtype, &scan_reference_schema)?; + let stream_schema = + calculate_physical_schema(&scan_dtype, &scan_reference_schema, &session.arrow())?; let leftover_projection = leftover_projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; diff --git a/vortex-datafusion/src/persistent/sink.rs b/vortex-datafusion/src/persistent/sink.rs index 263579a8cd5..d48181efe11 100644 --- a/vortex-datafusion/src/persistent/sink.rs +++ b/vortex-datafusion/src/persistent/sink.rs @@ -25,11 +25,8 @@ use futures::StreamExt; use object_store::ObjectStore; use object_store::path::Path; use tokio_stream::wrappers::ReceiverStream; -use vortex::array::ArrayRef; -use vortex::array::arrow::FromArrowArray; +use vortex::array::arrow::ArrowSessionExt; use vortex::array::stream::ArrayStreamAdapter; -use vortex::dtype::DType; -use vortex::dtype::arrow::FromArrowType; use vortex::file::WriteOptionsSessionExt; use vortex::file::WriteSummary; use vortex::io::VortexWrite; @@ -115,12 +112,23 @@ impl FileSink for VortexSink { let session = self.session.clone(); let object_store = Arc::clone(&object_store); let writer_schema = get_writer_schema(&self.config); - let dtype = DType::from_arrow(writer_schema); + let dtype = session + .arrow() + .from_arrow_schema(&writer_schema) + .map_err(|e| { + exec_datafusion_err!("Failed to derive Vortex DType from writer schema: {e}") + })?; // We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered, // the demux task might deadlock itself. + let arrow_session = session.clone(); + let import_schema = Arc::clone(&writer_schema); file_write_tasks.spawn(async move { - let stream = ReceiverStream::new(rx).map(move |rb| ArrayRef::from_arrow(rb, false)); + let stream = ReceiverStream::new(rx).map(move |rb| { + arrow_session + .arrow() + .from_arrow_record_batch(rb, &import_schema) + }); let stream_adapter = ArrayStreamAdapter::new(dtype, stream); diff --git a/vortex-datafusion/src/persistent/tests.rs b/vortex-datafusion/src/persistent/tests.rs index 194184e904e..c11cdc45a9c 100644 --- a/vortex-datafusion/src/persistent/tests.rs +++ b/vortex-datafusion/src/persistent/tests.rs @@ -148,7 +148,6 @@ async fn test_query_file(#[values(Some(1), None)] limit: Option) -> anyho #[tokio::test] async fn test_addition_pushdown() -> anyhow::Result<()> { let ctx = TestSessionContext::default(); - dbg!(&ctx.store); ctx.session .sql( @@ -432,3 +431,139 @@ async fn test_repartitioned_scan_matches_non_repartitioned_for_uneven_splits() - Ok(()) } + +/// Roundtrip an `arrow.uuid` extension column through a Vortex file: write the column directly +/// via the session-aware Arrow→Vortex conversion, then `SELECT *` and assert both the field +/// metadata and the underlying values survive the trip. +#[tokio::test] +async fn arrow_uuid_extension_roundtrip() -> anyhow::Result<()> { + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Schema; + use arrow_schema::extension::Uuid; + use datafusion::arrow::array::FixedSizeBinaryArray; + use datafusion::arrow::array::RecordBatch; + use datafusion::assert_batches_sorted_eq; + use vortex::array::arrow::ArrowSessionExt; + + let ctx = TestSessionContext::default(); + // Default vortex session has importer/exporter for Arrow UUID + let session = VortexSession::default(); + + let mut uuid_field = Field::new("id", DataType::FixedSizeBinary(16), false); + uuid_field.try_with_extension_type(Uuid)?; + let schema = Arc::new(Schema::new(vec![uuid_field])); + + let uuids = FixedSizeBinaryArray::try_from_iter( + [*b"0123456789abcdef", *b"fedcba9876543210"].into_iter(), + )?; + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(uuids)])?; + let array = session.arrow().from_arrow_record_batch(batch, &schema)?; + + let mut writer = ObjectStoreWrite::new(Arc::clone(&ctx.store), &"uuid.vortex".into()).await?; + session + .write_options() + .write(&mut writer, array.to_array_stream()) + .await?; + writer.shutdown().await?; + + let result = ctx + .session + .sql("SELECT * FROM '/uuid.vortex'") + .await? + .collect() + .await?; + + assert!( + result[0] + .schema_ref() + .field(0) + .has_valid_extension_type::() + ); + + assert_batches_sorted_eq!( + [ + "+----------------------------------+", + "| id |", + "+----------------------------------+", + "| 30313233343536373839616263646566 |", + "| 66656463626139383736353433323130 |", + "+----------------------------------+", + ], + &result + ); + + Ok(()) +} + +/// Same as [`arrow_uuid_extension_roundtrip`] but with the `arrow.uuid` field nested inside a +/// top-level `Struct`, exercising recursive session-aware Field/Schema inference: if any layer +/// falls back to the non-plugin canonical path, the inner field loses its extension metadata. +#[tokio::test] +async fn arrow_uuid_extension_roundtrip_nested_struct() -> anyhow::Result<()> { + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Fields; + use arrow_schema::Schema; + use arrow_schema::extension::Uuid; + use datafusion::arrow::array::Array; + use datafusion::arrow::array::FixedSizeBinaryArray; + use datafusion::arrow::array::RecordBatch; + use datafusion::arrow::array::StructArray as ArrowStructArray; + use datafusion::assert_batches_sorted_eq; + use vortex::array::arrow::ArrowSessionExt; + + let ctx = TestSessionContext::default(); + let session = VortexSession::default(); + + let mut inner_uuid_field = Field::new("id", DataType::FixedSizeBinary(16), false); + inner_uuid_field.try_with_extension_type(Uuid)?; + let payload_fields = Fields::from(vec![inner_uuid_field]); + let payload_field = Field::new("payload", DataType::Struct(payload_fields.clone()), false); + let schema = Arc::new(Schema::new(vec![payload_field])); + + let uuids: Arc = Arc::new(FixedSizeBinaryArray::try_from_iter( + [*b"0123456789abcdef", *b"fedcba9876543210"].into_iter(), + )?); + let payload_array = ArrowStructArray::new(payload_fields, vec![uuids], None); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(payload_array)])?; + let array = session.arrow().from_arrow_record_batch(batch, &schema)?; + + let mut writer = + ObjectStoreWrite::new(Arc::clone(&ctx.store), &"uuid_struct.vortex".into()).await?; + session + .write_options() + .write(&mut writer, array.to_array_stream()) + .await?; + writer.shutdown().await?; + + let result = ctx + .session + .sql("SELECT payload FROM '/uuid_struct.vortex'") + .await? + .collect() + .await?; + + let read_payload = result[0].schema_ref().field(0); + let DataType::Struct(read_inner) = read_payload.data_type() else { + panic!( + "expected Struct payload, got {:?}", + read_payload.data_type() + ); + }; + assert!(read_inner[0].has_valid_extension_type::()); + + assert_batches_sorted_eq!( + [ + "+----------------------------------------+", + "| payload |", + "+----------------------------------------+", + "| {id: 30313233343536373839616263646566} |", + "| {id: 66656463626139383736353433323130} |", + "+----------------------------------------+", + ], + &result + ); + + Ok(()) +}