-
Notifications
You must be signed in to change notification settings - Fork 150
pluggable arrow exec #7793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pluggable arrow exec #7793
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| //! [`CanonicalArrowEncoder`] — fallback Vortex → Arrow encoder for canonical encodings. | ||
| //! | ||
| //! The canonical encoder handles every `DataType` that maps directly to a canonical Vortex | ||
| //! encoding (`Bool`, `Primitive`, `VarBinView`, `ListView`, `Struct`, `FixedSizeList`, | ||
| //! `Decimal`, `Extension`). For now it also handles a few non-canonical optimizations | ||
| //! (offset-based byte/list arrays); those are slated to move into encoding-keyed | ||
| //! [`ArrowEncoder`](super::ArrowEncoder) plugins in subsequent work. | ||
|
|
||
| use arrow_array::ArrayRef as ArrowArrayRef; | ||
| use arrow_array::types::*; | ||
| use arrow_schema::DataType; | ||
| use vortex_error::VortexResult; | ||
| use vortex_error::vortex_bail; | ||
| use vortex_error::vortex_ensure; | ||
|
|
||
| use crate::ArrayRef; | ||
| use crate::ExecutionCtx; | ||
| use crate::arrow::ArrowEncoder; | ||
| use crate::arrow::ArrowSession; | ||
| use crate::arrow::executor::bool::to_arrow_bool; | ||
| use crate::arrow::executor::byte::to_arrow_byte_array; | ||
| use crate::arrow::executor::byte_view::to_arrow_byte_view; | ||
| use crate::arrow::executor::decimal::to_arrow_decimal; | ||
| use crate::arrow::executor::dictionary::to_arrow_dictionary; | ||
| use crate::arrow::executor::fixed_size_list::to_arrow_fixed_list; | ||
| use crate::arrow::executor::list::to_arrow_list; | ||
| use crate::arrow::executor::list_view::to_arrow_list_view; | ||
| use crate::arrow::executor::null::to_arrow_null; | ||
| use crate::arrow::executor::primitive::to_arrow_primitive; | ||
| use crate::arrow::executor::run_end::to_arrow_run_end; | ||
| use crate::arrow::executor::struct_::to_arrow_struct; | ||
| use crate::arrow::executor::temporal::to_arrow_temporal; | ||
| use crate::dtype::DType; | ||
|
|
||
| /// Returns the canonical Arrow [`DataType`] for a Vortex [`DType`]. | ||
| /// | ||
| /// This mirrors [`DType::to_arrow_dtype`] but is kept separate so encoders can use it without | ||
| /// touching the deprecated dtype shim. | ||
| pub fn canonical_arrow_type_for_dtype(dtype: &DType) -> VortexResult<DataType> { | ||
| dtype.to_arrow_dtype() | ||
| } | ||
|
|
||
| /// The default canonical Vortex → Arrow encoder. Registered automatically in | ||
| /// [`crate::arrow::ArrowSession::default`]. | ||
| #[derive(Debug, Default)] | ||
| pub struct CanonicalArrowEncoder; | ||
|
|
||
| impl ArrowEncoder for CanonicalArrowEncoder { | ||
| fn preferred_arrow_type( | ||
| &self, | ||
| array: &ArrayRef, | ||
| _session: &ArrowSession, | ||
| ) -> VortexResult<Option<DataType>> { | ||
| // The canonical encoder mirrors `DType::to_arrow_dtype()`. Encoding-specific | ||
| // shortcuts (e.g. VarBin → Utf8 instead of Utf8View) live on their own plugins. | ||
| Ok(Some(canonical_arrow_type_for_dtype(array.dtype())?)) | ||
| } | ||
|
|
||
| fn to_arrow_array( | ||
| &self, | ||
| array: ArrayRef, | ||
| target: &DataType, | ||
| ctx: &mut ExecutionCtx, | ||
| ) -> VortexResult<Option<ArrowArrayRef>> { | ||
| let len = array.len(); | ||
| let arrow = match target { | ||
| DataType::Null => to_arrow_null(array, ctx), | ||
| DataType::Boolean => to_arrow_bool(array, ctx), | ||
| DataType::Int8 => to_arrow_primitive::<Int8Type>(array, ctx), | ||
| DataType::Int16 => to_arrow_primitive::<Int16Type>(array, ctx), | ||
| DataType::Int32 => to_arrow_primitive::<Int32Type>(array, ctx), | ||
| DataType::Int64 => to_arrow_primitive::<Int64Type>(array, ctx), | ||
| DataType::UInt8 => to_arrow_primitive::<UInt8Type>(array, ctx), | ||
| DataType::UInt16 => to_arrow_primitive::<UInt16Type>(array, ctx), | ||
| DataType::UInt32 => to_arrow_primitive::<UInt32Type>(array, ctx), | ||
| DataType::UInt64 => to_arrow_primitive::<UInt64Type>(array, ctx), | ||
| DataType::Float16 => to_arrow_primitive::<Float16Type>(array, ctx), | ||
| DataType::Float32 => to_arrow_primitive::<Float32Type>(array, ctx), | ||
| DataType::Float64 => to_arrow_primitive::<Float64Type>(array, ctx), | ||
| DataType::Timestamp(..) | ||
| | DataType::Date32 | ||
| | DataType::Date64 | ||
| | DataType::Time32(_) | ||
| | DataType::Time64(_) => to_arrow_temporal(array, target, ctx), | ||
| DataType::Binary => to_arrow_byte_array::<BinaryType>(array, ctx), | ||
| DataType::LargeBinary => to_arrow_byte_array::<LargeBinaryType>(array, ctx), | ||
| DataType::Utf8 => to_arrow_byte_array::<Utf8Type>(array, ctx), | ||
| DataType::LargeUtf8 => to_arrow_byte_array::<LargeUtf8Type>(array, ctx), | ||
| DataType::BinaryView => to_arrow_byte_view::<BinaryViewType>(array, ctx), | ||
| DataType::Utf8View => to_arrow_byte_view::<StringViewType>(array, ctx), | ||
| DataType::List(elements_field) => to_arrow_list::<i32>(array, elements_field, ctx), | ||
| DataType::LargeList(elements_field) => to_arrow_list::<i64>(array, elements_field, ctx), | ||
| DataType::FixedSizeList(elements_field, list_size) => { | ||
| to_arrow_fixed_list(array, *list_size, elements_field, ctx) | ||
| } | ||
| DataType::ListView(elements_field) => { | ||
| to_arrow_list_view::<i32>(array, elements_field, ctx) | ||
| } | ||
| DataType::LargeListView(elements_field) => { | ||
| to_arrow_list_view::<i64>(array, elements_field, ctx) | ||
| } | ||
| DataType::Struct(fields) => to_arrow_struct(array, Some(fields), ctx), | ||
| DataType::Dictionary(codes_type, values_type) => { | ||
| to_arrow_dictionary(array, codes_type, values_type, ctx) | ||
| } | ||
| dt @ (DataType::Decimal32(..) | ||
| | DataType::Decimal64(..) | ||
| | DataType::Decimal128(..) | ||
| | DataType::Decimal256(..)) => to_arrow_decimal(array, dt, ctx), | ||
| DataType::RunEndEncoded(ends_type, values_type) => { | ||
| to_arrow_run_end(array, ends_type.data_type(), values_type, ctx) | ||
| } | ||
| DataType::FixedSizeBinary(_) | ||
| | DataType::Map(..) | ||
| | DataType::Duration(_) | ||
| | DataType::Interval(_) | ||
| | DataType::Union(..) => { | ||
| vortex_bail!("Conversion to Arrow type {target} is not supported"); | ||
| } | ||
| }?; | ||
|
|
||
| vortex_ensure!( | ||
| arrow.len() == len, | ||
| "Arrow array length does not match Vortex array length after conversion to {:?}", | ||
| arrow | ||
| ); | ||
| Ok(Some(arrow)) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| //! [`ArrowDecoder`] — pluggable Arrow → Vortex array conversion. | ||
|
|
||
| use std::fmt::Debug; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow_array::Array as ArrowArray; | ||
| use arrow_schema::Field; | ||
| use vortex_error::VortexResult; | ||
| use vortex_session::VortexSession; | ||
|
|
||
| use crate::ArrayRef; | ||
|
|
||
| /// Reference-counted pointer to an [`ArrowDecoder`]. | ||
| pub type ArrowDecoderRef = Arc<dyn ArrowDecoder>; | ||
|
|
||
| /// Plugin trait that converts an Arrow array into a Vortex [`ArrayRef`]. | ||
| /// | ||
| /// Decoders are registered as a chain on [`crate::arrow::ArrowSession`] and walked in | ||
| /// registration order, with user-registered decoders running before the built-in canonical | ||
| /// decoders so external crates can override built-in behavior. | ||
| /// | ||
| /// Returning [`Ok(None)`] passes the request to the next decoder in the chain. Returning | ||
| /// [`Ok(Some(_))`] short-circuits the chain. The dispatcher hard-fails if no decoder claims | ||
| /// the request. | ||
| pub trait ArrowDecoder: 'static + Send + Sync + Debug { | ||
| /// Try to decode `array` into a Vortex [`ArrayRef`]. | ||
| /// | ||
| /// `field` carries the Arrow extension-name metadata (`ARROW:extension:name`) that lets | ||
| /// extension-aware decoders dispatch on logical type rather than physical [`arrow_schema::DataType`]. | ||
| /// | ||
| /// `session` is the active [`VortexSession`], available so decoders can recurse into child | ||
| /// arrays via [`crate::arrow::ArrowSession`] porcelain. | ||
| fn try_decode( | ||
| &self, | ||
| array: &dyn ArrowArray, | ||
| field: &Field, | ||
| session: &VortexSession, | ||
| ) -> VortexResult<Option<ArrayRef>>; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| //! Canonical [`ArrowDecoder`] / [`ArrowDTypeReader`] — the default fallback for the reverse | ||
| //! (Arrow → Vortex) direction. | ||
| //! | ||
| //! These plugins should always be the **last** entries in their respective chains: they accept | ||
| //! every Arrow type the legacy [`FromArrowArray`] / [`FromArrowType`] implementations support, | ||
| //! so anything earlier in the chain that wants to override their behavior must run first. | ||
|
|
||
| use arrow_array::Array as ArrowArray; | ||
| use arrow_schema::Field; | ||
| use vortex_error::VortexResult; | ||
| use vortex_session::VortexSession; | ||
|
|
||
| use crate::ArrayRef; | ||
| use crate::arrow::ArrowDTypeReader; | ||
| use crate::arrow::ArrowDecoder; | ||
| use crate::arrow::FromArrowArray; | ||
| use crate::dtype::DType; | ||
| use crate::dtype::arrow::FromArrowType; | ||
|
|
||
| /// Default [`ArrowDecoder`] that delegates to the legacy | ||
| /// [`FromArrowArray`](crate::arrow::FromArrowArray) implementation. | ||
| #[derive(Debug, Default)] | ||
| pub struct CanonicalArrowDecoder; | ||
|
|
||
| impl ArrowDecoder for CanonicalArrowDecoder { | ||
| fn try_decode( | ||
| &self, | ||
| array: &dyn ArrowArray, | ||
| field: &Field, | ||
| _session: &VortexSession, | ||
| ) -> VortexResult<Option<ArrayRef>> { | ||
| Ok(Some(ArrayRef::from_arrow(array, field.is_nullable())?)) | ||
| } | ||
| } | ||
|
|
||
| /// Default [`ArrowDTypeReader`] that delegates to the legacy | ||
| /// [`FromArrowType`](crate::dtype::arrow::FromArrowType) implementation. Matches every | ||
| /// Arrow [`Field`]. | ||
| #[derive(Debug, Default)] | ||
| pub struct CanonicalArrowDTypeReader; | ||
|
|
||
| impl ArrowDTypeReader for CanonicalArrowDTypeReader { | ||
| fn try_read_dtype(&self, field: &Field) -> VortexResult<Option<DType>> { | ||
| Ok(Some(DType::from_arrow(field))) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| //! Built-in [`ArrowDecoder`](super::ArrowDecoder) and [`ArrowDTypeReader`](super::ArrowDTypeReader) | ||
| //! implementations. | ||
|
|
||
| pub mod canonical; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| //! [`ArrowDTypeConverter`] and [`ArrowDTypeReader`] — pluggable DType ↔ Arrow conversion. | ||
|
|
||
| use std::fmt::Debug; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow_schema::DataType; | ||
| use arrow_schema::Field; | ||
| use vortex_error::VortexResult; | ||
|
|
||
| use crate::dtype::DType; | ||
| use crate::dtype::extension::ExtDTypeRef; | ||
|
|
||
| /// Reference-counted pointer to an [`ArrowDTypeConverter`]. | ||
| pub type ArrowDTypeConverterRef = Arc<dyn ArrowDTypeConverter>; | ||
|
|
||
| /// Reference-counted pointer to an [`ArrowDTypeReader`]. | ||
| pub type ArrowDTypeReaderRef = Arc<dyn ArrowDTypeReader>; | ||
|
|
||
| /// Plugin trait that converts a Vortex extension [`crate::dtype::DType`] into an Arrow | ||
| /// [`DataType`]. | ||
| /// | ||
| /// Converters are registered against an [`crate::dtype::extension::ExtId`] on the | ||
| /// [`crate::arrow::ArrowSession`]. Each extension type that wants Arrow representation must | ||
| /// register a converter; the dispatcher hard-fails on unknown extensions. | ||
| /// | ||
| /// Implementations may also produce a full [`Field`] — useful when the Arrow representation | ||
| /// needs `ARROW:extension:name` metadata (for example `arrow.parquet.variant`). | ||
| pub trait ArrowDTypeConverter: 'static + Send + Sync + Debug { | ||
| /// Convert the extension dtype to its Arrow [`DataType`]. | ||
| fn to_arrow_data_type(&self, ext: &ExtDTypeRef) -> VortexResult<DataType>; | ||
|
|
||
| /// Convert the extension dtype to a fully-decorated Arrow [`Field`]. | ||
| /// | ||
| /// The default implementation builds a `Field` from `name`, the result of | ||
| /// [`ArrowDTypeConverter::to_arrow_data_type`], and `ext.is_nullable()`. Override when the | ||
| /// extension type needs Arrow extension metadata on the field. | ||
| fn to_arrow_field(&self, ext: &ExtDTypeRef, name: &str) -> VortexResult<Field> { | ||
| Ok(Field::new( | ||
| name, | ||
| self.to_arrow_data_type(ext)?, | ||
| ext.is_nullable(), | ||
| )) | ||
| } | ||
| } | ||
|
|
||
| /// Plugin trait that converts an Arrow [`Field`] into a Vortex [`DType`]. | ||
| /// | ||
| /// Readers are registered as a chain on [`crate::arrow::ArrowSession`] and walked in | ||
| /// registration order, with user-registered readers running before the built-in readers so | ||
| /// external crates can override built-in behavior. | ||
| /// | ||
| /// Returning [`Ok(None)`] passes the request to the next reader in the chain. | ||
| pub trait ArrowDTypeReader: 'static + Send + Sync + Debug { | ||
| /// Try to read a Vortex [`DType`] from an Arrow [`Field`]. | ||
| /// | ||
| /// Implementations typically inspect [`Field::metadata`] for the `ARROW:extension:name` | ||
| /// key and dispatch on it. | ||
| fn try_read_dtype(&self, field: &Field) -> VortexResult<Option<DType>>; | ||
| } | ||
|
Comment on lines
+55
to
+62
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why have both fields and data_types? I guess nullability?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea both is kinda funny. i think if you want to cover extension dtypes then you need the Field b/c that has metadata. and also nullability yea. |
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,55 @@ | ||||||
| // SPDX-License-Identifier: Apache-2.0 | ||||||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||||||
|
|
||||||
| //! [`ArrowEncoder`] — pluggable Vortex → Arrow array conversion. | ||||||
|
|
||||||
| use std::fmt::Debug; | ||||||
| use std::sync::Arc; | ||||||
|
|
||||||
| use arrow_array::ArrayRef as ArrowArrayRef; | ||||||
| use arrow_schema::DataType; | ||||||
| use vortex_error::VortexResult; | ||||||
|
|
||||||
| use crate::ArrayRef; | ||||||
| use crate::ExecutionCtx; | ||||||
| use crate::arrow::ArrowSession; | ||||||
|
|
||||||
| /// Reference-counted pointer to an [`ArrowEncoder`]. | ||||||
| pub type ArrowEncoderRef = Arc<dyn ArrowEncoder>; | ||||||
|
|
||||||
| /// Plugin trait that converts a Vortex [`ArrayRef`] into an Arrow array. | ||||||
| /// | ||||||
| /// Encoders are registered against an [`crate::array::ArrayId`] (encoding-keyed) or an | ||||||
| /// [`crate::dtype::extension::ExtId`] (extension-keyed) on the [`crate::arrow::ArrowSession`]. | ||||||
| /// Returning [`None`] from [`ArrowEncoder::to_arrow_array`] tells the dispatcher to fall through | ||||||
| /// to the canonical encoder. This is the only way an encoder can decline a request. | ||||||
| pub trait ArrowEncoder: 'static + Send + Sync + Debug { | ||||||
| /// Returns the Arrow [`DataType`] this encoder would prefer to emit for `array`. | ||||||
| /// | ||||||
| /// `session` is provided so encoders for nested types (e.g. | ||||||
| /// [`crate::arrays::List`]) can recursively resolve the preferred Arrow type of their | ||||||
| /// children via [`ArrowSession::resolve_preferred_arrow_type`]. | ||||||
| /// | ||||||
| /// Returning [`None`] defers to the canonical encoder's preference (e.g. | ||||||
| /// [`DataType::Utf8View`] for [`crate::dtype::DType::Utf8`]). Implementations should only | ||||||
| /// override the canonical preference when they can produce a cheaper Arrow representation | ||||||
| /// (for example, [`crate::arrays::VarBin`] preferring offset-based [`DataType::Utf8`] over | ||||||
| /// [`DataType::Utf8View`]). | ||||||
| fn preferred_arrow_type( | ||||||
| &self, | ||||||
| array: &ArrayRef, | ||||||
| session: &ArrowSession, | ||||||
| ) -> VortexResult<Option<DataType>>; | ||||||
|
Comment on lines
+38
to
+42
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. two passes might be expensive? |
||||||
|
|
||||||
| /// Convert `array` into an Arrow array of type `target`. | ||||||
| /// | ||||||
| /// Returning [`Ok(None)`] tells the dispatcher to canonicalize the array and re-dispatch | ||||||
| /// through the canonical encoder. Encoders may decline requests they don't recognize but | ||||||
| /// must not silently mis-convert. | ||||||
| fn to_arrow_array( | ||||||
| &self, | ||||||
| array: ArrayRef, | ||||||
| target: &DataType, | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we want this to be an optional so if we don't mind we can return encoding specific values
Suggested change
Sorry if I missed a comment regarding this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the idea was that you must pass in an explicit target physical type, whether that's derived from user or by calling |
||||||
| ctx: &mut ExecutionCtx, | ||||||
| ) -> VortexResult<Option<ArrowArrayRef>>; | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to make sure this doens't regress any benchmarks