Skip to content

fix: merge primary-key rows correctly when bucket files span multiple splits#374

Open
TheR1sing3un wants to merge 4 commits into
apache:mainfrom
TheR1sing3un:fix/pk-split-merge-tree-generator
Open

fix: merge primary-key rows correctly when bucket files span multiple splits#374
TheR1sing3un wants to merge 4 commits into
apache:mainfrom
TheR1sing3un:fix/pk-split-merge-tree-generator

Conversation

@TheR1sing3un

@TheR1sing3un TheR1sing3un commented Jun 10, 2026

Copy link
Copy Markdown
Member

Purpose

Linked issue: close #373

PK tables could return unmerged (duplicate-key) rows in two ways sharing one
root cause — split planning and read dispatch were blind to key-range overlap:

  1. plan_snapshot bin-packed a bucket's files purely by size, so files
    holding versions of the same key could land in different splits; each
    split runs its own sort-merge reader and emits its own version.
    Reproducible with source.split.target-size=1b and three commits of one
    key: SELECT returned 3 rows instead of 1.
  2. read_pk sent splits without level-0 files to the raw (non-merging)
    reader, but compacted files on different levels can still overlap on key
    range (e.g. produced by Java/Spark compaction) and need merging.

Reported by @JingsongLi while reviewing #340; affects deduplicate and
partial-update on main, and aggregation once #340 lands.

Changes

  • table/merge_tree_split_generator.rs (new) — port of Java
    MergeTreeSplitGenerator / IntervalPartition:
    • KeyComparator decodes serialized BinaryRow min/max keys with the
      trimmed-PK data types and compares via datum_cmp. BinaryRow stores
      fields little-endian, so raw byte comparison would order int 256 before
      int 1 — decoding is mandatory for correctness.
    • interval_partition sorts files by decoded (min_key, max_key) and
      groups transitively overlapping files into sections; sections never
      overlap each other.
    • pack_sections bin-packs whole sections into splits (reusing
      pack_for_ordered); a section is atomic and never separated.
    • Fail-safe: empty/undecodable key ranges collapse the bucket into one
      section — losing parallelism, never correctness.
  • table_scan.rs — PK tables route through
    pack_sections(interval_partition(...)) on the non-data-evolution path;
    append-only tables keep the existing file-level split_for_batch.
  • table_read.rsread_pk dispatch is now overlap-aware
    (split_requires_merge): any level-0 file or key-overlapping compacted
    files → sort-merge reader; only disjoint compacted files keep the raw
    fast path.
  • Deletion-vector / first-row fast path — mirroring Java
    MergeTreeSplitGenerator: DV tables resolve stale versions through DVs
    (and KeyValueFileReader rejects DV splits), and first-row tables skip
    level-0 at plan time, so both keep plain size-based packing and the plain
    level-0 read dispatch.

Tests

  • 16 unit tests: comparator ordering (little-endian regression), section
    grouping, running-bound chains, atomic packing, overlap detection,
    undecodable-key degradation.
  • 3 plan-level regression tests (memory FileIO, real write→commit→plan→read):
    overlapping files share one split and read back merged; disjoint files keep
    split parallelism; append tables keep file-level bin pack.
  • 2 DataFusion e2e tests reproducing the reviewer scenario for deduplicate
    and partial-update. All new tests fail without the fix.
  • Verified against the Spark-provisioned warehouse
    (make docker-up + cargo test -p paimon-datafusion --all-targets),
    which caught and now guards the deletion-vector interaction.

Out of scope

  • first-row reads never go through read_pk (pre-existing routing); the
    same overlap consideration for its raw path can be tracked separately.
  • Java's rawConvertible flag plumbing through DataSplit; this PR derives
    the same decision read-side from file metadata instead.

plan_snapshot bin-packed a bucket's files purely by size, so files whose
primary-key ranges overlap could land in different splits. Each split
runs its own sort-merge reader, so the same key surfaced once per split
instead of merging to a single row (reproducible with three commits of
one key under source.split.target-size=1b).

Port the Java MergeTreeSplitGenerator/IntervalPartition algorithm: sort
files by decoded (min_key, max_key), group transitively overlapping
files into sections, then bin-pack whole sections via pack_for_ordered.
Keys are decoded with the trimmed-PK data types and compared through
datum_cmp — BinaryRow stores fields little-endian, so raw byte
comparison would order int 256 before int 1. Undecodable key ranges
collapse the bucket into one section, trading parallelism for
correctness. Append-only tables keep the file-level bin pack.
read_pk routed a split to the raw (non-merging) reader whenever it had
no level-0 files. A split can still hold compacted files on different
levels whose key ranges overlap — e.g. produced by Java/Spark
compaction — and raw-reading those emits one row per version of a key.

Extend the dispatch with split_requires_merge: any level-0 file or any
key-range overlap among the split's files sends it through the
sort-merge reader; only disjoint compacted files keep the raw fast
path. Key ranges are compared with the same decoded-key comparator the
split planner uses (KeyComparator::from_table_schema, extracted from
plan_snapshot), and undecodable key ranges fall back to merging.
Reproduce the reviewer scenario from apache#340: a 1-byte
source.split.target-size forces every data file into its own split
candidate, so versions of one key used to surface once per split.
Cover both deduplicate (three commits of one key read back as the
latest row) and partial-update (per-column updates over three commits
merge into one row). Both tests fail without the merge-tree split
generator fix.
… tables

Deletion-vector tables resolve stale key versions through DVs, not
read-time merging, and KeyValueFileReader rejects splits with deletion
vectors — routing their key-overlapping compacted files to the
sort-merge reader broke reads of Spark-written DV tables
("KeyValueFileReader does not support deletion vectors").

Mirror Java MergeTreeSplitGenerator's fast path: when deletion vectors
are enabled or the merge engine is first-row, plan_snapshot keeps plain
size-based packing (no key-range sectioning), and read_pk keeps the
plain level-0 dispatch instead of the overlap-aware one.

Caught by test_read_primary_key_table_via_datafusion against the
Spark-provisioned warehouse.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Primary-key tables return unmerged rows when one bucket's files span multiple splits

1 participant