Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/common/src/catalog/logical/for_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub type ResolvedReferences = (
/// 1. Extract table references and function names from the query
/// 2. Resolve dataset names to hashes via the dataset store
/// 3. Build logical catalog with schemas and UDFs
/// 4. Return logical catalog for use with `PlanningContext::new()`
/// 4. Return logical catalog for use with `PlanContext`
///
/// Unlike `catalog_for_sql`, this does not query the metadata database for physical
/// parquet locations, making it faster for planning-only operations.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/common/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod common;
pub mod planning;
pub mod query;
pub mod exec;
pub mod plan;
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,26 @@ use crate::{
context::common::{
ReadOnlyCheckError, SqlToPlanError, builtin_udfs, read_only_check, sql_to_plan,
},
exec_env::ExecEnv,
memory_pool::{MemoryPoolKind, TieredMemoryPool, make_memory_pool},
plan_visitors::{extract_table_references_from_plan, forbid_duplicate_field_names},
query_env::QueryEnv,
sql::{TableReference, TableReferenceConversionError},
};

/// A context for executing queries against a catalog.
#[derive(Clone)]
pub struct QueryContext {
pub env: QueryEnv,
pub struct ExecContext {
pub env: ExecEnv,
session_config: SessionConfig,
catalog: CatalogSnapshot,
/// Per-query memory pool (if per-query limits are enabled)
tiered_memory_pool: Arc<TieredMemoryPool>,
}

impl QueryContext {
impl ExecContext {
/// Creates a query context from a physical catalog.
pub async fn for_catalog(
env: QueryEnv,
env: ExecEnv,
catalog: Catalog,
ignore_canonical_segments: bool,
) -> Result<Self, CreateContextError> {
Expand Down Expand Up @@ -88,17 +88,17 @@ impl QueryContext {
}

/// Converts a parsed SQL statement into a logical plan against the physical catalog.
pub async fn plan_sql(&self, query: parser::Statement) -> Result<LogicalPlan, PlanSqlError> {
pub async fn plan_sql(&self, query: parser::Statement) -> Result<LogicalPlan, SqlError> {
let ctx = new_session_ctx(
self.session_config.clone(),
&self.tiered_memory_pool,
&self.env,
);
register_catalog(&self.env, &ctx, &self.catalog).map_err(PlanSqlError::RegisterTable)?;
register_catalog(&self.env, &ctx, &self.catalog).map_err(SqlError::RegisterTable)?;

let plan = sql_to_plan(&ctx, query)
.await
.map_err(PlanSqlError::SqlToPlan)?;
.map_err(SqlError::SqlToPlan)?;
Ok(plan)
}

Expand Down Expand Up @@ -225,9 +225,9 @@ impl QueryContext {
}
}

/// Failed to create a `QueryContext` from a catalog
/// Failed to create a `ExecContext` from a catalog
///
/// This error covers failures during `QueryContext::for_catalog()`.
/// This error covers failures during `ExecContext::for_catalog()`.
#[derive(Debug, thiserror::Error)]
pub enum CreateContextError {
/// Failed to create a catalog snapshot from the physical catalog
Expand All @@ -240,14 +240,14 @@ pub enum CreateContextError {

/// Failed to plan a SQL query in the query context
///
/// This error covers failures during `QueryContext::plan_sql()`.
/// This error covers failures during `ExecContext::plan_sql()`.
#[derive(Debug, thiserror::Error)]
pub enum PlanSqlError {
/// Failed to create a query session context
pub enum SqlError {
/// Failed to create a exec session context
///
/// This occurs when building a `SessionContext` for query execution fails,
/// typically due to a table registration error during context setup.
#[error("failed to create query session context")]
#[error("failed to create exec session context")]
RegisterTable(#[source] RegisterTableError),

/// Failed to convert SQL to a logical plan
Expand Down Expand Up @@ -289,13 +289,13 @@ pub enum ExecuteError {
CollectExplainResults(#[source] DataFusionError),
}

/// Failed to execute a plan via `QueryContext::execute_plan`
/// Failed to execute a plan via `ExecContext::execute_plan`
///
/// This error wraps session context creation and inner execution errors.
#[derive(Debug, thiserror::Error)]
pub enum ExecutePlanError {
/// Failed to create a query session context
#[error("failed to create query session context")]
/// Failed to create a exec session context
#[error("failed to create exec session context")]
RegisterTable(#[source] RegisterTableError),

/// Failed during plan execution
Expand All @@ -305,11 +305,11 @@ pub enum ExecutePlanError {

/// Failed to execute a plan and concatenate results
///
/// This error covers `QueryContext::execute_and_concat()`.
/// This error covers `ExecContext::execute_and_concat()`.
#[derive(Debug, thiserror::Error)]
pub enum ExecuteAndConcatError {
/// Failed to create a query session context
#[error("failed to create query session context")]
/// Failed to create a exec session context
#[error("failed to create exec session context")]
RegisterTable(#[source] RegisterTableError),

/// Failed during plan execution
Expand Down Expand Up @@ -359,7 +359,7 @@ pub enum CommonRangesError {
fn new_session_ctx(
config: SessionConfig,
tiered_memory_pool: &Arc<TieredMemoryPool>,
env: &QueryEnv,
env: &ExecEnv,
) -> SessionContext {
let runtime_env = Arc::new(RuntimeEnv {
memory_pool: tiered_memory_pool.clone(),
Expand Down Expand Up @@ -387,7 +387,7 @@ fn new_session_ctx(

/// Registers the tables and UDFs from a [`CatalogSnapshot`] into a [`SessionContext`].
fn register_catalog(
env: &QueryEnv,
env: &ExecEnv,
ctx: &SessionContext,
catalog: &CatalogSnapshot,
) -> Result<(), RegisterTableError> {
Expand Down Expand Up @@ -428,13 +428,13 @@ fn register_catalog(
Ok(())
}

/// Failed to register a dataset table with the query session context
/// Failed to register a dataset table with the exec session context
///
/// This occurs when DataFusion rejects a table registration during query
/// session creation, typically because a table with the same name already
/// exists or the table metadata is invalid.
#[derive(Debug, thiserror::Error)]
#[error("Failed to register dataset table with query session context")]
#[error("Failed to register dataset table with exec session context")]
pub struct RegisterTableError(#[source] DataFusionError);

/// `logical_optimize` controls whether logical optimizations should be applied to `plan`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ use crate::{
catalog::logical::{LogicalCatalog, LogicalTable},
context::common::{SqlToPlanError, builtin_udfs, sql_to_plan},
detached_logical_plan::DetachedLogicalPlan,
planning_table::PlanningTable,
plan_table::PlanTable,
};

/// A context for planning SQL queries.
pub struct PlanningContext {
pub struct PlanContext {
session_config: SessionConfig,
catalog: LogicalCatalog,
}

impl PlanningContext {
impl PlanContext {
/// Creates a planning context from a logical catalog.
pub fn new(session_config: SessionConfig, catalog: LogicalCatalog) -> Self {
Self {
Expand All @@ -39,25 +39,25 @@ impl PlanningContext {
pub async fn sql_output_schema(
&self,
query: parser::Statement,
) -> Result<DFSchemaRef, PlanSqlError> {
) -> Result<DFSchemaRef, SqlError> {
let ctx = new_session_ctx(self.session_config.clone());
register_catalog(&ctx, &self.catalog).map_err(PlanSqlError::RegisterTable)?;
register_catalog(&ctx, &self.catalog).map_err(SqlError::RegisterTable)?;
let plan = sql_to_plan(&ctx, query)
.await
.map_err(PlanSqlError::SqlToPlan)?;
.map_err(SqlError::SqlToPlan)?;
Ok(plan.schema().clone())
}

/// Converts a parsed SQL statement into a detached logical plan.
pub async fn plan_sql(
&self,
query: parser::Statement,
) -> Result<DetachedLogicalPlan, PlanSqlError> {
) -> Result<DetachedLogicalPlan, SqlError> {
let ctx = new_session_ctx(self.session_config.clone());
register_catalog(&ctx, &self.catalog).map_err(PlanSqlError::RegisterTable)?;
register_catalog(&ctx, &self.catalog).map_err(SqlError::RegisterTable)?;
let plan = sql_to_plan(&ctx, query)
.await
.map_err(PlanSqlError::SqlToPlan)?;
.map_err(SqlError::SqlToPlan)?;
Ok(DetachedLogicalPlan::new(plan))
}

Expand Down Expand Up @@ -125,7 +125,7 @@ fn register_catalog(
let table_schema = table.schema().clone();
ctx.register_table(
table.table_ref().clone(),
Arc::new(PlanningTable::new(table_schema)),
Arc::new(PlanTable::new(table_schema)),
)
.map_err(RegisterTableError)?;
}
Expand All @@ -152,7 +152,7 @@ pub struct RegisterTableError(#[source] DataFusionError);
/// This error is shared by `plan_sql` and `sql_output_schema` because they
/// produce the exact same error variants.
#[derive(Debug, thiserror::Error)]
pub enum PlanSqlError {
pub enum SqlError {
/// Failed to create a planning session context
///
/// This occurs when building a `SessionContext` for SQL planning fails,
Expand All @@ -168,7 +168,7 @@ pub enum PlanSqlError {
SqlToPlan(#[source] SqlToPlanError),
}

impl PlanSqlError {
impl SqlError {
/// Returns `true` if this error represents an invalid plan due to user input
/// (forbidden aliases or read-only violations) rather than an internal failure.
pub fn is_invalid_plan(&self) -> bool {
Expand Down
12 changes: 6 additions & 6 deletions crates/core/common/src/detached_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use datafusion::{
};

use crate::{
context::query::QueryContext,
context::exec::ExecContext,
incrementalizer::NonIncrementalQueryError,
plan_visitors::{is_incremental, propagate_block_num},
sql::TableReference,
};

/// A plan that has `PlanningTable` for its `TableProvider`s. It cannot be executed before being
/// first "attached" to a `QueryContext`.
/// A plan that has `PlanTable` for its `TableProvider`s. It cannot be executed before being
/// first "attached" to a `ExecContext`.
#[derive(Debug, Clone)]
pub struct DetachedLogicalPlan(LogicalPlan);

Expand All @@ -28,10 +28,10 @@ impl DetachedLogicalPlan {
Self(plan)
}

/// Attaches this plan to a query context by replacing `PlanningTable` providers
/// Attaches this plan to a query context by replacing `PlanTable` providers
/// with actual `TableSnapshot` providers from the catalog.
#[tracing::instrument(skip_all, err)]
pub fn attach_to(self, ctx: &QueryContext) -> Result<LogicalPlan, AttachPlanError> {
pub fn attach_to(self, ctx: &ExecContext) -> Result<LogicalPlan, AttachPlanError> {
Ok(self
.0
.transform_with_subqueries(|mut node| match &mut node {
Expand Down Expand Up @@ -91,7 +91,7 @@ impl std::ops::Deref for DetachedLogicalPlan {

/// Failed to attach a detached logical plan to a query context
///
/// This occurs when transforming `PlanningTable` references into actual
/// This occurs when transforming `PlanTable` references into actual
/// `TableSnapshot` references fails during plan attachment.
#[derive(Debug, thiserror::Error)]
#[error("failed to attach plan to query context")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub fn default_session_config() -> Result<SessionConfig, datafusion::error::Data

/// Handle to the environment resources used by the query engine.
#[derive(Clone, Debug)]
pub struct QueryEnv {
pub struct ExecEnv {
/// DataFusion session configuration with project-wide defaults.
pub session_config: SessionConfig,

Expand All @@ -76,7 +76,7 @@ pub struct QueryEnv {
pub store: DataStore,
}

/// Creates a QueryEnv with specified memory and cache configuration
/// Creates a ExecEnv with specified memory and cache configuration
///
/// Configures DataFusion runtime environment including memory pools, disk spilling,
/// and parquet footer caching for query execution.
Expand All @@ -85,7 +85,7 @@ pub fn create(
query_max_mem_mb: usize,
spill_location: &[PathBuf],
store: DataStore,
) -> Result<QueryEnv, datafusion::error::DataFusionError> {
) -> Result<ExecEnv, datafusion::error::DataFusionError> {
let spill_allowed = !spill_location.is_empty();
let disk_manager_mode = if spill_allowed {
DiskManagerMode::Directories(spill_location.to_vec())
Expand All @@ -111,7 +111,7 @@ pub fn create(

let session_config = default_session_config()?;

Ok(QueryEnv {
Ok(ExecEnv {
session_config,
global_memory_pool: runtime_env.memory_pool,
disk_manager: runtime_env.disk_manager,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ pub mod dataset_store;
pub mod datasets_derived;
pub mod detached_logical_plan;
pub mod evm;
pub mod exec_env;
pub mod func_catalog;
pub mod incrementalizer;
pub mod memory_pool;
pub mod metadata;
pub mod plan_table;
pub mod plan_visitors;
pub mod planning_table;
pub mod query_env;
pub mod sql;
pub mod stream_helpers;
pub mod streaming_query;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ use datafusion::{
/// Must be replaced with actual `TableSnapshot` providers via
/// `DetachedLogicalPlan::attach_to` before execution.
#[derive(Clone, Debug)]
pub struct PlanningTable(SchemaRef);
pub struct PlanTable(SchemaRef);

impl PlanningTable {
impl PlanTable {
/// Creates a planning-only table provider from a schema.
pub(crate) fn new(schema: SchemaRef) -> Self {
Self(schema)
}
}

#[async_trait]
impl TableProvider for PlanningTable {
impl TableProvider for PlanTable {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -47,7 +47,7 @@ impl TableProvider for PlanningTable {
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
Err(DataFusionError::External(
"PlanningTable should never be scanned".into(),
"PlanTable should never be scanned".into(),
))
}
}
Loading
Loading