From 9e7753575711692857ba01b29d9527d1e096c734 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Tue, 24 Feb 2026 16:53:32 -0600 Subject: [PATCH] feat: Add FFI_TableProviderFactory support This wraps the new FFI_TableProviderFactory APIs in datafusion-ffi. --- .../tests/_test_table_provider_factory.py | 43 +++++++++ examples/datafusion-ffi-example/src/lib.rs | 3 + .../src/table_provider_factory.rs | 87 +++++++++++++++++++ python/datafusion/catalog.py | 9 ++ python/datafusion/context.py | 15 ++++ src/context.rs | 31 ++++++- 6 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 examples/datafusion-ffi-example/python/tests/_test_table_provider_factory.py create mode 100644 examples/datafusion-ffi-example/src/table_provider_factory.rs diff --git a/examples/datafusion-ffi-example/python/tests/_test_table_provider_factory.py b/examples/datafusion-ffi-example/python/tests/_test_table_provider_factory.py new file mode 100644 index 000000000..a9a219d64 --- /dev/null +++ b/examples/datafusion-ffi-example/python/tests/_test_table_provider_factory.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pyarrow as pa +import pytest +from datafusion import SessionContext +from datafusion_ffi_example import MyTableProviderFactory + + +def test_table_provider_factory_ffi() -> None: + ctx = SessionContext() + table = MyTableProviderFactory() + + ctx.register_table_factory("MY_FORMAT", table) + + # Create a new external table + ctx.sql(""" + CREATE EXTERNAL TABLE + foo + STORED AS my_format + LOCATION ''; + """) + + # Query the pre-populated table + result = ctx.sql("SELECT * FROM foo;").collect() + assert len(result) == 2 + assert result[0].num_columns == 2 diff --git a/examples/datafusion-ffi-example/src/lib.rs b/examples/datafusion-ffi-example/src/lib.rs index 23f2001a2..405cc0a46 100644 --- a/examples/datafusion-ffi-example/src/lib.rs +++ b/examples/datafusion-ffi-example/src/lib.rs @@ -22,6 +22,7 @@ use crate::catalog_provider::{FixedSchemaProvider, MyCatalogProvider, MyCatalogP use crate::scalar_udf::IsNullUDF; use crate::table_function::MyTableFunction; use crate::table_provider::MyTableProvider; +use crate::table_provider_factory::MyTableProviderFactory; use crate::window_udf::MyRankUDF; pub(crate) mod aggregate_udf; @@ -29,6 +30,7 @@ pub(crate) mod catalog_provider; pub(crate) mod scalar_udf; pub(crate) mod table_function; pub(crate) mod table_provider; +pub(crate) mod table_provider_factory; pub(crate) mod utils; pub(crate) mod window_udf; @@ -37,6 +39,7 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/examples/datafusion-ffi-example/src/table_provider_factory.rs b/examples/datafusion-ffi-example/src/table_provider_factory.rs new file mode 100644 index 000000000..1e139e919 --- /dev/null +++ b/examples/datafusion-ffi-example/src/table_provider_factory.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_catalog::{Session, TableProvider, TableProviderFactory}; +use datafusion_common::error::Result as DataFusionResult; +use datafusion_expr::CreateExternalTable; +use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory; +use pyo3::types::PyCapsule; +use pyo3::{Bound, PyAny, PyResult, Python, pyclass, pymethods}; + +use crate::catalog_provider; +use crate::utils::ffi_logical_codec_from_pycapsule; + +#[derive(Debug)] +pub(crate) struct ExampleTableProviderFactory {} + +impl ExampleTableProviderFactory { + fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl TableProviderFactory for ExampleTableProviderFactory { + async fn create( + &self, + _state: &dyn Session, + _cmd: &CreateExternalTable, + ) -> DataFusionResult> { + Ok(catalog_provider::my_table()) + } +} + +#[pyclass( + name = "MyTableProviderFactory", + module = "datafusion_ffi_example", + subclass +)] +#[derive(Debug)] +pub struct MyTableProviderFactory { + inner: Arc, +} + +impl Default for MyTableProviderFactory { + fn default() -> Self { + let inner = Arc::new(ExampleTableProviderFactory::new()); + Self { inner } + } +} + +#[pymethods] +impl MyTableProviderFactory { + #[new] + pub fn new() -> Self { + Self::default() + } + + pub fn __datafusion_table_provider_factory__<'py>( + &self, + py: Python<'py>, + codec: Bound, + ) -> PyResult> { + let name = cr"datafusion_table_provider_factory".into(); + let codec = ffi_logical_codec_from_pycapsule(codec)?; + let factory = Arc::clone(&self.inner) as Arc; + let factory = FFI_TableProviderFactory::new_with_ffi_codec(factory, None, codec); + + PyCapsule::new(py, factory, Some(name)) + } +} diff --git a/python/datafusion/catalog.py b/python/datafusion/catalog.py index bc43cf349..1826e5517 100644 --- a/python/datafusion/catalog.py +++ b/python/datafusion/catalog.py @@ -243,6 +243,15 @@ def kind(self) -> str: return self._inner.kind +class TableProviderFactoryExportable(Protocol): + """Type hint for object that has __datafusion_table_provider_factory__ PyCapsule. + + https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProviderFactory.html + """ + + def __datafusion_table_provider_factory__(self, session: Any) -> object: ... + + class CatalogProviderList(ABC): """Abstract class for defining a Python based Catalog Provider List.""" diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 0d8259774..b765fcae4 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -37,6 +37,7 @@ CatalogProviderExportable, CatalogProviderList, CatalogProviderListExportable, + TableProviderFactoryExportable, ) from datafusion.dataframe import DataFrame from datafusion.expr import sort_list_to_raw_sort_list @@ -830,6 +831,20 @@ def deregister_table(self, name: str) -> None: """Remove a table from the session.""" self.ctx.deregister_table(name) + def register_table_factory( + self, format: str, factory: TableProviderFactoryExportable + ) -> None: + """Register a :py:class:`~datafusion.TableProviderFactoryExportable`. + + The registered factory can be reference from SQL DDL statements executed + against this context. + + Args: + format: The value to be used in `STORED AS ${format}` clause. + factory: A PyCapsule that implements TableProviderFactoryExportable" + """ + self.ctx.register_table_factory(format, factory) + def catalog_names(self) -> set[str]: """Returns the list of catalogs in this context.""" return self.ctx.catalog_names() diff --git a/src/context.rs b/src/context.rs index 2eaf5a737..6083d01d6 100644 --- a/src/context.rs +++ b/src/context.rs @@ -27,7 +27,7 @@ use arrow::pyarrow::FromPyArrow; use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::arrow::pyarrow::PyArrowType; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::catalog::{CatalogProvider, CatalogProviderList}; +use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory}; use datafusion::common::{ScalarValue, TableReference, exec_err}; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -51,6 +51,7 @@ use datafusion_ffi::catalog_provider::FFI_CatalogProvider; use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList; use datafusion_ffi::execution::FFI_TaskContextProvider; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory; use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec; use object_store::ObjectStore; use pyo3::IntoPyObjectExt; @@ -659,6 +660,34 @@ impl PySessionContext { Ok(()) } + pub fn register_table_factory( + &self, + format: &str, + factory: Bound<'_, PyAny>, + ) -> PyDataFusionResult<()> { + let py = factory.py(); + let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?; + + let capsule = factory + .getattr("__datafusion_table_provider_factory__")? + .call1((codec_capsule,))?; + let capsule = capsule.cast::().map_err(py_datafusion_err)?; + validate_pycapsule(capsule, "datafusion_table_provider_factory")?; + + let factory: NonNull = capsule + .pointer_checked(Some(c_str!("datafusion_table_provider_factory")))? + .cast(); + let factory = unsafe { factory.as_ref() }; + let factory: Arc = factory.into(); + + let st = self.ctx.state_ref(); + let mut lock = st.write(); + lock.table_factories_mut() + .insert(format.to_owned(), factory); + + Ok(()) + } + pub fn register_catalog_provider_list( &self, mut provider: Bound,