Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions crates/integrations/datafusion/src/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! MERGE INTO execution for Paimon tables.
//!
//! Supports two execution paths:
//! - **Data evolution tables**: partial-column writes via [`paimon::table::DataEvolutionWriter`].
//! - **Data evolution tables**: partial-column writes via [`paimon::table::TableUpdate`].
//! - **Append-only tables** (no PK, no deletion vectors): copy-on-write file rewriting
//! via [`paimon::table::CopyOnWriteMergeWriter`].

Expand All @@ -37,7 +37,7 @@ use datafusion::sql::sqlparser::ast::{
use futures::TryStreamExt;

use paimon::spec::{datums_to_binary_row, extract_datum_from_arrow, CoreOptions};
use paimon::table::{CopyOnWriteMergeWriter, DataEvolutionWriter, DataSplitBuilder, Table};
use paimon::table::{CopyOnWriteMergeWriter, DataSplitBuilder, Table};

use crate::error::to_datafusion_error;
use crate::sql_context::SQLContext;
Expand Down Expand Up @@ -119,7 +119,7 @@ where
/// Execute a MERGE INTO statement on a Paimon table.
///
/// Dispatches to the appropriate execution path based on table type:
/// - Data evolution tables → partial-column writes via `DataEvolutionWriter`
/// - Data evolution tables → partial-column writes via `TableUpdate`
/// - Append-only tables (no PK) → copy-on-write file rewriting via `CopyOnWriteMergeWriter`
pub(crate) async fn execute_merge_into(
ctx: &SQLContext,
Expand Down Expand Up @@ -632,8 +632,12 @@ async fn execute_merge_into_once(
let parsed = extract_merge_clauses(merge)?;

// Validate preconditions early and create writer (before executing any SQL)
let wb = table.new_write_builder();
let update_writer = if let Some(ref upd) = parsed.update {
Some(DataEvolutionWriter::new(table, upd.columns.clone()).map_err(to_datafusion_error)?)
Some(
wb.new_update(upd.columns.clone())
.map_err(to_datafusion_error)?,
)
} else {
None
};
Expand Down Expand Up @@ -729,10 +733,7 @@ async fn execute_merge_into_once(
.await?;
let insert_count: usize = insert_batches.iter().map(|b| b.num_rows()).sum();
if insert_count > 0 {
let mut table_write = table
.new_write_builder()
.new_write()
.map_err(to_datafusion_error)?;
let mut table_write = wb.new_write().map_err(to_datafusion_error)?;
for batch in &insert_batches {
table_write
.write_arrow_batch(batch)
Expand All @@ -750,8 +751,7 @@ async fn execute_merge_into_once(

// 6. Commit all messages atomically
if !all_messages.is_empty() {
let commit = table.new_write_builder().new_commit();
commit
wb.new_commit()
.commit(all_messages)
.await
.map_err(to_datafusion_error)?;
Expand Down
25 changes: 16 additions & 9 deletions crates/integrations/datafusion/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! UPDATE execution for Paimon tables.
//!
//! Supports two execution paths:
//! - **Data evolution tables**: partial-column writes via [`paimon::table::DataEvolutionWriter`].
//! - **Data evolution tables**: partial-column writes via [`paimon::table::TableUpdate`].
//! - **Append-only tables** (no PK, no deletion vectors): copy-on-write file rewriting
//! via [`paimon::table::CopyOnWriteMergeWriter`].

Expand All @@ -31,7 +31,7 @@ use datafusion::prelude::{DataFrame, SessionContext};
use datafusion::sql::sqlparser::ast::{AssignmentTarget, TableFactor, Update};

use paimon::spec::CoreOptions;
use paimon::table::{CopyOnWriteMergeWriter, DataEvolutionWriter, Table};
use paimon::table::{CopyOnWriteMergeWriter, Table};

use crate::error::to_datafusion_error;
use crate::merge_into::{
Expand Down Expand Up @@ -113,9 +113,11 @@ async fn execute_update_once(
exprs.push(assignment.value.to_string());
}

// 2. Create DataEvolutionWriter (validates preconditions)
let mut writer =
DataEvolutionWriter::new(table, columns.clone()).map_err(to_datafusion_error)?;
// 2. Create TableUpdate through the table write builder (validates preconditions)
let wb = table.new_write_builder();
let mut table_update = wb
.new_update(columns.clone())
.map_err(to_datafusion_error)?;

// 3. Query the target table directly with WHERE filter.
let table_ref = update.table.to_string();
Expand Down Expand Up @@ -144,16 +146,21 @@ async fn execute_update_once(

let update_batches = project_update_columns(&batches, &columns)?;
for batch in update_batches {
writer
table_update
.add_matched_batch(batch)
.map_err(to_datafusion_error)?;
}

// 5. Commit
let messages = writer.prepare_commit().await.map_err(to_datafusion_error)?;
let messages = table_update
.prepare_commit()
.await
.map_err(to_datafusion_error)?;
if !messages.is_empty() {
let commit = table.new_write_builder().new_commit();
commit.commit(messages).await.map_err(to_datafusion_error)?;
wb.new_commit()
.commit(messages)
.await
.map_err(to_datafusion_error)?;
}

ok_result(ctx.ctx(), total_count)
Expand Down
4 changes: 2 additions & 2 deletions crates/paimon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ pub use catalog::FileSystemCatalog;
pub use table::{
CommitMessage, DataEvolutionWriter, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket,
Plan, RESTEnv, RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange,
SnapshotCommit, SnapshotManager, Table, TableCommit, TableRead, TableScan, TableWrite,
TagManager, WriteBuilder,
SnapshotCommit, SnapshotManager, Table, TableCommit, TableRead, TableScan, TableUpdate,
TableWrite, TagManager, WriteBuilder,
};
2 changes: 2 additions & 0 deletions crates/paimon/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ mod stats_filter;
pub(crate) mod table_commit;
mod table_read;
mod table_scan;
mod table_update;
pub(crate) mod table_write;
mod tag_manager;
mod vector_search_builder;
Expand All @@ -78,6 +79,7 @@ pub use source::{
pub use table_commit::TableCommit;
pub use table_read::TableRead;
pub use table_scan::TableScan;
pub use table_update::TableUpdate;
pub use table_write::TableWrite;
pub use tag_manager::TagManager;
pub use vector_search_builder::VectorSearchBuilder;
Expand Down
53 changes: 53 additions & 0 deletions crates/paimon/src/table/table_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Table update API for row-ID-based data evolution updates.

use arrow_array::RecordBatch;

use crate::table::{CommitMessage, DataEvolutionWriter, Table};

/// Table-level update API for data-evolution row-id updates.
///
/// `TableUpdate` wraps the lower-level [`DataEvolutionWriter`] so callers can
/// create update writers through [`WriteBuilder`](super::WriteBuilder).
#[must_use = "update must be used to call prepare_commit()"]
pub struct TableUpdate {
writer: DataEvolutionWriter,
}

impl TableUpdate {
pub(crate) fn new(table: &Table, update_columns: Vec<String>) -> crate::Result<Self> {
Ok(Self {
writer: DataEvolutionWriter::new(table, update_columns)?,
})
}

/// Add a batch of matched rows to update.
///
/// The batch must contain a non-null `_ROW_ID` column plus the update
/// columns passed to [`WriteBuilder::new_update`](super::WriteBuilder::new_update).
pub fn add_matched_batch(&mut self, batch: RecordBatch) -> crate::Result<()> {
self.writer.add_matched_batch(batch)
}

/// Prepare commit messages for the caller to commit via [`TableCommit`](super::TableCommit).
#[must_use = "commit messages must be passed to TableCommit"]
pub async fn prepare_commit(self) -> crate::Result<Vec<CommitMessage>> {
self.writer.prepare_commit().await
}
}
80 changes: 77 additions & 3 deletions crates/paimon/src/table/write_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//!
//! Reference: [pypaimon WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py)

use crate::table::{Table, TableCommit, TableWrite};
use crate::table::{Table, TableCommit, TableUpdate, TableWrite};
use uuid::Uuid;

/// Builder for creating table writers and committers.
Expand Down Expand Up @@ -87,6 +87,11 @@ impl<'a> WriteBuilder<'a> {
write
})
}

/// Create a new TableUpdate for data-evolution row-id updates.
pub fn new_update(&self, update_columns: Vec<String>) -> crate::Result<TableUpdate> {
TableUpdate::new(self.table, update_columns)
}
}

fn validate_commit_user(commit_user: &str) -> crate::Result<()> {
Expand All @@ -112,8 +117,10 @@ mod tests {
use super::*;
use crate::catalog::Identifier;
use crate::io::{FileIO, FileIOBuilder};
use crate::spec::{CommitKind, DataType, IntType, Schema, TableSchema, POSTPONE_BUCKET};
use arrow_array::{Int32Array, RecordBatch};
use crate::spec::{
CommitKind, DataType, IntType, Schema, TableSchema, VarCharType, POSTPONE_BUCKET,
};
use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
use std::sync::Arc;

Expand Down Expand Up @@ -182,6 +189,41 @@ mod tests {
)
}

fn test_data_evolution_table(file_io: &FileIO, table_path: &str) -> Table {
let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column(
"name",
DataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()),
)
.option("data-evolution.enabled", "true")
.option("row-tracking.enabled", "true")
.build()
.unwrap();
Table::new(
file_io.clone(),
Identifier::new("default", "test_data_evolution_table"),
table_path.to_string(),
TableSchema::new(0, &schema),
None,
)
}

fn make_empty_matched_batch() -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("_ROW_ID", ArrowDataType::Int64, false),
ArrowField::new("name", ArrowDataType::Utf8, true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(Vec::<i64>::new())),
Arc::new(StringArray::from(Vec::<&str>::new())),
],
)
.unwrap()
}

#[test]
fn test_with_commit_user_rejects_invalid_file_name_segments() {
let table = test_postpone_pk_table(&test_file_io(), "memory:/test_invalid_commit_user");
Expand Down Expand Up @@ -285,4 +327,36 @@ mod tests {
.unwrap();
assert_eq!(snapshot.commit_kind(), &CommitKind::APPEND);
}

#[test]
fn test_new_update_rejects_non_data_evolution_table() {
let table = test_postpone_pk_table(&test_file_io(), "memory:/test_new_update_invalid");
let err = table
.new_write_builder()
.new_update(vec!["value".to_string()])
.err()
.unwrap();

assert!(
matches!(err, crate::Error::Unsupported { ref message }
if message.contains("data-evolution.enabled")),
"Expected unsupported data-evolution error, got: {err:?}"
);
}

#[tokio::test]
async fn test_new_update_prepares_empty_commit_for_empty_batch() {
let file_io = test_file_io();
let table = test_data_evolution_table(&file_io, "memory:/test_new_update_empty");
let mut update = table
.new_write_builder()
.new_update(vec!["name".to_string()])
.unwrap();

update
.add_matched_batch(make_empty_matched_batch())
.unwrap();
let messages = update.prepare_commit().await.unwrap();
assert!(messages.is_empty());
}
}
Loading