Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,18 @@ impl PySessionContext {
self.ctx.remove_optimizer_rule(name)
}

pub fn add_optimizer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> {
let adapter = crate::optimizer_rules::build_optimizer_rule(rule)?;
self.ctx.add_optimizer_rule(adapter);
Ok(())
}

pub fn add_analyzer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> {
let adapter = crate::optimizer_rules::build_analyzer_rule(rule)?;
self.ctx.add_analyzer_rule(adapter);
Ok(())
}

pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
let provider = wait_for_future(py, self.ctx.table_provider(name))
// Outer error: runtime/async failure
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub mod expr;
#[allow(clippy::borrow_deref_ref)]
mod functions;
pub mod metrics;
pub mod optimizer_rules;
mod options;
pub mod physical_plan;
mod pyarrow_filter_expression;
Expand Down
168 changes: 168 additions & 0 deletions crates/core/src/optimizer_rules.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Bridges between user-provided Python rule classes and the upstream
//! [`OptimizerRule`] / [`AnalyzerRule`] traits.
//!
//! The Python side defines abstract base classes ``OptimizerRule`` and
//! ``AnalyzerRule`` with ``name()`` plus, respectively, ``rewrite(plan)``
//! and ``analyze(plan)``. Instances are wrapped in
//! [`PyOptimizerRuleAdapter`] / [`PyAnalyzerRuleAdapter`] before being
//! handed to [`SessionContext::add_optimizer_rule`] /
//! [`SessionContext::add_analyzer_rule`].
//!
//! `rewrite` may return ``None`` to signal "no transformation" — the
//! adapter maps that to [`Transformed::no`]. Any returned
//! :class:`LogicalPlan` becomes [`Transformed::yes`]. `analyze` is
//! mandatory-rewrite (must return a plan); returning ``None`` is an
//! error.
//!
//! The upstream ``&dyn OptimizerConfig`` / ``&ConfigOptions`` arguments
//! are not surfaced to Python in this MVP. Rules that need configuration
//! access should be implemented in Rust today; Python rules read state
//! from the plan and from any captured ``SessionContext`` they were
//! constructed with.

use std::fmt;
use std::sync::Arc;

use datafusion::common::config::ConfigOptions;
use datafusion::common::tree_node::Transformed;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::analyzer::AnalyzerRule;
use datafusion::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use pyo3::prelude::*;

use crate::errors::to_datafusion_err;
use crate::sql::logical::PyLogicalPlan;

/// Wraps a Python ``OptimizerRule`` instance so that it can be registered
/// with the upstream optimizer pipeline.
pub struct PyOptimizerRuleAdapter {
rule: Py<PyAny>,
name: String,
}

impl PyOptimizerRuleAdapter {
pub fn new(rule: Bound<'_, PyAny>) -> PyResult<Self> {
let name = rule.call_method0("name")?.extract::<String>()?;
Ok(Self {
rule: rule.unbind(),
name,
})
}
}

impl fmt::Debug for PyOptimizerRuleAdapter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PyOptimizerRuleAdapter")
.field("name", &self.name)
.finish()
}
}

impl OptimizerRule for PyOptimizerRuleAdapter {
fn name(&self) -> &str {
&self.name
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> DataFusionResult<Transformed<LogicalPlan>> {
Python::attach(|py| {
let py_plan = PyLogicalPlan::from(plan.clone());
let result = self
.rule
.bind(py)
.call_method1("rewrite", (py_plan,))
.map_err(to_datafusion_err)?;
if result.is_none() {
return Ok(Transformed::no(plan));
}
let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?;
Ok(Transformed::yes(LogicalPlan::from(rewritten)))
})
}
}

/// Wraps a Python ``AnalyzerRule`` instance so that it can be registered
/// with the upstream analyzer pipeline.
pub struct PyAnalyzerRuleAdapter {
rule: Py<PyAny>,
name: String,
}

impl PyAnalyzerRuleAdapter {
pub fn new(rule: Bound<'_, PyAny>) -> PyResult<Self> {
let name = rule.call_method0("name")?.extract::<String>()?;
Ok(Self {
rule: rule.unbind(),
name,
})
}
}

impl fmt::Debug for PyAnalyzerRuleAdapter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PyAnalyzerRuleAdapter")
.field("name", &self.name)
.finish()
}
}

impl AnalyzerRule for PyAnalyzerRuleAdapter {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> DataFusionResult<LogicalPlan> {
Python::attach(|py| {
let py_plan = PyLogicalPlan::from(plan);
let result = self
.rule
.bind(py)
.call_method1("analyze", (py_plan,))
.map_err(to_datafusion_err)?;
if result.is_none() {
return Err(DataFusionError::Execution(format!(
"AnalyzerRule {} returned None from analyze(); analyzer rules \
must return a LogicalPlan",
self.name
)));
}
let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?;
Ok(LogicalPlan::from(rewritten))
})
}

fn name(&self) -> &str {
&self.name
}
}

/// Construct an adapter from a Python ``OptimizerRule`` instance.
pub(crate) fn build_optimizer_rule(
rule: Bound<'_, PyAny>,
) -> PyResult<Arc<dyn OptimizerRule + Send + Sync>> {
Ok(Arc::new(PyOptimizerRuleAdapter::new(rule)?))
}

/// Construct an adapter from a Python ``AnalyzerRule`` instance.
pub(crate) fn build_analyzer_rule(
rule: Bound<'_, PyAny>,
) -> PyResult<Arc<dyn AnalyzerRule + Send + Sync>> {
Ok(Arc::new(PyAnalyzerRuleAdapter::new(rule)?))
}
47 changes: 47 additions & 0 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from datafusion.catalog import CatalogProvider, Table
from datafusion.common import DFSchema
from datafusion.expr import Expr, SortKey
from datafusion.optimizer import AnalyzerRule, OptimizerRule
from datafusion.plan import ExecutionPlan, LogicalPlan
from datafusion.user_defined import (
AggregateUDF,
Expand Down Expand Up @@ -1260,6 +1261,52 @@ def register_udwf(self, udwf: WindowUDF) -> None:
"""Register a user-defined window function (UDWF) with the context."""
self.ctx.register_udwf(udwf._udwf)

def add_optimizer_rule(self, rule: OptimizerRule) -> None:
"""Append a user-defined :class:`OptimizerRule` to the session.

The rule's :py:meth:`OptimizerRule.rewrite` method is invoked
during query planning. Returning ``None`` from ``rewrite``
signals no change; returning a new
:class:`~datafusion.plan.LogicalPlan` signals a rewrite.

Args:
rule: An instance of a class that implements
:class:`datafusion.optimizer.OptimizerRule`.

Examples:
>>> from datafusion.optimizer import OptimizerRule
>>> class NoopRule(OptimizerRule):
... def name(self) -> str: return "noop"
... def rewrite(self, plan): return None
>>> ctx = dfn.SessionContext()
>>> ctx.add_optimizer_rule(NoopRule())
>>> ctx.remove_optimizer_rule("noop")
True
"""
self.ctx.add_optimizer_rule(rule)

def add_analyzer_rule(self, rule: AnalyzerRule) -> None:
"""Append a user-defined :class:`AnalyzerRule` to the session.

The rule's :py:meth:`AnalyzerRule.analyze` method is invoked
during the analysis phase of query planning. Analyzer rules
must always return a :class:`~datafusion.plan.LogicalPlan`
(return the input plan unchanged when no rewrite applies).

Args:
rule: An instance of a class that implements
:class:`datafusion.optimizer.AnalyzerRule`.

Examples:
>>> from datafusion.optimizer import AnalyzerRule
>>> class Identity(AnalyzerRule):
... def name(self) -> str: return "identity"
... def analyze(self, plan): return plan
>>> ctx = dfn.SessionContext()
>>> ctx.add_analyzer_rule(Identity())
"""
self.ctx.add_analyzer_rule(rule)

def deregister_udwf(self, name: str) -> None:
"""Remove a user-defined window function from the session.

Expand Down
Loading
Loading