Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
checkAppendFiles = true;
allowRollback = true;
}
if (conflictDetection.hasRowIdCheckFromSnapshot()) {
checkAppendFiles = true;
allowRollback = true;
}

attempts +=
tryCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public void setRowIdCheckFromSnapshot(@Nullable Long rowIdCheckFromSnapshot) {
this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
}

public boolean hasRowIdCheckFromSnapshot() {
return rowIdCheckFromSnapshot != null;
}

@Nullable
public Comparator<InternalRow> keyComparator() {
return keyComparator;
Expand All @@ -140,7 +144,7 @@ public <T extends FileEntry> boolean shouldBeOverwriteCommit(
return true;
}
}
return rowIdCheckFromSnapshot != null;
return false;
}

public Optional<RuntimeException> checkConflicts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.manifest.SimpleFileEntryWithDV;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -340,4 +342,51 @@ private void assertConflict(
}
assert (deleteCount > 0);
}

@Test
void testShouldBeOverwriteCommit() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> addOnlyEntries = new ArrayList<>();
addOnlyEntries.add(createFileEntry("f1", ADD));
addOnlyEntries.add(createFileEntry("f2", ADD));
assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, Collections.emptyList()))
.isFalse();

assertThat(
detection.shouldBeOverwriteCommit(
Collections.emptyList(), Collections.emptyList()))
.isFalse();

List<SimpleFileEntry> deleteEntries = new ArrayList<>();
deleteEntries.add(createFileEntry("f1", DELETE));
deleteEntries.add(createFileEntry("f2", ADD));
assertThat(detection.shouldBeOverwriteCommit(deleteEntries, Collections.emptyList()))
.isTrue();

List<IndexManifestEntry> dvIndexFiles = new ArrayList<>();
dvIndexFiles.add(createDvIndexEntry("dv1", ADD, Arrays.asList("f1")));
assertThat(detection.shouldBeOverwriteCommit(Collections.emptyList(), dvIndexFiles))
.isTrue();

detection.setRowIdCheckFromSnapshot(1L);
assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, Collections.emptyList()))
.isFalse();
}

private ConflictDetection createConflictDetection() {
return new ConflictDetection(
"test-table",
"test-user",
RowType.of(),
null,
null,
BucketMode.HASH_FIXED,
false,
true,
false,
null,
null,
null);
}
}
60 changes: 60 additions & 0 deletions paimon-python/pypaimon/tests/shard_table_updator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
import tempfile
import unittest
from unittest.mock import patch

import pyarrow as pa

Expand Down Expand Up @@ -591,6 +592,65 @@ def test_read_projection(self):
% actual_columns
)

def test_shard_update_passes_allow_rollback_true(self):
table_schema = pa.schema([
('a', pa.int32()),
('b', pa.int32()),
])
schema = Schema.from_pyarrow_schema(
table_schema,
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}
)
name = self._create_unique_table_name('rollback')
self.catalog.create_table(name, schema, False)
table = self.catalog.get_table(name)

write_builder = table.new_batch_write_builder()
tw = write_builder.new_write().with_write_type(['a', 'b'])
tc = write_builder.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'a': [1, 2], 'b': [10, 20]},
schema=table_schema,
))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()

upd = write_builder.new_update()
upd.with_read_projection(['a'])
upd.with_update_type(['b'])
shard = upd.new_shard_updator(0, 1)
reader = shard.arrow_reader()
for batch in iter(reader.read_next_batch, None):
shard.update_by_arrow_batch(pa.RecordBatch.from_pydict(
{'b': [99] * batch.num_rows},
schema=pa.schema([('b', pa.int32())]),
))
commit_messages = shard.prepare_commit()

from pypaimon.write.file_store_commit import FileStoreCommit
original_try_commit = FileStoreCommit._try_commit
captured_args = {}

def spy_try_commit(self_inner, **kwargs):
captured_args.update(kwargs)
return original_try_commit(self_inner, **kwargs)

with patch.object(FileStoreCommit, '_try_commit', spy_try_commit):
tc2 = write_builder.new_commit()
tc2.commit(commit_messages)
tc2.close()

self.assertTrue(
captured_args.get('allow_rollback', False),
"Row-id-check commits must pass allow_rollback=True so that "
"concurrent COMPACT snapshots can be rolled back on conflict."
)
self.assertTrue(
captured_args.get('detect_conflicts', False),
"Row-id-check commits must enable conflict detection."
)


if __name__ == '__main__':
unittest.main()
9 changes: 2 additions & 7 deletions paimon-python/pypaimon/write/commit/conflict_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
Conflict detection for commit operations.
"""


from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.file_entry import FileEntry
Expand Down Expand Up @@ -53,13 +52,9 @@ def __init__(self, data_evolution_enabled, snapshot_manager,
self.commit_scanner = commit_scanner

def should_be_overwrite_commit(self):
"""Check if the commit should be treated as an overwrite commit.

returns True if rowIdCheckFromSnapshot is set.
return False

Returns:
True if the commit should be treated as OVERWRITE.
"""
def has_row_id_check_from_snapshot(self):
return self._row_id_check_from_snapshot is not None

def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind):
Expand Down
3 changes: 3 additions & 0 deletions paimon-python/pypaimon/write/file_store_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
commit_kind = "OVERWRITE"
detect_conflicts = True
allow_rollback = True
if self.conflict_detection.has_row_id_check_from_snapshot():
detect_conflicts = True
allow_rollback = True

self._try_commit(commit_kind=commit_kind,
commit_identifier=commit_identifier,
Expand Down
Loading