From cc71e610a127bd32c53e5598aaad0b700a490971 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 6 Mar 2026 15:51:36 +0800 Subject: [PATCH 1/5] Add SQLLogicTest coverage for UPDATE ... FROM aliases Enhance testing for UPDATE ... FROM alias and shape variants in update.slt. Introduce targeted planner/unit tests for qualifier and joined-assignment patterns in dml_planning.rs, currently asserting for the existing guard error. --- .../custom_sources_cases/dml_planning.rs | 79 +++++++++++++++++++ datafusion/sqllogictest/test_files/update.slt | 18 +++++ 2 files changed, 97 insertions(+) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 8c4bae5e98b36..cb7b6de891fc4 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -761,6 +761,85 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_update_from_alias_variants_are_rejected() -> Result<()> { + // UPDATE ... FROM is currently not working + // TODO fix https://github.com/apache/datafusion/issues/19950 + let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; + + let source_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("status", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); + ctx.register_table("t2", Arc::new(source_table))?; + + let alias_queries = [ + "UPDATE t1 AS dst \ + SET status = src.status, value = src.value + dst.value \ + FROM t2 AS src \ + WHERE dst.id = src.id AND src.status = 'active'", + "UPDATE t1 \ + FROM t2 AS src \ + SET status = src.status, value = t1.value + src.value \ + WHERE t1.id = src.id", + ]; + + for sql in alias_queries { + let result = ctx.sql(sql).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("UPDATE ... FROM is not supported"), + "Expected 'UPDATE ... FROM is not supported' error for `{sql}`, got: {err}" + ); + } + + Ok(()) +} + +#[tokio::test] +async fn test_update_from_joined_assignments_are_rejected() -> Result<()> { + // This captures joined assignment patterns that currently fail if UPDATE ... FROM + // planning is enabled without full qualifier-safe assignment handling. + // TODO fix https://github.com/apache/datafusion/issues/19950 + let target_provider = Arc::new(CaptureUpdateProvider::new(test_schema())); + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; + + let source_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("status", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); + ctx.register_table("t2", Arc::new(source_table))?; + + let result = ctx + .sql( + "UPDATE t1 AS dst \ + SET status = src.status, value = src.value + dst.value \ + FROM t2 AS src \ + WHERE dst.id = src.id AND src.value > 10 AND dst.value > 0", + ) + .await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("UPDATE ... FROM is not supported"), + "Expected 'UPDATE ... FROM is not supported' error, got: {err}" + ); + + Ok(()) +} + #[tokio::test] async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { // Test that filter qualifiers are properly stripped and validated diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index 1cd2b626e3b8e..fa94975b6d78b 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -72,6 +72,18 @@ physical_plan_error This feature is not implemented: Physical plan does not supp query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; +# update from (FROM before SET syntax) +# UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 +query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +explain update t1 from t2 set b = t2.b, c = t1.a + t2.a where t1.a = t2.a; + +# update from with explicit aliases and joined assignment expressions +# UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 +query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +explain update t1 as dst set b = src.b, c = src.a + dst.a, d = dst.d + src.d from t2 as src where dst.a = src.a and src.c > dst.c; + # test update from other table with actual data statement ok insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar', 4.0, 30); @@ -112,3 +124,9 @@ insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); # TODO fix https://github.com/apache/datafusion/issues/19950 statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; + +# test source alias permutations with joined assignments +# UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 +statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +update t1 as dst set b = src.b, c = src.a + dst.a, d = src.d from t2 as src where dst.a = src.a and src.c > 1.0; From 4ac5722f0cba5125a9565003d63481bbd768eba0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 6 Mar 2026 16:26:03 +0800 Subject: [PATCH 2/5] Enable UPDATE ... FROM and improve assignment extraction Remove hard guard for UPDATE ... FROM in SQL planner. Enhance assignment extraction to preserve qualifiers for multi-table updates and filter identity assignments with target-table awareness. Update API docs to clarify multi-table update behavior and add integration and unit tests to validate new functionality and expectations. --- datafusion/catalog/src/table.rs | 3 + datafusion/core/src/physical_planner.rs | 221 ++++++++++++++++-- .../custom_sources_cases/dml_planning.rs | 115 ++++----- datafusion/sql/src/statement.rs | 6 - datafusion/sqllogictest/test_files/update.slt | 79 ++++++- 5 files changed, 343 insertions(+), 81 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index f31d4d52ce88b..98ea359b33486 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -345,6 +345,9 @@ pub trait TableProvider: Debug + Sync + Send { /// /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64). /// Empty `filters` updates all rows. + /// + /// Assignment expressions may include qualified column references for + /// multi-table UPDATE statements (for example, `UPDATE t1 SET c = t2.c FROM t2`). async fn update( &self, _state: &dyn Session, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b4fb44f670e8d..b5363d3b8779b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -785,7 +785,7 @@ impl DefaultPhysicalPlanner { // We pass the filters and let the provider handle the projection let filters = extract_dml_filters(input, table_name)?; // Extract assignments from the projection in input plan - let assignments = extract_update_assignments(input)?; + let assignments = extract_update_assignments(input, table_name)?; provider .table_provider .update(session_state, assignments, filters) @@ -2233,9 +2233,16 @@ fn strip_column_qualifiers(expr: Expr) -> Result { /// Extract column assignments from an UPDATE input plan. /// For UPDATE statements, the SQL planner encodes assignments as a projection /// over the source table. This function extracts column name and expression pairs -/// from the projection. Column qualifiers are stripped from the expressions. +/// from the projection. /// -fn extract_update_assignments(input: &Arc) -> Result> { +/// For single-table UPDATE, qualifiers are stripped for compatibility with +/// table providers that evaluate assignment expressions against unqualified +/// target schemas. For multi-table UPDATE ... FROM, qualifiers are preserved. +/// +fn extract_update_assignments( + input: &Arc, + target_table: &TableReference, +) -> Result> { // The UPDATE input plan structure is: // Projection(updated columns as expressions with aliases) // Filter(optional WHERE clause) @@ -2244,6 +2251,9 @@ fn extract_update_assignments(input: &Arc) -> Result) -> Result) -> Result) -> Result bool { +fn is_identity_assignment( + expr: &Expr, + column_name: &str, + target_refs: &[TableReference], +) -> bool { match expr { - Expr::Column(col) => col.name == column_name, + Expr::Column(col) => { + col.name == column_name + && col.relation.as_ref().is_none_or(|relation| { + target_refs + .iter() + .any(|target| relation.resolved_eq(target)) + }) + } _ => false, } } +fn plan_contains_join(input: &Arc) -> Result { + let mut has_join = false; + input.apply(|node| { + if matches!(node, LogicalPlan::Join(_)) { + has_join = true; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(has_join) +} + +fn collect_update_target_references( + input: &Arc, + target_table: &TableReference, +) -> Result> { + let mut refs = vec![target_table.clone()]; + input.apply(|node| { + if let LogicalPlan::SubqueryAlias(alias) = node + && plan_contains_table_scan(&alias.input, target_table)? + && !refs.iter().any(|r| r.resolved_eq(&alias.alias)) + { + refs.push(alias.alias.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(refs) +} + +fn plan_contains_table_scan( + input: &Arc, + target_table: &TableReference, +) -> Result { + let mut found = false; + input.apply(|node| { + if let LogicalPlan::TableScan(TableScan { table_name, .. }) = node + && table_name.resolved_eq(target_table) + { + found = true; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(found) +} + /// Check if window bounds are valid after schema information is available, and /// window_frame bounds are casted to the corresponding column type. /// queries like: @@ -3115,7 +3194,8 @@ mod tests { use crate::execution::session_state::SessionStateBuilder; use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; - use arrow::datatypes::{DataType, Field, Int32Type}; + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; + use arrow::record_batch::RecordBatch; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{ @@ -3125,13 +3205,53 @@ mod tests { use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::builder::subquery_alias; use datafusion_expr::{ - LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, col, lit, + DmlStatement, LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, + WriteOp, col, lit, }; use datafusion_functions_aggregate::count::count_all; use datafusion_functions_aggregate::expr_fn::sum; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; + async fn make_update_from_plan( + sql: &str, + ) -> Result<(Arc, TableReference)> { + let ctx = SessionContext::new(); + let t1_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); + let t2_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); + let t1 = MemTable::try_new( + Arc::clone(&t1_schema), + vec![vec![RecordBatch::new_empty(Arc::clone(&t1_schema))]], + )?; + let t2 = MemTable::try_new( + Arc::clone(&t2_schema), + vec![vec![RecordBatch::new_empty(Arc::clone(&t2_schema))]], + )?; + ctx.register_table("t1", Arc::new(t1))?; + ctx.register_table("t2", Arc::new(t2))?; + + let plan = ctx.sql(sql).await?.into_unoptimized_plan(); + match plan { + LogicalPlan::Dml(DmlStatement { + table_name, + op: WriteOp::Update, + input, + .. + }) => Ok((input, table_name)), + other => internal_err!("Expected UPDATE DML plan, got: {other}"), + } + } + fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); let config = SessionConfig::new().with_target_partitions(4); @@ -3418,6 +3538,77 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_extract_update_assignments_preserves_source_qualifiers_for_update_from() + -> Result<()> { + let (input, table_name) = make_update_from_plan( + "UPDATE t1 AS dst \ + SET b = src.b, d = src.d \ + FROM t2 AS src \ + WHERE dst.a = src.a", + ) + .await?; + + let assignments = extract_update_assignments(&input, &table_name)?; + let b_expr = assignments + .iter() + .find(|(name, _)| name == "b") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column b") + })?; + let d_expr = assignments + .iter() + .find(|(name, _)| name == "d") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column d") + })?; + + assert!( + b_expr.contains("src.b"), + "Unexpected b assignment: {b_expr}" + ); + assert!( + d_expr.contains("src.d"), + "Unexpected d assignment: {d_expr}" + ); + assert!( + assignments.iter().all(|(name, _)| name != "a"), + "Identity target columns should not be extracted as assignments" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_extract_update_assignments_strips_target_qualifiers_single_table() + -> Result<()> { + let (input, table_name) = + make_update_from_plan("UPDATE t1 AS dst SET d = dst.d + 1 WHERE dst.a > 0") + .await?; + + let assignments = extract_update_assignments(&input, &table_name)?; + let d_expr = assignments + .iter() + .find(|(name, _)| name == "d") + .map(|(_, expr)| expr.to_string()) + .ok_or_else(|| { + internal_datafusion_err!("Expected assignment for target column d") + })?; + + assert!( + !d_expr.contains("dst."), + "Single-table assignment should not keep target qualifiers: {d_expr}" + ); + assert!( + d_expr.contains("d"), + "Unexpected assignment expression: {d_expr}" + ); + + Ok(()) + } + #[tokio::test] async fn test_create_not() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index cb7b6de891fc4..c95185d2c4153 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -725,117 +725,126 @@ async fn test_delete_target_table_scoping() -> Result<()> { #[tokio::test] async fn test_update_from_drops_non_target_predicates() -> Result<()> { - // UPDATE ... FROM is currently not working - // TODO fix https://github.com/apache/datafusion/issues/19950 + let target_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( - test_schema(), + Arc::clone(&target_schema), TableProviderFilterPushDown::Exact, )); let ctx = SessionContext::new(); ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; let source_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("status", DataType::Utf8, true), - // t2-only column to avoid false negatives after qualifier stripping + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), Field::new("src_only", DataType::Utf8, true), ])); let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); ctx.register_table("t2", Arc::new(source_table))?; - let result = ctx + let df = ctx .sql( - "UPDATE t1 SET value = 1 FROM t2 \ - WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", + "UPDATE t1 SET d = 1 FROM t2 \ + WHERE t1.a = t2.a AND t2.src_only = 'active' AND t1.d > 10", ) - .await; + .await?; + + df.collect().await?; - // Verify UPDATE ... FROM is rejected with appropriate error - // TODO fix https://github.com/apache/datafusion/issues/19950 - assert!(result.is_err()); - let err = result.unwrap_err(); + let filters = target_provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!( + filters.len(), + 1, + "only target-table predicates should be retained for provider update" + ); assert!( - err.to_string().contains("UPDATE ... FROM is not supported"), - "Expected 'UPDATE ... FROM is not supported' error, got: {err}" + filters[0].to_string().contains("d"), + "Expected target predicate on d, got: {}", + filters[0] ); Ok(()) } #[tokio::test] -async fn test_update_from_alias_variants_are_rejected() -> Result<()> { - // UPDATE ... FROM is currently not working - // TODO fix https://github.com/apache/datafusion/issues/19950 +async fn test_update_from_alias_variants_are_accepted() -> Result<()> { + let target_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( - test_schema(), + Arc::clone(&target_schema), TableProviderFilterPushDown::Exact, )); let ctx = SessionContext::new(); ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; let source_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("status", DataType::Utf8, true), - Field::new("value", DataType::Int32, true), + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), ])); let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); ctx.register_table("t2", Arc::new(source_table))?; let alias_queries = [ "UPDATE t1 AS dst \ - SET status = src.status, value = src.value + dst.value \ + SET b = src.b, d = src.d \ FROM t2 AS src \ - WHERE dst.id = src.id AND src.status = 'active'", + WHERE dst.a = src.a AND src.b = 'active'", "UPDATE t1 \ FROM t2 AS src \ - SET status = src.status, value = t1.value + src.value \ - WHERE t1.id = src.id", + SET b = src.b, d = src.d \ + WHERE t1.a = src.a", ]; for sql in alias_queries { - let result = ctx.sql(sql).await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.to_string().contains("UPDATE ... FROM is not supported"), - "Expected 'UPDATE ... FROM is not supported' error for `{sql}`, got: {err}" - ); + let _ = ctx.sql(sql).await?; } Ok(()) } #[tokio::test] -async fn test_update_from_joined_assignments_are_rejected() -> Result<()> { - // This captures joined assignment patterns that currently fail if UPDATE ... FROM - // planning is enabled without full qualifier-safe assignment handling. - // TODO fix https://github.com/apache/datafusion/issues/19950 - let target_provider = Arc::new(CaptureUpdateProvider::new(test_schema())); +async fn test_update_from_joined_assignments_plan_success() -> Result<()> { + let target_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), + ])); + let target_provider = + Arc::new(CaptureUpdateProvider::new(Arc::clone(&target_schema))); let ctx = SessionContext::new(); ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; let source_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("status", DataType::Utf8, true), - Field::new("value", DataType::Int32, true), + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + Field::new("d", DataType::Int32, true), ])); let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); ctx.register_table("t2", Arc::new(source_table))?; - let result = ctx + let _ = ctx .sql( "UPDATE t1 AS dst \ - SET status = src.status, value = src.value + dst.value \ + SET b = src.b, d = src.d \ FROM t2 AS src \ - WHERE dst.id = src.id AND src.value > 10 AND dst.value > 0", + WHERE dst.a = src.a AND src.b = 'active'", ) - .await; - - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.to_string().contains("UPDATE ... FROM is not supported"), - "Expected 'UPDATE ... FROM is not supported' error, got: {err}" - ); + .await?; Ok(()) } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index b91e38e53776a..d76e5b44c36ae 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1084,12 +1084,6 @@ impl SqlToRel<'_, S> { } let update_from = from_clauses.and_then(|mut f| f.pop()); - // UPDATE ... FROM is currently not working - // TODO fix https://github.com/apache/datafusion/issues/19950 - if update_from.is_some() { - return not_impl_err!("UPDATE ... FROM is not supported"); - } - if returning.is_some() { plan_err!("Update-returning clause not yet supported")?; } diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index fa94975b6d78b..5e69d3fe79400 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -69,20 +69,58 @@ physical_plan_error This feature is not implemented: Physical plan does not supp # set from other table # UPDATE ... FROM is currently unsupported # TODO fix https://github.com/apache/datafusion/issues/19950 -query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +query TT explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; +---- +logical_plan +01)Dml: op=[Update] table=[t1] +02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d +03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) +04)------Cross Join: +05)--------TableScan: t1 +06)--------TableScan: t2 +physical_plan_error +01)UPDATE operation on table 't1' +02)caused by +03)Schema error: No field named t2.b. Did you mean 'b'?. # update from (FROM before SET syntax) # UPDATE ... FROM is currently unsupported # TODO fix https://github.com/apache/datafusion/issues/19950 -query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +query TT explain update t1 from t2 set b = t2.b, c = t1.a + t2.a where t1.a = t2.a; +---- +logical_plan +01)Dml: op=[Update] table=[t1] +02)--Projection: t1.a AS a, t2.b AS b, CAST(t1.a + t2.a AS Float64) AS c, t1.d AS d +03)----Filter: t1.a = t2.a +04)------Cross Join: +05)--------TableScan: t1 +06)--------TableScan: t2 +physical_plan_error +01)UPDATE operation on table 't1' +02)caused by +03)Schema error: No field named t2.b. Did you mean 'b'?. # update from with explicit aliases and joined assignment expressions # UPDATE ... FROM is currently unsupported # TODO fix https://github.com/apache/datafusion/issues/19950 -query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +query TT explain update t1 as dst set b = src.b, c = src.a + dst.a, d = dst.d + src.d from t2 as src where dst.a = src.a and src.c > dst.c; +---- +logical_plan +01)Dml: op=[Update] table=[t1] +02)--Projection: dst.a AS a, src.b AS b, CAST(src.a + dst.a AS Float64) AS c, dst.d + src.d AS d +03)----Filter: dst.a = src.a AND src.c > dst.c +04)------Cross Join: +05)--------SubqueryAlias: dst +06)----------TableScan: t1 +07)--------SubqueryAlias: src +08)----------TableScan: t2 +physical_plan_error +01)UPDATE operation on table 't1' +02)caused by +03)Schema error: No field named src.b. Did you mean 'b'?. # test update from other table with actual data statement ok @@ -94,8 +132,13 @@ insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, # UPDATE ... FROM is currently unsupported - qualifier stripping breaks source column references # causing assignments like 'b = t2.b' to resolve to target table's 'b' instead of source table's 'b' # TODO fix https://github.com/apache/datafusion/issues/19950 -statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +statement error update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; +---- +DataFusion error: UPDATE operation on table 't1' +caused by +Schema error: No field named t2.b. Did you mean 'b'?. + # set from multiple tables, DataFusion only supports from one table statement error DataFusion error: This feature is not implemented: Multiple tables in UPDATE SET FROM not yet supported @@ -104,8 +147,21 @@ explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a an # test table alias # UPDATE ... FROM is currently unsupported # TODO fix https://github.com/apache/datafusion/issues/19950 -statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +query TT explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; +---- +logical_plan +01)Dml: op=[Update] table=[t1] +02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d +03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) +04)------Cross Join: +05)--------SubqueryAlias: t +06)----------TableScan: t1 +07)--------TableScan: t2 +physical_plan_error +01)UPDATE operation on table 't1' +02)caused by +03)Schema error: No field named t2.b. Did you mean 'b'?. # test update with table alias with actual data statement ok @@ -122,11 +178,20 @@ insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); # UPDATE ... FROM is currently unsupported # TODO fix https://github.com/apache/datafusion/issues/19950 -statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +statement error update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; +---- +DataFusion error: UPDATE operation on table 't1' +caused by +Schema error: No field named t2.b. Did you mean 'b'?. + # test source alias permutations with joined assignments # UPDATE ... FROM is currently unsupported # TODO fix https://github.com/apache/datafusion/issues/19950 -statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +statement error update t1 as dst set b = src.b, c = src.a + dst.a, d = src.d from t2 as src where dst.a = src.a and src.c > 1.0; +---- +DataFusion error: UPDATE operation on table 't1' +caused by +Schema error: No field named src.b. Did you mean 'b'?. From 812dc38adf6a2646ee857d1708785ab6340e110a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 6 Mar 2026 17:15:54 +0800 Subject: [PATCH 3/5] Implement support for UPDATE ... FROM statements in TableProvider and related components --- datafusion/catalog/src/memory/table.rs | 156 +++++++++++++++++- datafusion/catalog/src/table.rs | 18 ++ datafusion/core/src/physical_planner.rs | 37 +++-- .../custom_sources_cases/dml_planning.rs | 15 ++ datafusion/sqllogictest/test_files/update.slt | 49 ++---- 5 files changed, 229 insertions(+), 46 deletions(-) diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 9b91062657a07..f4f20e9a8641a 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -25,10 +25,11 @@ use std::sync::Arc; use crate::TableProvider; use arrow::array::{ - Array, ArrayRef, BooleanArray, RecordBatch as ArrowRecordBatch, UInt64Array, + Array, ArrayRef, BooleanArray, MutableArrayData, RecordBatch as ArrowRecordBatch, + UInt64Array, make_array, }; use arrow::compute::kernels::zip::zip; -use arrow::compute::{and, filter_record_batch}; +use arrow::compute::{and, concat_batches, filter_record_batch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::error::Result; @@ -47,7 +48,7 @@ use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, - PhysicalExpr, PlanProperties, common, + PhysicalExpr, PlanProperties, collect, common, }; use datafusion_session::Session; @@ -500,6 +501,155 @@ impl TableProvider for MemTable { Ok(Arc::new(DmlResultExec::new(total_updated))) } + + async fn update_from( + &self, + state: &dyn Session, + input: Arc, + filters: Vec, + ) -> Result> { + if self.batches.is_empty() { + return Ok(Arc::new(DmlResultExec::new(0))); + } + + self.schema() + .logically_equivalent_names_and_types(&input.schema())?; + + let replacement_batches = collect(input, state.task_ctx()).await?; + let replacement_row_count = replacement_batches + .iter() + .map(ArrowRecordBatch::num_rows) + .sum::(); + + *self.sort_order.lock() = vec![]; + + let replacement_batch = if replacement_row_count == 0 { + ArrowRecordBatch::new_empty(Arc::clone(&self.schema)) + } else { + concat_batches(&self.schema, &replacement_batches)? + }; + + let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?; + let mut expected_updates = 0usize; + + for partition_data in &self.batches { + let partition = partition_data.read().await; + for batch in partition.iter() { + let filter_mask = evaluate_filters_to_mask( + &filters, + batch, + &df_schema, + state.execution_props(), + )?; + let update_count = match filter_mask { + Some(mask) => mask.iter().filter(|v| v == &Some(true)).count(), + None => batch.num_rows(), + }; + expected_updates += update_count; + } + } + + if expected_updates != replacement_row_count { + return plan_err!( + "UPDATE ... FROM produced {replacement_row_count} replacement rows but target filters matched {expected_updates} rows" + ); + } + + let mut replace_cursor = 0usize; + + for partition_data in &self.batches { + let mut partition = partition_data.write().await; + let mut new_batches = Vec::with_capacity(partition.len()); + + for batch in partition.iter() { + if batch.num_rows() == 0 { + continue; + } + + let filter_mask = evaluate_filters_to_mask( + &filters, + batch, + &df_schema, + state.execution_props(), + )?; + + let (update_count, update_mask) = match filter_mask { + Some(mask) => { + let count = mask.iter().filter(|v| v == &Some(true)).count(); + let normalized: BooleanArray = + mask.iter().map(|v| Some(v == Some(true))).collect(); + (count, normalized) + } + None => ( + batch.num_rows(), + BooleanArray::from(vec![true; batch.num_rows()]), + ), + }; + + if update_count == 0 { + new_batches.push(batch.clone()); + continue; + } + + let mut new_columns = Vec::with_capacity(batch.num_columns()); + for column_idx in 0..batch.num_columns() { + let original_column = batch.column(column_idx); + let replacement_segment = replacement_batch + .column(column_idx) + .slice(replace_cursor, update_count); + let merged = merge_with_update_mask( + original_column, + replacement_segment.as_ref(), + &update_mask, + )?; + new_columns.push(merged); + } + + replace_cursor += update_count; + let updated_batch = + ArrowRecordBatch::try_new(Arc::clone(&self.schema), new_columns)?; + new_batches.push(updated_batch); + } + + *partition = new_batches; + } + + Ok(Arc::new(DmlResultExec::new(expected_updates as u64))) + } +} + +fn merge_with_update_mask( + original: &ArrayRef, + replacement_rows: &dyn Array, + update_mask: &BooleanArray, +) -> Result { + let original_data = original.to_data(); + let replacement_data = replacement_rows.to_data(); + let mut mutable = MutableArrayData::new( + vec![&original_data, &replacement_data], + false, + update_mask.len(), + ); + let mut replacement_idx = 0usize; + + for row_idx in 0..update_mask.len() { + if update_mask.value(row_idx) { + mutable.extend(1, replacement_idx, replacement_idx + 1); + replacement_idx += 1; + } else { + mutable.extend(0, row_idx, row_idx + 1); + } + } + + if replacement_idx != replacement_rows.len() { + return plan_err!( + "Invalid UPDATE ... FROM replacement rows: expected {}, consumed {}", + replacement_rows.len(), + replacement_idx + ); + } + + Ok(make_array(mutable.freeze())) } /// Evaluate filter expressions against a batch and return a combined boolean mask. diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 98ea359b33486..6eb219014ef3f 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -357,6 +357,24 @@ pub trait TableProvider: Debug + Sync + Send { not_impl_err!("UPDATE not supported for {} table", self.table_type()) } + /// Update rows using precomputed row values from a physical input plan. + /// + /// This is used for multi-table `UPDATE ... FROM` statements where + /// assignment expressions may reference external tables. + /// + /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64). + async fn update_from( + &self, + _state: &dyn Session, + _input: Arc, + _filters: Vec, + ) -> Result> { + not_impl_err!( + "UPDATE ... FROM not supported for {} table", + self.table_type() + ) + } + /// Remove all rows from the table. /// /// Should return an [ExecutionPlan] producing a single row with count (UInt64), diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b5363d3b8779b..7803bc9b2c119 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -781,18 +781,33 @@ impl DefaultPhysicalPlanner { if let Some(provider) = target.as_any().downcast_ref::() { - // For UPDATE, the assignments are encoded in the projection of input - // We pass the filters and let the provider handle the projection let filters = extract_dml_filters(input, table_name)?; - // Extract assignments from the projection in input plan - let assignments = extract_update_assignments(input, table_name)?; - provider - .table_provider - .update(session_state, assignments, filters) - .await - .map_err(|e| { - e.context(format!("UPDATE operation on table '{table_name}'")) - })? + if plan_contains_join(input)? { + let input_exec = children.one()?; + provider + .table_provider + .update_from(session_state, input_exec, filters) + .await + .map_err(|e| { + e.context(format!( + "UPDATE operation on table '{table_name}'" + )) + })? + } else { + // For single-table UPDATE, assignments are encoded in the + // projection of input and can be evaluated using only target + // columns. + let assignments = extract_update_assignments(input, table_name)?; + provider + .table_provider + .update(session_state, assignments, filters) + .await + .map_err(|e| { + e.context(format!( + "UPDATE operation on table '{table_name}'" + )) + })? + } } else { return exec_err!( "Table source can't be downcasted to DefaultTableSource" diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index c95185d2c4153..6b9f6838830eb 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -238,6 +238,21 @@ impl TableProvider for CaptureUpdateProvider { Ok(vec![self.filter_pushdown.clone(); filters.len()]) } + + async fn update_from( + &self, + _state: &dyn Session, + _input: Arc, + filters: Vec, + ) -> Result> { + *self.received_filters.lock().unwrap() = Some(filters); + // Multi-table update_from uses projected input rows and does not pass + // assignment expressions directly. + *self.received_assignments.lock().unwrap() = Some(vec![]); + Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![ + Field::new("count", DataType::UInt64, false), + ]))))) + } } /// A TableProvider that captures whether truncate() was called. diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index 5e69d3fe79400..6cef06a807b49 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -79,10 +79,9 @@ logical_plan 04)------Cross Join: 05)--------TableScan: t1 06)--------TableScan: t2 -physical_plan_error -01)UPDATE operation on table 't1' -02)caused by -03)Schema error: No field named t2.b. Did you mean 'b'?. +physical_plan +01)CooperativeExec +02)--DmlResultExec: rows_affected=0 # update from (FROM before SET syntax) # UPDATE ... FROM is currently unsupported @@ -97,10 +96,9 @@ logical_plan 04)------Cross Join: 05)--------TableScan: t1 06)--------TableScan: t2 -physical_plan_error -01)UPDATE operation on table 't1' -02)caused by -03)Schema error: No field named t2.b. Did you mean 'b'?. +physical_plan +01)CooperativeExec +02)--DmlResultExec: rows_affected=0 # update from with explicit aliases and joined assignment expressions # UPDATE ... FROM is currently unsupported @@ -117,10 +115,9 @@ logical_plan 06)----------TableScan: t1 07)--------SubqueryAlias: src 08)----------TableScan: t2 -physical_plan_error -01)UPDATE operation on table 't1' -02)caused by -03)Schema error: No field named src.b. Did you mean 'b'?. +physical_plan +01)CooperativeExec +02)--DmlResultExec: rows_affected=0 # test update from other table with actual data statement ok @@ -129,15 +126,9 @@ insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar', 4.0, statement ok insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, 'updated_b3', 1.5, 60); -# UPDATE ... FROM is currently unsupported - qualifier stripping breaks source column references -# causing assignments like 'b = t2.b' to resolve to target table's 'b' instead of source table's 'b' -# TODO fix https://github.com/apache/datafusion/issues/19950 -statement error +# UPDATE ... FROM assignment binding now resolves source-table references +statement ok update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; ----- -DataFusion error: UPDATE operation on table 't1' -caused by -Schema error: No field named t2.b. Did you mean 'b'?. # set from multiple tables, DataFusion only supports from one table @@ -158,10 +149,9 @@ logical_plan 05)--------SubqueryAlias: t 06)----------TableScan: t1 07)--------TableScan: t2 -physical_plan_error -01)UPDATE operation on table 't1' -02)caused by -03)Schema error: No field named t2.b. Did you mean 'b'?. +physical_plan +01)CooperativeExec +02)--DmlResultExec: rows_affected=2 # test update with table alias with actual data statement ok @@ -176,14 +166,9 @@ insert into t1 values (1, 'zebra', 1.5, 5), (2, 'wolf', 2.0, 10), (3, 'apple', 3 statement ok insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); -# UPDATE ... FROM is currently unsupported -# TODO fix https://github.com/apache/datafusion/issues/19950 -statement error +# UPDATE ... FROM with target alias resolves and executes +statement ok update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; ----- -DataFusion error: UPDATE operation on table 't1' -caused by -Schema error: No field named t2.b. Did you mean 'b'?. # test source alias permutations with joined assignments @@ -194,4 +179,4 @@ update t1 as dst set b = src.b, c = src.a + dst.a, d = src.d from t2 as src wher ---- DataFusion error: UPDATE operation on table 't1' caused by -Schema error: No field named src.b. Did you mean 'b'?. +Error during planning: UPDATE ... FROM produced 2 replacement rows but target filters matched 3 rows From 6a68d565c5aab9daccb8e58a02fd23be6beecba9 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 6 Mar 2026 17:30:34 +0800 Subject: [PATCH 4/5] Add test cases for UPDATE ... FROM statements with single table assignments --- datafusion/sqllogictest/test_files/update.slt | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index 6cef06a807b49..c846e50404616 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -130,6 +130,13 @@ insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, statement ok update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; +query ITRI +select * from t1 order by a; +---- +1 updated_b 1 1 +2 updated_b2 2 1 +3 bar 4 30 + # set from multiple tables, DataFusion only supports from one table statement error DataFusion error: This feature is not implemented: Multiple tables in UPDATE SET FROM not yet supported @@ -170,6 +177,13 @@ insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); statement ok update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; +query ITRI +select * from t1 order by a; +---- +1 new_val 1 1 +2 new_val2 2 1 +3 apple 3.5 15 + # test source alias permutations with joined assignments # UPDATE ... FROM is currently unsupported From d9c891ea0ed6d24bd51a8fc5b760c6351560d506 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 6 Mar 2026 17:35:44 +0800 Subject: [PATCH 5/5] Optimize input execution for UPDATE ... FROM to ensure join partitioning modes are resolved --- datafusion/core/src/physical_planner.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 7803bc9b2c119..8d2f716aa0458 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -784,6 +784,14 @@ impl DefaultPhysicalPlanner { let filters = extract_dml_filters(input, table_name)?; if plan_contains_join(input)? { let input_exec = children.one()?; + // `update_from` may execute the input plan eagerly as part of + // table mutation. Ensure join partitioning modes (e.g. Auto) + // are fully resolved before handing the plan to providers. + let input_exec = self.optimize_physical_plan( + input_exec, + session_state, + |_, _| {}, + )?; provider .table_provider .update_from(session_state, input_exec, filters)