Skip to content

Commit 7bdc0f6

Browse files
author
dailiang
committed
Fix wrong merge order of increment diff split read due to concurrent write of a key with non-deterministic sequence number.
1 parent 395c706 commit 7bdc0f6

2 files changed

Lines changed: 28 additions & 4 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,8 @@ public KeyValue getResult() {
213213
} else if (kvs.size() == 2) {
214214
KeyValue before = kvs.get(0);
215215
KeyValue after = kvs.get(1);
216-
if (after.level() == AFTER_LEVEL) {
217-
if (!valueAndRowKindEquals(before, after)) {
218-
toReturn = after;
219-
}
216+
if (!valueAndRowKindEquals(before, after)) {
217+
toReturn = after.level() == AFTER_LEVEL ? after : before;
220218
}
221219
} else {
222220
throw new IllegalArgumentException("Illegal kv number: " + kvs.size());

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,6 +1019,32 @@ public void testIncrementScanMode() throws Exception {
10191019
Row.of("+I", 2, "B"), Row.of("-D", 2, "B"), Row.of("+I", 3, "C"));
10201020
}
10211021

1022+
@Test
1023+
public void testIncrementScanModeWithInsertOverwrite() throws Exception {
1024+
1025+
sql("CREATE TABLE test_scan_mode (id INT PRIMARY KEY NOT ENFORCED, v STRING)");
1026+
1027+
// snapshot 1
1028+
sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'A'), (1, 'B'), (1, 'C')");
1029+
// snapshot 2
1030+
sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'C'), (1, 'D')");
1031+
1032+
List<Row> result =
1033+
sql(
1034+
"SELECT * FROM `test_scan_mode$audit_log` "
1035+
+ "/*+ OPTIONS('incremental-between'='1,2','incremental-between-scan-mode'='diff') */");
1036+
assertThat(result).containsExactlyInAnyOrder(Row.of("+I", 1, "D"));
1037+
1038+
// snapshot 3
1039+
sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'D')");
1040+
1041+
result =
1042+
sql(
1043+
"SELECT * FROM `test_scan_mode$audit_log` "
1044+
+ "/*+ OPTIONS('incremental-between'='2,2','incremental-between-scan-mode'='diff') */");
1045+
assertThat(result).isEmpty();
1046+
}
1047+
10221048
@Test
10231049
public void testAuditLogTableWithComputedColumn() throws Exception {
10241050
sql("CREATE TABLE test_table (a int, b int, c AS a + b);");

0 commit comments

Comments
 (0)