Skip to content

Commit 427c785

Browse files
committed
Cleaner should handle retries of killed compaction attemps
1 parent bd816ff commit 427c785

7 files changed

Lines changed: 108 additions & 8 deletions

File tree

ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1380,7 +1380,7 @@ public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDi
13801380
// Filter out all delta directories that are shadowed by others
13811381
findBestWorkingDeltas(writeIdList, directory);
13821382

1383-
if(directory.getOldestBase() != null && directory.getBase() == null &&
1383+
if (directory.getOldestBase() != null && directory.getBase() == null &&
13841384
isCompactedBase(directory.getOldestBase(), fs, dirSnapshots)) {
13851385
/*
13861386
* If here, it means there was a base_x (> 1 perhaps) but none were suitable for given

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ fs, path, getConf(), validWriteIdList, Ref.from(false), false,
163163

164164
// Make sure there are no leftovers below the compacted watermark
165165
boolean success = false;
166-
getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
166+
if (info.minOpenWriteId < 0) {
167+
getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
168+
}
167169

168170
dir = AcidUtils.getAcidState(
169171
fs, path, getConf(),

ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,10 @@ String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
729729
}
730730

731731
protected long compactInTxn(CompactionRequest rqst) throws Exception {
732+
return compactInTxn(rqst, true);
733+
}
734+
735+
protected long compactInTxn(CompactionRequest rqst, Boolean commit) throws Exception {
732736
txnHandler.compact(rqst);
733737
FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest();
734738
findNextCompactRequest.setWorkerId("fred");
@@ -748,9 +752,17 @@ protected long compactInTxn(CompactionRequest rqst) throws Exception {
748752

749753
ci.highestWriteId = tblValidWriteIds.getHighWatermark();
750754
txnHandler.updateCompactorState(ci, compactorTxnId);
751-
txnHandler.markCompacted(ci);
752-
txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
753-
Thread.sleep(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
755+
756+
if (commit != null) {
757+
if (commit) {
758+
txnHandler.markCompacted(ci);
759+
txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
760+
} else {
761+
txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId));
762+
}
763+
Thread.sleep(MetastoreConf.getTimeVar(
764+
conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
765+
}
754766
return compactorTxnId;
755767
}
756768

ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1215,7 +1215,7 @@ public void testCleanupOnConcurrentMinorCompactions() throws Exception {
12151215
Assert.assertTrue(sawDelta);
12161216
}
12171217

1218-
private void allocateTableWriteId(String dbName, String tblName, long txnId) throws Exception {
1218+
void allocateTableWriteId(String dbName, String tblName, long txnId) throws Exception {
12191219
AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName);
12201220
awiRqst.setTxnIds(Collections.singletonList(txnId));
12211221
AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);

ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,94 @@
1818

1919
package org.apache.hadoop.hive.ql.txn.compactor;
2020

21+
import org.apache.hadoop.fs.Path;
22+
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
23+
import org.apache.hadoop.hive.metastore.api.CompactionType;
24+
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
25+
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
26+
import org.apache.hadoop.hive.metastore.api.Table;
27+
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
28+
import org.apache.hadoop.hive.metastore.txn.TxnStore;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
32+
import java.util.List;
33+
34+
import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
35+
import static org.junit.jupiter.api.Assertions.assertEquals;
36+
2137
public class TestCleanerWithMinHistoryWriteId extends TestCleaner {
38+
39+
@Override
40+
@BeforeEach
41+
public void setup() throws Exception {
42+
super.setup();
43+
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 0);
44+
}
45+
2246
@Override
2347
protected boolean useMinHistoryWriteId() {
2448
return true;
2549
}
50+
51+
@Test
52+
public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception {
53+
Table t = prepareTestTable();
54+
CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
55+
long compactTxn = compactInTxn(rqst, false);
56+
addBaseFile(t, null, 25L, 25, compactTxn);
57+
58+
txnHandler.revokeTimedoutWorkers(1L);
59+
compactTxn = compactInTxn(rqst);
60+
addBaseFile(t, null, 25L, 25, compactTxn);
61+
62+
startCleaner();
63+
64+
// Check there are no compactions requests left.
65+
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
66+
assertEquals(1, rsp.getCompactsSize());
67+
assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState());
68+
69+
// Check that the files are removed
70+
List<Path> paths = getDirectories(conf, t, null);
71+
assertEquals(1, paths.size());
72+
assertEquals(addVisibilitySuffix("base_25", 27), paths.getFirst().getName());
73+
}
74+
75+
@Test
76+
public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception {
77+
Table t = prepareTestTable();
78+
CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
79+
long compactTxn = compactInTxn(rqst, null);
80+
addBaseFile(t, null, 25L, 25, compactTxn);
81+
82+
txnHandler.revokeTimedoutWorkers(1L);
83+
compactTxn = compactInTxn(rqst);
84+
addBaseFile(t, null, 25L, 25, compactTxn);
85+
86+
startCleaner();
87+
88+
// Check there are no compactions requests left.
89+
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
90+
assertEquals(1, rsp.getCompactsSize());
91+
assertEquals(TxnStore.FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
92+
assertEquals("txnid:26 is open and <= hwm: 27", rsp.getCompacts().getFirst().getErrorMessage());
93+
94+
// Check that the files are not removed
95+
List<Path> paths = getDirectories(conf, t, null);
96+
assertEquals(6, paths.size());
97+
}
98+
99+
private Table prepareTestTable() throws Exception {
100+
Table t = newTable("default", "camtc", false);
101+
102+
addBaseFile(t, null, 20L, 20);
103+
addDeltaFile(t, null, 21L, 22L, 2);
104+
addDeltaFile(t, null, 23L, 24L, 2);
105+
addDeltaFile(t, null, 25L, 25, 2);
106+
107+
burnThroughTransactions("default", "camtc", 25);
108+
return t;
109+
}
110+
26111
}

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns
8888
if (isAbortCleanup) {
8989
exceptions[i] = txnId;
9090
} else {
91-
assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" + highWatermark;
91+
throw new IllegalStateException(
92+
JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + highWatermark);
9293
}
9394
}
9495
++i;

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public String getParameterizedQueryString(DatabaseProduct databaseProduct) throw
8888
"ON \"cq1\".\"CQ_DATABASE\" = \"hwm\".\"MH_DATABASE\"" +
8989
" AND \"cq1\".\"CQ_TABLE\" = \"hwm\".\"MH_TABLE\"";
9090

91-
whereClause += " AND (\"CQ_HIGHEST_WRITE_ID\" < \"MIN_OPEN_WRITE_ID\" OR \"MIN_OPEN_WRITE_ID\" IS NULL)";
91+
whereClause += " AND (\"CQ_HIGHEST_WRITE_ID\" < \"MIN_OPEN_WRITE_ID\"-1 OR \"MIN_OPEN_WRITE_ID\" IS NULL)";
9292

9393
} else if (minOpenTxnWaterMark > 0) {
9494
whereClause += " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)";

0 commit comments

Comments
 (0)