Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ message ReadRequest {
optional int64 previousLAC = 4;
// Used as a timeout (in milliseconds) for the long polling request
optional int64 timeOut = 5;
// Bitmask of additional read flags from BookieProtocol.FLAG_*.
optional int32 readFlags = 6;
}

message AddRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte
// TODO: Shouldn't this be async?
ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException, BookieException;

default ByteBuf readEntry(long ledgerId, long entryId, boolean noReadAhead)
throws IOException, NoLedgerException, BookieException {
return readEntry(ledgerId, entryId);
}

long readLastAddConfirmed(long ledgerId) throws IOException, BookieException;
PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException;

Expand Down Expand Up @@ -127,4 +133,4 @@ public long getEntry() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,12 @@ public CompletableFuture<Boolean> fenceLedger(long ledgerId, byte[] masterKey)

public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException, BookieException {
return readEntry(ledgerId, entryId, false);
}

@Override
public ByteBuf readEntry(long ledgerId, long entryId, boolean noReadAhead)
throws IOException, NoLedgerException, BookieException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
Expand All @@ -1167,7 +1173,7 @@ public ByteBuf readEntry(long ledgerId, long entryId)
.attr("entryId", entryId)
.attr("ledgerId", ledgerId)
.log("Reading entry");
ByteBuf entry = handle.readEntry(entryId);
ByteBuf entry = handle.readEntry(entryId, noReadAhead);
entrySize = entry.readableBytes();
bookieStats.getReadBytes().addCount(entrySize);
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) {
abstract long addEntry(ByteBuf entry) throws IOException, BookieException;
abstract ByteBuf readEntry(long entryId) throws IOException, BookieException;

ByteBuf readEntry(long entryId, boolean noReadAhead) throws IOException, BookieException {
return readEntry(entryId);
}

abstract long getLastAddConfirmed() throws IOException, BookieException;
abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ ByteBuf readEntry(long entryId) throws IOException, BookieException {
return ledgerStorage.getEntry(ledgerId, entryId);
}

@Override
ByteBuf readEntry(long entryId, boolean noReadAhead) throws IOException, BookieException {
return ledgerStorage.getEntry(ledgerId, entryId, noReadAhead);
}

@Override
long getLastAddConfirmed() throws IOException, BookieException {
return ledgerStorage.getLastAddConfirmed(ledgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ void initialize(ServerConfiguration conf,
*/
ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException;

/**
* Read an entry from storage with options that apply to this request.
*
* @param noReadAhead whether this request should avoid triggering read-ahead
*/
default ByteBuf getEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException {
return getEntry(ledgerId, entryId);
}

/**
* Get last add confirmed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE
return getLedgerStorage(ledgerId).getEntry(ledgerId, entryId);
}

@Override
public ByteBuf getEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException {
return getLedgerStorage(ledgerId).getEntry(ledgerId, entryId, noReadAhead);
}

@Override
public long getLastAddConfirmed(long ledgerId) throws IOException, BookieException {
return getLedgerStorage(ledgerId).getLastAddConfirmed(ledgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,14 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)

@Override
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException {
return getEntry(ledgerId, entryId, false);
}

@Override
public ByteBuf getEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException {
long startTime = MathUtils.nowInNano();
try {
ByteBuf entry = doGetEntry(ledgerId, entryId);
ByteBuf entry = doGetEntry(ledgerId, entryId, noReadAhead);
recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
return entry;
} catch (IOException e) {
Expand All @@ -579,7 +584,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE
}
}

private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, BookieException {
private ByteBuf doGetEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException {
log.debug()
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
Expand Down Expand Up @@ -658,8 +663,10 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book
readCache.put(ledgerId, entryId, entry);

// Try to read more entries
long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
if (!noReadAhead) {
long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
}

return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadOptions;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
Expand Down Expand Up @@ -638,9 +639,26 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc
*/
public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
throws InterruptedException, BKException {
return readEntries(firstEntry, lastEntry, ReadOptions.DEFAULT);
}

/**
* Read a sequence of entries synchronously with options that apply to this request.
*
* @param firstEntry
* id of first entry of sequence (included)
* @param lastEntry
* id of last entry of sequence (included)
* @param options
* options for this read request
*
* @see #asyncReadEntries(long, long, ReadCallback, Object, ReadOptions)
*/
public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry, ReadOptions options)
throws InterruptedException, BKException {
CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();

asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(result), null);
asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(result), null, options);

return SyncCallbackUtils.waitForResult(result);
}
Expand Down Expand Up @@ -681,9 +699,29 @@ public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int maxCount,
*/
public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry)
throws InterruptedException, BKException {
return readUnconfirmedEntries(firstEntry, lastEntry, ReadOptions.DEFAULT);
}

/**
* Read a sequence of entries synchronously with options that apply to this request, allowing to read after the
* LastAddConfirmed range.
*
* @param firstEntry
* id of first entry of sequence (included)
* @param lastEntry
* id of last entry of sequence (included)
* @param options
* options for this read request
*
* @see #readEntries(long, long, ReadOptions)
* @see #asyncReadUnconfirmedEntries(long, long, ReadCallback, java.lang.Object, ReadOptions)
* @see #asyncReadLastConfirmed(ReadLastConfirmedCallback, java.lang.Object)
*/
public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry, ReadOptions options)
throws InterruptedException, BKException {
CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<>();

asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(result), null);
asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(result), null, options);

return SyncCallbackUtils.waitForResult(result);
}
Expand Down Expand Up @@ -722,6 +760,24 @@ public Enumeration<LedgerEntry> batchReadUnconfirmedEntries(long firstEntry, int
* control object
*/
public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
asyncReadEntries(firstEntry, lastEntry, cb, ctx, ReadOptions.DEFAULT);
}

/**
* Read a sequence of entries asynchronously with options that apply to this request.
*
* @param firstEntry
* id of first entry of sequence
* @param lastEntry
* id of last entry of sequence
* @param cb
* object implementing read callback interface
* @param ctx
* control object
* @param options
* options for this read request
*/
public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx, ReadOptions options) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
log.error()
Expand All @@ -742,7 +798,7 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O
return;
}

asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false, options);
}

/**
Expand Down Expand Up @@ -819,6 +875,30 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
* @see #readUnconfirmedEntries(long, long)
*/
public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
asyncReadUnconfirmedEntries(firstEntry, lastEntry, cb, ctx, ReadOptions.DEFAULT);
}

/**
* Read a sequence of entries asynchronously with options that apply to this request, allowing to read after the
* LastAddConfirmed range.
*
* @param firstEntry
* id of first entry of sequence
* @param lastEntry
* id of last entry of sequence
* @param cb
* object implementing read callback interface
* @param ctx
* control object
* @param options
* options for this read request
*
* @see #asyncReadEntries(long, long, ReadCallback, Object, ReadOptions)
* @see #asyncReadLastConfirmed(ReadLastConfirmedCallback, Object)
* @see #readUnconfirmedEntries(long, long, ReadOptions)
*/
public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx,
ReadOptions options) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
log.error()
Expand All @@ -829,7 +909,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
return;
}

asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false, options);
}

/**
Expand Down Expand Up @@ -886,6 +966,21 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
*/
@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry, ReadOptions.DEFAULT);
}

/**
* Read a sequence of entries asynchronously with options that apply to this request.
*
* @param firstEntry
* id of first entry of sequence
* @param lastEntry
* id of last entry of sequence
* @param options
* options for this read request
*/
@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry, ReadOptions options) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
log.error()
Expand All @@ -904,7 +999,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
return FutureUtils.exception(new BKReadException());
}

return readEntriesInternalAsync(firstEntry, lastEntry, false);
return readEntriesInternalAsync(firstEntry, lastEntry, false, options);
}

/**
Expand Down Expand Up @@ -975,6 +1070,10 @@ private boolean notSupportBatchRead() {
return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize();
}

private static ReadOptions readOptionsOrDefault(ReadOptions options) {
return options == null ? ReadOptions.DEFAULT : options;
}

private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
boolean isRecoveryRead) {
int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes;
Expand Down Expand Up @@ -1059,6 +1158,27 @@ private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long star
*/
@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readUnconfirmedAsync(firstEntry, lastEntry, ReadOptions.DEFAULT);
}

/**
* Read a sequence of entries asynchronously with options that apply to this request, allowing to read after the
* LastAddConfirmed range.
*
* @param firstEntry
* id of first entry of sequence
* @param lastEntry
* id of last entry of sequence
* @param options
* options for this read request
*
* @see #asyncReadEntries(long, long, ReadCallback, Object, ReadOptions)
* @see #asyncReadLastConfirmed(ReadLastConfirmedCallback, Object)
* @see #readUnconfirmedEntries(long, long, ReadOptions)
*/
@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry,
ReadOptions options) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
log.error()
Expand All @@ -1068,13 +1188,18 @@ public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, lo
return FutureUtils.exception(new BKIncorrectParameterException());
}

return readEntriesInternalAsync(firstEntry, lastEntry, false);
return readEntriesInternalAsync(firstEntry, lastEntry, false, options);
}

void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb,
Object ctx, boolean isRecoveryRead) {
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, isRecoveryRead, ReadOptions.DEFAULT);
}

void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb,
Object ctx, boolean isRecoveryRead, ReadOptions options) {
if (!clientCtx.isClientClosed()) {
readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead)
readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead, options)
.whenCompleteAsync(new FutureEventListener<LedgerEntries>() {
@Override
public void onSuccess(LedgerEntries entries) {
Expand Down Expand Up @@ -1174,8 +1299,15 @@ public LedgerEntry readLastEntry()
CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
long lastEntry,
boolean isRecoveryRead) {
return readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead, ReadOptions.DEFAULT);
}

CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
long lastEntry,
boolean isRecoveryRead,
ReadOptions options) {
PendingReadOp op = new PendingReadOp(this, clientCtx,
firstEntry, lastEntry, isRecoveryRead);
firstEntry, lastEntry, isRecoveryRead, readOptionsOrDefault(options));
if (!clientCtx.isClientClosed()) {
// Waiting on the first one.
// This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive
Expand Down
Loading