From 537b974f820c70efd84929dc975c153919da1d32 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 16 Apr 2026 11:27:44 +0800 Subject: [PATCH] revert change in table_scan --- .../paimon/operation/FileStoreCommitImpl.java | 4 ++ .../operation/commit/ConflictDetection.java | 6 +- .../commit/ConflictDetectionTest.java | 49 +++++++++++++++ .../tests/shard_table_updator_test.py | 60 +++++++++++++++++++ .../write/commit/conflict_detection.py | 9 +-- .../pypaimon/write/file_store_commit.py | 3 + 6 files changed, 123 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 74a27760fc90..d2e20ebaeffb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -318,6 +318,10 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { checkAppendFiles = true; allowRollback = true; } + if (conflictDetection.hasRowIdCheckFromSnapshot()) { + checkAppendFiles = true; + allowRollback = true; + } attempts += tryCommit( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 1e84f58e517a..db15db027d20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -119,6 +119,10 @@ public void setRowIdCheckFromSnapshot(@Nullable Long rowIdCheckFromSnapshot) { this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot; } + public boolean hasRowIdCheckFromSnapshot() { + return rowIdCheckFromSnapshot != null; + } + @Nullable public Comparator keyComparator() { return keyComparator; @@ -140,7 +144,7 @@ public boolean shouldBeOverwriteCommit( return true; } } - return rowIdCheckFromSnapshot != null; + return false; } public Optional checkConflicts( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 522cb73e7205..c3e0258da28f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -25,6 +25,8 @@ import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.types.RowType; import org.junit.jupiter.api.Test; @@ -340,4 +342,51 @@ private void assertConflict( } assert (deleteCount > 0); } + + @Test + void testShouldBeOverwriteCommit() { + ConflictDetection detection = createConflictDetection(); + + List addOnlyEntries = new ArrayList<>(); + addOnlyEntries.add(createFileEntry("f1", ADD)); + addOnlyEntries.add(createFileEntry("f2", ADD)); + assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, Collections.emptyList())) + .isFalse(); + + assertThat( + detection.shouldBeOverwriteCommit( + Collections.emptyList(), Collections.emptyList())) + .isFalse(); + + List deleteEntries = new ArrayList<>(); + deleteEntries.add(createFileEntry("f1", DELETE)); + deleteEntries.add(createFileEntry("f2", ADD)); + assertThat(detection.shouldBeOverwriteCommit(deleteEntries, Collections.emptyList())) + .isTrue(); + + List dvIndexFiles = new ArrayList<>(); + dvIndexFiles.add(createDvIndexEntry("dv1", ADD, Arrays.asList("f1"))); + assertThat(detection.shouldBeOverwriteCommit(Collections.emptyList(), dvIndexFiles)) + .isTrue(); + + detection.setRowIdCheckFromSnapshot(1L); + assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, Collections.emptyList())) + .isFalse(); + } + + private ConflictDetection createConflictDetection() { + return new ConflictDetection( + "test-table", + "test-user", + RowType.of(), + null, + null, + BucketMode.HASH_FIXED, + false, + true, + false, + null, + null, + null); + } } diff --git a/paimon-python/pypaimon/tests/shard_table_updator_test.py b/paimon-python/pypaimon/tests/shard_table_updator_test.py index 1ff658c609ad..92436c584d9f 100644 --- a/paimon-python/pypaimon/tests/shard_table_updator_test.py +++ b/paimon-python/pypaimon/tests/shard_table_updator_test.py @@ -19,6 +19,7 @@ import shutil import tempfile import unittest +from unittest.mock import patch import pyarrow as pa @@ -591,6 +592,65 @@ def test_read_projection(self): % actual_columns ) + def test_shard_update_passes_allow_rollback_true(self): + table_schema = pa.schema([ + ('a', pa.int32()), + ('b', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + table_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + name = self._create_unique_table_name('rollback') + self.catalog.create_table(name, schema, False) + table = self.catalog.get_table(name) + + write_builder = table.new_batch_write_builder() + tw = write_builder.new_write().with_write_type(['a', 'b']) + tc = write_builder.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'a': [1, 2], 'b': [10, 20]}, + schema=table_schema, + )) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + upd = write_builder.new_update() + upd.with_read_projection(['a']) + upd.with_update_type(['b']) + shard = upd.new_shard_updator(0, 1) + reader = shard.arrow_reader() + for batch in iter(reader.read_next_batch, None): + shard.update_by_arrow_batch(pa.RecordBatch.from_pydict( + {'b': [99] * batch.num_rows}, + schema=pa.schema([('b', pa.int32())]), + )) + commit_messages = shard.prepare_commit() + + from pypaimon.write.file_store_commit import FileStoreCommit + original_try_commit = FileStoreCommit._try_commit + captured_args = {} + + def spy_try_commit(self_inner, **kwargs): + captured_args.update(kwargs) + return original_try_commit(self_inner, **kwargs) + + with patch.object(FileStoreCommit, '_try_commit', spy_try_commit): + tc2 = write_builder.new_commit() + tc2.commit(commit_messages) + tc2.close() + + self.assertTrue( + captured_args.get('allow_rollback', False), + "Row-id-check commits must pass allow_rollback=True so that " + "concurrent COMPACT snapshots can be rolled back on conflict." + ) + self.assertTrue( + captured_args.get('detect_conflicts', False), + "Row-id-check commits must enable conflict detection." + ) + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py b/paimon-python/pypaimon/write/commit/conflict_detection.py index 8e62c946e04d..cfbdfd4e137c 100644 --- a/paimon-python/pypaimon/write/commit/conflict_detection.py +++ b/paimon-python/pypaimon/write/commit/conflict_detection.py @@ -19,7 +19,6 @@ Conflict detection for commit operations. """ - from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.file_entry import FileEntry @@ -53,13 +52,9 @@ def __init__(self, data_evolution_enabled, snapshot_manager, self.commit_scanner = commit_scanner def should_be_overwrite_commit(self): - """Check if the commit should be treated as an overwrite commit. - - returns True if rowIdCheckFromSnapshot is set. + return False - Returns: - True if the commit should be treated as OVERWRITE. - """ + def has_row_id_check_from_snapshot(self): return self._row_id_check_from_snapshot is not None def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind): diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 893784268090..22d0fea7acfa 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -147,6 +147,9 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): commit_kind = "OVERWRITE" detect_conflicts = True allow_rollback = True + if self.conflict_detection.has_row_id_check_from_snapshot(): + detect_conflicts = True + allow_rollback = True self._try_commit(commit_kind=commit_kind, commit_identifier=commit_identifier,