Skip to content

Commit 7ede3c9

Browse files
committed
Fix comment
1 parent 1ab4835 commit 7ede3c9

4 files changed

Lines changed: 137 additions & 5 deletions

File tree

crates/integrations/datafusion/tests/pk_tables.rs

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use std::sync::Arc;
2525

2626
use datafusion::arrow::array::{Int32Array, StringArray};
2727
use datafusion::prelude::SessionContext;
28-
use paimon::{CatalogOptions, FileSystemCatalog, Options};
28+
use paimon::catalog::Identifier;
29+
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
2930
use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler};
3031
use tempfile::TempDir;
3132

@@ -1146,3 +1147,112 @@ async fn test_pk_multiple_value_columns() {
11461147
]
11471148
);
11481149
}
1150+
1151+
// ======================= FirstRow Engine: INSERT OVERWRITE =======================
1152+
1153+
/// INSERT OVERWRITE on a partitioned FirstRow-engine PK table should delete
1154+
/// level-0 files. Before the fix, `skip_level_zero` was applied in the overwrite
1155+
/// scan path, causing level-0 files to survive the overwrite.
1156+
///
1157+
/// Verifies via TableScan (scan_all_files) that the overwrite correctly produces
1158+
/// delete entries for level-0 files, leaving only the new file per partition.
1159+
#[tokio::test]
1160+
async fn test_pk_first_row_insert_overwrite() {
1161+
let (_tmp, catalog) = create_test_env();
1162+
let handler = create_handler(catalog.clone());
1163+
handler
1164+
.sql("CREATE SCHEMA paimon.test_db")
1165+
.await
1166+
.expect("CREATE SCHEMA failed");
1167+
1168+
handler
1169+
.sql(
1170+
"CREATE TABLE paimon.test_db.t_fr_ow (
1171+
dt STRING, id INT NOT NULL, name STRING,
1172+
PRIMARY KEY (dt, id)
1173+
) PARTITIONED BY (dt STRING)
1174+
WITH ('bucket' = '1', 'merge-engine' = 'first-row')",
1175+
)
1176+
.await
1177+
.unwrap();
1178+
1179+
// First commit: two partitions, creates level-0 files
1180+
handler
1181+
.sql(
1182+
"INSERT INTO paimon.test_db.t_fr_ow VALUES \
1183+
('2024-01-01', 1, 'alice'), ('2024-01-01', 2, 'bob'), \
1184+
('2024-01-02', 3, 'carol')",
1185+
)
1186+
.await
1187+
.unwrap()
1188+
.collect()
1189+
.await
1190+
.unwrap();
1191+
1192+
// Verify via scan_all_files: 2 level-0 files (one per partition)
1193+
let table = catalog
1194+
.get_table(&Identifier::new("test_db", "t_fr_ow"))
1195+
.await
1196+
.unwrap();
1197+
let plan = table
1198+
.new_read_builder()
1199+
.new_scan()
1200+
.with_scan_all_files()
1201+
.plan()
1202+
.await
1203+
.unwrap();
1204+
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
1205+
assert_eq!(file_count, 2, "After INSERT: 2 level-0 files (one per partition)");
1206+
1207+
// INSERT OVERWRITE partition 2024-01-01 — must delete old level-0 file
1208+
handler
1209+
.sql("INSERT OVERWRITE paimon.test_db.t_fr_ow VALUES ('2024-01-01', 10, 'new_alice')")
1210+
.await
1211+
.unwrap()
1212+
.collect()
1213+
.await
1214+
.unwrap();
1215+
1216+
let table = catalog
1217+
.get_table(&Identifier::new("test_db", "t_fr_ow"))
1218+
.await
1219+
.unwrap();
1220+
let plan = table
1221+
.new_read_builder()
1222+
.new_scan()
1223+
.with_scan_all_files()
1224+
.plan()
1225+
.await
1226+
.unwrap();
1227+
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
1228+
assert_eq!(
1229+
file_count, 2,
1230+
"After OVERWRITE: 2 files (1 replaced for 2024-01-01 + 1 unchanged for 2024-01-02)"
1231+
);
1232+
1233+
// Second overwrite on the same partition — no stale files should accumulate
1234+
handler
1235+
.sql("INSERT OVERWRITE paimon.test_db.t_fr_ow VALUES ('2024-01-01', 20, 'newer_alice')")
1236+
.await
1237+
.unwrap()
1238+
.collect()
1239+
.await
1240+
.unwrap();
1241+
1242+
let table = catalog
1243+
.get_table(&Identifier::new("test_db", "t_fr_ow"))
1244+
.await
1245+
.unwrap();
1246+
let plan = table
1247+
.new_read_builder()
1248+
.new_scan()
1249+
.with_scan_all_files()
1250+
.plan()
1251+
.await
1252+
.unwrap();
1253+
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
1254+
assert_eq!(
1255+
file_count, 2,
1256+
"After second OVERWRITE: still 2 files (no stale level-0 files accumulated)"
1257+
);
1258+
}

crates/paimon/src/table/table_commit.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,8 @@ impl TableCommit {
477477
None,
478478
None,
479479
None,
480-
);
480+
)
481+
.with_scan_all_files();
481482
let current_entries = scan.plan_manifest_entries(snap).await?;
482483
for entry in current_entries {
483484
entries.push(entry.with_kind(FileKind::Delete));
@@ -567,7 +568,8 @@ impl TableCommit {
567568
};
568569

569570
// Read all current files from the latest snapshot.
570-
let scan = TableScan::new(&self.table, None, vec![], None, None, None);
571+
let scan =
572+
TableScan::new(&self.table, None, vec![], None, None, None).with_scan_all_files();
571573
let existing_entries = scan.plan_manifest_entries(snap).await?;
572574

573575
// Build index: (partition, bucket, first_row_id, row_count)

crates/paimon/src/table/table_scan.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,10 @@ pub struct TableScan<'a> {
308308
/// When set, the scan will try to return only enough splits to satisfy the limit.
309309
limit: Option<usize>,
310310
row_ranges: Option<Vec<RowRange>>,
311+
/// When true, disables level-0 filtering so all files are visible.
312+
/// Used by non-read paths (overwrite, truncate, writer restore) that need
313+
/// the complete file set. Normal read scans leave this as `false`.
314+
scan_all_files: bool,
311315
}
312316

313317
impl<'a> TableScan<'a> {
@@ -326,9 +330,19 @@ impl<'a> TableScan<'a> {
326330
bucket_predicate,
327331
limit,
328332
row_ranges,
333+
scan_all_files: false,
329334
}
330335
}
331336

337+
/// Disable level-0 filtering so all files are visible.
338+
///
339+
/// Used by non-read paths (overwrite, truncate, writer restore) that need
340+
/// the complete file set regardless of merge engine or DV settings.
341+
pub fn with_scan_all_files(mut self) -> Self {
342+
self.scan_all_files = true;
343+
self
344+
}
345+
332346
/// Set row ranges for scan-time filtering.
333347
///
334348
/// This replaces any existing row_ranges. Typically used to inject
@@ -467,7 +481,12 @@ impl<'a> TableScan<'a> {
467481
// - DV mode: level-0 files are unmerged, DV handles dedup at higher levels
468482
// - FirstRow engine without DV: reads go through DataFileReader (no merge),
469483
// so only compacted (level > 0) files are safe to read directly
470-
let skip_level_zero = if has_primary_keys {
484+
//
485+
// Non-read paths (overwrite, truncate, writer restore) set scan_all_files=true
486+
// to see all files including level-0, matching Java's CommitScanner behavior.
487+
let skip_level_zero = if self.scan_all_files {
488+
false
489+
} else if has_primary_keys {
471490
deletion_vectors_enabled
472491
|| core_options
473492
.merge_engine()

crates/paimon/src/table/table_write.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ impl TableWrite {
239239
let mut bucket_seq: HashMap<i32, i64> = HashMap::new();
240240
if let Some(snapshot) = latest_snapshot {
241241
let partition_predicate = Self::build_partition_predicate(table, partition_bytes)?;
242-
let scan = TableScan::new(table, partition_predicate, vec![], None, None, None);
242+
let scan = TableScan::new(table, partition_predicate, vec![], None, None, None)
243+
.with_scan_all_files();
243244
let entries = scan.plan_manifest_entries(&snapshot).await?;
244245
for entry in &entries {
245246
let bucket = entry.bucket();

0 commit comments

Comments
 (0)