From edb6550df016472469d02874ffd2406898395ad5 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Sun, 22 Feb 2026 11:16:11 +0100 Subject: [PATCH] refactor(common): rename execution context modules and types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Align module and type names to remove legacy `planning/query` terminology in favor of clearer `plan/exec` naming. - Rename `context::planning` → `context::plan`, `context::query` → `context::exec` - Rename `PlanningTable` → `PlanTable`, `QueryEnv` → `ExecEnv` - Rename `PlanSqlError` → `SqlError` in both plan and exec contexts to eliminate stutter - Rename `PlanningPlanSql` error variant → `PlanSql` in flight handler - Update all downstream imports and type aliases across workspace Signed-off-by: Lorenzo Delgado --- .../common/src/catalog/logical/for_query.rs | 2 +- crates/core/common/src/context.rs | 4 +- .../common/src/context/{query.rs => exec.rs} | 48 ++++++------ .../src/context/{planning.rs => plan.rs} | 24 +++--- .../core/common/src/detached_logical_plan.rs | 12 +-- .../common/src/{query_env.rs => exec_env.rs} | 8 +- crates/core/common/src/lib.rs | 4 +- .../src/{planning_table.rs => plan_table.rs} | 8 +- crates/core/common/src/streaming_query.rs | 76 +++++++++---------- .../worker-datasets-derived/src/dataset.rs | 16 ++-- .../services/admin-api/src/handlers/common.rs | 8 +- .../services/admin-api/src/handlers/schema.rs | 8 +- crates/services/server/src/flight.rs | 69 ++++++++--------- crates/services/server/src/service.rs | 2 +- tests/src/tests/it_reorg.rs | 2 +- 15 files changed, 143 insertions(+), 148 deletions(-) rename crates/core/common/src/context/{query.rs => exec.rs} (95%) rename crates/core/common/src/context/{planning.rs => plan.rs} (92%) rename crates/core/common/src/{query_env.rs => exec_env.rs} (96%) rename crates/core/common/src/{planning_table.rs => plan_table.rs} (88%) diff --git a/crates/core/common/src/catalog/logical/for_query.rs b/crates/core/common/src/catalog/logical/for_query.rs index 17a93e663..3b910fd2a 100644 --- a/crates/core/common/src/catalog/logical/for_query.rs +++ b/crates/core/common/src/catalog/logical/for_query.rs @@ -45,7 +45,7 @@ pub type ResolvedReferences = ( /// 1. Extract table references and function names from the query /// 2. Resolve dataset names to hashes via the dataset store /// 3. Build logical catalog with schemas and UDFs -/// 4. Return logical catalog for use with `PlanningContext::new()` +/// 4. Return logical catalog for use with `PlanContext` /// /// Unlike `catalog_for_sql`, this does not query the metadata database for physical /// parquet locations, making it faster for planning-only operations. diff --git a/crates/core/common/src/context.rs b/crates/core/common/src/context.rs index db4654184..ad7fcb949 100644 --- a/crates/core/common/src/context.rs +++ b/crates/core/common/src/context.rs @@ -1,3 +1,3 @@ mod common; -pub mod planning; -pub mod query; +pub mod exec; +pub mod plan; diff --git a/crates/core/common/src/context/query.rs b/crates/core/common/src/context/exec.rs similarity index 95% rename from crates/core/common/src/context/query.rs rename to crates/core/common/src/context/exec.rs index 25788eba2..b5b325fd1 100644 --- a/crates/core/common/src/context/query.rs +++ b/crates/core/common/src/context/exec.rs @@ -35,26 +35,26 @@ use crate::{ context::common::{ ReadOnlyCheckError, SqlToPlanError, builtin_udfs, read_only_check, sql_to_plan, }, + exec_env::ExecEnv, memory_pool::{MemoryPoolKind, TieredMemoryPool, make_memory_pool}, plan_visitors::{extract_table_references_from_plan, forbid_duplicate_field_names}, - query_env::QueryEnv, sql::{TableReference, TableReferenceConversionError}, }; /// A context for executing queries against a catalog. #[derive(Clone)] -pub struct QueryContext { - pub env: QueryEnv, +pub struct ExecContext { + pub env: ExecEnv, session_config: SessionConfig, catalog: CatalogSnapshot, /// Per-query memory pool (if per-query limits are enabled) tiered_memory_pool: Arc, } -impl QueryContext { +impl ExecContext { /// Creates a query context from a physical catalog. pub async fn for_catalog( - env: QueryEnv, + env: ExecEnv, catalog: Catalog, ignore_canonical_segments: bool, ) -> Result { @@ -88,17 +88,17 @@ impl QueryContext { } /// Converts a parsed SQL statement into a logical plan against the physical catalog. - pub async fn plan_sql(&self, query: parser::Statement) -> Result { + pub async fn plan_sql(&self, query: parser::Statement) -> Result { let ctx = new_session_ctx( self.session_config.clone(), &self.tiered_memory_pool, &self.env, ); - register_catalog(&self.env, &ctx, &self.catalog).map_err(PlanSqlError::RegisterTable)?; + register_catalog(&self.env, &ctx, &self.catalog).map_err(SqlError::RegisterTable)?; let plan = sql_to_plan(&ctx, query) .await - .map_err(PlanSqlError::SqlToPlan)?; + .map_err(SqlError::SqlToPlan)?; Ok(plan) } @@ -225,9 +225,9 @@ impl QueryContext { } } -/// Failed to create a `QueryContext` from a catalog +/// Failed to create a `ExecContext` from a catalog /// -/// This error covers failures during `QueryContext::for_catalog()`. +/// This error covers failures during `ExecContext::for_catalog()`. #[derive(Debug, thiserror::Error)] pub enum CreateContextError { /// Failed to create a catalog snapshot from the physical catalog @@ -240,14 +240,14 @@ pub enum CreateContextError { /// Failed to plan a SQL query in the query context /// -/// This error covers failures during `QueryContext::plan_sql()`. +/// This error covers failures during `ExecContext::plan_sql()`. #[derive(Debug, thiserror::Error)] -pub enum PlanSqlError { - /// Failed to create a query session context +pub enum SqlError { + /// Failed to create a exec session context /// /// This occurs when building a `SessionContext` for query execution fails, /// typically due to a table registration error during context setup. - #[error("failed to create query session context")] + #[error("failed to create exec session context")] RegisterTable(#[source] RegisterTableError), /// Failed to convert SQL to a logical plan @@ -289,13 +289,13 @@ pub enum ExecuteError { CollectExplainResults(#[source] DataFusionError), } -/// Failed to execute a plan via `QueryContext::execute_plan` +/// Failed to execute a plan via `ExecContext::execute_plan` /// /// This error wraps session context creation and inner execution errors. #[derive(Debug, thiserror::Error)] pub enum ExecutePlanError { - /// Failed to create a query session context - #[error("failed to create query session context")] + /// Failed to create a exec session context + #[error("failed to create exec session context")] RegisterTable(#[source] RegisterTableError), /// Failed during plan execution @@ -305,11 +305,11 @@ pub enum ExecutePlanError { /// Failed to execute a plan and concatenate results /// -/// This error covers `QueryContext::execute_and_concat()`. +/// This error covers `ExecContext::execute_and_concat()`. #[derive(Debug, thiserror::Error)] pub enum ExecuteAndConcatError { - /// Failed to create a query session context - #[error("failed to create query session context")] + /// Failed to create a exec session context + #[error("failed to create exec session context")] RegisterTable(#[source] RegisterTableError), /// Failed during plan execution @@ -359,7 +359,7 @@ pub enum CommonRangesError { fn new_session_ctx( config: SessionConfig, tiered_memory_pool: &Arc, - env: &QueryEnv, + env: &ExecEnv, ) -> SessionContext { let runtime_env = Arc::new(RuntimeEnv { memory_pool: tiered_memory_pool.clone(), @@ -387,7 +387,7 @@ fn new_session_ctx( /// Registers the tables and UDFs from a [`CatalogSnapshot`] into a [`SessionContext`]. fn register_catalog( - env: &QueryEnv, + env: &ExecEnv, ctx: &SessionContext, catalog: &CatalogSnapshot, ) -> Result<(), RegisterTableError> { @@ -428,13 +428,13 @@ fn register_catalog( Ok(()) } -/// Failed to register a dataset table with the query session context +/// Failed to register a dataset table with the exec session context /// /// This occurs when DataFusion rejects a table registration during query /// session creation, typically because a table with the same name already /// exists or the table metadata is invalid. #[derive(Debug, thiserror::Error)] -#[error("Failed to register dataset table with query session context")] +#[error("Failed to register dataset table with exec session context")] pub struct RegisterTableError(#[source] DataFusionError); /// `logical_optimize` controls whether logical optimizations should be applied to `plan`. diff --git a/crates/core/common/src/context/planning.rs b/crates/core/common/src/context/plan.rs similarity index 92% rename from crates/core/common/src/context/planning.rs rename to crates/core/common/src/context/plan.rs index e3609c413..52c9eedc8 100644 --- a/crates/core/common/src/context/planning.rs +++ b/crates/core/common/src/context/plan.rs @@ -12,16 +12,16 @@ use crate::{ catalog::logical::{LogicalCatalog, LogicalTable}, context::common::{SqlToPlanError, builtin_udfs, sql_to_plan}, detached_logical_plan::DetachedLogicalPlan, - planning_table::PlanningTable, + plan_table::PlanTable, }; /// A context for planning SQL queries. -pub struct PlanningContext { +pub struct PlanContext { session_config: SessionConfig, catalog: LogicalCatalog, } -impl PlanningContext { +impl PlanContext { /// Creates a planning context from a logical catalog. pub fn new(session_config: SessionConfig, catalog: LogicalCatalog) -> Self { Self { @@ -39,12 +39,12 @@ impl PlanningContext { pub async fn sql_output_schema( &self, query: parser::Statement, - ) -> Result { + ) -> Result { let ctx = new_session_ctx(self.session_config.clone()); - register_catalog(&ctx, &self.catalog).map_err(PlanSqlError::RegisterTable)?; + register_catalog(&ctx, &self.catalog).map_err(SqlError::RegisterTable)?; let plan = sql_to_plan(&ctx, query) .await - .map_err(PlanSqlError::SqlToPlan)?; + .map_err(SqlError::SqlToPlan)?; Ok(plan.schema().clone()) } @@ -52,12 +52,12 @@ impl PlanningContext { pub async fn plan_sql( &self, query: parser::Statement, - ) -> Result { + ) -> Result { let ctx = new_session_ctx(self.session_config.clone()); - register_catalog(&ctx, &self.catalog).map_err(PlanSqlError::RegisterTable)?; + register_catalog(&ctx, &self.catalog).map_err(SqlError::RegisterTable)?; let plan = sql_to_plan(&ctx, query) .await - .map_err(PlanSqlError::SqlToPlan)?; + .map_err(SqlError::SqlToPlan)?; Ok(DetachedLogicalPlan::new(plan)) } @@ -125,7 +125,7 @@ fn register_catalog( let table_schema = table.schema().clone(); ctx.register_table( table.table_ref().clone(), - Arc::new(PlanningTable::new(table_schema)), + Arc::new(PlanTable::new(table_schema)), ) .map_err(RegisterTableError)?; } @@ -152,7 +152,7 @@ pub struct RegisterTableError(#[source] DataFusionError); /// This error is shared by `plan_sql` and `sql_output_schema` because they /// produce the exact same error variants. #[derive(Debug, thiserror::Error)] -pub enum PlanSqlError { +pub enum SqlError { /// Failed to create a planning session context /// /// This occurs when building a `SessionContext` for SQL planning fails, @@ -168,7 +168,7 @@ pub enum PlanSqlError { SqlToPlan(#[source] SqlToPlanError), } -impl PlanSqlError { +impl SqlError { /// Returns `true` if this error represents an invalid plan due to user input /// (forbidden aliases or read-only violations) rather than an internal failure. pub fn is_invalid_plan(&self) -> bool { diff --git a/crates/core/common/src/detached_logical_plan.rs b/crates/core/common/src/detached_logical_plan.rs index 8bd691143..7cf9ef38b 100644 --- a/crates/core/common/src/detached_logical_plan.rs +++ b/crates/core/common/src/detached_logical_plan.rs @@ -11,14 +11,14 @@ use datafusion::{ }; use crate::{ - context::query::QueryContext, + context::exec::ExecContext, incrementalizer::NonIncrementalQueryError, plan_visitors::{is_incremental, propagate_block_num}, sql::TableReference, }; -/// A plan that has `PlanningTable` for its `TableProvider`s. It cannot be executed before being -/// first "attached" to a `QueryContext`. +/// A plan that has `PlanTable` for its `TableProvider`s. It cannot be executed before being +/// first "attached" to a `ExecContext`. #[derive(Debug, Clone)] pub struct DetachedLogicalPlan(LogicalPlan); @@ -28,10 +28,10 @@ impl DetachedLogicalPlan { Self(plan) } - /// Attaches this plan to a query context by replacing `PlanningTable` providers + /// Attaches this plan to a query context by replacing `PlanTable` providers /// with actual `TableSnapshot` providers from the catalog. #[tracing::instrument(skip_all, err)] - pub fn attach_to(self, ctx: &QueryContext) -> Result { + pub fn attach_to(self, ctx: &ExecContext) -> Result { Ok(self .0 .transform_with_subqueries(|mut node| match &mut node { @@ -91,7 +91,7 @@ impl std::ops::Deref for DetachedLogicalPlan { /// Failed to attach a detached logical plan to a query context /// -/// This occurs when transforming `PlanningTable` references into actual +/// This occurs when transforming `PlanTable` references into actual /// `TableSnapshot` references fails during plan attachment. #[derive(Debug, thiserror::Error)] #[error("failed to attach plan to query context")] diff --git a/crates/core/common/src/query_env.rs b/crates/core/common/src/exec_env.rs similarity index 96% rename from crates/core/common/src/query_env.rs rename to crates/core/common/src/exec_env.rs index fe0519414..a8da9c246 100644 --- a/crates/core/common/src/query_env.rs +++ b/crates/core/common/src/exec_env.rs @@ -56,7 +56,7 @@ pub fn default_session_config() -> Result Result { +) -> Result { let spill_allowed = !spill_location.is_empty(); let disk_manager_mode = if spill_allowed { DiskManagerMode::Directories(spill_location.to_vec()) @@ -111,7 +111,7 @@ pub fn create( let session_config = default_session_config()?; - Ok(QueryEnv { + Ok(ExecEnv { session_config, global_memory_pool: runtime_env.memory_pool, disk_manager: runtime_env.disk_manager, diff --git a/crates/core/common/src/lib.rs b/crates/core/common/src/lib.rs index 5a45ddc58..70f8f343a 100644 --- a/crates/core/common/src/lib.rs +++ b/crates/core/common/src/lib.rs @@ -14,13 +14,13 @@ pub mod dataset_store; pub mod datasets_derived; pub mod detached_logical_plan; pub mod evm; +pub mod exec_env; pub mod func_catalog; pub mod incrementalizer; pub mod memory_pool; pub mod metadata; +pub mod plan_table; pub mod plan_visitors; -pub mod planning_table; -pub mod query_env; pub mod sql; pub mod stream_helpers; pub mod streaming_query; diff --git a/crates/core/common/src/planning_table.rs b/crates/core/common/src/plan_table.rs similarity index 88% rename from crates/core/common/src/planning_table.rs rename to crates/core/common/src/plan_table.rs index 6fa40c71e..4dd21c513 100644 --- a/crates/core/common/src/planning_table.rs +++ b/crates/core/common/src/plan_table.rs @@ -16,9 +16,9 @@ use datafusion::{ /// Must be replaced with actual `TableSnapshot` providers via /// `DetachedLogicalPlan::attach_to` before execution. #[derive(Clone, Debug)] -pub struct PlanningTable(SchemaRef); +pub struct PlanTable(SchemaRef); -impl PlanningTable { +impl PlanTable { /// Creates a planning-only table provider from a schema. pub(crate) fn new(schema: SchemaRef) -> Self { Self(schema) @@ -26,7 +26,7 @@ impl PlanningTable { } #[async_trait] -impl TableProvider for PlanningTable { +impl TableProvider for PlanTable { fn as_any(&self) -> &dyn Any { self } @@ -47,7 +47,7 @@ impl TableProvider for PlanningTable { _limit: Option, ) -> Result, DataFusionError> { Err(DataFusionError::External( - "PlanningTable should never be scanned".into(), + "PlanTable should never be scanned".into(), )) } } diff --git a/crates/core/common/src/streaming_query.rs b/crates/core/common/src/streaming_query.rs index 2470992aa..ee6a715e4 100644 --- a/crates/core/common/src/streaming_query.rs +++ b/crates/core/common/src/streaming_query.rs @@ -37,13 +37,13 @@ use crate::{ logical::LogicalTable, physical::{CanonicalChainError, Catalog, PhysicalTable}, }, - context::{planning::PlanningContext, query::QueryContext}, + context::{exec::ExecContext, plan::PlanContext}, dataset_store::{DatasetStore, ResolveRevisionError}, detached_logical_plan::DetachedLogicalPlan, + exec_env::ExecEnv, incrementalizer::incrementalize_plan, metadata::segments::{Segment, WatermarkNotFoundError}, plan_visitors::{order_by_block_num, unproject_special_block_num_column}, - query_env::QueryEnv, sql_str::SqlStr, }; @@ -86,7 +86,7 @@ pub enum SpawnError { /// Optimization failures prevent the streaming query from starting with an /// efficient execution plan. #[error("failed to optimize query plan")] - OptimizePlan(#[source] crate::context::planning::OptimizePlanError), + OptimizePlan(#[source] crate::context::plan::OptimizePlanError), /// Query references tables from multiple blockchain networks /// @@ -289,7 +289,7 @@ impl StreamingQueryHandle { /// This follows a 'microbatch' model where it processes data in chunks based on a block range /// stream. pub struct StreamingQuery { - query_env: QueryEnv, + exec_env: ExecEnv, catalog: Catalog, plan: DetachedLogicalPlan, start_block: BlockNum, @@ -316,7 +316,7 @@ impl StreamingQuery { #[instrument(skip_all, err)] #[expect(clippy::too_many_arguments)] pub async fn spawn( - query_env: QueryEnv, + exec_env: ExecEnv, catalog: Catalog, dataset_store: &DatasetStore, plan: DetachedLogicalPlan, @@ -346,8 +346,7 @@ impl StreamingQuery { .propagate_block_num() .map_err(SpawnError::PropagateBlockNum)?; - let ctx = - PlanningContext::new(query_env.session_config.clone(), catalog.logical().clone()); + let ctx = PlanContext::new(exec_env.session_config.clone(), catalog.logical().clone()); ctx.optimize_plan(&plan) .await .map_err(SpawnError::OptimizePlan)? @@ -367,7 +366,7 @@ impl StreamingQuery { let blocks_table = resolve_blocks_table( dataset_store, - query_env.store.clone(), + exec_env.store.clone(), unique_refs.into_iter(), network, ) @@ -380,7 +379,7 @@ impl StreamingQuery { .transpose() .map_err(SpawnError::ConvertResumeWatermark)?; let streaming_query = Self { - query_env, + exec_env, catalog, plan, tx, @@ -413,10 +412,9 @@ impl StreamingQuery { self.table_updates.changed().await; // The table snapshots to execute the microbatch against. - let ctx = - QueryContext::for_catalog(self.query_env.clone(), self.catalog.clone(), false) - .await - .map_err(StreamingQueryExecutionError::CreateQueryContext)?; + let ctx = ExecContext::for_catalog(self.exec_env.clone(), self.catalog.clone(), false) + .await + .map_err(StreamingQueryExecutionError::CreateExecContext)?; // Get the next execution range let Some(MicrobatchRange { range, direction }) = self @@ -495,7 +493,7 @@ impl StreamingQuery { #[instrument(skip_all, err)] async fn next_microbatch_range( &mut self, - ctx: &QueryContext, + ctx: &ExecContext, ) -> Result, NextMicrobatchRangeError> { // Gather the chains for each source table. let chains = ctx @@ -518,9 +516,9 @@ impl StreamingQuery { let logical = LogicalCatalog::from_tables(std::iter::once(&resolved_table)); Catalog::new(vec![self.blocks_table.clone()], logical) }; - QueryContext::for_catalog(self.query_env.clone(), catalog, false) + ExecContext::for_catalog(self.exec_env.clone(), catalog, false) .await - .map_err(NextMicrobatchRangeError::CreateQueryContext)? + .map_err(NextMicrobatchRangeError::CreateExecContext)? }; // The latest common watermark across the source tables. @@ -571,7 +569,7 @@ impl StreamingQuery { #[instrument(skip_all, err)] async fn next_microbatch_start( &self, - ctx: &QueryContext, + ctx: &ExecContext, ) -> Result, NextMicrobatchStartError> { match &self.prev_watermark { // start stream @@ -610,7 +608,7 @@ impl StreamingQuery { #[instrument(skip_all, err)] async fn next_microbatch_end( &mut self, - ctx: &QueryContext, + ctx: &ExecContext, start: &SegmentStart, common_watermark: BlockWatermark, ) -> Result, NextMicrobatchEndError> { @@ -646,7 +644,7 @@ impl StreamingQuery { #[instrument(skip_all, err)] async fn latest_src_watermark( &self, - ctx: &QueryContext, + ctx: &ExecContext, chains: impl Iterator, ) -> Result, LatestSrcWatermarkError> { // For each chain, collect the latest segment @@ -677,7 +675,7 @@ impl StreamingQuery { #[instrument(skip_all, err)] async fn reorg_base( &self, - ctx: &QueryContext, + ctx: &ExecContext, prev_watermark: &Watermark, ) -> Result, ReorgBaseError> { // context for querying forked blocks @@ -686,9 +684,9 @@ impl StreamingQuery { ctx.catalog().physical_tables().cloned().collect(), ctx.catalog().logical().clone(), ); - QueryContext::for_catalog(ctx.env.clone(), catalog, true) + ExecContext::for_catalog(ctx.env.clone(), catalog, true) .await - .map_err(ReorgBaseError::CreateQueryContext)? + .map_err(ReorgBaseError::CreateExecContext)? }; let mut min_fork_block_num = prev_watermark.number; @@ -742,7 +740,7 @@ impl StreamingQuery { #[instrument(skip_all, err)] async fn blocks_table_contains( &self, - ctx: &QueryContext, + ctx: &ExecContext, watermark: &Watermark, ) -> Result, BlocksTableContainsError> { // Panic safety: The `blocks_ctx` always has a single table. @@ -782,7 +780,7 @@ impl StreamingQuery { #[instrument(skip(self, ctx), err)] async fn blocks_table_fetch( &self, - ctx: &QueryContext, + ctx: &ExecContext, number: BlockNum, hash: Option<&BlockHash>, ) -> Result, BlocksTableFetchError> { @@ -858,11 +856,11 @@ pub enum StreamingQueryExecutionError { #[error("streaming task join timed out")] TaskTimeout, - /// Failed to create a query context + /// Failed to create an exec context /// - /// This occurs when the query context cannot be created. - #[error("failed to create query context: {0}")] - CreateQueryContext(#[source] crate::context::query::CreateContextError), + /// This occurs when the exec context cannot be created. + #[error("failed to create exec context: {0}")] + CreateExecContext(#[source] crate::context::exec::CreateContextError), /// Failed to get the next microbatch range /// @@ -892,7 +890,7 @@ pub enum StreamingQueryExecutionError { /// /// This occurs when the plan cannot be executed. #[error("failed to execute the plan: {0}")] - ExecutePlan(#[source] crate::context::query::ExecutePlanError), + ExecutePlan(#[source] crate::context::exec::ExecutePlanError), /// Failed to stream item /// @@ -906,11 +904,11 @@ pub enum StreamingQueryExecutionError { /// This error type is used by `StreamingQuery::next_microbatch_range()`. #[derive(Debug, thiserror::Error)] pub enum NextMicrobatchRangeError { - /// Failed to create a query context + /// Failed to create an exec context /// - /// This occurs when the query context cannot be created. - #[error("failed to create query context: {0}")] - CreateQueryContext(#[source] crate::context::query::CreateContextError), + /// This occurs when the exec context cannot be created. + #[error("failed to create exec context: {0}")] + CreateExecContext(#[source] crate::context::exec::CreateContextError), /// Failed to get the latest source watermark /// @@ -975,11 +973,11 @@ pub struct LatestSrcWatermarkError(#[source] BlocksTableContainsError); /// This error type is used by `StreamingQuery::reorg_base()`. #[derive(Debug, thiserror::Error)] pub enum ReorgBaseError { - /// Failed to create a query context + /// Failed to create an exec context /// - /// This occurs when the query context cannot be created. - #[error("failed to create query context: {0}")] - CreateQueryContext(#[source] crate::context::query::CreateContextError), + /// This occurs when the exec context cannot be created. + #[error("failed to create exec context: {0}")] + CreateExecContext(#[source] crate::context::exec::CreateContextError), /// Failed to fetch the blocks table /// @@ -1022,13 +1020,13 @@ pub enum BlocksTableFetchError { /// /// This occurs when the SQL cannot be planned. #[error("failed to plan the SQL: {0}")] - PlanSql(#[source] crate::context::query::PlanSqlError), + PlanSql(#[source] crate::context::exec::SqlError), /// Failed to execute the SQL /// /// This occurs when the SQL cannot be executed. #[error("failed to execute the SQL: {0}")] - ExecuteSql(#[source] crate::context::query::ExecuteAndConcatError), + ExecuteSql(#[source] crate::context::exec::ExecuteAndConcatError), /// Failed to extract a hash value from query results /// diff --git a/crates/core/worker-datasets-derived/src/dataset.rs b/crates/core/worker-datasets-derived/src/dataset.rs index dc6cf7208..077262504 100644 --- a/crates/core/worker-datasets-derived/src/dataset.rs +++ b/crates/core/worker-datasets-derived/src/dataset.rs @@ -121,12 +121,12 @@ use common::{ for_dump as physical_catalog, }, }, - context::{planning::PlanningContext, query::QueryContext}, + context::{exec::ExecContext, plan::PlanContext}, dataset_store::ResolveRevisionError, detached_logical_plan::DetachedLogicalPlan, + exec_env::ExecEnv, metadata::Generation, parquet::errors::ParquetError, - query_env::QueryEnv, sql::{ ParseSqlError, ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references, resolve_table_references, @@ -242,7 +242,7 @@ pub async fn dump( // Process all tables in parallel using FailFastJoinSet let mut join_set = tasks::FailFastJoinSet::>::new(); - let env = common::query_env::create( + let env = common::exec_env::create( ctx.config.max_mem_mb, ctx.config.query_max_mem_mb, &ctx.config.spill_location, @@ -430,7 +430,7 @@ impl Error { async fn dump_table( ctx: Ctx, manifest: &DerivedManifest, - env: QueryEnv, + env: ExecEnv, table: Arc, compactor: Arc, opts: Arc, @@ -504,7 +504,7 @@ async fn dump_table( .await .map_err(DumpTableError::CreatePhysicalCatalog)? }; - let planning_ctx = PlanningContext::new(env.session_config.clone(), catalog.logical().clone()); + let planning_ctx = PlanContext::new(env.session_config.clone(), catalog.logical().clone()); let manifest_start_block = manifest.start_block; join_set.spawn( @@ -543,7 +543,7 @@ async fn dump_table( let resolved = resolve_end_block(&end, start, async { let query_ctx = - QueryContext::for_catalog(env.clone(), catalog.clone(), false).await?; + ExecContext::for_catalog(env.clone(), catalog.clone(), false).await?; let max_end_blocks = query_ctx .max_end_blocks(&plan.clone().attach_to(&query_ctx)?) .await?; @@ -730,7 +730,7 @@ pub enum DumpTableSpawnError { /// This occurs when DataFusion cannot create an execution plan /// from the parsed SQL query. #[error("failed to plan SQL query: {0}")] - PlanSql(#[source] common::context::planning::PlanSqlError), + PlanSql(#[source] common::context::plan::SqlError), /// The query is not incremental and cannot be synced /// @@ -787,7 +787,7 @@ pub enum DumpTableSpawnError { #[expect(clippy::too_many_arguments)] async fn dump_sql_query( ctx: &Ctx, - env: &QueryEnv, + env: &ExecEnv, catalog: &Catalog, query: DetachedLogicalPlan, start: BlockNum, diff --git a/crates/services/admin-api/src/handlers/common.rs b/crates/services/admin-api/src/handlers/common.rs index 8f2178919..fdb52733a 100644 --- a/crates/services/admin-api/src/handlers/common.rs +++ b/crates/services/admin-api/src/handlers/common.rs @@ -10,10 +10,10 @@ use common::{ self as catalog, CreateLogicalCatalogError, ResolveTablesError, ResolveUdfsError, TableReferencesMap, }, - context::planning::{PlanSqlError as PlanningPlanSqlError, PlanningContext}, + context::plan::{PlanContext, SqlError as PlanSqlError}, dataset_store::{DatasetStore, GetDatasetError}, + exec_env::default_session_config, metadata::{AmpMetadataFromParquetError, amp_metadata_from_parquet_file}, - query_env::default_session_config, sql::{ ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references, resolve_table_references, @@ -339,7 +339,7 @@ pub async fn validate_derived_manifest( references, ) .await - .map(|catalog| PlanningContext::new(session_config, catalog)) + .map(|catalog| PlanContext::new(session_config, catalog)) .map_err(|err| match &err { CreateLogicalCatalogError::ResolveTables(resolve_error) => match resolve_error { ResolveTablesError::UnqualifiedTable { .. } => { @@ -583,7 +583,7 @@ pub enum ManifestValidationError { /// The table whose SQL query failed to plan table_name: TableName, #[source] - source: PlanningPlanSqlError, + source: PlanSqlError, }, /// Failed to create DataFusion session configuration diff --git a/crates/services/admin-api/src/handlers/schema.rs b/crates/services/admin-api/src/handlers/schema.rs index 3f37c8269..30ae89411 100644 --- a/crates/services/admin-api/src/handlers/schema.rs +++ b/crates/services/admin-api/src/handlers/schema.rs @@ -10,11 +10,11 @@ use common::{ self as catalog, CreateLogicalCatalogError, ResolveTablesError, ResolveUdfsError, TableReferencesMap, }, - context::planning::{PlanSqlError as PlanningPlanSqlError, PlanningContext}, + context::plan::{PlanContext, SqlError as PlanSqlError}, dataset_store::GetDatasetError, + exec_env::default_session_config, incrementalizer::NonIncrementalQueryError, plan_visitors::prepend_special_block_num_field, - query_env::default_session_config, sql::{ ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references, resolve_table_references, @@ -304,7 +304,7 @@ pub async fn handler( // Create planning context from catalog let session_config = default_session_config().map_err(Error::SessionConfig)?; - let planning_ctx = PlanningContext::new(session_config, catalog); + let planning_ctx = PlanContext::new(session_config, catalog); // Infer schema for each table and extract networks let mut schemas = BTreeMap::new(); @@ -666,7 +666,7 @@ enum Error { table_name: TableName, /// The underlying query context error #[source] - source: PlanningPlanSqlError, + source: PlanSqlError, }, } diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 0705d7f80..50f4fe38f 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -42,12 +42,12 @@ use common::{ }, }, context::{ - planning::{self, PlanningContext}, - query::{self, QueryContext}, + exec::{self, ExecContext}, + plan::{self, PlanContext}, }, dataset_store::{DatasetStore, GetDatasetError}, detached_logical_plan::{AttachPlanError, DetachedLogicalPlan}, - query_env::QueryEnv, + exec_env::ExecEnv, sql::{ ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references, resolve_table_references, @@ -85,7 +85,7 @@ type TonicStream = Pin> + Send + 'sta #[derive(Clone)] pub struct Service { config: Arc, - env: QueryEnv, + env: ExecEnv, dataset_store: DatasetStore, notification_multiplexer: Arc, metrics: Option>, @@ -99,13 +99,13 @@ impl Service { dataset_store: DatasetStore, meter: Option, ) -> Result { - let env = common::query_env::create( + let env = common::exec_env::create( config.max_mem_mb, config.query_max_mem_mb, &config.spill_location, data_store, ) - .map_err(InitError::QueryEnv)?; + .map_err(InitError::ExecEnv)?; let notification_multiplexer = Arc::new(notification_multiplexer::spawn(metadata_db.clone())); let metrics = meter.as_ref().map(|m| Arc::new(MetricsRegistry::new(m))); @@ -143,11 +143,8 @@ impl Service { .map_err(Error::PhysicalCatalogError) }?; - let ctx = PlanningContext::new(self.env.session_config.clone(), catalog.logical().clone()); - let plan = ctx - .plan_sql(query.clone()) - .await - .map_err(Error::PlanningPlanSql)?; + let ctx = PlanContext::new(self.env.session_config.clone(), catalog.logical().clone()); + let plan = ctx.plan_sql(query.clone()).await.map_err(Error::PlanSql)?; let is_streaming = is_streaming.unwrap_or_else(|| common::stream_helpers::is_streaming(&query)); @@ -194,9 +191,9 @@ impl Service { // If not streaming or metadata db is not available, execute once if !is_streaming { - let ctx = QueryContext::for_catalog(self.env.clone(), catalog, false) + let ctx = ExecContext::for_catalog(self.env.clone(), catalog, false) .await - .map_err(Error::CreateQueryContext)?; + .map_err(Error::CreateExecContext)?; let plan = plan.attach_to(&ctx).map_err(Error::AttachPlan)?; let block_ranges = ctx @@ -332,7 +329,7 @@ impl Service { ) .await .map_err(Error::CreateLogicalCatalogError)?; - PlanningContext::new(self.env.session_config.clone(), catalog) + PlanContext::new(self.env.session_config.clone(), catalog) }; let is_streaming = streaming_override @@ -340,7 +337,7 @@ impl Service { let schema = plan_ctx .sql_output_schema(query) .await - .map_err(Error::PlanningPlanSql)?; + .map_err(Error::PlanSql)?; let ticket = AmpTicket { query: sql_query.query, is_streaming, @@ -976,19 +973,19 @@ pub enum Error { SqlParse(#[source] common::sql::ParseSqlError), #[error("failed to plan SQL query")] - PlanningPlanSql(#[source] planning::PlanSqlError), + PlanSql(#[source] plan::SqlError), - #[error("failed to create query context")] - CreateQueryContext(#[source] query::CreateContextError), + #[error("failed to create exec context")] + CreateExecContext(#[source] exec::CreateContextError), #[error("failed to attach plan to query context")] AttachPlan(#[source] AttachPlanError), #[error("failed to compute common ranges")] - QueryCommonRanges(#[source] query::CommonRangesError), + QueryCommonRanges(#[source] exec::CommonRangesError), #[error("failed to execute plan")] - QueryExecutePlan(#[source] query::ExecutePlanError), + QueryExecutePlan(#[source] exec::ExecutePlanError), #[error("invalid query: {0}")] InvalidQuery(String), @@ -1016,25 +1013,25 @@ impl Error { Error::TableReferenceResolution(_) => "TABLE_REFERENCE_RESOLUTION", Error::FunctionReferenceResolution(_) => "FUNCTION_REFERENCE_RESOLUTION", Error::SqlParse(_) => "SQL_PARSE_ERROR", - Error::PlanningPlanSql(e) if e.is_invalid_plan() => "INVALID_PLAN", - Error::PlanningPlanSql(_) => "PLANNING_ERROR", - Error::CreateQueryContext(query::CreateContextError::CatalogSnapshot(_)) => { + Error::PlanSql(e) if e.is_invalid_plan() => "INVALID_PLAN", + Error::PlanSql(_) => "PLANNING_ERROR", + Error::CreateExecContext(exec::CreateContextError::CatalogSnapshot(_)) => { "CATALOG_SNAPSHOT_ERROR" } Error::AttachPlan(_) => "PLANNING_ERROR", - Error::QueryCommonRanges(query::CommonRangesError::TableNotFound(_)) => { + Error::QueryCommonRanges(exec::CommonRangesError::TableNotFound(_)) => { "TABLE_NOT_FOUND_ERROR" } - Error::QueryCommonRanges(query::CommonRangesError::ExtractTableReferences(_)) => { + Error::QueryCommonRanges(exec::CommonRangesError::ExtractTableReferences(_)) => { "EXTRACT_TABLE_REFERENCES_ERROR" } - Error::QueryCommonRanges(query::CommonRangesError::TableReferenceConversion(_)) => { + Error::QueryCommonRanges(exec::CommonRangesError::TableReferenceConversion(_)) => { "TABLE_REFERENCE_CONVERSION_ERROR" } - Error::QueryExecutePlan(query::ExecutePlanError::RegisterTable(_)) => { + Error::QueryExecutePlan(exec::ExecutePlanError::RegisterTable(_)) => { "REGISTER_TABLE_ERROR" } - Error::QueryExecutePlan(query::ExecutePlanError::Execute(_)) => "CORE_EXECUTION_ERROR", + Error::QueryExecutePlan(exec::ExecutePlanError::Execute(_)) => "CORE_EXECUTION_ERROR", Error::InvalidQuery(_) => "INVALID_QUERY", Error::StreamingExecutionError(_) => "STREAMING_EXECUTION_ERROR", Error::TicketEncodingError(_) => "TICKET_ENCODING_ERROR", @@ -1060,11 +1057,11 @@ impl IntoResponse for Error { fn into_response(self) -> axum::response::Response { let status_code = match &self { Error::SqlParse(_) => StatusCode::BAD_REQUEST, - Error::PlanningPlanSql(e) if e.is_invalid_plan() => StatusCode::BAD_REQUEST, - Error::PlanningPlanSql(_) => StatusCode::INTERNAL_SERVER_ERROR, - Error::CreateQueryContext(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::PlanSql(e) if e.is_invalid_plan() => StatusCode::BAD_REQUEST, + Error::PlanSql(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::CreateExecContext(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::AttachPlan(_) => StatusCode::INTERNAL_SERVER_ERROR, - Error::QueryCommonRanges(query::CommonRangesError::TableNotFound(_)) => { + Error::QueryCommonRanges(exec::CommonRangesError::TableNotFound(_)) => { StatusCode::NOT_FOUND } Error::QueryCommonRanges(_) => StatusCode::INTERNAL_SERVER_ERROR, @@ -1104,11 +1101,11 @@ impl From for Status { Error::UnsupportedFlightDescriptorCommand(_) => Status::invalid_argument(message), Error::DatasetStoreError(_) => Status::internal(message), Error::SqlParse(_) => Status::invalid_argument(message), - Error::PlanningPlanSql(e) if e.is_invalid_plan() => Status::invalid_argument(message), - Error::PlanningPlanSql(_) => Status::internal(message), - Error::CreateQueryContext(_) => Status::internal(message), + Error::PlanSql(e) if e.is_invalid_plan() => Status::invalid_argument(message), + Error::PlanSql(_) => Status::internal(message), + Error::CreateExecContext(_) => Status::internal(message), Error::AttachPlan(_) => Status::internal(message), - Error::QueryCommonRanges(query::CommonRangesError::TableNotFound(_)) => { + Error::QueryCommonRanges(exec::CommonRangesError::TableNotFound(_)) => { Status::not_found(message) } Error::QueryCommonRanges(_) => Status::internal(message), diff --git a/crates/services/server/src/service.rs b/crates/services/server/src/service.rs index b1461ef40..34f4257c5 100644 --- a/crates/services/server/src/service.rs +++ b/crates/services/server/src/service.rs @@ -188,7 +188,7 @@ pub enum InitError { /// - DataFusion session configuration is invalid /// - Query execution pool initialization fails #[error("failed to create query environment: {0}")] - QueryEnv(#[source] DataFusionError), + ExecEnv(#[source] DataFusionError), /// Failed to initialize Flight server /// diff --git a/tests/src/tests/it_reorg.rs b/tests/src/tests/it_reorg.rs index 3627364a8..858f869af 100644 --- a/tests/src/tests/it_reorg.rs +++ b/tests/src/tests/it_reorg.rs @@ -474,7 +474,7 @@ impl ReorgTestCtx { let test_env = &self.ctx; let sql_query = SqlStr::new_unchecked(format!("select * from {}.blocks", dataset)); let sql = sql::parse(&sql_query).expect("Failed to parse SQL for dataset.blocks"); - let env = common::query_env::create( + let env = common::exec_env::create( test_env.daemon_server().config().max_mem_mb, test_env.daemon_server().config().query_max_mem_mb, &test_env.daemon_server().config().spill_location,