diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c120b9fa719ec..84e1f82020abb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -686,7 +686,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac }; try { bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback, - null); + null, true); } catch (Throwable t) { log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", ledger.getName(), ledgerId, name, t); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6dd886fe4ea2b..b7affb01929a4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -491,7 +491,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { log.debug("[{}] Opening ledger {}", name, id); } mbean.startDataLedgerOpenOp(); - bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null); + bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null, true); } else { initializeBookKeeper(callback); } @@ -1918,7 +1918,7 @@ synchronized void addEntryFailedDueToConcurrentlyModified(final LedgerHandle cur handleBadVersion(new BadVersionException("the current ledger " + currentLedger.getId() + " was concurrent modified by a other bookie client. The error code is: " + errorCode)); } - }, null); + }, null, true); } synchronized void ledgerClosed(final LedgerHandle lh) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index bfb9b6ecca1b3..bacc157743542 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5945,6 +5945,15 @@ public void asyncOpenLedger(final long lId, final DigestType digestType, final b super.asyncOpenLedger(lId, digestType, passwd, cb, ctx); } } + + public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, + final OpenCallback cb, final Object ctx, boolean keepMetadataUpdate) { + if (ledgerErrors.containsKey(lId)) { + cb.openComplete(ledgerErrors.get(lId), null, ctx); + } else { + super.asyncOpenLedger(lId, digestType, passwd, cb, ctx, keepMetadataUpdate); + } + } } private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java index ecc3423e292e9..95f0a6b8c775e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -137,6 +138,11 @@ private void setup() { cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class)); return null; }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), any()); + doAnswer(inv -> { + AsyncCallback.OpenCallback cb = inv.getArgument(3, AsyncCallback.OpenCallback.class); + cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class)); + return null; + }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), any(), anyBoolean()); doAnswer(inv -> { AsyncCallback.CreateCallback cb = inv.getArgument(5, AsyncCallback.CreateCallback.class); cb.createComplete(0, newLedgerHandle, inv.getArgument(6, Object.class)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index ba37092e88d9b..fa7408d7e1574 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -213,7 +213,7 @@ private CompletableFuture openLedger(Long ledgerId) { } else { future.complete(handle); } - }, null + }, null, true ); return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index b931239a32cea..e38bf48f1fdb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -630,7 +630,7 @@ private CompletableFuture openLedger(Long ledgerId) { } else { future.complete(handle); } - }, null + }, null, true ); return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 21cc0e45acdaf..2ceaab83f2232 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -184,7 +184,7 @@ private static CompletableFuture openCompactedLedger(Book } else { promise.complete(ledger); } - }, null); + }, null, true); return promise.thenApply((ledger) -> new CompactedTopicContext( ledger, createCache(ledger, DEFAULT_MAX_CACHE_SIZE))); }