From 013bb8b9443e692038e36cda713649d13e451e7c Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 10 Jun 2026 11:25:45 +0800 Subject: [PATCH 1/4] fix(table/scan): keep key-overlapping PK files in one split MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plan_snapshot bin-packed a bucket's files purely by size, so files whose primary-key ranges overlap could land in different splits. Each split runs its own sort-merge reader, so the same key surfaced once per split instead of merging to a single row (reproducible with three commits of one key under source.split.target-size=1b). Port the Java MergeTreeSplitGenerator/IntervalPartition algorithm: sort files by decoded (min_key, max_key), group transitively overlapping files into sections, then bin-pack whole sections via pack_for_ordered. Keys are decoded with the trimmed-PK data types and compared through datum_cmp — BinaryRow stores fields little-endian, so raw byte comparison would order int 256 before int 1. Undecodable key ranges collapse the bucket into one section, trading parallelism for correctness. Append-only tables keep the file-level bin pack. --- .../src/table/merge_tree_split_generator.rs | 353 ++++++++++++++++++ crates/paimon/src/table/mod.rs | 1 + crates/paimon/src/table/table_scan.rs | 33 +- crates/paimon/src/table/table_write.rs | 135 +++++++ 4 files changed, 521 insertions(+), 1 deletion(-) create mode 100644 crates/paimon/src/table/merge_tree_split_generator.rs diff --git a/crates/paimon/src/table/merge_tree_split_generator.rs b/crates/paimon/src/table/merge_tree_split_generator.rs new file mode 100644 index 00000000..7cdcc467 --- /dev/null +++ b/crates/paimon/src/table/merge_tree_split_generator.rs @@ -0,0 +1,353 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Split generation for primary-key (merge-tree) tables. +//! +//! Files whose primary-key ranges overlap must be read by the same +//! sort-merge reader, so they have to stay in the same split. This module +//! groups a bucket's files into key-range "sections" and then bin-packs +//! whole sections into splits, mirroring the Java implementation. +//! +//! References: +//! [MergeTreeSplitGenerator](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java), +//! [IntervalPartition](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java) + +use super::bin_pack::pack_for_ordered; +use crate::spec::{datum_cmp, BinaryRow, DataFileMeta, DataType, Datum}; +use std::cmp::{self, Ordering}; + +/// Compares serialized `BinaryRow` keys field-by-field using the trimmed +/// primary-key data types. +/// +/// BinaryRow stores fields little-endian, so raw byte comparison would order +/// e.g. int 256 (`[00 01 00 00]`) before int 1 (`[01 00 00 00]`); keys must +/// be decoded before comparing. +pub(crate) struct KeyComparator { + key_types: Vec, +} + +/// A decoded key: one `Option` per trimmed primary-key field +/// (`None` = SQL NULL). +type DecodedKey = Vec>; + +impl KeyComparator { + pub(crate) fn new(key_types: Vec) -> Self { + Self { key_types } + } + + /// Decode a serialized min/max key. Returns `None` when the key is empty + /// or malformed, letting callers degrade to the safe "treat everything as + /// overlapping" path instead of failing the scan. + fn decode(&self, key: &[u8]) -> Option { + if key.is_empty() { + return None; + } + let row = BinaryRow::from_serialized_bytes(key).ok()?; + if (row.arity() as usize) != self.key_types.len() { + return None; + } + self.key_types + .iter() + .enumerate() + .map(|(pos, dt)| row.get_datum(pos, dt).ok()) + .collect() + } +} + +/// Compare decoded keys field-by-field. NULL sorts first; fields that +/// `datum_cmp` cannot order (e.g. float NaN) compare as equal, which forces +/// the files into the same section — conservative but never incorrect. +fn compare_decoded(a: &DecodedKey, b: &DecodedKey) -> Ordering { + for (fa, fb) in a.iter().zip(b.iter()) { + let ord = match (fa, fb) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(da), Some(db)) => datum_cmp(da, db).unwrap_or(Ordering::Equal), + }; + if ord != Ordering::Equal { + return ord; + } + } + Ordering::Equal +} + +/// A file paired with its decoded min/max keys. +struct KeyedFile { + file: DataFileMeta, + min: DecodedKey, + max: DecodedKey, +} + +/// Decode every file's key range up front. Returns `None` if any file lacks +/// a usable key range, in which case callers must assume full overlap. +fn decode_all( + files: Vec, + comparator: &KeyComparator, +) -> Result, Vec> { + let mut keyed = Vec::with_capacity(files.len()); + let mut undecodable = false; + for file in &files { + match ( + comparator.decode(&file.min_key), + comparator.decode(&file.max_key), + ) { + (Some(min), Some(max)) if !undecodable => keyed.push(KeyedFile { + file: file.clone(), + min, + max, + }), + _ => undecodable = true, + } + } + if undecodable { + Err(files) + } else { + Ok(keyed) + } +} + +/// Group files into sections by primary-key range overlap. +/// +/// Files are sorted by `(min_key, max_key)`; a running upper bound tracks the +/// max key seen in the current section, and a file whose min key exceeds the +/// bound starts a new section. Sections never overlap each other, while files +/// inside one section all transitively overlap and must be merged together. +/// +/// Files with empty or undecodable key ranges collapse everything into one +/// section: no parallelism, but never a missed merge. +pub(crate) fn interval_partition( + files: Vec, + comparator: &KeyComparator, +) -> Vec> { + if files.len() <= 1 { + return if files.is_empty() { + Vec::new() + } else { + vec![files] + }; + } + + let mut keyed = match decode_all(files, comparator) { + Ok(keyed) => keyed, + Err(files) => return vec![files], + }; + keyed.sort_by(|a, b| { + compare_decoded(&a.min, &b.min).then_with(|| compare_decoded(&a.max, &b.max)) + }); + + let mut sections: Vec> = Vec::new(); + let mut current: Vec = Vec::new(); + let mut bound: Option = None; + + for kf in keyed { + if let Some(ref b) = bound { + if compare_decoded(&kf.min, b) == Ordering::Greater { + sections.push(std::mem::take(&mut current)); + bound = None; + } + } + match bound { + Some(ref b) if compare_decoded(&kf.max, b) != Ordering::Greater => {} + _ => bound = Some(kf.max), + } + current.push(kf.file); + } + if !current.is_empty() { + sections.push(current); + } + sections +} + +/// Bin-pack whole sections into splits. A section is atomic: its files +/// overlap on primary key and must never be separated, even when the section +/// alone exceeds `target_split_size`. +pub(crate) fn pack_sections( + sections: Vec>, + target_split_size: i64, + open_file_cost: i64, +) -> Vec> { + pack_for_ordered( + sections, + |section| { + section + .iter() + .map(|f| cmp::max(f.file_size, open_file_cost)) + .sum() + }, + target_split_size, + ) + .into_iter() + .map(|sections| sections.into_iter().flatten().collect()) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{BinaryRowBuilder, IntType}; + use chrono::{DateTime, Utc}; + + fn int_key(value: i32) -> Vec { + let mut builder = BinaryRowBuilder::new(1); + builder.write_int(0, value); + builder.build_serialized() + } + + fn keyed_file(name: &str, min: i32, max: i32, file_size: i64, level: i32) -> DataFileMeta { + DataFileMeta { + file_name: name.to_string(), + file_size, + row_count: 100, + min_key: int_key(min), + max_key: int_key(max), + key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()), + value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level, + extra_files: Vec::new(), + creation_time: DateTime::::from_timestamp(0, 0), + delete_row_count: None, + embedded_index: None, + first_row_id: None, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + } + } + + fn int_comparator() -> KeyComparator { + KeyComparator::new(vec![DataType::Int(IntType::new())]) + } + + fn section_names(sections: &[Vec]) -> Vec> { + sections + .iter() + .map(|s| s.iter().map(|f| f.file_name.as_str()).collect()) + .collect() + } + + /// Int keys must be ordered numerically, not by little-endian bytes + /// (byte-wise, 256 = [00 01 00 00] would sort before 1 = [01 00 00 00]). + #[test] + fn key_comparator_orders_ints_numerically() { + let comparator = int_comparator(); + let one = comparator.decode(&int_key(1)).unwrap(); + let two = comparator.decode(&int_key(2)).unwrap(); + let big = comparator.decode(&int_key(256)).unwrap(); + assert_eq!(compare_decoded(&one, &two), Ordering::Less); + assert_eq!(compare_decoded(&two, &big), Ordering::Less); + assert_eq!(compare_decoded(&big, &one), Ordering::Greater); + } + + #[test] + fn interval_partition_groups_overlapping_files() { + let files = vec![ + keyed_file("a", 1, 10, 100, 0), + keyed_file("b", 5, 15, 100, 0), + keyed_file("c", 20, 30, 100, 0), + keyed_file("d", 25, 28, 100, 0), + ]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!( + section_names(§ions), + vec![vec!["a", "b"], vec!["c", "d"]] + ); + } + + #[test] + fn interval_partition_keeps_disjoint_files_separate() { + let files = vec![ + keyed_file("b", 3, 4, 100, 0), + keyed_file("a", 1, 2, 100, 0), + keyed_file("c", 5, 6, 100, 0), + ]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!( + section_names(§ions), + vec![vec!["a"], vec!["b"], vec!["c"]] + ); + } + + /// A later file can extend the section bound past an earlier file's max: + /// [1,100] chains [50,60] and [90,110] into one section with [105,120]. + #[test] + fn interval_partition_tracks_running_bound() { + let files = vec![ + keyed_file("a", 1, 100, 100, 0), + keyed_file("b", 50, 60, 100, 0), + keyed_file("c", 90, 110, 100, 0), + keyed_file("d", 105, 120, 100, 0), + keyed_file("e", 121, 130, 100, 0), + ]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!( + section_names(§ions), + vec![vec!["a", "b", "c", "d"], vec!["e"]] + ); + } + + #[test] + fn interval_partition_empty_key_degrades_to_single_section() { + let mut no_key = keyed_file("a", 1, 2, 100, 0); + no_key.min_key = Vec::new(); + no_key.max_key = Vec::new(); + let files = vec![no_key, keyed_file("b", 10, 20, 100, 0)]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!(section_names(§ions), vec![vec!["a", "b"]]); + } + + #[test] + fn pack_sections_respects_target_size() { + let sections = vec![ + vec![keyed_file("a", 1, 2, 100, 0)], + vec![keyed_file("b", 3, 4, 100, 0)], + vec![keyed_file("c", 5, 6, 100, 0)], + ]; + let splits = pack_sections(sections, 250, 1); + assert_eq!(section_names(&splits), vec![vec!["a", "b"], vec!["c"]]); + } + + #[test] + fn pack_sections_never_splits_a_section() { + let sections = vec![vec![ + keyed_file("a", 1, 10, 100, 0), + keyed_file("b", 5, 15, 100, 0), + ]]; + let splits = pack_sections(sections, 50, 1); + assert_eq!(section_names(&splits), vec![vec!["a", "b"]]); + } + + #[test] + fn pack_sections_applies_open_file_cost() { + let sections = vec![ + vec![keyed_file("a", 1, 1, 2, 0)], + vec![keyed_file("b", 2, 2, 2, 0)], + vec![keyed_file("c", 3, 3, 2, 0)], + ]; + // Weight is max(file_size=2, open_file_cost=100) = 100 each. + let splits = pack_sections(sections, 150, 100); + assert_eq!( + section_names(&splits), + vec![vec!["a"], vec!["b"], vec!["c"]] + ); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c425a9eb..7a758b50 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -37,6 +37,7 @@ mod full_text_search_builder; pub(crate) mod global_index_scanner; mod kv_file_reader; mod kv_file_writer; +pub(crate) mod merge_tree_split_generator; mod partition_filter; mod postpone_file_writer; mod prepared_files; diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index be0d2df4..ccb88fdf 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -31,10 +31,11 @@ use super::Table; use crate::io::FileIO; use crate::spec::{ avro::SharedSchemaCache, bucket_dir_name, BinaryRow, CoreOptions, DataField, DataFileMeta, - FileKind, IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot, + DataType, FileKind, IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot, TimeTravelSelector, }; use crate::table::bin_pack::split_for_batch; +use crate::table::merge_tree_split_generator::{interval_partition, pack_sections, KeyComparator}; use crate::table::source::{ any_range_overlaps_file, intersect_ranges_with_file, merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, @@ -627,6 +628,28 @@ impl<'a> TableScan<'a> { None }; + // Primary-key tables must keep key-overlapping files in one split so the + // sort-merge reader sees every version of a key. The comparator decodes + // the trimmed-PK min/max keys written by the kv writer. + let pk_comparator = { + let trimmed_pks = self.table.schema().trimmed_primary_keys(); + if trimmed_pks.is_empty() { + None + } else { + let fields = self.table.schema().fields(); + let key_types: Vec = trimmed_pks + .iter() + .filter_map(|name| { + fields + .iter() + .find(|f| f.name() == name) + .map(|f| f.data_type().clone()) + }) + .collect(); + Some(KeyComparator::new(key_types)) + } + }; + // Read deletion vector index manifest once (like Java generateSplits / scanDvIndex). let (deletion_files_map, effective_row_ranges) = if let Some(index_manifest_name) = snapshot.index_manifest() { @@ -721,6 +744,14 @@ impl<'a> TableScan<'a> { } result + } else if let Some(ref comparator) = pk_comparator { + // Merge-tree path: section files by key-range overlap first, then + // bin-pack whole sections. Overlapping files always share a split. + pack_sections( + interval_partition(data_files, comparator), + target_split_size, + open_file_cost, + ) } else { split_for_batch(data_files, target_split_size, open_file_cost) }; diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index fea6ff79..d8add25f 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -2567,4 +2567,139 @@ mod tests { assert_eq!(ids, vec![1]); assert_eq!(values, vec![20]); } + + fn tiny_split_pk_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_tiny_split_pk"), + table_path.to_string(), + pk_changelog_schema(&[ + ("source.split.target-size", "1b"), + ("source.split.open-file-cost", "1b"), + ]), + None, + ) + } + + async fn commit_one_batch(table: &Table, ids: Vec, values: Vec) { + let mut tw = TableWrite::new(table, "test-user".to_string()).unwrap(); + tw.write_arrow_batch(&make_batch(ids, values)) + .await + .unwrap(); + let msgs = tw.prepare_commit().await.unwrap(); + TableCommit::new(table.clone(), "test-user".to_string()) + .commit(msgs) + .await + .unwrap(); + } + + async fn read_id_value_rows(table: &Table) -> Vec<(i32, i32)> { + let rb = table.new_read_builder(); + let plan = rb.new_scan().plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let batches: Vec = + futures::TryStreamExt::try_collect(read.to_arrow(plan.splits()).unwrap()) + .await + .unwrap(); + let mut rows: Vec<(i32, i32)> = batches + .iter() + .flat_map(|b| { + let ids = b.column(0).as_any().downcast_ref::().unwrap(); + let values = b.column(1).as_any().downcast_ref::().unwrap(); + (0..b.num_rows()).map(|i| (ids.value(i), values.value(i))) + }) + .collect(); + rows.sort_unstable(); + rows + } + + /// Regression test: three commits of the same primary key produce three + /// key-overlapping files. Even with a 1-byte split target they must stay + /// in one split so the sort-merge reader merges them, instead of each + /// split emitting its own (stale) version of the row. + #[tokio::test] + async fn test_pk_plan_keeps_overlapping_files_in_one_split_under_tiny_target() { + let file_io = test_file_io(); + let table_path = "memory:/test_tiny_split_overlap"; + setup_dirs(&file_io, table_path).await; + let table = tiny_split_pk_table(&file_io, table_path); + + commit_one_batch(&table, vec![1], vec![10]).await; + commit_one_batch(&table, vec![1], vec![20]).await; + commit_one_batch(&table, vec![1], vec![30]).await; + + let plan = table.new_read_builder().new_scan().plan().await.unwrap(); + assert_eq!( + plan.splits().len(), + 1, + "overlapping files must share a split" + ); + assert_eq!(plan.splits()[0].data_files().len(), 3); + + assert_eq!(read_id_value_rows(&table).await, vec![(1, 30)]); + } + + /// Files with disjoint key ranges may still be distributed across splits + /// for parallelism when the split target is small. + #[tokio::test] + async fn test_pk_plan_separates_disjoint_files_under_tiny_target() { + let file_io = test_file_io(); + let table_path = "memory:/test_tiny_split_disjoint"; + setup_dirs(&file_io, table_path).await; + let table = tiny_split_pk_table(&file_io, table_path); + + commit_one_batch(&table, vec![1], vec![10]).await; + commit_one_batch(&table, vec![2], vec![20]).await; + commit_one_batch(&table, vec![3], vec![30]).await; + + let plan = table.new_read_builder().new_scan().plan().await.unwrap(); + assert_eq!( + plan.splits().len(), + 3, + "disjoint files keep split parallelism" + ); + + assert_eq!( + read_id_value_rows(&table).await, + vec![(1, 10), (2, 20), (3, 30)] + ); + } + + /// Append-only tables have no primary keys (and empty min/max keys); they + /// must keep using plain file-level bin packing instead of degrading to a + /// single all-files section. + #[tokio::test] + async fn test_append_table_plan_uses_file_level_bin_pack_under_tiny_target() { + let file_io = test_file_io(); + let table_path = "memory:/test_tiny_split_append"; + setup_dirs(&file_io, table_path).await; + + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .option("bucket", "1") + .option("bucket-key", "id") + .option("source.split.target-size", "1b") + .option("source.split.open-file-cost", "1b") + .build() + .unwrap(); + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_tiny_split_append"), + table_path.to_string(), + TableSchema::new(0, &schema), + None, + ); + + commit_one_batch(&table, vec![1], vec![10]).await; + commit_one_batch(&table, vec![2], vec![20]).await; + commit_one_batch(&table, vec![3], vec![30]).await; + + let plan = table.new_read_builder().new_scan().plan().await.unwrap(); + assert_eq!( + plan.splits().len(), + 3, + "append tables keep file-level bin pack" + ); + } } From 144ce5610d24b492e4e4532f15e65cb270fb2ece Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 10 Jun 2026 11:34:29 +0800 Subject: [PATCH 2/4] fix(table/read): merge compacted PK files with overlapping key ranges MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit read_pk routed a split to the raw (non-merging) reader whenever it had no level-0 files. A split can still hold compacted files on different levels whose key ranges overlap — e.g. produced by Java/Spark compaction — and raw-reading those emits one row per version of a key. Extend the dispatch with split_requires_merge: any level-0 file or any key-range overlap among the split's files sends it through the sort-merge reader; only disjoint compacted files keep the raw fast path. Key ranges are compared with the same decoded-key comparator the split planner uses (KeyComparator::from_table_schema, extracted from plan_snapshot), and undecodable key ranges fall back to merging. --- .../src/table/merge_tree_split_generator.rs | 126 +++++++++++++++++- crates/paimon/src/table/table_read.rs | 25 +++- crates/paimon/src/table/table_scan.rs | 21 +-- 3 files changed, 146 insertions(+), 26 deletions(-) diff --git a/crates/paimon/src/table/merge_tree_split_generator.rs b/crates/paimon/src/table/merge_tree_split_generator.rs index 7cdcc467..be88e3ba 100644 --- a/crates/paimon/src/table/merge_tree_split_generator.rs +++ b/crates/paimon/src/table/merge_tree_split_generator.rs @@ -27,7 +27,7 @@ //! [IntervalPartition](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java) use super::bin_pack::pack_for_ordered; -use crate::spec::{datum_cmp, BinaryRow, DataFileMeta, DataType, Datum}; +use crate::spec::{datum_cmp, BinaryRow, DataFileMeta, DataType, Datum, TableSchema}; use std::cmp::{self, Ordering}; /// Compares serialized `BinaryRow` keys field-by-field using the trimmed @@ -49,6 +49,29 @@ impl KeyComparator { Self { key_types } } + /// Build a comparator over a table's trimmed primary keys, matching the + /// key layout the kv writer uses for min/max keys. Returns `None` for + /// tables without primary keys. + pub(crate) fn from_table_schema(schema: &TableSchema) -> Option { + let trimmed_pks = schema.trimmed_primary_keys(); + if trimmed_pks.is_empty() { + return None; + } + let fields = schema.fields(); + let key_types: Vec = trimmed_pks + .iter() + .filter_map(|name| { + fields + .iter() + .find(|f| f.name() == name) + .map(|f| f.data_type().clone()) + }) + .collect(); + // A PK name missing from the fields (should not happen) leaves the + // arity short; decode then fails and callers degrade safely. + Some(Self::new(key_types)) + } + /// Decode a serialized min/max key. Returns `None` when the key is empty /// or malformed, letting callers degrade to the safe "treat everything as /// overlapping" path instead of failing the scan. @@ -196,6 +219,43 @@ pub(crate) fn pack_sections( .collect() } +/// Whether any two files in the group overlap on primary-key range. +/// +/// Undecodable key ranges report `true` (overlap assumed), so callers fall +/// back to the merging read path rather than risk emitting unmerged rows. +pub(crate) fn has_key_overlap(files: &[DataFileMeta], comparator: &KeyComparator) -> bool { + if files.len() <= 1 { + return false; + } + let keyed = match decode_all(files.to_vec(), comparator) { + Ok(keyed) => keyed, + Err(_) => return true, + }; + let mut ranges: Vec<(&DecodedKey, &DecodedKey)> = + keyed.iter().map(|kf| (&kf.min, &kf.max)).collect(); + ranges.sort_by(|a, b| compare_decoded(a.0, b.0)); + let mut bound = ranges[0].1; + for &(min, max) in &ranges[1..] { + if compare_decoded(min, bound) != Ordering::Greater { + return true; + } + if compare_decoded(max, bound) == Ordering::Greater { + bound = max; + } + } + false +} + +/// Whether a split's files must go through the sort-merge reader. +/// +/// Level-0 files may carry unmerged duplicates of any key, and files whose +/// key ranges overlap (e.g. compacted files on different levels) hold +/// multiple versions of the same key; both require merging. Disjoint +/// compacted files can be read raw. +pub(crate) fn split_requires_merge(files: &[DataFileMeta], comparator: &KeyComparator) -> bool { + files.iter().any(|f| f.level == 0) || has_key_overlap(files, comparator) +} + #[cfg(test)] mod tests { use super::*; @@ -350,4 +410,68 @@ mod tests { vec![vec!["a"], vec!["b"], vec!["c"]] ); } + + #[test] + fn has_key_overlap_detects_cross_level_overlap() { + let comparator = int_comparator(); + let overlapping = vec![ + keyed_file("l1", 1, 50, 100, 1), + keyed_file("l2", 40, 90, 100, 2), + ]; + assert!(has_key_overlap(&overlapping, &comparator)); + + let disjoint = vec![ + keyed_file("l1", 1, 30, 100, 1), + keyed_file("l2", 31, 90, 100, 2), + ]; + assert!(!has_key_overlap(&disjoint, &comparator)); + } + + /// A wide earlier range must keep bounding later files: [1,100] overlaps + /// [50,60] even though the middle file [10,20] is disjoint from it. + #[test] + fn has_key_overlap_tracks_running_bound() { + let comparator = int_comparator(); + let files = vec![ + keyed_file("wide", 1, 100, 100, 1), + keyed_file("mid", 10, 20, 100, 2), + keyed_file("late", 50, 60, 100, 3), + ]; + assert!(has_key_overlap(&files, &comparator)); + } + + #[test] + fn has_key_overlap_assumes_overlap_for_undecodable_keys() { + let comparator = int_comparator(); + let mut no_key = keyed_file("a", 1, 2, 100, 1); + no_key.min_key = Vec::new(); + let files = vec![no_key, keyed_file("b", 10, 20, 100, 2)]; + assert!(has_key_overlap(&files, &comparator)); + } + + #[test] + fn split_requires_merge_for_level_zero_or_overlap() { + let comparator = int_comparator(); + + // Any level-0 file forces merging, even with disjoint ranges. + let with_level_zero = vec![ + keyed_file("l0", 1, 10, 100, 0), + keyed_file("l1", 11, 20, 100, 1), + ]; + assert!(split_requires_merge(&with_level_zero, &comparator)); + + // Compacted files with overlapping ranges force merging too. + let overlapping_compacted = vec![ + keyed_file("l1", 1, 50, 100, 1), + keyed_file("l2", 40, 90, 100, 2), + ]; + assert!(split_requires_merge(&overlapping_compacted, &comparator)); + + // Disjoint compacted files can be read raw. + let disjoint_compacted = vec![ + keyed_file("l1", 1, 30, 100, 1), + keyed_file("l2", 31, 90, 100, 2), + ]; + assert!(!split_requires_merge(&disjoint_compacted, &comparator)); + } } diff --git a/crates/paimon/src/table/table_read.rs b/crates/paimon/src/table/table_read.rs index 54939383..ea10873f 100644 --- a/crates/paimon/src/table/table_read.rs +++ b/crates/paimon/src/table/table_read.rs @@ -18,6 +18,7 @@ use super::data_evolution_reader::DataEvolutionReader; use super::data_file_reader::DataFileReader; use super::kv_file_reader::{KeyValueFileReader, KeyValueReadConfig}; +use super::merge_tree_split_generator::{split_requires_merge, KeyComparator}; use super::read_builder::split_scan_predicates; use super::{ArrowRecordBatchStream, Table}; use crate::arrow::filtering::reader_pruning_predicates; @@ -76,9 +77,10 @@ impl<'a> TableRead<'a> { let core_options = CoreOptions::new(self.table.schema.options()); let merge_engine = core_options.merge_engine()?; - // PK table with Deduplicate engine: splits containing level-0 files - // need KeyValueFileReader for sort-merge dedup; splits with only - // compacted files (level > 0) can use the faster DataFileReader. + // PK table with Deduplicate engine: splits that may hold multiple + // versions of a key (level-0 files or key-overlapping compacted + // files) need KeyValueFileReader for sort-merge dedup; splits of + // disjoint compacted files can use the faster DataFileReader. if has_primary_keys && matches!( merge_engine, @@ -95,8 +97,11 @@ impl<'a> TableRead<'a> { } } - /// Read PK table with Deduplicate engine: level-0 splits go through - /// KeyValueFileReader for sort-merge dedup, compacted splits use DataFileReader. + /// Read PK table with Deduplicate engine: splits that may hold multiple + /// versions of a key (any level-0 file, or compacted files with + /// overlapping key ranges) go through KeyValueFileReader for sort-merge + /// dedup; splits of disjoint compacted files use the faster + /// DataFileReader. fn read_pk( &self, data_splits: &[DataSplit], @@ -106,10 +111,18 @@ impl<'a> TableRead<'a> { return self.read_kv(data_splits, core_options); } + // No comparator means no usable trimmed PK — fall back to merging + // everything rather than risk raw-reading duplicate keys. + let comparator = KeyComparator::from_table_schema(self.table.schema()); + let mut kv_splits = Vec::new(); let mut raw_splits = Vec::new(); for split in data_splits { - if split.data_files().iter().any(|f| f.level == 0) { + let needs_merge = match comparator { + Some(ref comparator) => split_requires_merge(split.data_files(), comparator), + None => true, + }; + if needs_merge { kv_splits.push(split.clone()); } else { raw_splits.push(split.clone()); diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index ccb88fdf..18b48f18 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -31,7 +31,7 @@ use super::Table; use crate::io::FileIO; use crate::spec::{ avro::SharedSchemaCache, bucket_dir_name, BinaryRow, CoreOptions, DataField, DataFileMeta, - DataType, FileKind, IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot, + FileKind, IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot, TimeTravelSelector, }; use crate::table::bin_pack::split_for_batch; @@ -631,24 +631,7 @@ impl<'a> TableScan<'a> { // Primary-key tables must keep key-overlapping files in one split so the // sort-merge reader sees every version of a key. The comparator decodes // the trimmed-PK min/max keys written by the kv writer. - let pk_comparator = { - let trimmed_pks = self.table.schema().trimmed_primary_keys(); - if trimmed_pks.is_empty() { - None - } else { - let fields = self.table.schema().fields(); - let key_types: Vec = trimmed_pks - .iter() - .filter_map(|name| { - fields - .iter() - .find(|f| f.name() == name) - .map(|f| f.data_type().clone()) - }) - .collect(); - Some(KeyComparator::new(key_types)) - } - }; + let pk_comparator = KeyComparator::from_table_schema(self.table.schema()); // Read deletion vector index manifest once (like Java generateSplits / scanDvIndex). let (deletion_files_map, effective_row_ranges) = From b7caca1ced9d88e4ad6fbb96d85fd4d1cdbd0cd4 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 10 Jun 2026 11:44:08 +0800 Subject: [PATCH 3/4] test(datafusion): cover cross-split PK merge correctness e2e Reproduce the reviewer scenario from apache/paimon-rust#340: a 1-byte source.split.target-size forces every data file into its own split candidate, so versions of one key used to surface once per split. Cover both deduplicate (three commits of one key read back as the latest row) and partial-update (per-column updates over three commits merge into one row). Both tests fail without the merge-tree split generator fix. --- .../datafusion/tests/pk_tables.rs | 108 +++++++++++++++++- 1 file changed, 105 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index bf1a317b..3c483f38 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -19,7 +19,8 @@ //! //! Covers: basic write+read, dedup within/across commits, partitioned PK tables, //! multi-bucket, column projection, FirstRow merge engine, sequence.field, -//! INSERT OVERWRITE, filter pushdown, and error cases. +//! INSERT OVERWRITE, filter pushdown, cross-split merge correctness, and +//! error cases. //! //! Dynamic bucket and cross-partition tests are in separate files: //! - `dynamic_bucket_tables.rs` @@ -28,8 +29,8 @@ mod common; use common::{ - collect_id_name, collect_id_value, create_sql_context, create_test_env, row_count, - setup_sql_context, + collect_id_name, collect_id_value, collect_int_int_str, create_sql_context, create_test_env, + row_count, setup_sql_context, }; use datafusion::arrow::array::{Array, Int32Array, StringArray}; use paimon::catalog::Identifier; @@ -1979,3 +1980,104 @@ async fn test_pk_dv_deduplicate_read_no_error() { result.err() ); } + +// ======================= Cross-Split Merge Correctness ======================= + +/// Regression: a 1-byte split target forces every data file into its own +/// split candidate. Files holding versions of the same key overlap on key +/// range and must still be merged into a single row — previously each split +/// emitted its own (stale) version. +#[tokio::test] +async fn test_pk_dedup_merges_across_tiny_splits() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_tiny_split ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ( + 'bucket' = '1', + 'source.split.target-size' = '1b', + 'source.split.open-file-cost' = '1b' + )", + ) + .await + .unwrap(); + + for value in [10, 20, 30] { + sql_context + .sql(&format!( + "INSERT INTO paimon.test_db.t_tiny_split VALUES (1, {value})" + )) + .await + .unwrap() + .collect() + .await + .unwrap(); + } + + let rows = collect_id_value( + &sql_context, + "SELECT id, value FROM paimon.test_db.t_tiny_split", + ) + .await; + assert_eq!(rows, vec![(1, 30)]); +} + +/// Same regression for the partial-update engine: per-column updates of one +/// key spread over three commits/files must merge into a single row even +/// when the split target would otherwise separate the files. +#[tokio::test] +async fn test_pk_partial_update_merges_across_tiny_splits() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_tiny_split_pu ( + id INT NOT NULL, v_int INT, v_str STRING, + PRIMARY KEY (id) + ) WITH ( + 'bucket' = '1', + 'merge-engine' = 'partial-update', + 'source.split.target-size' = '1b', + 'source.split.open-file-cost' = '1b' + )", + ) + .await + .unwrap(); + + sql_context + .sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, 10, CAST(NULL AS STRING))") + .await + .unwrap() + .collect() + .await + .unwrap(); + sql_context + .sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, CAST(NULL AS INT), 'hello')") + .await + .unwrap() + .collect() + .await + .unwrap(); + sql_context + .sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, 100, CAST(NULL AS STRING))") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = sql_context + .sql("SELECT id, v_int, v_str FROM paimon.test_db.t_tiny_split_pu") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!( + collect_int_int_str(&batches), + vec![(1, 100, "hello".to_string())] + ); +} From adb6aeee4005b93a25f81a7b0c0f2099fbc1e4cd Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 10 Jun 2026 11:59:00 +0800 Subject: [PATCH 4/4] fix(table): keep raw-read fast path for deletion-vector and first-row tables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deletion-vector tables resolve stale key versions through DVs, not read-time merging, and KeyValueFileReader rejects splits with deletion vectors — routing their key-overlapping compacted files to the sort-merge reader broke reads of Spark-written DV tables ("KeyValueFileReader does not support deletion vectors"). Mirror Java MergeTreeSplitGenerator's fast path: when deletion vectors are enabled or the merge engine is first-row, plan_snapshot keeps plain size-based packing (no key-range sectioning), and read_pk keeps the plain level-0 dispatch instead of the overlap-aware one. Caught by test_read_primary_key_table_via_datafusion against the Spark-provisioned warehouse. --- crates/paimon/src/table/table_read.rs | 22 +++++++++++++++++----- crates/paimon/src/table/table_scan.rs | 15 ++++++++++++++- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/crates/paimon/src/table/table_read.rs b/crates/paimon/src/table/table_read.rs index ea10873f..c95d36fa 100644 --- a/crates/paimon/src/table/table_read.rs +++ b/crates/paimon/src/table/table_read.rs @@ -80,7 +80,9 @@ impl<'a> TableRead<'a> { // PK table with Deduplicate engine: splits that may hold multiple // versions of a key (level-0 files or key-overlapping compacted // files) need KeyValueFileReader for sort-merge dedup; splits of - // disjoint compacted files can use the faster DataFileReader. + // disjoint compacted files — and all compacted files of + // deletion-vector tables, where DVs mask stale versions — use the + // faster DataFileReader. if has_primary_keys && matches!( merge_engine, @@ -101,7 +103,9 @@ impl<'a> TableRead<'a> { /// versions of a key (any level-0 file, or compacted files with /// overlapping key ranges) go through KeyValueFileReader for sort-merge /// dedup; splits of disjoint compacted files use the faster - /// DataFileReader. + /// DataFileReader. Deletion-vector tables are exempt from the overlap + /// check: their stale versions are masked by DVs, and KeyValueFileReader + /// does not support DVs. fn read_pk( &self, data_splits: &[DataSplit], @@ -111,6 +115,10 @@ impl<'a> TableRead<'a> { return self.read_kv(data_splits, core_options); } + // Deletion-vector tables read raw by design: stale versions of a key + // are masked by DVs, not merged, and KeyValueFileReader does not + // support DVs. Keep the plain level-0 dispatch for them. + let dv_enabled = core_options.deletion_vectors_enabled(); // No comparator means no usable trimmed PK — fall back to merging // everything rather than risk raw-reading duplicate keys. let comparator = KeyComparator::from_table_schema(self.table.schema()); @@ -118,9 +126,13 @@ impl<'a> TableRead<'a> { let mut kv_splits = Vec::new(); let mut raw_splits = Vec::new(); for split in data_splits { - let needs_merge = match comparator { - Some(ref comparator) => split_requires_merge(split.data_files(), comparator), - None => true, + let needs_merge = if dv_enabled { + split.data_files().iter().any(|f| f.level == 0) + } else { + match comparator { + Some(ref comparator) => split_requires_merge(split.data_files(), comparator), + None => true, + } }; if needs_merge { kv_splits.push(split.clone()); diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 18b48f18..312bb2ce 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -631,7 +631,20 @@ impl<'a> TableScan<'a> { // Primary-key tables must keep key-overlapping files in one split so the // sort-merge reader sees every version of a key. The comparator decodes // the trimmed-PK min/max keys written by the kv writer. - let pk_comparator = KeyComparator::from_table_schema(self.table.schema()); + // + // Deletion-vector and first-row tables read without merging (stale rows + // are masked by DVs / level-0 is skipped), so they keep plain size-based + // packing like Java's MergeTreeSplitGenerator fast path. + let read_merges_overlapping_keys = !core_options.deletion_vectors_enabled() + && !matches!( + core_options.merge_engine(), + Ok(crate::spec::MergeEngine::FirstRow) + ); + let pk_comparator = if read_merges_overlapping_keys { + KeyComparator::from_table_schema(self.table.schema()) + } else { + None + }; // Read deletion vector index manifest once (like Java generateSplits / scanDvIndex). let (deletion_files_map, effective_row_ranges) =