Skip to content

Commit d8c3718

Browse files
committed
feat: add primary-key table read/write support with sort-merge deduplication
Add KV file reader/writer, sort-merge reader with LoserTree, and RowKind support to enable reading and writing primary-key tables. Includes integration tests and DataFusion pk_tables tests.
1 parent fa8091e commit d8c3718

22 files changed

Lines changed: 4007 additions & 83 deletions

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ arrow-buffer = "58.0"
3434
arrow-schema = "58.0"
3535
arrow-cast = "58.0"
3636
arrow-ord = "58.0"
37+
arrow-row = "58.0"
3738
arrow-select = "58.0"
3839
datafusion = "53.0.0"
3940
datafusion-ffi = "53.0.0"

crates/integration_tests/tests/append_tables.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -563,33 +563,6 @@ async fn test_partitioned_fixed_bucket_write_read() {
563563
assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30, 40]);
564564
}
565565

566-
// ---------------------------------------------------------------------------
567-
// Unsupported: primary key table should be rejected
568-
// ---------------------------------------------------------------------------
569-
570-
#[tokio::test]
571-
async fn test_reject_primary_key_table() {
572-
let schema = Schema::builder()
573-
.column("id", DataType::Int(IntType::new()))
574-
.column("value", DataType::Int(IntType::new()))
575-
.primary_key(["id"])
576-
.build()
577-
.unwrap();
578-
let table_schema = TableSchema::new(0, &schema);
579-
580-
let file_io = memory_file_io();
581-
let path = "memory:/append_reject_pk";
582-
let table = make_table(&file_io, path, table_schema);
583-
584-
let result = table.new_write_builder().new_write();
585-
assert!(result.is_err());
586-
let err = result.err().unwrap();
587-
assert!(
588-
matches!(&err, paimon::Error::Unsupported { message } if message.contains("primary keys")),
589-
"Expected Unsupported error for PK table, got: {err:?}"
590-
);
591-
}
592-
593566
#[tokio::test]
594567
async fn test_reject_fixed_bucket_without_bucket_key() {
595568
let schema = Schema::builder()

crates/integration_tests/tests/read_tables.rs

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,9 +1478,9 @@ async fn test_read_complex_type_table() {
14781478
// PK-without-DV and non-PK-with-DV tests
14791479
// ---------------------------------------------------------------------------
14801480

1481-
/// Reading a primary-key table without deletion vectors should return an Unsupported error.
1481+
/// Reading a primary-key table without deletion vectors should work via sort-merge reader.
14821482
#[tokio::test]
1483-
async fn test_read_pk_table_without_dv_returns_error() {
1483+
async fn test_read_pk_table_without_dv_via_sort_merge() {
14841484
let catalog = create_file_system_catalog();
14851485
let table = get_table_from_catalog(&catalog, "simple_pk_table").await;
14861486

@@ -1493,16 +1493,73 @@ async fn test_read_pk_table_without_dv_returns_error() {
14931493
);
14941494

14951495
let read = table.new_read_builder().new_read();
1496-
let result = read
1496+
let stream = read
14971497
.expect("new_read should succeed")
1498-
.to_arrow(plan.splits());
1499-
let err = result
1500-
.err()
1501-
.expect("Reading PK table without DV should fail");
1498+
.to_arrow(plan.splits())
1499+
.expect("to_arrow should succeed for PK table via sort-merge");
15021500

1501+
let batches: Vec<_> = futures::TryStreamExt::try_collect(stream)
1502+
.await
1503+
.expect("Reading PK table without DV should succeed via sort-merge reader");
15031504
assert!(
1504-
matches!(&err, Error::Unsupported { message } if message.contains("primary-key")),
1505-
"Expected Unsupported error about primary-key tables, got: {err:?}"
1505+
!batches.is_empty(),
1506+
"PK table read should return non-empty results"
1507+
);
1508+
1509+
let actual = extract_id_name(&batches);
1510+
let expected = vec![
1511+
(1, "alice".to_string()),
1512+
(2, "bob".to_string()),
1513+
(3, "carol".to_string()),
1514+
];
1515+
assert_eq!(
1516+
actual, expected,
1517+
"PK table without DV should return correct rows via sort-merge reader"
1518+
);
1519+
}
1520+
1521+
/// Reading a first-row merge engine PK table should return only the first-inserted row per key.
1522+
/// The table has been compacted so all files are level > 0, and the scan skips level-0 files.
1523+
#[tokio::test]
1524+
async fn test_read_first_row_pk_table() {
1525+
let catalog = create_file_system_catalog();
1526+
let table = get_table_from_catalog(&catalog, "first_row_pk_table").await;
1527+
1528+
let read_builder = table.new_read_builder();
1529+
let scan = read_builder.new_scan();
1530+
let plan = scan.plan().await.expect("Failed to plan scan");
1531+
assert!(
1532+
!plan.splits().is_empty(),
1533+
"first-row PK table should have splits to read"
1534+
);
1535+
1536+
let read = table.new_read_builder().new_read();
1537+
let stream = read
1538+
.expect("new_read should succeed")
1539+
.to_arrow(plan.splits())
1540+
.expect("to_arrow should succeed for first-row PK table");
1541+
1542+
let batches: Vec<_> = futures::TryStreamExt::try_collect(stream)
1543+
.await
1544+
.expect("Reading first-row PK table should succeed");
1545+
assert!(
1546+
!batches.is_empty(),
1547+
"first-row PK table read should return non-empty results"
1548+
);
1549+
1550+
let actual = extract_id_name(&batches);
1551+
// first-row keeps the earliest row per key:
1552+
// commit 1: (1, alice), (2, bob), (3, carol)
1553+
// commit 2: (2, bob-v2), (3, carol-v2), (4, dave) — id=2,3 ignored, id=4 is new
1554+
let expected = vec![
1555+
(1, "alice".to_string()),
1556+
(2, "bob".to_string()),
1557+
(3, "carol".to_string()),
1558+
(4, "dave".to_string()),
1559+
];
1560+
assert_eq!(
1561+
actual, expected,
1562+
"first-row PK table should keep earliest row per key"
15061563
);
15071564
}
15081565

0 commit comments

Comments
 (0)