Skip to content

Commit 03394f9

Browse files
[core][python] Fix commitKind mislabeled as OVERWRITE for data-evolution merge into (#7639)
1 parent cf96eed commit 03394f9

6 files changed

Lines changed: 123 additions & 8 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
318318
checkAppendFiles = true;
319319
allowRollback = true;
320320
}
321+
if (conflictDetection.hasRowIdCheckFromSnapshot()) {
322+
checkAppendFiles = true;
323+
allowRollback = true;
324+
}
321325

322326
attempts +=
323327
tryCommit(

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ public void setRowIdCheckFromSnapshot(@Nullable Long rowIdCheckFromSnapshot) {
119119
this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
120120
}
121121

122+
public boolean hasRowIdCheckFromSnapshot() {
123+
return rowIdCheckFromSnapshot != null;
124+
}
125+
122126
@Nullable
123127
public Comparator<InternalRow> keyComparator() {
124128
return keyComparator;
@@ -140,7 +144,7 @@ public <T extends FileEntry> boolean shouldBeOverwriteCommit(
140144
return true;
141145
}
142146
}
143-
return rowIdCheckFromSnapshot != null;
147+
return false;
144148
}
145149

146150
public Optional<RuntimeException> checkConflicts(

paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.paimon.manifest.IndexManifestEntry;
2626
import org.apache.paimon.manifest.SimpleFileEntry;
2727
import org.apache.paimon.manifest.SimpleFileEntryWithDV;
28+
import org.apache.paimon.table.BucketMode;
29+
import org.apache.paimon.types.RowType;
2830

2931
import org.junit.jupiter.api.Test;
3032

@@ -340,4 +342,51 @@ private void assertConflict(
340342
}
341343
assert (deleteCount > 0);
342344
}
345+
346+
@Test
347+
void testShouldBeOverwriteCommit() {
348+
ConflictDetection detection = createConflictDetection();
349+
350+
List<SimpleFileEntry> addOnlyEntries = new ArrayList<>();
351+
addOnlyEntries.add(createFileEntry("f1", ADD));
352+
addOnlyEntries.add(createFileEntry("f2", ADD));
353+
assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, Collections.emptyList()))
354+
.isFalse();
355+
356+
assertThat(
357+
detection.shouldBeOverwriteCommit(
358+
Collections.emptyList(), Collections.emptyList()))
359+
.isFalse();
360+
361+
List<SimpleFileEntry> deleteEntries = new ArrayList<>();
362+
deleteEntries.add(createFileEntry("f1", DELETE));
363+
deleteEntries.add(createFileEntry("f2", ADD));
364+
assertThat(detection.shouldBeOverwriteCommit(deleteEntries, Collections.emptyList()))
365+
.isTrue();
366+
367+
List<IndexManifestEntry> dvIndexFiles = new ArrayList<>();
368+
dvIndexFiles.add(createDvIndexEntry("dv1", ADD, Arrays.asList("f1")));
369+
assertThat(detection.shouldBeOverwriteCommit(Collections.emptyList(), dvIndexFiles))
370+
.isTrue();
371+
372+
detection.setRowIdCheckFromSnapshot(1L);
373+
assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, Collections.emptyList()))
374+
.isFalse();
375+
}
376+
377+
private ConflictDetection createConflictDetection() {
378+
return new ConflictDetection(
379+
"test-table",
380+
"test-user",
381+
RowType.of(),
382+
null,
383+
null,
384+
BucketMode.HASH_FIXED,
385+
false,
386+
true,
387+
false,
388+
null,
389+
null,
390+
null);
391+
}
343392
}

paimon-python/pypaimon/tests/shard_table_updator_test.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import shutil
2020
import tempfile
2121
import unittest
22+
from unittest.mock import patch
2223

2324
import pyarrow as pa
2425

@@ -591,6 +592,65 @@ def test_read_projection(self):
591592
% actual_columns
592593
)
593594

595+
def test_shard_update_passes_allow_rollback_true(self):
596+
table_schema = pa.schema([
597+
('a', pa.int32()),
598+
('b', pa.int32()),
599+
])
600+
schema = Schema.from_pyarrow_schema(
601+
table_schema,
602+
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}
603+
)
604+
name = self._create_unique_table_name('rollback')
605+
self.catalog.create_table(name, schema, False)
606+
table = self.catalog.get_table(name)
607+
608+
write_builder = table.new_batch_write_builder()
609+
tw = write_builder.new_write().with_write_type(['a', 'b'])
610+
tc = write_builder.new_commit()
611+
tw.write_arrow(pa.Table.from_pydict(
612+
{'a': [1, 2], 'b': [10, 20]},
613+
schema=table_schema,
614+
))
615+
tc.commit(tw.prepare_commit())
616+
tw.close()
617+
tc.close()
618+
619+
upd = write_builder.new_update()
620+
upd.with_read_projection(['a'])
621+
upd.with_update_type(['b'])
622+
shard = upd.new_shard_updator(0, 1)
623+
reader = shard.arrow_reader()
624+
for batch in iter(reader.read_next_batch, None):
625+
shard.update_by_arrow_batch(pa.RecordBatch.from_pydict(
626+
{'b': [99] * batch.num_rows},
627+
schema=pa.schema([('b', pa.int32())]),
628+
))
629+
commit_messages = shard.prepare_commit()
630+
631+
from pypaimon.write.file_store_commit import FileStoreCommit
632+
original_try_commit = FileStoreCommit._try_commit
633+
captured_args = {}
634+
635+
def spy_try_commit(self_inner, **kwargs):
636+
captured_args.update(kwargs)
637+
return original_try_commit(self_inner, **kwargs)
638+
639+
with patch.object(FileStoreCommit, '_try_commit', spy_try_commit):
640+
tc2 = write_builder.new_commit()
641+
tc2.commit(commit_messages)
642+
tc2.close()
643+
644+
self.assertTrue(
645+
captured_args.get('allow_rollback', False),
646+
"Row-id-check commits must pass allow_rollback=True so that "
647+
"concurrent COMPACT snapshots can be rolled back on conflict."
648+
)
649+
self.assertTrue(
650+
captured_args.get('detect_conflicts', False),
651+
"Row-id-check commits must enable conflict detection."
652+
)
653+
594654

595655
if __name__ == '__main__':
596656
unittest.main()

paimon-python/pypaimon/write/commit/conflict_detection.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
Conflict detection for commit operations.
2020
"""
2121

22-
2322
from pypaimon.manifest.manifest_list_manager import ManifestListManager
2423
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
2524
from pypaimon.manifest.schema.file_entry import FileEntry
@@ -53,13 +52,9 @@ def __init__(self, data_evolution_enabled, snapshot_manager,
5352
self.commit_scanner = commit_scanner
5453

5554
def should_be_overwrite_commit(self):
56-
"""Check if the commit should be treated as an overwrite commit.
57-
58-
returns True if rowIdCheckFromSnapshot is set.
55+
return False
5956

60-
Returns:
61-
True if the commit should be treated as OVERWRITE.
62-
"""
57+
def has_row_id_check_from_snapshot(self):
6358
return self._row_id_check_from_snapshot is not None
6459

6560
def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind):

paimon-python/pypaimon/write/file_store_commit.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
147147
commit_kind = "OVERWRITE"
148148
detect_conflicts = True
149149
allow_rollback = True
150+
if self.conflict_detection.has_row_id_check_from_snapshot():
151+
detect_conflicts = True
152+
allow_rollback = True
150153

151154
self._try_commit(commit_kind=commit_kind,
152155
commit_identifier=commit_identifier,

0 commit comments

Comments
 (0)