diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index bf1a317b..3c483f38 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -19,7 +19,8 @@ //! //! Covers: basic write+read, dedup within/across commits, partitioned PK tables, //! multi-bucket, column projection, FirstRow merge engine, sequence.field, -//! INSERT OVERWRITE, filter pushdown, and error cases. +//! INSERT OVERWRITE, filter pushdown, cross-split merge correctness, and +//! error cases. //! //! Dynamic bucket and cross-partition tests are in separate files: //! - `dynamic_bucket_tables.rs` @@ -28,8 +29,8 @@ mod common; use common::{ - collect_id_name, collect_id_value, create_sql_context, create_test_env, row_count, - setup_sql_context, + collect_id_name, collect_id_value, collect_int_int_str, create_sql_context, create_test_env, + row_count, setup_sql_context, }; use datafusion::arrow::array::{Array, Int32Array, StringArray}; use paimon::catalog::Identifier; @@ -1979,3 +1980,104 @@ async fn test_pk_dv_deduplicate_read_no_error() { result.err() ); } + +// ======================= Cross-Split Merge Correctness ======================= + +/// Regression: a 1-byte split target forces every data file into its own +/// split candidate. Files holding versions of the same key overlap on key +/// range and must still be merged into a single row — previously each split +/// emitted its own (stale) version. +#[tokio::test] +async fn test_pk_dedup_merges_across_tiny_splits() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_tiny_split ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ( + 'bucket' = '1', + 'source.split.target-size' = '1b', + 'source.split.open-file-cost' = '1b' + )", + ) + .await + .unwrap(); + + for value in [10, 20, 30] { + sql_context + .sql(&format!( + "INSERT INTO paimon.test_db.t_tiny_split VALUES (1, {value})" + )) + .await + .unwrap() + .collect() + .await + .unwrap(); + } + + let rows = collect_id_value( + &sql_context, + "SELECT id, value FROM paimon.test_db.t_tiny_split", + ) + .await; + assert_eq!(rows, vec![(1, 30)]); +} + +/// Same regression for the partial-update engine: per-column updates of one +/// key spread over three commits/files must merge into a single row even +/// when the split target would otherwise separate the files. +#[tokio::test] +async fn test_pk_partial_update_merges_across_tiny_splits() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_tiny_split_pu ( + id INT NOT NULL, v_int INT, v_str STRING, + PRIMARY KEY (id) + ) WITH ( + 'bucket' = '1', + 'merge-engine' = 'partial-update', + 'source.split.target-size' = '1b', + 'source.split.open-file-cost' = '1b' + )", + ) + .await + .unwrap(); + + sql_context + .sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, 10, CAST(NULL AS STRING))") + .await + .unwrap() + .collect() + .await + .unwrap(); + sql_context + .sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, CAST(NULL AS INT), 'hello')") + .await + .unwrap() + .collect() + .await + .unwrap(); + sql_context + .sql("INSERT INTO paimon.test_db.t_tiny_split_pu VALUES (1, 100, CAST(NULL AS STRING))") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = sql_context + .sql("SELECT id, v_int, v_str FROM paimon.test_db.t_tiny_split_pu") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!( + collect_int_int_str(&batches), + vec![(1, 100, "hello".to_string())] + ); +} diff --git a/crates/paimon/src/table/merge_tree_split_generator.rs b/crates/paimon/src/table/merge_tree_split_generator.rs new file mode 100644 index 00000000..be88e3ba --- /dev/null +++ b/crates/paimon/src/table/merge_tree_split_generator.rs @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Split generation for primary-key (merge-tree) tables. +//! +//! Files whose primary-key ranges overlap must be read by the same +//! sort-merge reader, so they have to stay in the same split. This module +//! groups a bucket's files into key-range "sections" and then bin-packs +//! whole sections into splits, mirroring the Java implementation. +//! +//! References: +//! [MergeTreeSplitGenerator](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java), +//! [IntervalPartition](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java) + +use super::bin_pack::pack_for_ordered; +use crate::spec::{datum_cmp, BinaryRow, DataFileMeta, DataType, Datum, TableSchema}; +use std::cmp::{self, Ordering}; + +/// Compares serialized `BinaryRow` keys field-by-field using the trimmed +/// primary-key data types. +/// +/// BinaryRow stores fields little-endian, so raw byte comparison would order +/// e.g. int 256 (`[00 01 00 00]`) before int 1 (`[01 00 00 00]`); keys must +/// be decoded before comparing. +pub(crate) struct KeyComparator { + key_types: Vec, +} + +/// A decoded key: one `Option` per trimmed primary-key field +/// (`None` = SQL NULL). +type DecodedKey = Vec>; + +impl KeyComparator { + pub(crate) fn new(key_types: Vec) -> Self { + Self { key_types } + } + + /// Build a comparator over a table's trimmed primary keys, matching the + /// key layout the kv writer uses for min/max keys. Returns `None` for + /// tables without primary keys. + pub(crate) fn from_table_schema(schema: &TableSchema) -> Option { + let trimmed_pks = schema.trimmed_primary_keys(); + if trimmed_pks.is_empty() { + return None; + } + let fields = schema.fields(); + let key_types: Vec = trimmed_pks + .iter() + .filter_map(|name| { + fields + .iter() + .find(|f| f.name() == name) + .map(|f| f.data_type().clone()) + }) + .collect(); + // A PK name missing from the fields (should not happen) leaves the + // arity short; decode then fails and callers degrade safely. + Some(Self::new(key_types)) + } + + /// Decode a serialized min/max key. Returns `None` when the key is empty + /// or malformed, letting callers degrade to the safe "treat everything as + /// overlapping" path instead of failing the scan. + fn decode(&self, key: &[u8]) -> Option { + if key.is_empty() { + return None; + } + let row = BinaryRow::from_serialized_bytes(key).ok()?; + if (row.arity() as usize) != self.key_types.len() { + return None; + } + self.key_types + .iter() + .enumerate() + .map(|(pos, dt)| row.get_datum(pos, dt).ok()) + .collect() + } +} + +/// Compare decoded keys field-by-field. NULL sorts first; fields that +/// `datum_cmp` cannot order (e.g. float NaN) compare as equal, which forces +/// the files into the same section — conservative but never incorrect. +fn compare_decoded(a: &DecodedKey, b: &DecodedKey) -> Ordering { + for (fa, fb) in a.iter().zip(b.iter()) { + let ord = match (fa, fb) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(da), Some(db)) => datum_cmp(da, db).unwrap_or(Ordering::Equal), + }; + if ord != Ordering::Equal { + return ord; + } + } + Ordering::Equal +} + +/// A file paired with its decoded min/max keys. +struct KeyedFile { + file: DataFileMeta, + min: DecodedKey, + max: DecodedKey, +} + +/// Decode every file's key range up front. Returns `None` if any file lacks +/// a usable key range, in which case callers must assume full overlap. +fn decode_all( + files: Vec, + comparator: &KeyComparator, +) -> Result, Vec> { + let mut keyed = Vec::with_capacity(files.len()); + let mut undecodable = false; + for file in &files { + match ( + comparator.decode(&file.min_key), + comparator.decode(&file.max_key), + ) { + (Some(min), Some(max)) if !undecodable => keyed.push(KeyedFile { + file: file.clone(), + min, + max, + }), + _ => undecodable = true, + } + } + if undecodable { + Err(files) + } else { + Ok(keyed) + } +} + +/// Group files into sections by primary-key range overlap. +/// +/// Files are sorted by `(min_key, max_key)`; a running upper bound tracks the +/// max key seen in the current section, and a file whose min key exceeds the +/// bound starts a new section. Sections never overlap each other, while files +/// inside one section all transitively overlap and must be merged together. +/// +/// Files with empty or undecodable key ranges collapse everything into one +/// section: no parallelism, but never a missed merge. +pub(crate) fn interval_partition( + files: Vec, + comparator: &KeyComparator, +) -> Vec> { + if files.len() <= 1 { + return if files.is_empty() { + Vec::new() + } else { + vec![files] + }; + } + + let mut keyed = match decode_all(files, comparator) { + Ok(keyed) => keyed, + Err(files) => return vec![files], + }; + keyed.sort_by(|a, b| { + compare_decoded(&a.min, &b.min).then_with(|| compare_decoded(&a.max, &b.max)) + }); + + let mut sections: Vec> = Vec::new(); + let mut current: Vec = Vec::new(); + let mut bound: Option = None; + + for kf in keyed { + if let Some(ref b) = bound { + if compare_decoded(&kf.min, b) == Ordering::Greater { + sections.push(std::mem::take(&mut current)); + bound = None; + } + } + match bound { + Some(ref b) if compare_decoded(&kf.max, b) != Ordering::Greater => {} + _ => bound = Some(kf.max), + } + current.push(kf.file); + } + if !current.is_empty() { + sections.push(current); + } + sections +} + +/// Bin-pack whole sections into splits. A section is atomic: its files +/// overlap on primary key and must never be separated, even when the section +/// alone exceeds `target_split_size`. +pub(crate) fn pack_sections( + sections: Vec>, + target_split_size: i64, + open_file_cost: i64, +) -> Vec> { + pack_for_ordered( + sections, + |section| { + section + .iter() + .map(|f| cmp::max(f.file_size, open_file_cost)) + .sum() + }, + target_split_size, + ) + .into_iter() + .map(|sections| sections.into_iter().flatten().collect()) + .collect() +} + +/// Whether any two files in the group overlap on primary-key range. +/// +/// Undecodable key ranges report `true` (overlap assumed), so callers fall +/// back to the merging read path rather than risk emitting unmerged rows. +pub(crate) fn has_key_overlap(files: &[DataFileMeta], comparator: &KeyComparator) -> bool { + if files.len() <= 1 { + return false; + } + let keyed = match decode_all(files.to_vec(), comparator) { + Ok(keyed) => keyed, + Err(_) => return true, + }; + let mut ranges: Vec<(&DecodedKey, &DecodedKey)> = + keyed.iter().map(|kf| (&kf.min, &kf.max)).collect(); + ranges.sort_by(|a, b| compare_decoded(a.0, b.0)); + let mut bound = ranges[0].1; + for &(min, max) in &ranges[1..] { + if compare_decoded(min, bound) != Ordering::Greater { + return true; + } + if compare_decoded(max, bound) == Ordering::Greater { + bound = max; + } + } + false +} + +/// Whether a split's files must go through the sort-merge reader. +/// +/// Level-0 files may carry unmerged duplicates of any key, and files whose +/// key ranges overlap (e.g. compacted files on different levels) hold +/// multiple versions of the same key; both require merging. Disjoint +/// compacted files can be read raw. +pub(crate) fn split_requires_merge(files: &[DataFileMeta], comparator: &KeyComparator) -> bool { + files.iter().any(|f| f.level == 0) || has_key_overlap(files, comparator) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{BinaryRowBuilder, IntType}; + use chrono::{DateTime, Utc}; + + fn int_key(value: i32) -> Vec { + let mut builder = BinaryRowBuilder::new(1); + builder.write_int(0, value); + builder.build_serialized() + } + + fn keyed_file(name: &str, min: i32, max: i32, file_size: i64, level: i32) -> DataFileMeta { + DataFileMeta { + file_name: name.to_string(), + file_size, + row_count: 100, + min_key: int_key(min), + max_key: int_key(max), + key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()), + value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level, + extra_files: Vec::new(), + creation_time: DateTime::::from_timestamp(0, 0), + delete_row_count: None, + embedded_index: None, + first_row_id: None, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + } + } + + fn int_comparator() -> KeyComparator { + KeyComparator::new(vec![DataType::Int(IntType::new())]) + } + + fn section_names(sections: &[Vec]) -> Vec> { + sections + .iter() + .map(|s| s.iter().map(|f| f.file_name.as_str()).collect()) + .collect() + } + + /// Int keys must be ordered numerically, not by little-endian bytes + /// (byte-wise, 256 = [00 01 00 00] would sort before 1 = [01 00 00 00]). + #[test] + fn key_comparator_orders_ints_numerically() { + let comparator = int_comparator(); + let one = comparator.decode(&int_key(1)).unwrap(); + let two = comparator.decode(&int_key(2)).unwrap(); + let big = comparator.decode(&int_key(256)).unwrap(); + assert_eq!(compare_decoded(&one, &two), Ordering::Less); + assert_eq!(compare_decoded(&two, &big), Ordering::Less); + assert_eq!(compare_decoded(&big, &one), Ordering::Greater); + } + + #[test] + fn interval_partition_groups_overlapping_files() { + let files = vec![ + keyed_file("a", 1, 10, 100, 0), + keyed_file("b", 5, 15, 100, 0), + keyed_file("c", 20, 30, 100, 0), + keyed_file("d", 25, 28, 100, 0), + ]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!( + section_names(§ions), + vec![vec!["a", "b"], vec!["c", "d"]] + ); + } + + #[test] + fn interval_partition_keeps_disjoint_files_separate() { + let files = vec![ + keyed_file("b", 3, 4, 100, 0), + keyed_file("a", 1, 2, 100, 0), + keyed_file("c", 5, 6, 100, 0), + ]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!( + section_names(§ions), + vec![vec!["a"], vec!["b"], vec!["c"]] + ); + } + + /// A later file can extend the section bound past an earlier file's max: + /// [1,100] chains [50,60] and [90,110] into one section with [105,120]. + #[test] + fn interval_partition_tracks_running_bound() { + let files = vec![ + keyed_file("a", 1, 100, 100, 0), + keyed_file("b", 50, 60, 100, 0), + keyed_file("c", 90, 110, 100, 0), + keyed_file("d", 105, 120, 100, 0), + keyed_file("e", 121, 130, 100, 0), + ]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!( + section_names(§ions), + vec![vec!["a", "b", "c", "d"], vec!["e"]] + ); + } + + #[test] + fn interval_partition_empty_key_degrades_to_single_section() { + let mut no_key = keyed_file("a", 1, 2, 100, 0); + no_key.min_key = Vec::new(); + no_key.max_key = Vec::new(); + let files = vec![no_key, keyed_file("b", 10, 20, 100, 0)]; + let sections = interval_partition(files, &int_comparator()); + assert_eq!(section_names(§ions), vec![vec!["a", "b"]]); + } + + #[test] + fn pack_sections_respects_target_size() { + let sections = vec![ + vec![keyed_file("a", 1, 2, 100, 0)], + vec![keyed_file("b", 3, 4, 100, 0)], + vec![keyed_file("c", 5, 6, 100, 0)], + ]; + let splits = pack_sections(sections, 250, 1); + assert_eq!(section_names(&splits), vec![vec!["a", "b"], vec!["c"]]); + } + + #[test] + fn pack_sections_never_splits_a_section() { + let sections = vec![vec![ + keyed_file("a", 1, 10, 100, 0), + keyed_file("b", 5, 15, 100, 0), + ]]; + let splits = pack_sections(sections, 50, 1); + assert_eq!(section_names(&splits), vec![vec!["a", "b"]]); + } + + #[test] + fn pack_sections_applies_open_file_cost() { + let sections = vec![ + vec![keyed_file("a", 1, 1, 2, 0)], + vec![keyed_file("b", 2, 2, 2, 0)], + vec![keyed_file("c", 3, 3, 2, 0)], + ]; + // Weight is max(file_size=2, open_file_cost=100) = 100 each. + let splits = pack_sections(sections, 150, 100); + assert_eq!( + section_names(&splits), + vec![vec!["a"], vec!["b"], vec!["c"]] + ); + } + + #[test] + fn has_key_overlap_detects_cross_level_overlap() { + let comparator = int_comparator(); + let overlapping = vec![ + keyed_file("l1", 1, 50, 100, 1), + keyed_file("l2", 40, 90, 100, 2), + ]; + assert!(has_key_overlap(&overlapping, &comparator)); + + let disjoint = vec![ + keyed_file("l1", 1, 30, 100, 1), + keyed_file("l2", 31, 90, 100, 2), + ]; + assert!(!has_key_overlap(&disjoint, &comparator)); + } + + /// A wide earlier range must keep bounding later files: [1,100] overlaps + /// [50,60] even though the middle file [10,20] is disjoint from it. + #[test] + fn has_key_overlap_tracks_running_bound() { + let comparator = int_comparator(); + let files = vec![ + keyed_file("wide", 1, 100, 100, 1), + keyed_file("mid", 10, 20, 100, 2), + keyed_file("late", 50, 60, 100, 3), + ]; + assert!(has_key_overlap(&files, &comparator)); + } + + #[test] + fn has_key_overlap_assumes_overlap_for_undecodable_keys() { + let comparator = int_comparator(); + let mut no_key = keyed_file("a", 1, 2, 100, 1); + no_key.min_key = Vec::new(); + let files = vec![no_key, keyed_file("b", 10, 20, 100, 2)]; + assert!(has_key_overlap(&files, &comparator)); + } + + #[test] + fn split_requires_merge_for_level_zero_or_overlap() { + let comparator = int_comparator(); + + // Any level-0 file forces merging, even with disjoint ranges. + let with_level_zero = vec![ + keyed_file("l0", 1, 10, 100, 0), + keyed_file("l1", 11, 20, 100, 1), + ]; + assert!(split_requires_merge(&with_level_zero, &comparator)); + + // Compacted files with overlapping ranges force merging too. + let overlapping_compacted = vec![ + keyed_file("l1", 1, 50, 100, 1), + keyed_file("l2", 40, 90, 100, 2), + ]; + assert!(split_requires_merge(&overlapping_compacted, &comparator)); + + // Disjoint compacted files can be read raw. + let disjoint_compacted = vec![ + keyed_file("l1", 1, 30, 100, 1), + keyed_file("l2", 31, 90, 100, 2), + ]; + assert!(!split_requires_merge(&disjoint_compacted, &comparator)); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c425a9eb..7a758b50 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -37,6 +37,7 @@ mod full_text_search_builder; pub(crate) mod global_index_scanner; mod kv_file_reader; mod kv_file_writer; +pub(crate) mod merge_tree_split_generator; mod partition_filter; mod postpone_file_writer; mod prepared_files; diff --git a/crates/paimon/src/table/table_read.rs b/crates/paimon/src/table/table_read.rs index 54939383..c95d36fa 100644 --- a/crates/paimon/src/table/table_read.rs +++ b/crates/paimon/src/table/table_read.rs @@ -18,6 +18,7 @@ use super::data_evolution_reader::DataEvolutionReader; use super::data_file_reader::DataFileReader; use super::kv_file_reader::{KeyValueFileReader, KeyValueReadConfig}; +use super::merge_tree_split_generator::{split_requires_merge, KeyComparator}; use super::read_builder::split_scan_predicates; use super::{ArrowRecordBatchStream, Table}; use crate::arrow::filtering::reader_pruning_predicates; @@ -76,9 +77,12 @@ impl<'a> TableRead<'a> { let core_options = CoreOptions::new(self.table.schema.options()); let merge_engine = core_options.merge_engine()?; - // PK table with Deduplicate engine: splits containing level-0 files - // need KeyValueFileReader for sort-merge dedup; splits with only - // compacted files (level > 0) can use the faster DataFileReader. + // PK table with Deduplicate engine: splits that may hold multiple + // versions of a key (level-0 files or key-overlapping compacted + // files) need KeyValueFileReader for sort-merge dedup; splits of + // disjoint compacted files — and all compacted files of + // deletion-vector tables, where DVs mask stale versions — use the + // faster DataFileReader. if has_primary_keys && matches!( merge_engine, @@ -95,8 +99,13 @@ impl<'a> TableRead<'a> { } } - /// Read PK table with Deduplicate engine: level-0 splits go through - /// KeyValueFileReader for sort-merge dedup, compacted splits use DataFileReader. + /// Read PK table with Deduplicate engine: splits that may hold multiple + /// versions of a key (any level-0 file, or compacted files with + /// overlapping key ranges) go through KeyValueFileReader for sort-merge + /// dedup; splits of disjoint compacted files use the faster + /// DataFileReader. Deletion-vector tables are exempt from the overlap + /// check: their stale versions are masked by DVs, and KeyValueFileReader + /// does not support DVs. fn read_pk( &self, data_splits: &[DataSplit], @@ -106,10 +115,26 @@ impl<'a> TableRead<'a> { return self.read_kv(data_splits, core_options); } + // Deletion-vector tables read raw by design: stale versions of a key + // are masked by DVs, not merged, and KeyValueFileReader does not + // support DVs. Keep the plain level-0 dispatch for them. + let dv_enabled = core_options.deletion_vectors_enabled(); + // No comparator means no usable trimmed PK — fall back to merging + // everything rather than risk raw-reading duplicate keys. + let comparator = KeyComparator::from_table_schema(self.table.schema()); + let mut kv_splits = Vec::new(); let mut raw_splits = Vec::new(); for split in data_splits { - if split.data_files().iter().any(|f| f.level == 0) { + let needs_merge = if dv_enabled { + split.data_files().iter().any(|f| f.level == 0) + } else { + match comparator { + Some(ref comparator) => split_requires_merge(split.data_files(), comparator), + None => true, + } + }; + if needs_merge { kv_splits.push(split.clone()); } else { raw_splits.push(split.clone()); diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index be0d2df4..312bb2ce 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -35,6 +35,7 @@ use crate::spec::{ TimeTravelSelector, }; use crate::table::bin_pack::split_for_batch; +use crate::table::merge_tree_split_generator::{interval_partition, pack_sections, KeyComparator}; use crate::table::source::{ any_range_overlaps_file, intersect_ranges_with_file, merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, @@ -627,6 +628,24 @@ impl<'a> TableScan<'a> { None }; + // Primary-key tables must keep key-overlapping files in one split so the + // sort-merge reader sees every version of a key. The comparator decodes + // the trimmed-PK min/max keys written by the kv writer. + // + // Deletion-vector and first-row tables read without merging (stale rows + // are masked by DVs / level-0 is skipped), so they keep plain size-based + // packing like Java's MergeTreeSplitGenerator fast path. + let read_merges_overlapping_keys = !core_options.deletion_vectors_enabled() + && !matches!( + core_options.merge_engine(), + Ok(crate::spec::MergeEngine::FirstRow) + ); + let pk_comparator = if read_merges_overlapping_keys { + KeyComparator::from_table_schema(self.table.schema()) + } else { + None + }; + // Read deletion vector index manifest once (like Java generateSplits / scanDvIndex). let (deletion_files_map, effective_row_ranges) = if let Some(index_manifest_name) = snapshot.index_manifest() { @@ -721,6 +740,14 @@ impl<'a> TableScan<'a> { } result + } else if let Some(ref comparator) = pk_comparator { + // Merge-tree path: section files by key-range overlap first, then + // bin-pack whole sections. Overlapping files always share a split. + pack_sections( + interval_partition(data_files, comparator), + target_split_size, + open_file_cost, + ) } else { split_for_batch(data_files, target_split_size, open_file_cost) }; diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index fea6ff79..d8add25f 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -2567,4 +2567,139 @@ mod tests { assert_eq!(ids, vec![1]); assert_eq!(values, vec![20]); } + + fn tiny_split_pk_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_tiny_split_pk"), + table_path.to_string(), + pk_changelog_schema(&[ + ("source.split.target-size", "1b"), + ("source.split.open-file-cost", "1b"), + ]), + None, + ) + } + + async fn commit_one_batch(table: &Table, ids: Vec, values: Vec) { + let mut tw = TableWrite::new(table, "test-user".to_string()).unwrap(); + tw.write_arrow_batch(&make_batch(ids, values)) + .await + .unwrap(); + let msgs = tw.prepare_commit().await.unwrap(); + TableCommit::new(table.clone(), "test-user".to_string()) + .commit(msgs) + .await + .unwrap(); + } + + async fn read_id_value_rows(table: &Table) -> Vec<(i32, i32)> { + let rb = table.new_read_builder(); + let plan = rb.new_scan().plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let batches: Vec = + futures::TryStreamExt::try_collect(read.to_arrow(plan.splits()).unwrap()) + .await + .unwrap(); + let mut rows: Vec<(i32, i32)> = batches + .iter() + .flat_map(|b| { + let ids = b.column(0).as_any().downcast_ref::().unwrap(); + let values = b.column(1).as_any().downcast_ref::().unwrap(); + (0..b.num_rows()).map(|i| (ids.value(i), values.value(i))) + }) + .collect(); + rows.sort_unstable(); + rows + } + + /// Regression test: three commits of the same primary key produce three + /// key-overlapping files. Even with a 1-byte split target they must stay + /// in one split so the sort-merge reader merges them, instead of each + /// split emitting its own (stale) version of the row. + #[tokio::test] + async fn test_pk_plan_keeps_overlapping_files_in_one_split_under_tiny_target() { + let file_io = test_file_io(); + let table_path = "memory:/test_tiny_split_overlap"; + setup_dirs(&file_io, table_path).await; + let table = tiny_split_pk_table(&file_io, table_path); + + commit_one_batch(&table, vec![1], vec![10]).await; + commit_one_batch(&table, vec![1], vec![20]).await; + commit_one_batch(&table, vec![1], vec![30]).await; + + let plan = table.new_read_builder().new_scan().plan().await.unwrap(); + assert_eq!( + plan.splits().len(), + 1, + "overlapping files must share a split" + ); + assert_eq!(plan.splits()[0].data_files().len(), 3); + + assert_eq!(read_id_value_rows(&table).await, vec![(1, 30)]); + } + + /// Files with disjoint key ranges may still be distributed across splits + /// for parallelism when the split target is small. + #[tokio::test] + async fn test_pk_plan_separates_disjoint_files_under_tiny_target() { + let file_io = test_file_io(); + let table_path = "memory:/test_tiny_split_disjoint"; + setup_dirs(&file_io, table_path).await; + let table = tiny_split_pk_table(&file_io, table_path); + + commit_one_batch(&table, vec![1], vec![10]).await; + commit_one_batch(&table, vec![2], vec![20]).await; + commit_one_batch(&table, vec![3], vec![30]).await; + + let plan = table.new_read_builder().new_scan().plan().await.unwrap(); + assert_eq!( + plan.splits().len(), + 3, + "disjoint files keep split parallelism" + ); + + assert_eq!( + read_id_value_rows(&table).await, + vec![(1, 10), (2, 20), (3, 30)] + ); + } + + /// Append-only tables have no primary keys (and empty min/max keys); they + /// must keep using plain file-level bin packing instead of degrading to a + /// single all-files section. + #[tokio::test] + async fn test_append_table_plan_uses_file_level_bin_pack_under_tiny_target() { + let file_io = test_file_io(); + let table_path = "memory:/test_tiny_split_append"; + setup_dirs(&file_io, table_path).await; + + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .option("bucket", "1") + .option("bucket-key", "id") + .option("source.split.target-size", "1b") + .option("source.split.open-file-cost", "1b") + .build() + .unwrap(); + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_tiny_split_append"), + table_path.to_string(), + TableSchema::new(0, &schema), + None, + ); + + commit_one_batch(&table, vec![1], vec![10]).await; + commit_one_batch(&table, vec![2], vec![20]).await; + commit_one_batch(&table, vec![3], vec![30]).await; + + let plan = table.new_read_builder().new_scan().plan().await.unwrap(); + assert_eq!( + plan.splits().len(), + 3, + "append tables keep file-level bin pack" + ); + } }