Skip to content

Commit 6d51f88

Browse files
authored
[fix][broker]Fix ledgerHandle failed to read by using new BK API (#25199)
1 parent bea6f8a commit 6d51f88

7 files changed

Lines changed: 21 additions & 6 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
686686
};
687687
try {
688688
bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback,
689-
null);
689+
null, true);
690690
} catch (Throwable t) {
691691
log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}",
692692
ledger.getName(), ledgerId, name, t);

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
491491
log.debug("[{}] Opening ledger {}", name, id);
492492
}
493493
mbean.startDataLedgerOpenOp();
494-
bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null);
494+
bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null, true);
495495
} else {
496496
initializeBookKeeper(callback);
497497
}
@@ -1918,7 +1918,7 @@ synchronized void addEntryFailedDueToConcurrentlyModified(final LedgerHandle cur
19181918
handleBadVersion(new BadVersionException("the current ledger " + currentLedger.getId()
19191919
+ " was concurrent modified by a other bookie client. The error code is: " + errorCode));
19201920
}
1921-
}, null);
1921+
}, null, true);
19221922
}
19231923

19241924
synchronized void ledgerClosed(final LedgerHandle lh) {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6045,6 +6045,15 @@ public void asyncOpenLedger(final long lId, final DigestType digestType, final b
60456045
super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
60466046
}
60476047
}
6048+
6049+
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
6050+
final OpenCallback cb, final Object ctx, boolean keepMetadataUpdate) {
6051+
if (ledgerErrors.containsKey(lId)) {
6052+
cb.openComplete(ledgerErrors.get(lId), null, ctx);
6053+
} else {
6054+
super.asyncOpenLedger(lId, digestType, passwd, cb, ctx, keepMetadataUpdate);
6055+
}
6056+
}
60486057
}
60496058

60506059
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyBoolean;
2223
import static org.mockito.ArgumentMatchers.anyInt;
2324
import static org.mockito.ArgumentMatchers.anyLong;
2425
import static org.mockito.ArgumentMatchers.anyString;
@@ -137,6 +138,11 @@ private void setup() {
137138
cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
138139
return null;
139140
}).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), any());
141+
doAnswer(inv -> {
142+
AsyncCallback.OpenCallback cb = inv.getArgument(3, AsyncCallback.OpenCallback.class);
143+
cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
144+
return null;
145+
}).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), any(), anyBoolean());
140146
doAnswer(inv -> {
141147
AsyncCallback.CreateCallback cb = inv.getArgument(5, AsyncCallback.CreateCallback.class);
142148
cb.createComplete(0, newLedgerHandle, inv.getArgument(6, Object.class));

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
213213
} else {
214214
future.complete(handle);
215215
}
216-
}, null
216+
}, null, true
217217
);
218218
return future;
219219
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
630630
} else {
631631
future.complete(handle);
632632
}
633-
}, null
633+
}, null, true
634634
);
635635
return future;
636636
}

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private static CompletableFuture<CompactedTopicContext> openCompactedLedger(Book
184184
} else {
185185
promise.complete(ledger);
186186
}
187-
}, null);
187+
}, null, true);
188188
return promise.thenApply((ledger) -> new CompactedTopicContext(
189189
ledger, createCache(ledger, DEFAULT_MAX_CACHE_SIZE)));
190190
}

0 commit comments

Comments
 (0)