forked from apache/paimon
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconflict_detection.py
More file actions
198 lines (164 loc) · 7.96 KB
/
conflict_detection.py
File metadata and controls
198 lines (164 loc) · 7.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Conflict detection for commit operations.
"""
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.file_entry import FileEntry
from pypaimon.utils.range import Range
from pypaimon.utils.range_helper import RangeHelper
from pypaimon.write.commit.commit_scanner import CommitScanner
class ConflictDetection:
"""Detects conflicts between base and delta files during commit.
This class provides row ID range conflict checks and row ID from snapshot conflict checks
for Data Evolution tables.
"""
def __init__(self, data_evolution_enabled, snapshot_manager,
manifest_list_manager: ManifestListManager, table, commit_scanner: CommitScanner):
"""Initialize ConflictDetection.
Args:
data_evolution_enabled: Whether data evolution feature is enabled.
snapshot_manager: Manager for reading snapshot metadata.
manifest_list_manager: Manager for reading manifest lists.
table: The FileStoreTable instance.
"""
self.data_evolution_enabled = data_evolution_enabled
self.snapshot_manager = snapshot_manager
self.manifest_list_manager = manifest_list_manager
self.table = table
self._row_id_check_from_snapshot = None
self.commit_scanner = commit_scanner
def should_be_overwrite_commit(self):
return False
def has_row_id_check_from_snapshot(self):
return self._row_id_check_from_snapshot is not None
def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind):
"""Run all conflict checks and return the first detected conflict.
merges base_entries and delta_entries, then runs conflict checks
on the merged result.
Args:
latest_snapshot: The latest snapshot at commit time.
base_entries: All entries read from the latest snapshot.
delta_entries: The delta entries being committed.
commit_kind: The kind of commit (e.g. "APPEND", "COMPACT", "OVERWRITE").
Returns:
A RuntimeError if a conflict is detected, otherwise None.
"""
all_entries = list(base_entries) + list(delta_entries)
try:
merged_entries = FileEntry.merge_entries(all_entries)
except Exception as e:
return RuntimeError(
"File deletion conflicts detected! Give up committing. " + str(e))
conflict = self.check_row_id_range_conflicts(commit_kind, merged_entries)
if conflict is not None:
return conflict
return self.check_row_id_from_snapshot(latest_snapshot, delta_entries)
def check_row_id_range_conflicts(self, commit_kind, commit_entries):
"""Check for row ID range conflicts among merged entries.
only enabled when data evolution is active, and checks that
overlapping row ID ranges in non-blob data files are identical.
Args:
commit_kind: The kind of commit (e.g. "APPEND", "COMPACT").
commit_entries: The entries being committed.
Returns:
A RuntimeError if conflict is detected, otherwise None.
"""
if not self.data_evolution_enabled:
return None
if self._row_id_check_from_snapshot is None and commit_kind != "COMPACT":
return None
entries_with_row_id = [
entry for entry in commit_entries
if entry.file.first_row_id is not None
]
if not entries_with_row_id:
return None
range_helper = RangeHelper(lambda entry: entry.file.row_id_range())
merged_groups = range_helper.merge_overlapping_ranges(entries_with_row_id)
for group in merged_groups:
data_files = [
entry for entry in group
if not DataFileMeta.is_blob_file(entry.file.file_name)
]
if not range_helper.are_all_ranges_same(data_files):
file_descriptions = [
"{name}(rowId={row_id}, count={count})".format(
name=entry.file.file_name,
row_id=entry.file.first_row_id,
count=entry.file.row_count,
)
for entry in data_files
]
return RuntimeError(
"For Data Evolution table, multiple 'MERGE INTO' and 'COMPACT' "
"operations have encountered conflicts, data files: "
+ str(file_descriptions))
return None
def check_row_id_from_snapshot(self, latest_snapshot, commit_entries):
"""Check for row ID conflicts from a specific snapshot onwards.
collects row ID ranges from delta entries, then checks if any
incremental changes between the check snapshot and latest snapshot
have overlapping row ID ranges.
Args:
latest_snapshot: The latest snapshot at commit time.
commit_entries: The delta entries being committed.
Returns:
A RuntimeError if conflict is detected, otherwise None.
"""
if not self.data_evolution_enabled:
return None
if self._row_id_check_from_snapshot is None:
return None
history_id_ranges = []
for entry in commit_entries:
first_row_id = entry.file.first_row_id
row_count = entry.file.row_count
if first_row_id is not None:
history_id_ranges.append(
Range(first_row_id, first_row_id + row_count - 1))
check_snapshot = self.snapshot_manager.get_snapshot_by_id(
self._row_id_check_from_snapshot)
if check_snapshot is None or check_snapshot.next_row_id is None:
raise RuntimeError(
"Next row id cannot be null for snapshot "
"{snapshot}.".format(snapshot=self._row_id_check_from_snapshot))
check_next_row_id = check_snapshot.next_row_id
for snapshot_id in range(
self._row_id_check_from_snapshot + 1,
latest_snapshot.id + 1):
snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
if snapshot is None:
continue
if snapshot.commit_kind == "COMPACT":
continue
incremental_entries = self.commit_scanner.read_incremental_entries_from_changed_partitions(
snapshot, commit_entries)
for entry in incremental_entries:
file_range = entry.file.row_id_range()
if file_range is None:
continue
if file_range.from_ < check_next_row_id:
for history_range in history_id_ranges:
if history_range.overlaps(file_range):
return RuntimeError(
"For Data Evolution table, multiple 'MERGE INTO' "
"operations have encountered conflicts, updating "
"the same file, which can render some updates "
"ineffective.")
return None