Skip to content

Commit d7e55a7

Browse files
committed
HIVE-29244: Add catalog field into ShowLocksRequest
1 parent fb5e870 commit d7e55a7

85 files changed

Lines changed: 12556 additions & 6561 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreLock.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public class MetastoreLock implements HiveLock {
7878

7979
private final ClientPool<IMetaStoreClient, TException> metaClients;
8080

81+
private final String catalogName;
8182
private final String databaseName;
8283
private final String tableName;
8384
private final String fullName;
@@ -100,6 +101,7 @@ public MetastoreLock(Configuration conf, ClientPool<IMetaStoreClient, TException
100101
String catalogName, String databaseName, String tableName) {
101102
this.metaClients = metaClients;
102103
this.fullName = catalogName + "." + databaseName + "." + tableName;
104+
this.catalogName = catalogName;
103105
this.databaseName = databaseName;
104106
this.tableName = tableName;
105107

@@ -179,8 +181,8 @@ public void unlock() {
179181
@SuppressWarnings("checkstyle:CyclomaticComplexity")
180182
private long acquireLock() throws LockException {
181183
if (hmsLockId.isPresent()) {
182-
throw new IllegalArgumentException(String.format("HMS lock ID=%s already acquired for table %s.%s",
183-
hmsLockId.get(), databaseName, tableName));
184+
throw new IllegalArgumentException(String.format("HMS lock ID=%s already acquired for table %s.%s.%s",
185+
hmsLockId.get(), catalogName, databaseName, tableName));
184186
}
185187
LockInfo lockInfo = createLock();
186188

@@ -212,12 +214,13 @@ private long acquireLock() throws LockException {
212214
lockInfo.lockState = newState;
213215
if (newState.equals(LockState.WAITING)) {
214216
throw new WaitingForLockException(String.format(
215-
"Waiting for lock on table %s.%s", databaseName, tableName));
217+
"Waiting for lock on table %s.%s.%s", catalogName, databaseName, tableName));
216218
}
217219
} catch (InterruptedException e) {
218220
Thread.interrupted(); // Clear the interrupt status flag
219221
LOG.warn(
220-
"Interrupted while waiting for lock on table {}.{}",
222+
"Interrupted while waiting for lock on table {}.{}.{}",
223+
catalogName,
221224
databaseName,
222225
tableName,
223226
e);
@@ -239,19 +242,19 @@ private long acquireLock() throws LockException {
239242
if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
240243
// timeout and do not have lock acquired
241244
if (timeout) {
242-
throw new LockException("Timed out after %s ms waiting for lock on %s.%s",
243-
duration, databaseName, tableName);
245+
throw new LockException("Timed out after %s ms waiting for lock on %s.%s.%s",
246+
duration, catalogName, databaseName, tableName);
244247
}
245248

246249
if (thriftError != null) {
247250
throw new LockException(
248-
thriftError, "Metastore operation failed for %s.%s", databaseName, tableName);
251+
thriftError, "Metastore operation failed for %s.%s.%s", catalogName, databaseName, tableName);
249252
}
250253

251254
// Just for safety. We should not get here.
252255
throw new LockException(
253-
"Could not acquire the lock on %s.%s, lock request ended in state %s",
254-
databaseName, tableName, lockInfo.lockState);
256+
"Could not acquire the lock on %s.%s.%s, lock request ended in state %s",
257+
catalogName, databaseName, tableName, lockInfo.lockState);
255258
} else {
256259
return lockInfo.lockId;
257260
}
@@ -277,6 +280,7 @@ private LockInfo createLock() throws LockException {
277280

278281
LockComponent lockComponent =
279282
new LockComponent(LockType.EXCL_WRITE, LockLevel.TABLE, databaseName);
283+
lockComponent.setCatName(catalogName);
280284
lockComponent.setTablename(tableName);
281285
LockRequest lockRequest =
282286
new LockRequest(
@@ -318,27 +322,32 @@ private LockInfo createLock() throws LockException {
318322
}
319323
}
320324

321-
throw new LockException("Failed to find lock for table %s.%s", databaseName, tableName);
325+
throw new LockException("Failed to find lock for table %s.%s.%s", catalogName, databaseName,
326+
tableName);
322327
} catch (InterruptedException e) {
323328
Thread.currentThread().interrupt();
324329
interrupted.set(true);
325330
LOG.warn(
326-
"Interrupted while trying to find lock for table {}.{}", databaseName, tableName, e);
331+
"Interrupted while trying to find lock for table {}.{}.{}", catalogName, databaseName,
332+
tableName, e);
327333
throw new LockException(
328-
e, "Interrupted while trying to find lock for table %s.%s", databaseName, tableName);
334+
e, "Interrupted while trying to find lock for table %s.%s.%s", catalogName, databaseName,
335+
tableName);
329336
}
330337
} catch (InterruptedException e) {
331338
Thread.currentThread().interrupt();
332339
interrupted.set(true);
333-
LOG.warn("Interrupted while creating lock on table {}.{}", databaseName, tableName, e);
340+
LOG.warn("Interrupted while creating lock on table {}.{}.{}", catalogName, databaseName,
341+
tableName, e);
334342
throw new LockException(
335-
e, "Interrupted while creating lock on table %s.%s", databaseName, tableName);
343+
e, "Interrupted while creating lock on table %s.%s.%s", catalogName, databaseName,
344+
tableName);
336345
}
337346
},
338347
LockException.class);
339348

340349
// This should be initialized always, or exception should be thrown.
341-
LOG.debug("Lock {} created for table {}.{}", lockInfo, databaseName, tableName);
350+
LOG.debug("Lock {} created for table {}.{}.{}", lockInfo, catalogName, databaseName, tableName);
342351
return lockInfo;
343352
}
344353

@@ -354,13 +363,14 @@ private LockInfo findLock() throws LockException, InterruptedException {
354363
HiveVersion.min(HiveVersion.HIVE_2),
355364
"Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
356365
ShowLocksRequest showLocksRequest = new ShowLocksRequest();
366+
showLocksRequest.setCatname(catalogName);
357367
showLocksRequest.setDbname(databaseName);
358368
showLocksRequest.setTablename(tableName);
359369
ShowLocksResponse response;
360370
try {
361371
response = metaClients.run(client -> client.showLocks(showLocksRequest));
362372
} catch (TException e) {
363-
throw new LockException(e, "Failed to find lock for table %s.%s", databaseName, tableName);
373+
throw new LockException(e, "Failed to find lock for table %s.%s.%s", catalogName, databaseName, tableName);
364374
}
365375
for (ShowLocksResponseElement lock : response.getLocks()) {
366376
if (lock.getAgentInfo().equals(agentInfo)) {
@@ -403,19 +413,22 @@ private void unlock(Optional<Long> lockId) {
403413
// Interrupted unlock. We try to unlock one more time if we have a lockId
404414
try {
405415
Thread.interrupted(); // Clear the interrupt status flag for now, so we can retry unlock
406-
LOG.warn("Interrupted unlock we try one more time {}.{}", databaseName, tableName, ie);
416+
LOG.warn("Interrupted unlock we try one more time {}.{}.{}", catalogName, databaseName,
417+
tableName, ie);
407418
doUnlock(id);
408419
} catch (Exception e) {
409-
LOG.warn("Failed to unlock even on 2nd attempt {}.{}", databaseName, tableName, e);
420+
LOG.warn("Failed to unlock even on 2nd attempt {}.{}.{}", catalogName, databaseName,
421+
tableName, e);
410422
} finally {
411423
Thread.currentThread().interrupt(); // Set back the interrupt status
412424
}
413425
} else {
414426
Thread.currentThread().interrupt(); // Set back the interrupt status
415-
LOG.warn("Interrupted finding locks to unlock {}.{}", databaseName, tableName, ie);
427+
LOG.warn("Interrupted finding locks to unlock {}.{}.{}", catalogName, databaseName,
428+
tableName, ie);
416429
}
417430
} catch (Exception e) {
418-
LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e);
431+
LOG.warn("Failed to unlock {}.{}.{}", catalogName, databaseName, tableName, e);
419432
}
420433
}
421434

iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ public void testUnLockAfterInterruptedLock() throws TException {
369369
.isInstanceOf(RuntimeException.class)
370370
.hasMessage(
371371
"org.apache.iceberg.hive.LockException: " +
372-
"Interrupted while creating lock on table hivedb.tbl");
372+
"Interrupted while creating lock on table hive.hivedb.tbl");
373373

374374
verify(spyClient, times(1)).unlock(eq(dummyLockId));
375375
// Make sure that we exit the lock loop on InterruptedException
@@ -393,7 +393,7 @@ public void testUnLockAfterInterruptedLockCheck() throws TException {
393393
.isInstanceOf(RuntimeException.class)
394394
.hasMessage(
395395
"org.apache.iceberg.hive.LockException: " +
396-
"Could not acquire the lock on hivedb.tbl, lock request ended in state WAITING");
396+
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state WAITING");
397397

398398
verify(spyClient, times(1)).unlock(eq(dummyLockId));
399399
// Make sure that we exit the checkLock loop on InterruptedException
@@ -452,7 +452,7 @@ public void testLockFailureAtFirstTime() throws TException {
452452
.isInstanceOf(CommitFailedException.class)
453453
.hasMessage(
454454
"org.apache.iceberg.hive.LockException: " +
455-
"Could not acquire the lock on hivedb.tbl, lock request ended in state NOT_ACQUIRED");
455+
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state NOT_ACQUIRED");
456456
}
457457

458458
@Test
@@ -470,7 +470,7 @@ public void testLockFailureAfterRetries() throws TException {
470470
.isInstanceOf(CommitFailedException.class)
471471
.hasMessage(
472472
"org.apache.iceberg.hive.LockException: " +
473-
"Could not acquire the lock on hivedb.tbl, lock request ended in state NOT_ACQUIRED");
473+
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state NOT_ACQUIRED");
474474
}
475475

476476
@Test
@@ -482,7 +482,7 @@ public void testLockTimeoutAfterRetries() throws TException {
482482
.isInstanceOf(CommitFailedException.class)
483483
.hasMessageStartingWith("org.apache.iceberg.hive.LockException")
484484
.hasMessageContaining("Timed out after")
485-
.hasMessageEndingWith("waiting for lock on hivedb.tbl");
485+
.hasMessageEndingWith("waiting for lock on hive.hivedb.tbl");
486486
}
487487

488488
@Test
@@ -507,7 +507,7 @@ public void testPassThroughThriftExceptionsForHiveVersion_1()
507507
assertThatThrownBy(() -> spyOps.doCommit(metadataV2, metadataV1))
508508
.isInstanceOf(CommitFailedException.class)
509509
.hasMessage(
510-
"org.apache.iceberg.hive.LockException: Failed to find lock for table hivedb.tbl");
510+
"org.apache.iceberg.hive.LockException: Failed to find lock for table hive.hivedb.tbl");
511511
}
512512
}
513513

@@ -520,7 +520,7 @@ public void testPassThroughThriftExceptions() throws TException {
520520
assertThatThrownBy(() -> spyOps.doCommit(metadataV2, metadataV1))
521521
.isInstanceOf(RuntimeException.class)
522522
.hasMessage(
523-
"org.apache.iceberg.hive.LockException: Metastore operation failed for hivedb.tbl");
523+
"org.apache.iceberg.hive.LockException: Metastore operation failed for hive.hivedb.tbl");
524524
}
525525

526526
@Test
@@ -536,7 +536,7 @@ public void testPassThroughInterruptions() throws TException {
536536
.isInstanceOf(CommitFailedException.class)
537537
.hasMessage(
538538
"org.apache.iceberg.hive.LockException: " +
539-
"Could not acquire the lock on hivedb.tbl, lock request ended in state WAITING");
539+
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state WAITING");
540540
}
541541

542542
@Test

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.hive.cli.CliSessionState;
2424
import org.apache.hadoop.hive.conf.HiveConf;
2525
import org.apache.hadoop.hive.conf.HiveConfForTest;
26+
import org.apache.hadoop.hive.metastore.Warehouse;
2627
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
2728
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
2829
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
@@ -70,6 +71,7 @@ public class BaseReplicationScenariosAcidTables {
7071

7172
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
7273
private static final Path REPLICA_EXTERNAL_BASE = new Path("/replica_external_base");
74+
protected static final String PRIMARY_CAT_NAME = Warehouse.DEFAULT_CATALOG_NAME;
7375
protected static String fullyQualifiedReplicaExternalBase;
7476
static WarehouseInstance primary;
7577
static WarehouseInstance replica, replicaNonAcid;
@@ -348,8 +350,8 @@ List<Long> openTxns(int numTxns, TxnStore txnHandler, HiveConf primaryConf) thro
348350
return txns;
349351
}
350352

351-
List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryDbName, Map<String, Long> tables,
352-
TxnStore txnHandler,
353+
List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryCatName, String primaryDbName,
354+
Map<String, Long> tables, TxnStore txnHandler,
353355
List<Long> txns, HiveConf primaryConf) throws Throwable {
354356
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
355357
rqst.setDbName(primaryDbName);
@@ -361,6 +363,7 @@ List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryDbName, Map<St
361363
for (long txnId : txns) {
362364
LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
363365
primaryDbName);
366+
comp.setCatName(primaryCatName);
364367
comp.setTablename(entry.getKey());
365368
comp.setOperationType(DataOperationType.UPDATE);
366369
List<LockComponent> components = new ArrayList<LockComponent>(1);

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ public void testReverseBootstrap() throws Throwable {
563563
Map<String, Long> tablesInSecDb = new HashMap<>();
564564
tablesInSecDb.put("t1", (long) numTxnsForSecDb + 4);
565565
tablesInSecDb.put("t2", (long) numTxnsForSecDb + 4);
566-
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
566+
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
567567
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
568568
tearDownLockIds.addAll(lockIdsForSecDb);
569569

@@ -576,8 +576,8 @@ public void testReverseBootstrap() throws Throwable {
576576
Map<String, Long> tablesInSourceDb = new HashMap<>();
577577
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 6);
578578
tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
579-
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, txnHandler,
580-
txnsForSourceDb, replica.getConf());
579+
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, replicatedDbName,
580+
tablesInSourceDb, txnHandler, txnsForSourceDb, replica.getConf());
581581
tearDownLockIds.addAll(lockIdsForSourceDb);
582582

583583
//Open 1 txn with no hive locks acquired
@@ -1092,7 +1092,7 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
10921092
Map<String, Long> tablesInSecDb = new HashMap<>();
10931093
tablesInSecDb.put("t1", (long) numTxnsForSecDb);
10941094
tablesInSecDb.put("t2", (long) numTxnsForSecDb);
1095-
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
1095+
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
10961096
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
10971097
tearDownLockIds.addAll(lockIdsForSecDb);
10981098

@@ -1105,8 +1105,8 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
11051105
Map<String, Long> tablesInSourceDb = new HashMap<>();
11061106
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb);
11071107
tablesInSourceDb.put("t5", (long) numTxnsForPrimaryDb);
1108-
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tablesInSourceDb, txnHandler,
1109-
txnsForSourceDb, primary.getConf());
1108+
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
1109+
tablesInSourceDb, txnHandler, txnsForSourceDb, primary.getConf());
11101110
tearDownLockIds.addAll(lockIdsForSourceDb);
11111111

11121112
//Open 1 txn with no hive locks acquired
@@ -1157,7 +1157,7 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
11571157
Map<String, Long> newTablesForSecDb = new HashMap<>();
11581158
newTablesForSecDb.put("t1", (long) numTxnsForSecDb + 1);
11591159
newTablesForSecDb.put("t2", (long) numTxnsForSecDb + 1);
1160-
List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
1160+
List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
11611161
newTablesForSecDb, txnHandler, newTxnsForSecDb, primaryConf);
11621162
tearDownLockIds.addAll(newLockIdsForSecDb);
11631163

@@ -1169,8 +1169,8 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
11691169
Map<String, Long> newTablesInSourceDb = new HashMap<>();
11701170
newTablesInSourceDb.put("t1", (long) 5);
11711171
newTablesInSourceDb.put("t5", (long) 3);
1172-
List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, newTablesInSourceDb, txnHandler,
1173-
newTxnsForSourceDb, primary.getConf());
1172+
List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
1173+
newTablesInSourceDb, txnHandler, newTxnsForSourceDb, primary.getConf());
11741174
tearDownLockIds.addAll(newLockIdsForSourceDb);
11751175

11761176
//Open 1 txn with no hive locks acquired

0 commit comments

Comments
 (0)