Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions crates/integrations/datafusion/tests/pk_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
//!
//! Covers: basic write+read, dedup within/across commits, partitioned PK tables,
//! multi-bucket, column projection, FirstRow merge engine, sequence.field,
//! INSERT OVERWRITE, filter pushdown, and error cases.
//! INSERT OVERWRITE, filter pushdown, cross-split merge correctness, and
//! error cases.
//!
//! Dynamic bucket and cross-partition tests are in separate files:
//! - `dynamic_bucket_tables.rs`
Expand All @@ -28,8 +29,8 @@
mod common;

use common::{
collect_id_name, collect_id_value, create_sql_context, create_test_env, row_count,
setup_sql_context,
collect_id_name, collect_id_value, collect_int_int_str, create_sql_context, create_test_env,
row_count, setup_sql_context,
};
use datafusion::arrow::array::{Array, Int32Array, StringArray};
use paimon::catalog::Identifier;
Expand Down Expand Up @@ -1979,3 +1980,104 @@ async fn test_pk_dv_deduplicate_read_no_error() {
result.err()
);
}

// ======================= Cross-Split Merge Correctness =======================

/// Regression: a 1-byte split target forces every data file into its own
/// split candidate. Files holding versions of the same key overlap on key
/// range and must still be merged into a single row — previously each split
/// emitted its own (stale) version.
#[tokio::test]
async fn test_pk_dedup_merges_across_tiny_splits() {
let (_tmp, sql_context) = setup_sql_context().await;

sql_context
.sql(
"CREATE TABLE paimon.test_db.t_tiny_split (
id INT NOT NULL, value INT,
PRIMARY KEY (id)
) WITH (
'bucket' = '1',
'source.split.target-size' = '1b',
'source.split.open-file-cost' = '1b'
)",
)
.await
.unwrap();

for value in [10, 20, 30] {
sql_context
.sql(&format!(
"INSERT INTO paimon.test_db.t_tiny_split VALUES (1, {value})"
))
.await
.unwrap()
.collect()
.await
.unwrap();
}

let rows = collect_id_value(
&sql_context,
"SELECT id, value FROM paimon.test_db.t_tiny_split",
)
.await;
assert_eq!(rows, vec![(1, 30)]);
}

/// Same regression for the partial-update engine: per-column updates of one
/// key spread over three commits/files must merge into a single row even
/// when the split target would otherwise separate the files.
#[tokio::test]
async fn test_pk_partial_update_merges_across_tiny_splits() {
let (_tmp, sql_context) = setup_sql_context().await;

sql_context
.sql(
"CREATE TABLE paimon.test_db.t_tiny_split_pu (
id INT NOT NULL, v_int INT, v_str STRING,
PRIMARY KEY (id)
) WITH (
'bucket' = '1',
'merge-engine' = 'partial-update',
'source.split.target-size' = '1b',
'source.split.open-file-cost' = '1b'
)",
)
.await
.unwrap();

sql_context
.sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, 10, CAST(NULL AS STRING))")
.await
.unwrap()
.collect()
.await
.unwrap();
sql_context
.sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, CAST(NULL AS INT), 'hello')")
.await
.unwrap()
.collect()
.await
.unwrap();
sql_context
.sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, 100, CAST(NULL AS STRING))")
.await
.unwrap()
.collect()
.await
.unwrap();

let batches = sql_context
.sql("SELECT id, v_int, v_str FROM paimon.test_db.t_tiny_split_pu")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(
collect_int_int_str(&batches),
vec![(1, 100, "hello".to_string())]
);
}
Loading
Loading