From 8e4a261a47d8d6bf8f470ffa06636794a1cc6568 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 5 May 2026 08:25:12 -0400 Subject: [PATCH 1/2] pluggable arrow exec Signed-off-by: Andrew Duffy --- vortex-array/public-api.lock | 350 ++++++++++++ vortex-array/src/arrow/canonical.rs | 132 +++++ vortex-array/src/arrow/decoder.rs | 42 ++ vortex-array/src/arrow/decoders/canonical.rs | 49 ++ vortex-array/src/arrow/decoders/mod.rs | 7 + vortex-array/src/arrow/dtype_converter.rs | 62 ++ vortex-array/src/arrow/encoder.rs | 55 ++ vortex-array/src/arrow/encoders/list.rs | 77 +++ vortex-array/src/arrow/encoders/mod.rs | 13 + vortex-array/src/arrow/encoders/temporal.rs | 99 ++++ vortex-array/src/arrow/encoders/varbin.rs | 76 +++ vortex-array/src/arrow/executor/bool.rs | 2 +- vortex-array/src/arrow/executor/byte.rs | 2 +- vortex-array/src/arrow/executor/byte_view.rs | 2 +- vortex-array/src/arrow/executor/decimal.rs | 2 +- vortex-array/src/arrow/executor/dictionary.rs | 2 +- .../src/arrow/executor/fixed_size_list.rs | 2 +- vortex-array/src/arrow/executor/list.rs | 2 +- vortex-array/src/arrow/executor/list_view.rs | 2 +- vortex-array/src/arrow/executor/mod.rs | 194 +------ vortex-array/src/arrow/executor/null.rs | 2 +- vortex-array/src/arrow/executor/primitive.rs | 2 +- vortex-array/src/arrow/executor/run_end.rs | 2 +- vortex-array/src/arrow/executor/struct_.rs | 2 +- vortex-array/src/arrow/executor/temporal.rs | 2 +- vortex-array/src/arrow/mod.rs | 22 + vortex-array/src/arrow/session.rs | 533 ++++++++++++++++++ vortex-array/src/dtype/arrow.rs | 5 + vortex-array/src/lib.rs | 8 +- vortex/src/lib.rs | 2 + 30 files changed, 1570 insertions(+), 182 deletions(-) create mode 100644 vortex-array/src/arrow/canonical.rs create mode 100644 vortex-array/src/arrow/decoder.rs create mode 100644 vortex-array/src/arrow/decoders/canonical.rs create mode 100644 vortex-array/src/arrow/decoders/mod.rs create mode 100644 vortex-array/src/arrow/dtype_converter.rs create mode 100644 vortex-array/src/arrow/encoder.rs create mode 100644 vortex-array/src/arrow/encoders/list.rs create mode 100644 vortex-array/src/arrow/encoders/mod.rs create mode 100644 vortex-array/src/arrow/encoders/temporal.rs create mode 100644 vortex-array/src/arrow/encoders/varbin.rs create mode 100644 vortex-array/src/arrow/session.rs diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index b6de5284907..b189bbe0210 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -7084,6 +7084,210 @@ pub fn vortex_array::arrow::byte_view::canonical_varbinview_to_arrow(&vortex_array::arrays::VarBinViewArray, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub mod vortex_array::arrow::canonical + +pub struct vortex_array::arrow::canonical::CanonicalArrowEncoder + +impl core::default::Default for vortex_array::arrow::canonical::CanonicalArrowEncoder + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::default() -> vortex_array::arrow::canonical::CanonicalArrowEncoder + +impl core::fmt::Debug for vortex_array::arrow::canonical::CanonicalArrowEncoder + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::canonical::CanonicalArrowEncoder + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::canonical::canonical_arrow_type_for_dtype(&vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub mod vortex_array::arrow::decoder + +pub trait vortex_array::arrow::decoder::ArrowDecoder: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::decoder::ArrowDecoder::try_decode(&self, &dyn arrow_array::array::Array, &arrow_schema::field::Field, &vortex_session::VortexSession) -> vortex_error::VortexResult> + +impl vortex_array::arrow::decoder::ArrowDecoder for vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder::try_decode(&self, &dyn arrow_array::array::Array, &arrow_schema::field::Field, &vortex_session::VortexSession) -> vortex_error::VortexResult> + +pub type vortex_array::arrow::decoder::ArrowDecoderRef = alloc::sync::Arc + +pub mod vortex_array::arrow::decoders + +pub mod vortex_array::arrow::decoders::canonical + +pub struct vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader + +impl core::default::Default for vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader::default() -> vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader + +impl core::fmt::Debug for vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::dtype_converter::ArrowDTypeReader for vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader::try_read_dtype(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + +pub struct vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder + +impl core::default::Default for vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder::default() -> vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder + +impl core::fmt::Debug for vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::decoder::ArrowDecoder for vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder::try_decode(&self, &dyn arrow_array::array::Array, &arrow_schema::field::Field, &vortex_session::VortexSession) -> vortex_error::VortexResult> + +pub mod vortex_array::arrow::dtype_converter + +pub trait vortex_array::arrow::dtype_converter::ArrowDTypeConverter: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::dtype_converter::ArrowDTypeConverter::to_arrow_data_type(&self, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::dtype_converter::ArrowDTypeConverter::to_arrow_field(&self, &vortex_array::dtype::extension::ExtDTypeRef, &str) -> vortex_error::VortexResult + +impl vortex_array::arrow::dtype_converter::ArrowDTypeConverter for vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::to_arrow_data_type(&self, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::to_arrow_field(&self, &vortex_array::dtype::extension::ExtDTypeRef, &str) -> vortex_error::VortexResult + +pub trait vortex_array::arrow::dtype_converter::ArrowDTypeReader: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::dtype_converter::ArrowDTypeReader::try_read_dtype(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + +impl vortex_array::arrow::dtype_converter::ArrowDTypeReader for vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader::try_read_dtype(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + +pub type vortex_array::arrow::dtype_converter::ArrowDTypeConverterRef = alloc::sync::Arc + +pub type vortex_array::arrow::dtype_converter::ArrowDTypeReaderRef = alloc::sync::Arc + +pub mod vortex_array::arrow::encoder + +pub trait vortex_array::arrow::encoder::ArrowEncoder: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::encoder::ArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoder::ArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::canonical::CanonicalArrowEncoder + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::list::ListArrowEncoder + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::temporal::TemporalArrowEncoder + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub type vortex_array::arrow::encoder::ArrowEncoderRef = alloc::sync::Arc + +pub mod vortex_array::arrow::encoders + +pub mod vortex_array::arrow::encoders::list + +pub struct vortex_array::arrow::encoders::list::ListArrowEncoder + +impl vortex_array::arrow::encoders::list::ListArrowEncoder + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::array_id() -> vortex_array::ArrayId + +impl core::default::Default for vortex_array::arrow::encoders::list::ListArrowEncoder + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::default() -> vortex_array::arrow::encoders::list::ListArrowEncoder + +impl core::fmt::Debug for vortex_array::arrow::encoders::list::ListArrowEncoder + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::list::ListArrowEncoder + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub mod vortex_array::arrow::encoders::temporal + +pub struct vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter + +impl core::default::Default for vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::default() -> vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter + +impl core::fmt::Debug for vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::dtype_converter::ArrowDTypeConverter for vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::to_arrow_data_type(&self, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::to_arrow_field(&self, &vortex_array::dtype::extension::ExtDTypeRef, &str) -> vortex_error::VortexResult + +pub struct vortex_array::arrow::encoders::temporal::TemporalArrowEncoder + +impl core::default::Default for vortex_array::arrow::encoders::temporal::TemporalArrowEncoder + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::default() -> vortex_array::arrow::encoders::temporal::TemporalArrowEncoder + +impl core::fmt::Debug for vortex_array::arrow::encoders::temporal::TemporalArrowEncoder + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::temporal::TemporalArrowEncoder + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub mod vortex_array::arrow::encoders::varbin + +pub struct vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +impl vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::array_id() -> vortex_array::ArrayId + +impl core::default::Default for vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::default() -> vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +impl core::fmt::Debug for vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + pub mod vortex_array::arrow::null pub fn vortex_array::arrow::null::canonical_null_to_arrow(&vortex_array::arrays::null::NullArray) -> arrow_array::array::ArrayRef @@ -7108,6 +7312,78 @@ impl vortex_array::iter::ArrayIterator for vortex_array::arrow::ArrowArrayStream pub fn vortex_array::arrow::ArrowArrayStreamAdapter::dtype(&self) -> &vortex_array::dtype::DType +pub struct vortex_array::arrow::ArrowSession + +impl vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::canonical_encoder(&self) -> core::option::Option + +pub fn vortex_array::arrow::ArrowSession::decoders(&self) -> alloc::vec::Vec + +pub fn vortex_array::arrow::ArrowSession::default_decoder(&self) -> core::option::Option + +pub fn vortex_array::arrow::ArrowSession::default_dtype_reader(&self) -> core::option::Option + +pub fn vortex_array::arrow::ArrowSession::dtype_converter_for(&self, &vortex_array::dtype::extension::ExtId) -> core::option::Option + +pub fn vortex_array::arrow::ArrowSession::dtype_readers(&self) -> alloc::vec::Vec + +pub fn vortex_array::arrow::ArrowSession::encoder_for_encoding(&self, &vortex_array::ArrayId) -> core::option::Option + +pub fn vortex_array::arrow::ArrowSession::encoder_for_extension(&self, &vortex_array::dtype::extension::ExtId) -> core::option::Option + +pub fn vortex_array::arrow::ArrowSession::register_decoder(&self, impl core::convert::Into) + +pub fn vortex_array::arrow::ArrowSession::register_dtype_converter(&self, impl core::convert::Into, impl core::convert::Into) + +pub fn vortex_array::arrow::ArrowSession::register_dtype_reader(&self, impl core::convert::Into) + +pub fn vortex_array::arrow::ArrowSession::register_encoder_for_encoding(&self, impl core::convert::Into, impl core::convert::Into) + +pub fn vortex_array::arrow::ArrowSession::register_encoder_for_extension(&self, impl core::convert::Into, impl core::convert::Into) + +pub fn vortex_array::arrow::ArrowSession::set_canonical_encoder(&self, impl core::convert::Into) + +pub fn vortex_array::arrow::ArrowSession::set_default_decoder(&self, impl core::convert::Into) + +pub fn vortex_array::arrow::ArrowSession::set_default_dtype_reader(&self, impl core::convert::Into) + +impl vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::from_arrow_array(&self, &dyn arrow_array::array::Array, &arrow_schema::field::Field, &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::from_arrow_field(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::from_arrow_schema(&self, &arrow_schema::schema::Schema) -> vortex_error::VortexResult + +impl vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::resolve_preferred_arrow_type(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::to_arrow_array(&self, vortex_array::ArrayRef, core::option::Option<&arrow_schema::datatype::DataType>, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::to_arrow_data_type(&self, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::to_arrow_field(&self, &str, &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::to_arrow_record_batch(&self, vortex_array::ArrayRef, &arrow_schema::schema::Schema, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowSession::to_arrow_schema(&self, &vortex_array::dtype::StructFields, vortex_array::dtype::Nullability) -> vortex_error::VortexResult + +impl core::default::Default for vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::default() -> Self + +impl core::fmt::Debug for vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_session::SessionVar for vortex_array::arrow::ArrowSession + +pub fn vortex_array::arrow::ArrowSession::as_any(&self) -> &dyn core::any::Any + +pub fn vortex_array::arrow::ArrowSession::as_any_mut(&mut self) -> &mut dyn core::any::Any + pub struct vortex_array::arrow::Datum impl vortex_array::arrow::Datum @@ -7144,6 +7420,72 @@ pub fn vortex_array::ArrayRef::execute_record_batch(self, &arrow_schema::schema: pub fn vortex_array::ArrayRef::execute_record_batches(self, &arrow_schema::schema::Schema, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +pub trait vortex_array::arrow::ArrowDTypeConverter: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::ArrowDTypeConverter::to_arrow_data_type(&self, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::ArrowDTypeConverter::to_arrow_field(&self, &vortex_array::dtype::extension::ExtDTypeRef, &str) -> vortex_error::VortexResult + +impl vortex_array::arrow::dtype_converter::ArrowDTypeConverter for vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::to_arrow_data_type(&self, &vortex_array::dtype::extension::ExtDTypeRef) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowDTypeConverter::to_arrow_field(&self, &vortex_array::dtype::extension::ExtDTypeRef, &str) -> vortex_error::VortexResult + +pub trait vortex_array::arrow::ArrowDTypeReader: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::ArrowDTypeReader::try_read_dtype(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + +impl vortex_array::arrow::dtype_converter::ArrowDTypeReader for vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDTypeReader::try_read_dtype(&self, &arrow_schema::field::Field) -> vortex_error::VortexResult> + +pub trait vortex_array::arrow::ArrowDecoder: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::ArrowDecoder::try_decode(&self, &dyn arrow_array::array::Array, &arrow_schema::field::Field, &vortex_session::VortexSession) -> vortex_error::VortexResult> + +impl vortex_array::arrow::decoder::ArrowDecoder for vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder + +pub fn vortex_array::arrow::decoders::canonical::CanonicalArrowDecoder::try_decode(&self, &dyn arrow_array::array::Array, &arrow_schema::field::Field, &vortex_session::VortexSession) -> vortex_error::VortexResult> + +pub trait vortex_array::arrow::ArrowEncoder: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug + +pub fn vortex_array::arrow::ArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::ArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::canonical::CanonicalArrowEncoder + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::canonical::CanonicalArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::list::ListArrowEncoder + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::list::ListArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::temporal::TemporalArrowEncoder + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::temporal::TemporalArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrow::encoder::ArrowEncoder for vortex_array::arrow::encoders::varbin::VarBinArrowEncoder + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::preferred_arrow_type(&self, &vortex_array::ArrayRef, &vortex_array::arrow::ArrowSession) -> vortex_error::VortexResult> + +pub fn vortex_array::arrow::encoders::varbin::VarBinArrowEncoder::to_arrow_array(&self, vortex_array::ArrayRef, &arrow_schema::datatype::DataType, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + +pub trait vortex_array::arrow::ArrowSessionExt: vortex_session::SessionExt + +pub fn vortex_array::arrow::ArrowSessionExt::arrow(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowSession> + +impl vortex_array::arrow::ArrowSessionExt for S + +pub fn S::arrow(&self) -> vortex_session::Ref<'_, vortex_array::arrow::ArrowSession> + pub trait vortex_array::arrow::FromArrowArray pub fn vortex_array::arrow::FromArrowArray::from_arrow(A, bool) -> vortex_error::VortexResult where Self: core::marker::Sized @@ -7314,6 +7656,14 @@ pub fn vortex_array::arrow::to_arrow_null_buffer(vortex_array::validity::Validit pub fn vortex_array::arrow::to_null_buffer(vortex_mask::Mask) -> core::option::Option +pub type vortex_array::arrow::ArrowDTypeConverterRef = alloc::sync::Arc + +pub type vortex_array::arrow::ArrowDTypeReaderRef = alloc::sync::Arc + +pub type vortex_array::arrow::ArrowDecoderRef = alloc::sync::Arc + +pub type vortex_array::arrow::ArrowEncoderRef = alloc::sync::Arc + pub mod vortex_array::buffer pub struct vortex_array::buffer::BufferHandle(_) diff --git a/vortex-array/src/arrow/canonical.rs b/vortex-array/src/arrow/canonical.rs new file mode 100644 index 00000000000..8c7b19110e4 --- /dev/null +++ b/vortex-array/src/arrow/canonical.rs @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`CanonicalArrowEncoder`] — fallback Vortex → Arrow encoder for canonical encodings. +//! +//! The canonical encoder handles every `DataType` that maps directly to a canonical Vortex +//! encoding (`Bool`, `Primitive`, `VarBinView`, `ListView`, `Struct`, `FixedSizeList`, +//! `Decimal`, `Extension`). For now it also handles a few non-canonical optimizations +//! (offset-based byte/list arrays); those are slated to move into encoding-keyed +//! [`ArrowEncoder`](super::ArrowEncoder) plugins in subsequent work. + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::types::*; +use arrow_schema::DataType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::arrow::ArrowEncoder; +use crate::arrow::ArrowSession; +use crate::arrow::executor::bool::to_arrow_bool; +use crate::arrow::executor::byte::to_arrow_byte_array; +use crate::arrow::executor::byte_view::to_arrow_byte_view; +use crate::arrow::executor::decimal::to_arrow_decimal; +use crate::arrow::executor::dictionary::to_arrow_dictionary; +use crate::arrow::executor::fixed_size_list::to_arrow_fixed_list; +use crate::arrow::executor::list::to_arrow_list; +use crate::arrow::executor::list_view::to_arrow_list_view; +use crate::arrow::executor::null::to_arrow_null; +use crate::arrow::executor::primitive::to_arrow_primitive; +use crate::arrow::executor::run_end::to_arrow_run_end; +use crate::arrow::executor::struct_::to_arrow_struct; +use crate::arrow::executor::temporal::to_arrow_temporal; +use crate::dtype::DType; + +/// Returns the canonical Arrow [`DataType`] for a Vortex [`DType`]. +/// +/// This mirrors [`DType::to_arrow_dtype`] but is kept separate so encoders can use it without +/// touching the deprecated dtype shim. +pub fn canonical_arrow_type_for_dtype(dtype: &DType) -> VortexResult { + dtype.to_arrow_dtype() +} + +/// The default canonical Vortex → Arrow encoder. Registered automatically in +/// [`crate::arrow::ArrowSession::default`]. +#[derive(Debug, Default)] +pub struct CanonicalArrowEncoder; + +impl ArrowEncoder for CanonicalArrowEncoder { + fn preferred_arrow_type( + &self, + array: &ArrayRef, + _session: &ArrowSession, + ) -> VortexResult> { + // The canonical encoder mirrors `DType::to_arrow_dtype()`. Encoding-specific + // shortcuts (e.g. VarBin → Utf8 instead of Utf8View) live on their own plugins. + Ok(Some(canonical_arrow_type_for_dtype(array.dtype())?)) + } + + fn to_arrow_array( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let len = array.len(); + let arrow = match target { + DataType::Null => to_arrow_null(array, ctx), + DataType::Boolean => to_arrow_bool(array, ctx), + DataType::Int8 => to_arrow_primitive::(array, ctx), + DataType::Int16 => to_arrow_primitive::(array, ctx), + DataType::Int32 => to_arrow_primitive::(array, ctx), + DataType::Int64 => to_arrow_primitive::(array, ctx), + DataType::UInt8 => to_arrow_primitive::(array, ctx), + DataType::UInt16 => to_arrow_primitive::(array, ctx), + DataType::UInt32 => to_arrow_primitive::(array, ctx), + DataType::UInt64 => to_arrow_primitive::(array, ctx), + DataType::Float16 => to_arrow_primitive::(array, ctx), + DataType::Float32 => to_arrow_primitive::(array, ctx), + DataType::Float64 => to_arrow_primitive::(array, ctx), + DataType::Timestamp(..) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) => to_arrow_temporal(array, target, ctx), + DataType::Binary => to_arrow_byte_array::(array, ctx), + DataType::LargeBinary => to_arrow_byte_array::(array, ctx), + DataType::Utf8 => to_arrow_byte_array::(array, ctx), + DataType::LargeUtf8 => to_arrow_byte_array::(array, ctx), + DataType::BinaryView => to_arrow_byte_view::(array, ctx), + DataType::Utf8View => to_arrow_byte_view::(array, ctx), + DataType::List(elements_field) => to_arrow_list::(array, elements_field, ctx), + DataType::LargeList(elements_field) => to_arrow_list::(array, elements_field, ctx), + DataType::FixedSizeList(elements_field, list_size) => { + to_arrow_fixed_list(array, *list_size, elements_field, ctx) + } + DataType::ListView(elements_field) => { + to_arrow_list_view::(array, elements_field, ctx) + } + DataType::LargeListView(elements_field) => { + to_arrow_list_view::(array, elements_field, ctx) + } + DataType::Struct(fields) => to_arrow_struct(array, Some(fields), ctx), + DataType::Dictionary(codes_type, values_type) => { + to_arrow_dictionary(array, codes_type, values_type, ctx) + } + dt @ (DataType::Decimal32(..) + | DataType::Decimal64(..) + | DataType::Decimal128(..) + | DataType::Decimal256(..)) => to_arrow_decimal(array, dt, ctx), + DataType::RunEndEncoded(ends_type, values_type) => { + to_arrow_run_end(array, ends_type.data_type(), values_type, ctx) + } + DataType::FixedSizeBinary(_) + | DataType::Map(..) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Union(..) => { + vortex_bail!("Conversion to Arrow type {target} is not supported"); + } + }?; + + vortex_ensure!( + arrow.len() == len, + "Arrow array length does not match Vortex array length after conversion to {:?}", + arrow + ); + Ok(Some(arrow)) + } +} diff --git a/vortex-array/src/arrow/decoder.rs b/vortex-array/src/arrow/decoder.rs new file mode 100644 index 00000000000..29794daf13d --- /dev/null +++ b/vortex-array/src/arrow/decoder.rs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`ArrowDecoder`] — pluggable Arrow → Vortex array conversion. + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_array::Array as ArrowArray; +use arrow_schema::Field; +use vortex_error::VortexResult; +use vortex_session::VortexSession; + +use crate::ArrayRef; + +/// Reference-counted pointer to an [`ArrowDecoder`]. +pub type ArrowDecoderRef = Arc; + +/// Plugin trait that converts an Arrow array into a Vortex [`ArrayRef`]. +/// +/// Decoders are registered as a chain on [`crate::arrow::ArrowSession`] and walked in +/// registration order, with user-registered decoders running before the built-in canonical +/// decoders so external crates can override built-in behavior. +/// +/// Returning [`Ok(None)`] passes the request to the next decoder in the chain. Returning +/// [`Ok(Some(_))`] short-circuits the chain. The dispatcher hard-fails if no decoder claims +/// the request. +pub trait ArrowDecoder: 'static + Send + Sync + Debug { + /// Try to decode `array` into a Vortex [`ArrayRef`]. + /// + /// `field` carries the Arrow extension-name metadata (`ARROW:extension:name`) that lets + /// extension-aware decoders dispatch on logical type rather than physical [`arrow_schema::DataType`]. + /// + /// `session` is the active [`VortexSession`], available so decoders can recurse into child + /// arrays via [`crate::arrow::ArrowSession`] porcelain. + fn try_decode( + &self, + array: &dyn ArrowArray, + field: &Field, + session: &VortexSession, + ) -> VortexResult>; +} diff --git a/vortex-array/src/arrow/decoders/canonical.rs b/vortex-array/src/arrow/decoders/canonical.rs new file mode 100644 index 00000000000..8912e3a5fd5 --- /dev/null +++ b/vortex-array/src/arrow/decoders/canonical.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Canonical [`ArrowDecoder`] / [`ArrowDTypeReader`] — the default fallback for the reverse +//! (Arrow → Vortex) direction. +//! +//! These plugins should always be the **last** entries in their respective chains: they accept +//! every Arrow type the legacy [`FromArrowArray`] / [`FromArrowType`] implementations support, +//! so anything earlier in the chain that wants to override their behavior must run first. + +use arrow_array::Array as ArrowArray; +use arrow_schema::Field; +use vortex_error::VortexResult; +use vortex_session::VortexSession; + +use crate::ArrayRef; +use crate::arrow::ArrowDTypeReader; +use crate::arrow::ArrowDecoder; +use crate::arrow::FromArrowArray; +use crate::dtype::DType; +use crate::dtype::arrow::FromArrowType; + +/// Default [`ArrowDecoder`] that delegates to the legacy +/// [`FromArrowArray`](crate::arrow::FromArrowArray) implementation. +#[derive(Debug, Default)] +pub struct CanonicalArrowDecoder; + +impl ArrowDecoder for CanonicalArrowDecoder { + fn try_decode( + &self, + array: &dyn ArrowArray, + field: &Field, + _session: &VortexSession, + ) -> VortexResult> { + Ok(Some(ArrayRef::from_arrow(array, field.is_nullable())?)) + } +} + +/// Default [`ArrowDTypeReader`] that delegates to the legacy +/// [`FromArrowType`](crate::dtype::arrow::FromArrowType) implementation. Matches every +/// Arrow [`Field`]. +#[derive(Debug, Default)] +pub struct CanonicalArrowDTypeReader; + +impl ArrowDTypeReader for CanonicalArrowDTypeReader { + fn try_read_dtype(&self, field: &Field) -> VortexResult> { + Ok(Some(DType::from_arrow(field))) + } +} diff --git a/vortex-array/src/arrow/decoders/mod.rs b/vortex-array/src/arrow/decoders/mod.rs new file mode 100644 index 00000000000..c4c5e25870b --- /dev/null +++ b/vortex-array/src/arrow/decoders/mod.rs @@ -0,0 +1,7 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Built-in [`ArrowDecoder`](super::ArrowDecoder) and [`ArrowDTypeReader`](super::ArrowDTypeReader) +//! implementations. + +pub mod canonical; diff --git a/vortex-array/src/arrow/dtype_converter.rs b/vortex-array/src/arrow/dtype_converter.rs new file mode 100644 index 00000000000..cbffd6cee40 --- /dev/null +++ b/vortex-array/src/arrow/dtype_converter.rs @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`ArrowDTypeConverter`] and [`ArrowDTypeReader`] — pluggable DType ↔ Arrow conversion. + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Field; +use vortex_error::VortexResult; + +use crate::dtype::DType; +use crate::dtype::extension::ExtDTypeRef; + +/// Reference-counted pointer to an [`ArrowDTypeConverter`]. +pub type ArrowDTypeConverterRef = Arc; + +/// Reference-counted pointer to an [`ArrowDTypeReader`]. +pub type ArrowDTypeReaderRef = Arc; + +/// Plugin trait that converts a Vortex extension [`crate::dtype::DType`] into an Arrow +/// [`DataType`]. +/// +/// Converters are registered against an [`crate::dtype::extension::ExtId`] on the +/// [`crate::arrow::ArrowSession`]. Each extension type that wants Arrow representation must +/// register a converter; the dispatcher hard-fails on unknown extensions. +/// +/// Implementations may also produce a full [`Field`] — useful when the Arrow representation +/// needs `ARROW:extension:name` metadata (for example `arrow.parquet.variant`). +pub trait ArrowDTypeConverter: 'static + Send + Sync + Debug { + /// Convert the extension dtype to its Arrow [`DataType`]. + fn to_arrow_data_type(&self, ext: &ExtDTypeRef) -> VortexResult; + + /// Convert the extension dtype to a fully-decorated Arrow [`Field`]. + /// + /// The default implementation builds a `Field` from `name`, the result of + /// [`ArrowDTypeConverter::to_arrow_data_type`], and `ext.is_nullable()`. Override when the + /// extension type needs Arrow extension metadata on the field. + fn to_arrow_field(&self, ext: &ExtDTypeRef, name: &str) -> VortexResult { + Ok(Field::new( + name, + self.to_arrow_data_type(ext)?, + ext.is_nullable(), + )) + } +} + +/// Plugin trait that converts an Arrow [`Field`] into a Vortex [`DType`]. +/// +/// Readers are registered as a chain on [`crate::arrow::ArrowSession`] and walked in +/// registration order, with user-registered readers running before the built-in readers so +/// external crates can override built-in behavior. +/// +/// Returning [`Ok(None)`] passes the request to the next reader in the chain. +pub trait ArrowDTypeReader: 'static + Send + Sync + Debug { + /// Try to read a Vortex [`DType`] from an Arrow [`Field`]. + /// + /// Implementations typically inspect [`Field::metadata`] for the `ARROW:extension:name` + /// key and dispatch on it. + fn try_read_dtype(&self, field: &Field) -> VortexResult>; +} diff --git a/vortex-array/src/arrow/encoder.rs b/vortex-array/src/arrow/encoder.rs new file mode 100644 index 00000000000..9c7f562a529 --- /dev/null +++ b/vortex-array/src/arrow/encoder.rs @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`ArrowEncoder`] — pluggable Vortex → Arrow array conversion. + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::DataType; +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::arrow::ArrowSession; + +/// Reference-counted pointer to an [`ArrowEncoder`]. +pub type ArrowEncoderRef = Arc; + +/// Plugin trait that converts a Vortex [`ArrayRef`] into an Arrow array. +/// +/// Encoders are registered against an [`crate::array::ArrayId`] (encoding-keyed) or an +/// [`crate::dtype::extension::ExtId`] (extension-keyed) on the [`crate::arrow::ArrowSession`]. +/// Returning [`None`] from [`ArrowEncoder::to_arrow_array`] tells the dispatcher to fall through +/// to the canonical encoder. This is the only way an encoder can decline a request. +pub trait ArrowEncoder: 'static + Send + Sync + Debug { + /// Returns the Arrow [`DataType`] this encoder would prefer to emit for `array`. + /// + /// `session` is provided so encoders for nested types (e.g. + /// [`crate::arrays::List`]) can recursively resolve the preferred Arrow type of their + /// children via [`ArrowSession::resolve_preferred_arrow_type`]. + /// + /// Returning [`None`] defers to the canonical encoder's preference (e.g. + /// [`DataType::Utf8View`] for [`crate::dtype::DType::Utf8`]). Implementations should only + /// override the canonical preference when they can produce a cheaper Arrow representation + /// (for example, [`crate::arrays::VarBin`] preferring offset-based [`DataType::Utf8`] over + /// [`DataType::Utf8View`]). + fn preferred_arrow_type( + &self, + array: &ArrayRef, + session: &ArrowSession, + ) -> VortexResult>; + + /// Convert `array` into an Arrow array of type `target`. + /// + /// Returning [`Ok(None)`] tells the dispatcher to canonicalize the array and re-dispatch + /// through the canonical encoder. Encoders may decline requests they don't recognize but + /// must not silently mis-convert. + fn to_arrow_array( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult>; +} diff --git a/vortex-array/src/arrow/encoders/list.rs b/vortex-array/src/arrow/encoders/list.rs new file mode 100644 index 00000000000..93a3e2c6d94 --- /dev/null +++ b/vortex-array/src/arrow/encoders/list.rs @@ -0,0 +1,77 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`ListArrowEncoder`] — short-circuits List → Arrow [`arrow_array::GenericListArray`] for +//! offset-based targets. + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::DataType; +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::array::ArrayId; +use crate::array::ArrayPlugin; +use crate::arrays::List; +use crate::arrays::list::ListArrayExt; +use crate::arrow::ArrowEncoder; +use crate::arrow::ArrowSession; +use crate::arrow::executor::list::to_arrow_list; +use crate::dtype::PType; + +/// Forward [`ArrowEncoder`] keyed by the [`crate::arrays::List`] [`ArrayId`]. +/// +/// Handles [`DataType::List`] and [`DataType::LargeList`] targets directly without canonicalizing +/// to [`crate::arrays::ListView`]. Returns [`None`] for any other target. +#[derive(Debug, Default)] +pub struct ListArrowEncoder; + +impl ListArrowEncoder { + /// The encoding [`ArrayId`] this encoder is registered against. + pub fn array_id() -> ArrayId { + List.id() + } +} + +impl ArrowEncoder for ListArrowEncoder { + fn preferred_arrow_type( + &self, + array: &ArrayRef, + session: &ArrowSession, + ) -> VortexResult> { + let Some(list) = array.as_opt::() else { + return Ok(None); + }; + let offsets_ptype = PType::try_from(list.offsets().dtype())?; + let use_large = matches!(offsets_ptype, PType::I64 | PType::U64); + // Recurse via the session so nested List/VarBin children pick up their own + // encoder-specific preferences. + let elem_dtype = session.resolve_preferred_arrow_type(list.elements())?; + let field = arrow_schema::FieldRef::new(arrow_schema::Field::new_list_field( + elem_dtype, + list.elements().dtype().is_nullable(), + )); + Ok(Some(if use_large { + DataType::LargeList(field) + } else { + DataType::List(field) + })) + } + + fn to_arrow_array( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + match target { + DataType::List(elements_field) => { + to_arrow_list::(array, elements_field, ctx).map(Some) + } + DataType::LargeList(elements_field) => { + to_arrow_list::(array, elements_field, ctx).map(Some) + } + _ => Ok(None), + } + } +} diff --git a/vortex-array/src/arrow/encoders/mod.rs b/vortex-array/src/arrow/encoders/mod.rs new file mode 100644 index 00000000000..33b8733f01b --- /dev/null +++ b/vortex-array/src/arrow/encoders/mod.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Built-in encoding-keyed [`ArrowEncoder`](super::ArrowEncoder) plugins. +//! +//! These plugins ride alongside the [`CanonicalArrowEncoder`](super::canonical::CanonicalArrowEncoder) +//! to short-circuit forward conversion for encodings that have a cheaper Arrow representation +//! than canonicalizing the array first (for example, [`crate::arrays::VarBin`] preferring +//! offset-based [`arrow_schema::DataType::Utf8`] over [`arrow_schema::DataType::Utf8View`]). + +pub mod list; +pub mod temporal; +pub mod varbin; diff --git a/vortex-array/src/arrow/encoders/temporal.rs b/vortex-array/src/arrow/encoders/temporal.rs new file mode 100644 index 00000000000..cb0a5356d53 --- /dev/null +++ b/vortex-array/src/arrow/encoders/temporal.rs @@ -0,0 +1,99 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Temporal-extension Arrow plugins for `vortex.date`, `vortex.time`, and `vortex.timestamp`. + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::DataType; +use arrow_schema::TimeUnit as ArrowTimeUnit; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::arrow::ArrowEncoder; +use crate::arrow::ArrowSession; +use crate::arrow::dtype_converter::ArrowDTypeConverter; +use crate::arrow::executor::temporal::to_arrow_temporal; +use crate::dtype::DType; +use crate::dtype::extension::ExtDTypeRef; +use crate::extension::datetime::AnyTemporal; +use crate::extension::datetime::TemporalMetadata; +use crate::extension::datetime::TimeUnit; + +/// Map a temporal extension dtype to its preferred Arrow [`DataType`]. +fn temporal_arrow_data_type(ext: &ExtDTypeRef) -> VortexResult { + let temporal = ext + .metadata_opt::() + .ok_or_else(|| vortex_err!("ExtDType {} is not a temporal extension", ext.id()))?; + Ok(match temporal { + TemporalMetadata::Timestamp(unit, tz) => { + DataType::Timestamp(ArrowTimeUnit::try_from(*unit)?, tz.clone()) + } + TemporalMetadata::Date(unit) => match unit { + TimeUnit::Days => DataType::Date32, + TimeUnit::Milliseconds => DataType::Date64, + TimeUnit::Nanoseconds | TimeUnit::Microseconds | TimeUnit::Seconds => { + vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext.id()) + } + }, + TemporalMetadata::Time(unit) => match unit { + TimeUnit::Seconds => DataType::Time32(ArrowTimeUnit::Second), + TimeUnit::Milliseconds => DataType::Time32(ArrowTimeUnit::Millisecond), + TimeUnit::Microseconds => DataType::Time64(ArrowTimeUnit::Microsecond), + TimeUnit::Nanoseconds => DataType::Time64(ArrowTimeUnit::Nanosecond), + TimeUnit::Days => { + vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext.id()) + } + }, + }) +} + +/// [`ArrowDTypeConverter`] for the built-in temporal extensions +/// (`vortex.date`, `vortex.time`, `vortex.timestamp`). +#[derive(Debug, Default)] +pub struct TemporalArrowDTypeConverter; + +impl ArrowDTypeConverter for TemporalArrowDTypeConverter { + fn to_arrow_data_type(&self, ext: &ExtDTypeRef) -> VortexResult { + temporal_arrow_data_type(ext) + } +} + +/// [`ArrowEncoder`] for the built-in temporal extensions. Registered against the +/// `vortex.date`, `vortex.time`, and `vortex.timestamp` [`crate::dtype::extension::ExtId`]s. +#[derive(Debug, Default)] +pub struct TemporalArrowEncoder; + +impl ArrowEncoder for TemporalArrowEncoder { + fn preferred_arrow_type( + &self, + array: &ArrayRef, + _session: &ArrowSession, + ) -> VortexResult> { + let DType::Extension(ext) = array.dtype() else { + return Ok(None); + }; + if ext.metadata_opt::().is_none() { + return Ok(None); + } + Ok(Some(temporal_arrow_data_type(ext)?)) + } + + fn to_arrow_array( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + match target { + DataType::Timestamp(..) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) => to_arrow_temporal(array, target, ctx).map(Some), + _ => Ok(None), + } + } +} diff --git a/vortex-array/src/arrow/encoders/varbin.rs b/vortex-array/src/arrow/encoders/varbin.rs new file mode 100644 index 00000000000..e16851a115c --- /dev/null +++ b/vortex-array/src/arrow/encoders/varbin.rs @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`VarBinArrowEncoder`] — short-circuits VarBin → Arrow byte arrays for offset-based targets. + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::types::BinaryType; +use arrow_array::types::LargeBinaryType; +use arrow_array::types::LargeUtf8Type; +use arrow_array::types::Utf8Type; +use arrow_schema::DataType; +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::array::ArrayId; +use crate::array::ArrayPlugin; +use crate::arrays::VarBin; +use crate::arrays::varbin::VarBinArrayExt; +use crate::arrow::ArrowEncoder; +use crate::arrow::ArrowSession; +use crate::arrow::executor::byte::to_arrow_byte_array; +use crate::dtype::DType; +use crate::dtype::PType; + +/// Forward [`ArrowEncoder`] keyed by the [`crate::arrays::VarBin`] [`ArrayId`]. +/// +/// Handles the four offset-based byte targets ([`DataType::Utf8`], [`DataType::LargeUtf8`], +/// [`DataType::Binary`], [`DataType::LargeBinary`]) without going through +/// [`crate::arrays::VarBinView`]. Returns [`None`] for any other target so the dispatcher +/// falls back to the canonical encoder. +#[derive(Debug, Default)] +pub struct VarBinArrowEncoder; + +impl VarBinArrowEncoder { + /// The encoding [`ArrayId`] this encoder is registered against. + pub fn array_id() -> ArrayId { + VarBin.id() + } +} + +impl ArrowEncoder for VarBinArrowEncoder { + fn preferred_arrow_type( + &self, + array: &ArrayRef, + _session: &ArrowSession, + ) -> VortexResult> { + let Some(varbin) = array.as_opt::() else { + return Ok(None); + }; + let offsets_ptype = PType::try_from(varbin.offsets().dtype())?; + let use_large = matches!(offsets_ptype, PType::I64 | PType::U64); + Ok(Some(match (varbin.dtype(), use_large) { + (DType::Utf8(_), false) => DataType::Utf8, + (DType::Utf8(_), true) => DataType::LargeUtf8, + (DType::Binary(_), false) => DataType::Binary, + (DType::Binary(_), true) => DataType::LargeBinary, + _ => unreachable!("VarBinArray must have Utf8 or Binary dtype"), + })) + } + + fn to_arrow_array( + &self, + array: ArrayRef, + target: &DataType, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + match target { + DataType::Utf8 => to_arrow_byte_array::(array, ctx).map(Some), + DataType::LargeUtf8 => to_arrow_byte_array::(array, ctx).map(Some), + DataType::Binary => to_arrow_byte_array::(array, ctx).map(Some), + DataType::LargeBinary => to_arrow_byte_array::(array, ctx).map(Some), + _ => Ok(None), + } + } +} diff --git a/vortex-array/src/arrow/executor/bool.rs b/vortex-array/src/arrow/executor/bool.rs index 68f2451cd77..0f59f0f1cc0 100644 --- a/vortex-array/src/arrow/executor/bool.rs +++ b/vortex-array/src/arrow/executor/bool.rs @@ -29,7 +29,7 @@ pub fn canonical_bool_to_arrow( ))) } -pub(super) fn to_arrow_bool( +pub(in crate::arrow) fn to_arrow_bool( array: ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult { diff --git a/vortex-array/src/arrow/executor/byte.rs b/vortex-array/src/arrow/executor/byte.rs index bace6665bca..d2805306516 100644 --- a/vortex-array/src/arrow/executor/byte.rs +++ b/vortex-array/src/arrow/executor/byte.rs @@ -26,7 +26,7 @@ use crate::dtype::NativePType; use crate::dtype::Nullability; /// Convert a Vortex array into an Arrow GenericBinaryArray. -pub(super) fn to_arrow_byte_array( +pub(in crate::arrow) fn to_arrow_byte_array( array: ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult diff --git a/vortex-array/src/arrow/executor/byte_view.rs b/vortex-array/src/arrow/executor/byte_view.rs index b88b1895d53..c73828580eb 100644 --- a/vortex-array/src/arrow/executor/byte_view.rs +++ b/vortex-array/src/arrow/executor/byte_view.rs @@ -63,7 +63,7 @@ pub fn execute_varbinview_to_arrow( })) } -pub(super) fn to_arrow_byte_view( +pub(in crate::arrow) fn to_arrow_byte_view( array: ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult { diff --git a/vortex-array/src/arrow/executor/decimal.rs b/vortex-array/src/arrow/executor/decimal.rs index 077495354cc..19717a45cdb 100644 --- a/vortex-array/src/arrow/executor/decimal.rs +++ b/vortex-array/src/arrow/executor/decimal.rs @@ -23,7 +23,7 @@ use crate::arrays::DecimalArray; use crate::arrow::null_buffer::to_null_buffer; use crate::dtype::DecimalType; -pub(super) fn to_arrow_decimal( +pub(in crate::arrow) fn to_arrow_decimal( array: ArrayRef, data_type: &DataType, ctx: &mut ExecutionCtx, diff --git a/vortex-array/src/arrow/executor/dictionary.rs b/vortex-array/src/arrow/executor/dictionary.rs index 99e315bd8f7..3811421cfc8 100644 --- a/vortex-array/src/arrow/executor/dictionary.rs +++ b/vortex-array/src/arrow/executor/dictionary.rs @@ -24,7 +24,7 @@ use crate::arrays::DictArray; use crate::arrays::dict::DictArraySlotsExt; use crate::arrow::ArrowArrayExecutor; -pub(super) fn to_arrow_dictionary( +pub(in crate::arrow) fn to_arrow_dictionary( array: ArrayRef, codes_type: &DataType, values_type: &DataType, diff --git a/vortex-array/src/arrow/executor/fixed_size_list.rs b/vortex-array/src/arrow/executor/fixed_size_list.rs index 7abea9aaa7a..a3cd8942817 100644 --- a/vortex-array/src/arrow/executor/fixed_size_list.rs +++ b/vortex-array/src/arrow/executor/fixed_size_list.rs @@ -15,7 +15,7 @@ use crate::arrays::fixed_size_list::FixedSizeListArrayExt; use crate::arrow::ArrowArrayExecutor; use crate::arrow::executor::validity::to_arrow_null_buffer; -pub(super) fn to_arrow_fixed_list( +pub(in crate::arrow) fn to_arrow_fixed_list( array: ArrayRef, list_size: i32, elements_field: &FieldRef, diff --git a/vortex-array/src/arrow/executor/list.rs b/vortex-array/src/arrow/executor/list.rs index 6c7c271e933..2a5d46453ac 100644 --- a/vortex-array/src/arrow/executor/list.rs +++ b/vortex-array/src/arrow/executor/list.rs @@ -34,7 +34,7 @@ use crate::dtype::NativePType; use crate::dtype::Nullability; /// Convert a Vortex VarBinArray into an Arrow [`GenericListArray`](arrow_array:array::GenericListArray). -pub(super) fn to_arrow_list( +pub(in crate::arrow) fn to_arrow_list( array: ArrayRef, elements_field: &FieldRef, ctx: &mut ExecutionCtx, diff --git a/vortex-array/src/arrow/executor/list_view.rs b/vortex-array/src/arrow/executor/list_view.rs index a6795b1adaa..fec120c61a8 100644 --- a/vortex-array/src/arrow/executor/list_view.rs +++ b/vortex-array/src/arrow/executor/list_view.rs @@ -22,7 +22,7 @@ use crate::dtype::DType; use crate::dtype::IntegerPType; use crate::dtype::Nullability::NonNullable; -pub(super) fn to_arrow_list_view( +pub(in crate::arrow) fn to_arrow_list_view( array: ArrayRef, elements_field: &FieldRef, ctx: &mut ExecutionCtx, diff --git a/vortex-array/src/arrow/executor/mod.rs b/vortex-array/src/arrow/executor/mod.rs index 890e7f8a46a..69b01763478 100644 --- a/vortex-array/src/arrow/executor/mod.rs +++ b/vortex-array/src/arrow/executor/mod.rs @@ -2,56 +2,37 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub mod bool; -mod byte; +pub(crate) mod byte; pub mod byte_view; -mod decimal; -mod dictionary; -mod fixed_size_list; -mod list; -mod list_view; +pub(crate) mod decimal; +pub(crate) mod dictionary; +pub(crate) mod fixed_size_list; +pub(crate) mod list; +pub(crate) mod list_view; pub mod null; pub mod primitive; -mod run_end; -mod struct_; -mod temporal; -mod validity; +pub(crate) mod run_end; +pub(crate) mod struct_; +pub(crate) mod temporal; +pub(crate) mod validity; use arrow_array::ArrayRef as ArrowArrayRef; use arrow_array::RecordBatch; -use arrow_array::cast::AsArray; -use arrow_array::types::*; use arrow_schema::DataType; -use arrow_schema::Field; -use arrow_schema::FieldRef; use arrow_schema::Schema; use itertools::Itertools; use vortex_error::VortexResult; -use vortex_error::vortex_bail; -use vortex_error::vortex_ensure; use crate::ArrayRef; -use crate::arrays::List; -use crate::arrays::VarBin; -use crate::arrays::list::ListArrayExt; -use crate::arrays::varbin::VarBinArrayExt; -use crate::arrow::executor::bool::to_arrow_bool; -use crate::arrow::executor::byte::to_arrow_byte_array; -use crate::arrow::executor::byte_view::to_arrow_byte_view; -use crate::arrow::executor::decimal::to_arrow_decimal; -use crate::arrow::executor::dictionary::to_arrow_dictionary; -use crate::arrow::executor::fixed_size_list::to_arrow_fixed_list; -use crate::arrow::executor::list::to_arrow_list; -use crate::arrow::executor::list_view::to_arrow_list_view; -use crate::arrow::executor::null::to_arrow_null; -use crate::arrow::executor::primitive::to_arrow_primitive; -use crate::arrow::executor::run_end::to_arrow_run_end; -use crate::arrow::executor::struct_::to_arrow_struct; -use crate::arrow::executor::temporal::to_arrow_temporal; -use crate::dtype::DType; -use crate::dtype::PType; +use crate::arrow::ArrowSessionExt; use crate::executor::ExecutionCtx; /// Trait for executing a Vortex array to produce an Arrow array. +/// +/// Prefer [`crate::arrow::ArrowSession`] porcelain on the active +/// [`vortex_session::VortexSession`] (e.g. `session.arrow().to_arrow_array(...)` via +/// [`crate::arrow::ArrowSessionExt`]). This trait is the underlying shim that delegates +/// to that porcelain and is retained for callers that already hold an [`ArrayRef`]. pub trait ArrowArrayExecutor: Sized { /// Execute the array to produce an Arrow array. /// @@ -68,10 +49,7 @@ pub trait ArrowArrayExecutor: Sized { self, schema: &Schema, ctx: &mut ExecutionCtx, - ) -> VortexResult { - let array = self.execute_arrow(Some(&DataType::Struct(schema.fields.clone())), ctx)?; - Ok(RecordBatch::from(array.as_struct())) - } + ) -> VortexResult; /// Execute the array to produce Arrow `RecordBatch`'s with the given schema. fn execute_record_batches( @@ -87,92 +65,17 @@ impl ArrowArrayExecutor for ArrayRef { data_type: Option<&DataType>, ctx: &mut ExecutionCtx, ) -> VortexResult { - let len = self.len(); - - // Resolve the DataType if it is a leaf type - // we should likely make this extensible. - let resolved_type: DataType = match data_type { - Some(dt) => dt.clone(), - None => preferred_arrow_type(&self)?, - }; - - let arrow = match &resolved_type { - DataType::Null => to_arrow_null(self, ctx), - DataType::Boolean => to_arrow_bool(self, ctx), - DataType::Int8 => to_arrow_primitive::(self, ctx), - DataType::Int16 => to_arrow_primitive::(self, ctx), - DataType::Int32 => to_arrow_primitive::(self, ctx), - DataType::Int64 => to_arrow_primitive::(self, ctx), - DataType::UInt8 => to_arrow_primitive::(self, ctx), - DataType::UInt16 => to_arrow_primitive::(self, ctx), - DataType::UInt32 => to_arrow_primitive::(self, ctx), - DataType::UInt64 => to_arrow_primitive::(self, ctx), - DataType::Float16 => to_arrow_primitive::(self, ctx), - DataType::Float32 => to_arrow_primitive::(self, ctx), - DataType::Float64 => to_arrow_primitive::(self, ctx), - DataType::Timestamp(..) - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) => to_arrow_temporal(self, &resolved_type, ctx), - DataType::Binary => to_arrow_byte_array::(self, ctx), - DataType::LargeBinary => to_arrow_byte_array::(self, ctx), - DataType::Utf8 => to_arrow_byte_array::(self, ctx), - DataType::LargeUtf8 => to_arrow_byte_array::(self, ctx), - DataType::BinaryView => to_arrow_byte_view::(self, ctx), - DataType::Utf8View => to_arrow_byte_view::(self, ctx), - // TODO(joe): pass down preferred - DataType::List(elements_field) => to_arrow_list::(self, elements_field, ctx), - // TODO(joe): pass down preferred - DataType::LargeList(elements_field) => to_arrow_list::(self, elements_field, ctx), - // TODO(joe): pass down preferred - DataType::FixedSizeList(elements_field, list_size) => { - to_arrow_fixed_list(self, *list_size, elements_field, ctx) - } - // TODO(joe): pass down preferred - DataType::ListView(elements_field) => { - to_arrow_list_view::(self, elements_field, ctx) - } - // TODO(joe): pass down preferred - DataType::LargeListView(elements_field) => { - to_arrow_list_view::(self, elements_field, ctx) - } - DataType::Struct(fields) => { - let fields = if data_type.is_none() { - None - } else { - Some(fields) - }; - to_arrow_struct(self, fields, ctx) - } - // TODO(joe): pass down preferred - DataType::Dictionary(codes_type, values_type) => { - to_arrow_dictionary(self, codes_type, values_type, ctx) - } - dt @ DataType::Decimal32(..) => to_arrow_decimal(self, dt, ctx), - dt @ DataType::Decimal64(..) => to_arrow_decimal(self, dt, ctx), - dt @ DataType::Decimal128(..) => to_arrow_decimal(self, dt, ctx), - dt @ DataType::Decimal256(..) => to_arrow_decimal(self, dt, ctx), - // TODO(joe): pass down preferred - DataType::RunEndEncoded(ends_type, values_type) => { - to_arrow_run_end(self, ends_type.data_type(), values_type, ctx) - } - DataType::FixedSizeBinary(_) - | DataType::Map(..) - | DataType::Duration(_) - | DataType::Interval(_) - | DataType::Union(..) => { - vortex_bail!("Conversion to Arrow type {resolved_type} is not supported"); - } - }?; - - vortex_ensure!( - arrow.len() == len, - "Arrow array length does not match Vortex array length after conversion to {:?}", - arrow - ); + let session = ctx.session().clone(); + session.arrow().to_arrow_array(self, data_type, ctx) + } - Ok(arrow) + fn execute_record_batch( + self, + schema: &Schema, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let session = ctx.session().clone(); + session.arrow().to_arrow_record_batch(self, schema, ctx) } fn execute_record_batches( @@ -185,46 +88,3 @@ impl ArrowArrayExecutor for ArrayRef { .try_collect() } } - -/// Determine the preferred (cheapest) Arrow type for an array. -/// -/// For most arrays, this returns the canonical Arrow type from `dtype.to_arrow_dtype()`. -/// However, some encodings have cheaper Arrow representations: -/// - `VarBinArray`: Uses `Utf8`/`Binary` (offset-based) instead of `Utf8View`/`BinaryView` -/// - `ListArray`: Uses `List` instead of `ListView` -fn preferred_arrow_type(array: &ArrayRef) -> VortexResult { - // VarBinArray: use offset-based Binary/Utf8 instead of View types - if let Some(varbin) = array.as_opt::() { - let offsets_ptype = PType::try_from(varbin.offsets().dtype())?; - let use_large = matches!(offsets_ptype, PType::I64 | PType::U64); - - return Ok(match (varbin.dtype(), use_large) { - (DType::Utf8(_), false) => DataType::Utf8, - (DType::Utf8(_), true) => DataType::LargeUtf8, - (DType::Binary(_), false) => DataType::Binary, - (DType::Binary(_), true) => DataType::LargeBinary, - _ => unreachable!("VarBinArray must have Utf8 or Binary dtype"), - }); - } - - // ListArray: use List with appropriate offset size - if let Some(list) = array.as_opt::() { - let offsets_ptype = PType::try_from(list.offsets().dtype())?; - let use_large = matches!(offsets_ptype, PType::I64 | PType::U64); - // Recursively get the preferred type for elements - let elem_dtype = preferred_arrow_type(list.elements())?; - let field = FieldRef::new(Field::new_list_field( - elem_dtype, - list.elements().dtype().is_nullable(), - )); - - return Ok(if use_large { - DataType::LargeList(field) - } else { - DataType::List(field) - }); - } - - // Everything else: use canonical dtype conversion - array.dtype().to_arrow_dtype() -} diff --git a/vortex-array/src/arrow/executor/null.rs b/vortex-array/src/arrow/executor/null.rs index 9d2b5ef6fc4..f23567d1242 100644 --- a/vortex-array/src/arrow/executor/null.rs +++ b/vortex-array/src/arrow/executor/null.rs @@ -16,7 +16,7 @@ pub fn canonical_null_to_arrow(array: &NullArray) -> ArrowArrayRef { Arc::new(ArrowNullArray::new(array.len())) } -pub(super) fn to_arrow_null( +pub(in crate::arrow) fn to_arrow_null( array: ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult { diff --git a/vortex-array/src/arrow/executor/primitive.rs b/vortex-array/src/arrow/executor/primitive.rs index 0d41176972b..d77e7e0eb1c 100644 --- a/vortex-array/src/arrow/executor/primitive.rs +++ b/vortex-array/src/arrow/executor/primitive.rs @@ -34,7 +34,7 @@ where Ok(Arc::new(ArrowPrimitiveArray::::new(buffer, null_buffer))) } -pub(super) fn to_arrow_primitive( +pub(in crate::arrow) fn to_arrow_primitive( array: ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult diff --git a/vortex-array/src/arrow/executor/run_end.rs b/vortex-array/src/arrow/executor/run_end.rs index b579cab7e4d..4d1a222adfb 100644 --- a/vortex-array/src/arrow/executor/run_end.rs +++ b/vortex-array/src/arrow/executor/run_end.rs @@ -41,7 +41,7 @@ struct RunEndMetadata { pub offset: u64, } -pub(super) fn to_arrow_run_end( +pub(in crate::arrow) fn to_arrow_run_end( array: ArrayRef, ends_type: &DataType, values_type: &Field, diff --git a/vortex-array/src/arrow/executor/struct_.rs b/vortex-array/src/arrow/executor/struct_.rs index 909fd78059d..d94327bc51a 100644 --- a/vortex-array/src/arrow/executor/struct_.rs +++ b/vortex-array/src/arrow/executor/struct_.rs @@ -30,7 +30,7 @@ use crate::dtype::StructFields; use crate::dtype::arrow::FromArrowType; use crate::scalar_fn::fns::pack::Pack; -pub(super) fn to_arrow_struct( +pub(in crate::arrow) fn to_arrow_struct( array: ArrayRef, target_fields: Option<&Fields>, ctx: &mut ExecutionCtx, diff --git a/vortex-array/src/arrow/executor/temporal.rs b/vortex-array/src/arrow/executor/temporal.rs index c68a4f3ee9f..dd0bf80fc22 100644 --- a/vortex-array/src/arrow/executor/temporal.rs +++ b/vortex-array/src/arrow/executor/temporal.rs @@ -36,7 +36,7 @@ use crate::extension::datetime::AnyTemporal; use crate::extension::datetime::TemporalMetadata; use crate::extension::datetime::TimeUnit; -pub(super) fn to_arrow_temporal( +pub(in crate::arrow) fn to_arrow_temporal( array: ArrayRef, data_type: &DataType, ctx: &mut ExecutionCtx, diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index efc83aa6af6..db99a2c26a0 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -7,23 +7,45 @@ use arrow_array::ArrayRef as ArrowArrayRef; use arrow_schema::DataType; use vortex_error::VortexResult; +pub mod canonical; mod convert; mod datum; +pub mod decoder; +pub mod decoders; +pub mod dtype_converter; +pub mod encoder; +pub mod encoders; mod executor; mod iter; mod null_buffer; mod record_batch; +mod session; pub use datum::*; +pub use decoder::ArrowDecoder; +pub use decoder::ArrowDecoderRef; +pub use dtype_converter::ArrowDTypeConverter; +pub use dtype_converter::ArrowDTypeConverterRef; +pub use dtype_converter::ArrowDTypeReader; +pub use dtype_converter::ArrowDTypeReaderRef; +pub use encoder::ArrowEncoder; +pub use encoder::ArrowEncoderRef; pub use executor::*; pub use iter::*; pub use null_buffer::to_arrow_null_buffer; pub use null_buffer::to_null_buffer; +pub use session::ArrowSession; +pub use session::ArrowSessionExt; use crate::ArrayRef; use crate::LEGACY_SESSION; use crate::VortexSessionExecute; +/// Convert an Arrow array into a Vortex [`ArrayRef`]. +/// +/// Prefer the porcelain on [`ArrowSession`] (`session.arrow().from_arrow_array(...)`), +/// which dispatches through registered [`ArrowDecoder`] plugins. This trait is retained +/// as the underlying canonical implementation that the default decoder delegates to. pub trait FromArrowArray { fn from_arrow(array: A, nullable: bool) -> VortexResult where diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs new file mode 100644 index 00000000000..f56021c6594 --- /dev/null +++ b/vortex-array/src/arrow/session.rs @@ -0,0 +1,533 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`ArrowSession`] — pluggable Vortex ↔ Arrow conversion session facet. + +use std::any::Any; +use std::sync::Arc; + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::RecordBatch; +use arrow_array::cast::AsArray; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use parking_lot::RwLock; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_session::Ref; +use vortex_session::SessionExt; +use vortex_session::SessionVar; +use vortex_session::registry::Id; +use vortex_session::registry::Registry; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::array::ArrayId; +use crate::arrow::canonical::CanonicalArrowEncoder; +use crate::arrow::decoder::ArrowDecoderRef; +use crate::arrow::decoders::canonical::CanonicalArrowDTypeReader; +use crate::arrow::decoders::canonical::CanonicalArrowDecoder; +use crate::arrow::dtype_converter::ArrowDTypeConverterRef; +use crate::arrow::dtype_converter::ArrowDTypeReaderRef; +use crate::arrow::encoder::ArrowEncoderRef; +use crate::arrow::encoders::list::ListArrowEncoder; +use crate::arrow::encoders::temporal::TemporalArrowDTypeConverter; +use crate::arrow::encoders::temporal::TemporalArrowEncoder; +use crate::arrow::encoders::varbin::VarBinArrowEncoder; +use crate::dtype::DType; +use crate::dtype::Nullability; +use crate::dtype::StructFields; +use crate::dtype::extension::ExtId; +use crate::dtype::extension::ExtVTable; +use crate::extension::datetime::Date; +use crate::extension::datetime::Time; +use crate::extension::datetime::Timestamp; + +/// Registry for [`crate::arrow::ArrowEncoder`]s keyed by [`ArrayId`] (encoding-keyed dispatch). +pub type ArrowEncoderByEncodingRegistry = Registry; + +/// Registry for [`crate::arrow::ArrowEncoder`]s keyed by [`ExtId`] (extension-keyed dispatch). +pub type ArrowEncoderByExtensionRegistry = Registry; + +/// Registry for [`crate::arrow::ArrowDTypeConverter`]s keyed by [`ExtId`]. +pub type ArrowDTypeConverterRegistry = Registry; + +/// Session facet for Vortex ↔ Arrow conversion plugins. +/// +/// `ArrowSession` holds four kinds of plugin registries: +/// +/// - **encoder_by_encoding** — [`crate::arrow::ArrowEncoder`]s keyed by [`ArrayId`]. Consulted +/// first during forward (Vortex → Arrow) array conversion, before canonicalization. +/// - **encoder_by_extension** — [`crate::arrow::ArrowEncoder`]s keyed by [`ExtId`]. Consulted +/// by the canonical encoder when handling [`crate::dtype::DType::Extension`] arrays. +/// - **dtype_converters** — [`crate::arrow::ArrowDTypeConverter`]s keyed by [`ExtId`]. +/// Consulted when computing the Arrow schema for an extension dtype. +/// - **decoders** / **dtype_readers** — chains for the reverse (Arrow → Vortex) direction. +/// Walked in registration order with user-registered plugins before built-ins. +/// +/// The single `canonical_encoder` slot holds the fallback encoder that handles all canonical +/// Vortex encodings. It is set by the default initializer. +#[derive(Debug)] +pub struct ArrowSession { + encoder_by_encoding: ArrowEncoderByEncodingRegistry, + encoder_by_extension: ArrowEncoderByExtensionRegistry, + dtype_converters: ArrowDTypeConverterRegistry, + /// User-registered decoder chain. Walked before [`ArrowSession::default_decoder`]. + decoders: RwLock>, + /// User-registered dtype-reader chain. Walked before [`ArrowSession::default_dtype_reader`]. + dtype_readers: RwLock>, + canonical_encoder: RwLock>, + /// Fallback decoder used after the user chain has declined. + default_decoder: RwLock>, + /// Fallback dtype reader used after the user chain has declined. + default_dtype_reader: RwLock>, +} + +impl Default for ArrowSession { + fn default() -> Self { + let this = Self { + encoder_by_encoding: Registry::default(), + encoder_by_extension: Registry::default(), + dtype_converters: Registry::default(), + decoders: RwLock::new(Vec::new()), + dtype_readers: RwLock::new(Vec::new()), + canonical_encoder: RwLock::new(None), + default_decoder: RwLock::new(None), + default_dtype_reader: RwLock::new(None), + }; + this.set_canonical_encoder(Arc::new(CanonicalArrowEncoder) as ArrowEncoderRef); + this.set_default_decoder(Arc::new(CanonicalArrowDecoder) as ArrowDecoderRef); + this.set_default_dtype_reader(Arc::new(CanonicalArrowDTypeReader) as ArrowDTypeReaderRef); + // Built-in encoding-keyed encoders for non-canonical optimizations. + this.register_encoder_for_encoding( + VarBinArrowEncoder::array_id(), + Arc::new(VarBinArrowEncoder) as ArrowEncoderRef, + ); + this.register_encoder_for_encoding( + ListArrowEncoder::array_id(), + Arc::new(ListArrowEncoder) as ArrowEncoderRef, + ); + + // Built-in temporal extension plugins. + let temporal_encoder: ArrowEncoderRef = Arc::new(TemporalArrowEncoder); + let temporal_converter: ArrowDTypeConverterRef = Arc::new(TemporalArrowDTypeConverter); + for ext_id in [ + ExtVTable::id(&Date), + ExtVTable::id(&Time), + ExtVTable::id(&Timestamp), + ] { + this.register_encoder_for_extension(ext_id, Arc::clone(&temporal_encoder)); + this.register_dtype_converter(ext_id, Arc::clone(&temporal_converter)); + } + this + } +} + +impl SessionVar for ArrowSession { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl ArrowSession { + /// Register a forward array encoder keyed by encoding [`ArrayId`]. + pub fn register_encoder_for_encoding( + &self, + key: impl Into, + plugin: impl Into, + ) { + self.encoder_by_encoding.register(key, plugin); + } + + /// Register a forward array encoder keyed by extension [`ExtId`]. + pub fn register_encoder_for_extension( + &self, + key: impl Into, + plugin: impl Into, + ) { + self.encoder_by_extension.register(key, plugin); + } + + /// Register a dtype converter keyed by extension [`ExtId`]. + pub fn register_dtype_converter( + &self, + key: impl Into, + plugin: impl Into, + ) { + self.dtype_converters.register(key, plugin); + } + + /// Register a reverse-direction array decoder. Decoders are walked in registration order. + pub fn register_decoder(&self, plugin: impl Into) { + self.decoders.write().push(plugin.into()); + } + + /// Register a reverse-direction dtype reader. Readers are walked in registration order. + pub fn register_dtype_reader(&self, plugin: impl Into) { + self.dtype_readers.write().push(plugin.into()); + } + + /// Set the canonical (fallback) encoder slot. + /// + /// This single plugin must handle every canonical Vortex encoding. Callers replace any + /// previously-registered canonical encoder. + pub fn set_canonical_encoder(&self, plugin: impl Into) { + *self.canonical_encoder.write() = Some(plugin.into()); + } + + /// Set the fallback Arrow → Vortex array decoder. + /// + /// Replaces any previously-registered fallback. The fallback runs after the user chain + /// has declined. + pub fn set_default_decoder(&self, plugin: impl Into) { + *self.default_decoder.write() = Some(plugin.into()); + } + + /// Set the fallback Arrow → Vortex dtype reader. + pub fn set_default_dtype_reader(&self, plugin: impl Into) { + *self.default_dtype_reader.write() = Some(plugin.into()); + } + + /// Find a forward encoder for the given encoding [`ArrayId`]. + pub fn encoder_for_encoding(&self, id: &ArrayId) -> Option { + self.encoder_by_encoding.find(id) + } + + /// Find a forward encoder for the given extension [`ExtId`]. + pub fn encoder_for_extension(&self, id: &ExtId) -> Option { + self.encoder_by_extension.find(id) + } + + /// Find a dtype converter for the given extension [`ExtId`]. + pub fn dtype_converter_for(&self, id: &ExtId) -> Option { + self.dtype_converters.find(id) + } + + /// Snapshot the current decoder chain (in registration order). + pub fn decoders(&self) -> Vec { + self.decoders.read().clone() + } + + /// Snapshot the current dtype-reader chain (in registration order). + pub fn dtype_readers(&self) -> Vec { + self.dtype_readers.read().clone() + } + + /// The currently-registered canonical encoder, if any. + pub fn canonical_encoder(&self) -> Option { + self.canonical_encoder.read().clone() + } + + /// The currently-registered fallback decoder, if any. + pub fn default_decoder(&self) -> Option { + self.default_decoder.read().clone() + } + + /// The currently-registered fallback dtype reader, if any. + pub fn default_dtype_reader(&self) -> Option { + self.default_dtype_reader.read().clone() + } +} + +// --- Forward porcelain (Vortex → Arrow) --- + +impl ArrowSession { + /// Convert a Vortex [`DType`] into the Arrow [`DataType`] this session would emit. + pub fn to_arrow_data_type(&self, dtype: &DType) -> VortexResult { + if let DType::Extension(ext) = dtype + && let Some(converter) = self.dtype_converter_for(&ext.id()) + { + return converter.to_arrow_data_type(ext); + } + // Non-extension types and extensions without a converter fall back to the canonical + // dtype mapping. The shim retains historical hard-coded behavior for callers that + // don't go through the session. + dtype.to_arrow_dtype() + } + + /// Build an Arrow [`Field`] for `dtype` with the given column name. + pub fn to_arrow_field(&self, name: &str, dtype: &DType) -> VortexResult { + if let DType::Extension(ext) = dtype + && let Some(converter) = self.dtype_converter_for(&ext.id()) + { + return converter.to_arrow_field(ext, name); + } + Ok(Field::new( + name, + self.to_arrow_data_type(dtype)?, + dtype.is_nullable(), + )) + } + + /// Build an Arrow [`Schema`] for `struct_dtype`. + pub fn to_arrow_schema( + &self, + struct_dtype: &StructFields, + nullability: Nullability, + ) -> VortexResult { + // Defer to the existing top-level helper to preserve extension-metadata handling for + // Variant fields. The wrapper exists so callers can stop reaching for `DType` directly. + DType::Struct(struct_dtype.clone(), nullability).to_arrow_schema() + } + + /// Resolve the Arrow [`DataType`] this session would emit for `array` when no target is + /// specified. Walks the encoding-keyed encoder, then the extension-keyed encoder, and + /// finally the canonical encoder. + pub fn resolve_preferred_arrow_type(&self, array: &ArrayRef) -> VortexResult { + if let Some(plugin) = self.encoder_for_encoding(&array.encoding_id()) + && let Some(t) = plugin.preferred_arrow_type(array, self)? + { + return Ok(t); + } + if let DType::Extension(ext) = array.dtype() + && let Some(plugin) = self.encoder_for_extension(&ext.id()) + && let Some(t) = plugin.preferred_arrow_type(array, self)? + { + return Ok(t); + } + let canonical = self + .canonical_encoder() + .ok_or_else(|| vortex_err!("ArrowSession has no canonical encoder registered"))?; + canonical + .preferred_arrow_type(array, self)? + .ok_or_else(|| vortex_err!("canonical encoder produced no preferred Arrow type")) + } + + /// Convert a Vortex [`ArrayRef`] into an Arrow array. + /// + /// `target` selects the Arrow [`DataType`] to emit. Passing [`None`] uses the cheapest + /// representation the session can produce. + pub fn to_arrow_array( + &self, + array: ArrayRef, + target: Option<&DataType>, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let target_owned; + let target = match target { + Some(t) => t, + None => { + target_owned = self.resolve_preferred_arrow_type(&array)?; + &target_owned + } + }; + + if let Some(plugin) = self.encoder_for_encoding(&array.encoding_id()) + && let Some(out) = plugin.to_arrow_array(array.clone(), target, ctx)? + { + return Ok(out); + } + + if let DType::Extension(ext) = array.dtype() + && let Some(plugin) = self.encoder_for_extension(&ext.id()) + && let Some(out) = plugin.to_arrow_array(array.clone(), target, ctx)? + { + return Ok(out); + } + + let canonical = self + .canonical_encoder() + .ok_or_else(|| vortex_err!("ArrowSession has no canonical encoder registered"))?; + match canonical.to_arrow_array(array, target, ctx)? { + Some(out) => Ok(out), + None => vortex_bail!("canonical encoder declined Arrow target {target}"), + } + } + + /// Convert a Vortex [`ArrayRef`] into an Arrow [`RecordBatch`] matching `schema`. + pub fn to_arrow_record_batch( + &self, + array: ArrayRef, + schema: &Schema, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let target = DataType::Struct(schema.fields.clone()); + let arrow = self.to_arrow_array(array, Some(&target), ctx)?; + Ok(RecordBatch::from(arrow.as_struct())) + } +} + +// --- Reverse porcelain (Arrow → Vortex) --- + +impl ArrowSession { + /// Read a Vortex [`DType`] from an Arrow [`Field`]. + pub fn from_arrow_field(&self, field: &Field) -> VortexResult { + for reader in self.dtype_readers() { + if let Some(dtype) = reader.try_read_dtype(field)? { + return Ok(dtype); + } + } + if let Some(default) = self.default_dtype_reader() + && let Some(dtype) = default.try_read_dtype(field)? + { + return Ok(dtype); + } + vortex_bail!( + "no ArrowDTypeReader claimed Arrow field with type {}", + field.data_type() + ) + } + + /// Read a Vortex [`StructFields`] from an Arrow [`Schema`]. + pub fn from_arrow_schema(&self, schema: &Schema) -> VortexResult { + let mut entries = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + let dtype = self.from_arrow_field(field)?; + entries.push((crate::dtype::FieldName::from(field.name().as_str()), dtype)); + } + Ok(StructFields::from_iter(entries)) + } + + /// Convert an Arrow array into a Vortex [`ArrayRef`]. + pub fn from_arrow_array( + &self, + array: &dyn arrow_array::Array, + field: &Field, + session: &vortex_session::VortexSession, + ) -> VortexResult { + for decoder in self.decoders() { + if let Some(out) = decoder.try_decode(array, field, session)? { + return Ok(out); + } + } + if let Some(default) = self.default_decoder() + && let Some(out) = default.try_decode(array, field, session)? + { + return Ok(out); + } + vortex_bail!( + "no ArrowDecoder claimed Arrow array of type {}", + array.data_type() + ) + } +} + +/// Extension trait for accessing the [`ArrowSession`] facet. +pub trait ArrowSessionExt: SessionExt { + /// Get the Arrow session. + fn arrow(&self) -> Ref<'_, ArrowSession> { + self.get::() + } +} + +impl ArrowSessionExt for S {} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::Int32Array; + use arrow_array::cast::AsArray; + use arrow_array::types::Int32Type; + use arrow_schema::DataType; + use arrow_schema::Field; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use super::*; + use crate::IntoArray; + use crate::VortexSessionExecute; + use crate::arrays::PrimitiveArray; + use crate::dtype::PType; + use crate::session::ArraySession; + use crate::validity::Validity; + + fn test_session() -> VortexSession { + VortexSession::empty() + .with::() + .with::() + } + + #[test] + fn forward_canonical_primitive_roundtrips() -> VortexResult<()> { + let session = test_session(); + let array = PrimitiveArray::new(buffer![1i32, 2, 3], Validity::NonNullable).into_array(); + let mut ctx = session.create_execution_ctx(); + let arrow = session.arrow().to_arrow_array(array, None, &mut ctx)?; + assert_eq!(arrow.data_type(), &DataType::Int32); + let primitive = arrow.as_primitive::(); + assert_eq!(primitive.values().as_ref(), &[1, 2, 3]); + Ok(()) + } + + #[test] + fn reverse_canonical_primitive_roundtrips() -> VortexResult<()> { + let session = test_session(); + let arrow_array: Arc = Arc::new(Int32Array::from(vec![1, 2, 3])); + let field = Field::new("x", DataType::Int32, false); + let vortex = session + .arrow() + .from_arrow_array(arrow_array.as_ref(), &field, &session)?; + let primitive = vortex.as_::(); + assert_eq!(primitive.ptype(), PType::I32); + Ok(()) + } + + /// Custom decoder used to demonstrate that user-registered plugins run before the default + /// canonical decoder. + #[derive(Debug, Default)] + struct OverrideInt32Decoder; + + impl crate::arrow::ArrowDecoder for OverrideInt32Decoder { + fn try_decode( + &self, + array: &dyn arrow_array::Array, + _field: &Field, + _session: &VortexSession, + ) -> VortexResult> { + if matches!(array.data_type(), DataType::Int32) { + // Return all-zero array of the same length so the test can observe the override. + let len = array.len(); + let zeros: Vec = vec![0; len]; + Ok(Some( + PrimitiveArray::new( + vortex_buffer::Buffer::::from(zeros), + Validity::NonNullable, + ) + .into_array(), + )) + } else { + Ok(None) + } + } + } + + #[test] + fn user_registered_decoder_runs_before_default() -> VortexResult<()> { + let session = test_session(); + session + .arrow() + .register_decoder(Arc::new(OverrideInt32Decoder) as ArrowDecoderRef); + let arrow_array: Arc = Arc::new(Int32Array::from(vec![1, 2, 3])); + let field = Field::new("x", DataType::Int32, false); + let vortex = session + .arrow() + .from_arrow_array(arrow_array.as_ref(), &field, &session)?; + let primitive = vortex.as_::(); + // Override returned all-zero values, proving it ran instead of the canonical decoder. + assert_eq!(primitive.as_slice::(), &[0, 0, 0]); + Ok(()) + } + + #[test] + fn temporal_extension_dispatches_to_plugin() -> VortexResult<()> { + let session = test_session(); + let date_dtype = DType::Extension( + Date::new( + crate::extension::datetime::TimeUnit::Days, + Nullability::NonNullable, + ) + .erased(), + ); + let arrow_dt = session.arrow().to_arrow_data_type(&date_dtype)?; + assert_eq!(arrow_dt, DataType::Date32); + Ok(()) + } +} diff --git a/vortex-array/src/dtype/arrow.rs b/vortex-array/src/dtype/arrow.rs index 17af749cfc0..f303fa2829f 100644 --- a/vortex-array/src/dtype/arrow.rs +++ b/vortex-array/src/dtype/arrow.rs @@ -258,6 +258,11 @@ impl DType { } /// Returns the Arrow [`DataType`] that best corresponds to this Vortex [`DType`]. + /// + /// Prefer `session.arrow().to_arrow_data_type(dtype)` via + /// [`crate::arrow::ArrowSessionExt`], which dispatches through registered + /// [`crate::arrow::ArrowDTypeConverter`] plugins. This method retains the canonical + /// dtype mapping for callers without a session. pub fn to_arrow_dtype(&self) -> VortexResult { Ok(match self { DType::Null => DataType::Null, diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 0869eac3ff4..9d69f1b32ef 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -26,6 +26,7 @@ pub use vortex_array_macros::array_slots; use vortex_session::VortexSession; use vortex_session::registry::Context; +use crate::arrow::ArrowSession; use crate::session::ArraySession; pub mod accessor; @@ -79,7 +80,10 @@ pub mod flatbuffers { // TODO(ngates): canonicalize doesn't currently take a session, therefore we cannot invoke execute // from the new array encodings to support back-compat for legacy encodings. So we hold a session // here... -pub static LEGACY_SESSION: LazyLock = - LazyLock::new(|| VortexSession::empty().with::()); +pub static LEGACY_SESSION: LazyLock = LazyLock::new(|| { + VortexSession::empty() + .with::() + .with::() +}); pub type ArrayContext = Context; diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index ae803ee98ae..f55bbbb42d3 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -7,6 +7,7 @@ // vortex::compute is deprecated and will be ported over to expressions. pub use vortex_array::aggregate_fn; use vortex_array::aggregate_fn::session::AggregateFnSession; +use vortex_array::arrow::ArrowSession; pub use vortex_array::compute; use vortex_array::dtype::session::DTypeSession; // vortex::expr is in the process of having its dependencies inverted, and will eventually be @@ -165,6 +166,7 @@ impl VortexSessionDefault for VortexSession { let session = VortexSession::empty() .with::() .with::() + .with::() .with::() .with::() .with::() From fba88e0693b060ce1e57db70595690eea8b1bc4a Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 5 May 2026 14:11:00 -0400 Subject: [PATCH 2/2] switch to ArcSwap Signed-off-by: Andrew Duffy --- vortex-array/src/arrow/session.rs | 47 +++++++++++++++++-------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs index f56021c6594..42326df473e 100644 --- a/vortex-array/src/arrow/session.rs +++ b/vortex-array/src/arrow/session.rs @@ -6,13 +6,13 @@ use std::any::Any; use std::sync::Arc; +use arc_swap::ArcSwap; use arrow_array::ArrayRef as ArrowArrayRef; use arrow_array::RecordBatch; use arrow_array::cast::AsArray; use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Schema; -use parking_lot::RwLock; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; @@ -75,14 +75,14 @@ pub struct ArrowSession { encoder_by_extension: ArrowEncoderByExtensionRegistry, dtype_converters: ArrowDTypeConverterRegistry, /// User-registered decoder chain. Walked before [`ArrowSession::default_decoder`]. - decoders: RwLock>, + decoders: ArcSwap>, /// User-registered dtype-reader chain. Walked before [`ArrowSession::default_dtype_reader`]. - dtype_readers: RwLock>, - canonical_encoder: RwLock>, + dtype_readers: ArcSwap>, + canonical_encoder: ArcSwap>, /// Fallback decoder used after the user chain has declined. - default_decoder: RwLock>, + default_decoder: ArcSwap>, /// Fallback dtype reader used after the user chain has declined. - default_dtype_reader: RwLock>, + default_dtype_reader: ArcSwap>, } impl Default for ArrowSession { @@ -91,11 +91,11 @@ impl Default for ArrowSession { encoder_by_encoding: Registry::default(), encoder_by_extension: Registry::default(), dtype_converters: Registry::default(), - decoders: RwLock::new(Vec::new()), - dtype_readers: RwLock::new(Vec::new()), - canonical_encoder: RwLock::new(None), - default_decoder: RwLock::new(None), - default_dtype_reader: RwLock::new(None), + decoders: ArcSwap::from_pointee(Vec::new()), + dtype_readers: ArcSwap::from_pointee(Vec::new()), + canonical_encoder: ArcSwap::from_pointee(None), + default_decoder: ArcSwap::from_pointee(None), + default_dtype_reader: ArcSwap::from_pointee(None), }; this.set_canonical_encoder(Arc::new(CanonicalArrowEncoder) as ArrowEncoderRef); this.set_default_decoder(Arc::new(CanonicalArrowDecoder) as ArrowDecoderRef); @@ -165,12 +165,16 @@ impl ArrowSession { /// Register a reverse-direction array decoder. Decoders are walked in registration order. pub fn register_decoder(&self, plugin: impl Into) { - self.decoders.write().push(plugin.into()); + let mut next = (**self.decoders.load()).clone(); + next.push(plugin.into()); + self.decoders.store(Arc::new(next)); } /// Register a reverse-direction dtype reader. Readers are walked in registration order. pub fn register_dtype_reader(&self, plugin: impl Into) { - self.dtype_readers.write().push(plugin.into()); + let mut next = (**self.dtype_readers.load()).clone(); + next.push(plugin.into()); + self.dtype_readers.store(Arc::new(next)); } /// Set the canonical (fallback) encoder slot. @@ -178,7 +182,7 @@ impl ArrowSession { /// This single plugin must handle every canonical Vortex encoding. Callers replace any /// previously-registered canonical encoder. pub fn set_canonical_encoder(&self, plugin: impl Into) { - *self.canonical_encoder.write() = Some(plugin.into()); + self.canonical_encoder.store(Arc::new(Some(plugin.into()))); } /// Set the fallback Arrow → Vortex array decoder. @@ -186,12 +190,13 @@ impl ArrowSession { /// Replaces any previously-registered fallback. The fallback runs after the user chain /// has declined. pub fn set_default_decoder(&self, plugin: impl Into) { - *self.default_decoder.write() = Some(plugin.into()); + self.default_decoder.store(Arc::new(Some(plugin.into()))); } /// Set the fallback Arrow → Vortex dtype reader. pub fn set_default_dtype_reader(&self, plugin: impl Into) { - *self.default_dtype_reader.write() = Some(plugin.into()); + self.default_dtype_reader + .store(Arc::new(Some(plugin.into()))); } /// Find a forward encoder for the given encoding [`ArrayId`]. @@ -211,27 +216,27 @@ impl ArrowSession { /// Snapshot the current decoder chain (in registration order). pub fn decoders(&self) -> Vec { - self.decoders.read().clone() + (**self.decoders.load()).clone() } /// Snapshot the current dtype-reader chain (in registration order). pub fn dtype_readers(&self) -> Vec { - self.dtype_readers.read().clone() + (**self.dtype_readers.load()).clone() } /// The currently-registered canonical encoder, if any. pub fn canonical_encoder(&self) -> Option { - self.canonical_encoder.read().clone() + (**self.canonical_encoder.load()).clone() } /// The currently-registered fallback decoder, if any. pub fn default_decoder(&self) -> Option { - self.default_decoder.read().clone() + (**self.default_decoder.load()).clone() } /// The currently-registered fallback dtype reader, if any. pub fn default_dtype_reader(&self) -> Option { - self.default_dtype_reader.read().clone() + (**self.default_dtype_reader.load()).clone() } }