From bb75019175b5f0c8699e7572cfe40cdefa0d53e5 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Wed, 10 Jun 2026 12:31:19 +0800 Subject: [PATCH] feat: add table update to write builder Signed-off-by: QuakeWang --- .../integrations/datafusion/src/merge_into.rs | 20 ++--- crates/integrations/datafusion/src/update.rs | 25 +++--- crates/paimon/src/lib.rs | 4 +- crates/paimon/src/table/mod.rs | 2 + crates/paimon/src/table/table_update.rs | 53 ++++++++++++ crates/paimon/src/table/write_builder.rs | 80 ++++++++++++++++++- 6 files changed, 160 insertions(+), 24 deletions(-) create mode 100644 crates/paimon/src/table/table_update.rs diff --git a/crates/integrations/datafusion/src/merge_into.rs b/crates/integrations/datafusion/src/merge_into.rs index cd78ddec..9961aea2 100644 --- a/crates/integrations/datafusion/src/merge_into.rs +++ b/crates/integrations/datafusion/src/merge_into.rs @@ -18,7 +18,7 @@ //! MERGE INTO execution for Paimon tables. //! //! Supports two execution paths: -//! - **Data evolution tables**: partial-column writes via [`paimon::table::DataEvolutionWriter`]. +//! - **Data evolution tables**: partial-column writes via [`paimon::table::TableUpdate`]. //! - **Append-only tables** (no PK, no deletion vectors): copy-on-write file rewriting //! via [`paimon::table::CopyOnWriteMergeWriter`]. @@ -37,7 +37,7 @@ use datafusion::sql::sqlparser::ast::{ use futures::TryStreamExt; use paimon::spec::{datums_to_binary_row, extract_datum_from_arrow, CoreOptions}; -use paimon::table::{CopyOnWriteMergeWriter, DataEvolutionWriter, DataSplitBuilder, Table}; +use paimon::table::{CopyOnWriteMergeWriter, DataSplitBuilder, Table}; use crate::error::to_datafusion_error; use crate::sql_context::SQLContext; @@ -119,7 +119,7 @@ where /// Execute a MERGE INTO statement on a Paimon table. /// /// Dispatches to the appropriate execution path based on table type: -/// - Data evolution tables → partial-column writes via `DataEvolutionWriter` +/// - Data evolution tables → partial-column writes via `TableUpdate` /// - Append-only tables (no PK) → copy-on-write file rewriting via `CopyOnWriteMergeWriter` pub(crate) async fn execute_merge_into( ctx: &SQLContext, @@ -632,8 +632,12 @@ async fn execute_merge_into_once( let parsed = extract_merge_clauses(merge)?; // Validate preconditions early and create writer (before executing any SQL) + let wb = table.new_write_builder(); let update_writer = if let Some(ref upd) = parsed.update { - Some(DataEvolutionWriter::new(table, upd.columns.clone()).map_err(to_datafusion_error)?) + Some( + wb.new_update(upd.columns.clone()) + .map_err(to_datafusion_error)?, + ) } else { None }; @@ -729,10 +733,7 @@ async fn execute_merge_into_once( .await?; let insert_count: usize = insert_batches.iter().map(|b| b.num_rows()).sum(); if insert_count > 0 { - let mut table_write = table - .new_write_builder() - .new_write() - .map_err(to_datafusion_error)?; + let mut table_write = wb.new_write().map_err(to_datafusion_error)?; for batch in &insert_batches { table_write .write_arrow_batch(batch) @@ -750,8 +751,7 @@ async fn execute_merge_into_once( // 6. Commit all messages atomically if !all_messages.is_empty() { - let commit = table.new_write_builder().new_commit(); - commit + wb.new_commit() .commit(all_messages) .await .map_err(to_datafusion_error)?; diff --git a/crates/integrations/datafusion/src/update.rs b/crates/integrations/datafusion/src/update.rs index 8803dcc6..a987aaa8 100644 --- a/crates/integrations/datafusion/src/update.rs +++ b/crates/integrations/datafusion/src/update.rs @@ -18,7 +18,7 @@ //! UPDATE execution for Paimon tables. //! //! Supports two execution paths: -//! - **Data evolution tables**: partial-column writes via [`paimon::table::DataEvolutionWriter`]. +//! - **Data evolution tables**: partial-column writes via [`paimon::table::TableUpdate`]. //! - **Append-only tables** (no PK, no deletion vectors): copy-on-write file rewriting //! via [`paimon::table::CopyOnWriteMergeWriter`]. @@ -31,7 +31,7 @@ use datafusion::prelude::{DataFrame, SessionContext}; use datafusion::sql::sqlparser::ast::{AssignmentTarget, TableFactor, Update}; use paimon::spec::CoreOptions; -use paimon::table::{CopyOnWriteMergeWriter, DataEvolutionWriter, Table}; +use paimon::table::{CopyOnWriteMergeWriter, Table}; use crate::error::to_datafusion_error; use crate::merge_into::{ @@ -113,9 +113,11 @@ async fn execute_update_once( exprs.push(assignment.value.to_string()); } - // 2. Create DataEvolutionWriter (validates preconditions) - let mut writer = - DataEvolutionWriter::new(table, columns.clone()).map_err(to_datafusion_error)?; + // 2. Create TableUpdate through the table write builder (validates preconditions) + let wb = table.new_write_builder(); + let mut table_update = wb + .new_update(columns.clone()) + .map_err(to_datafusion_error)?; // 3. Query the target table directly with WHERE filter. let table_ref = update.table.to_string(); @@ -144,16 +146,21 @@ async fn execute_update_once( let update_batches = project_update_columns(&batches, &columns)?; for batch in update_batches { - writer + table_update .add_matched_batch(batch) .map_err(to_datafusion_error)?; } // 5. Commit - let messages = writer.prepare_commit().await.map_err(to_datafusion_error)?; + let messages = table_update + .prepare_commit() + .await + .map_err(to_datafusion_error)?; if !messages.is_empty() { - let commit = table.new_write_builder().new_commit(); - commit.commit(messages).await.map_err(to_datafusion_error)?; + wb.new_commit() + .commit(messages) + .await + .map_err(to_datafusion_error)?; } ok_result(ctx.ctx(), total_count) diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 612477a1..15931227 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -45,6 +45,6 @@ pub use catalog::FileSystemCatalog; pub use table::{ CommitMessage, DataEvolutionWriter, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RESTEnv, RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, - SnapshotCommit, SnapshotManager, Table, TableCommit, TableRead, TableScan, TableWrite, - TagManager, WriteBuilder, + SnapshotCommit, SnapshotManager, Table, TableCommit, TableRead, TableScan, TableUpdate, + TableWrite, TagManager, WriteBuilder, }; diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c425a9eb..6c4901af 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -53,6 +53,7 @@ mod stats_filter; pub(crate) mod table_commit; mod table_read; mod table_scan; +mod table_update; pub(crate) mod table_write; mod tag_manager; mod vector_search_builder; @@ -78,6 +79,7 @@ pub use source::{ pub use table_commit::TableCommit; pub use table_read::TableRead; pub use table_scan::TableScan; +pub use table_update::TableUpdate; pub use table_write::TableWrite; pub use tag_manager::TagManager; pub use vector_search_builder::VectorSearchBuilder; diff --git a/crates/paimon/src/table/table_update.rs b/crates/paimon/src/table/table_update.rs new file mode 100644 index 00000000..78449e62 --- /dev/null +++ b/crates/paimon/src/table/table_update.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Table update API for row-ID-based data evolution updates. + +use arrow_array::RecordBatch; + +use crate::table::{CommitMessage, DataEvolutionWriter, Table}; + +/// Table-level update API for data-evolution row-id updates. +/// +/// `TableUpdate` wraps the lower-level [`DataEvolutionWriter`] so callers can +/// create update writers through [`WriteBuilder`](super::WriteBuilder). +#[must_use = "update must be used to call prepare_commit()"] +pub struct TableUpdate { + writer: DataEvolutionWriter, +} + +impl TableUpdate { + pub(crate) fn new(table: &Table, update_columns: Vec) -> crate::Result { + Ok(Self { + writer: DataEvolutionWriter::new(table, update_columns)?, + }) + } + + /// Add a batch of matched rows to update. + /// + /// The batch must contain a non-null `_ROW_ID` column plus the update + /// columns passed to [`WriteBuilder::new_update`](super::WriteBuilder::new_update). + pub fn add_matched_batch(&mut self, batch: RecordBatch) -> crate::Result<()> { + self.writer.add_matched_batch(batch) + } + + /// Prepare commit messages for the caller to commit via [`TableCommit`](super::TableCommit). + #[must_use = "commit messages must be passed to TableCommit"] + pub async fn prepare_commit(self) -> crate::Result> { + self.writer.prepare_commit().await + } +} diff --git a/crates/paimon/src/table/write_builder.rs b/crates/paimon/src/table/write_builder.rs index d24ba28c..e3a4dbce 100644 --- a/crates/paimon/src/table/write_builder.rs +++ b/crates/paimon/src/table/write_builder.rs @@ -19,7 +19,7 @@ //! //! Reference: [pypaimon WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py) -use crate::table::{Table, TableCommit, TableWrite}; +use crate::table::{Table, TableCommit, TableUpdate, TableWrite}; use uuid::Uuid; /// Builder for creating table writers and committers. @@ -87,6 +87,11 @@ impl<'a> WriteBuilder<'a> { write }) } + + /// Create a new TableUpdate for data-evolution row-id updates. + pub fn new_update(&self, update_columns: Vec) -> crate::Result { + TableUpdate::new(self.table, update_columns) + } } fn validate_commit_user(commit_user: &str) -> crate::Result<()> { @@ -112,8 +117,10 @@ mod tests { use super::*; use crate::catalog::Identifier; use crate::io::{FileIO, FileIOBuilder}; - use crate::spec::{CommitKind, DataType, IntType, Schema, TableSchema, POSTPONE_BUCKET}; - use arrow_array::{Int32Array, RecordBatch}; + use crate::spec::{ + CommitKind, DataType, IntType, Schema, TableSchema, VarCharType, POSTPONE_BUCKET, + }; + use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use std::sync::Arc; @@ -182,6 +189,41 @@ mod tests { ) } + fn test_data_evolution_table(file_io: &FileIO, table_path: &str) -> Table { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column( + "name", + DataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()), + ) + .option("data-evolution.enabled", "true") + .option("row-tracking.enabled", "true") + .build() + .unwrap(); + Table::new( + file_io.clone(), + Identifier::new("default", "test_data_evolution_table"), + table_path.to_string(), + TableSchema::new(0, &schema), + None, + ) + } + + fn make_empty_matched_batch() -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("_ROW_ID", ArrowDataType::Int64, false), + ArrowField::new("name", ArrowDataType::Utf8, true), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(Vec::::new())), + Arc::new(StringArray::from(Vec::<&str>::new())), + ], + ) + .unwrap() + } + #[test] fn test_with_commit_user_rejects_invalid_file_name_segments() { let table = test_postpone_pk_table(&test_file_io(), "memory:/test_invalid_commit_user"); @@ -285,4 +327,36 @@ mod tests { .unwrap(); assert_eq!(snapshot.commit_kind(), &CommitKind::APPEND); } + + #[test] + fn test_new_update_rejects_non_data_evolution_table() { + let table = test_postpone_pk_table(&test_file_io(), "memory:/test_new_update_invalid"); + let err = table + .new_write_builder() + .new_update(vec!["value".to_string()]) + .err() + .unwrap(); + + assert!( + matches!(err, crate::Error::Unsupported { ref message } + if message.contains("data-evolution.enabled")), + "Expected unsupported data-evolution error, got: {err:?}" + ); + } + + #[tokio::test] + async fn test_new_update_prepares_empty_commit_for_empty_batch() { + let file_io = test_file_io(); + let table = test_data_evolution_table(&file_io, "memory:/test_new_update_empty"); + let mut update = table + .new_write_builder() + .new_update(vec!["name".to_string()]) + .unwrap(); + + update + .add_matched_batch(make_empty_matched_batch()) + .unwrap(); + let messages = update.prepare_commit().await.unwrap(); + assert!(messages.is_empty()); + } }