diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto index 72df7d5e1d4..549790b34bd 100644 --- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto +++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto @@ -115,6 +115,8 @@ message ReadRequest { optional int64 previousLAC = 4; // Used as a timeout (in milliseconds) for the long polling request optional int64 timeOut = 5; + // Bitmask of additional read flags from BookieProtocol.FLAG_*. + optional int32 readFlags = 6; } message AddRequest { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 90c8acf5af4..61c06f9234c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -57,6 +57,12 @@ void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte // TODO: Shouldn't this be async? ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException, BookieException; + + default ByteBuf readEntry(long ledgerId, long entryId, boolean noReadAhead) + throws IOException, NoLedgerException, BookieException { + return readEntry(ledgerId, entryId); + } + long readLastAddConfirmed(long ledgerId) throws IOException, BookieException; PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException; @@ -127,4 +133,4 @@ public long getEntry() { } } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 362a20129a1..251c21a6bb6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -1158,6 +1158,12 @@ public CompletableFuture fenceLedger(long ledgerId, byte[] masterKey) public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException, BookieException { + return readEntry(ledgerId, entryId, false); + } + + @Override + public ByteBuf readEntry(long ledgerId, long entryId, boolean noReadAhead) + throws IOException, NoLedgerException, BookieException { long requestNanos = MathUtils.nowInNano(); boolean success = false; int entrySize = 0; @@ -1167,7 +1173,7 @@ public ByteBuf readEntry(long ledgerId, long entryId) .attr("entryId", entryId) .attr("ledgerId", ledgerId) .log("Reading entry"); - ByteBuf entry = handle.readEntry(entryId); + ByteBuf entry = handle.readEntry(entryId, noReadAhead); entrySize = entry.readableBytes(); bookieStats.getReadBytes().addCount(entrySize); success = true; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java index 3c84feb876d..05d01c13663 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java @@ -78,6 +78,10 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) { abstract long addEntry(ByteBuf entry) throws IOException, BookieException; abstract ByteBuf readEntry(long entryId) throws IOException, BookieException; + ByteBuf readEntry(long entryId, boolean noReadAhead) throws IOException, BookieException { + return readEntry(entryId); + } + abstract long getLastAddConfirmed() throws IOException, BookieException; abstract boolean waitForLastAddConfirmedUpdate(long previousLAC, Watcher watcher) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java index 80b6dc5a902..d3f63cd3630 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java @@ -162,6 +162,11 @@ ByteBuf readEntry(long entryId) throws IOException, BookieException { return ledgerStorage.getEntry(ledgerId, entryId); } + @Override + ByteBuf readEntry(long entryId, boolean noReadAhead) throws IOException, BookieException { + return ledgerStorage.getEntry(ledgerId, entryId, noReadAhead); + } + @Override long getLastAddConfirmed() throws IOException, BookieException { return ledgerStorage.getLastAddConfirmed(ledgerId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 6eca6e00108..b7bba5a96fb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -144,6 +144,15 @@ void initialize(ServerConfiguration conf, */ ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException; + /** + * Read an entry from storage with options that apply to this request. + * + * @param noReadAhead whether this request should avoid triggering read-ahead + */ + default ByteBuf getEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException { + return getEntry(ledgerId, entryId); + } + /** * Get last add confirmed. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 69964c8f81f..e342df8392c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -361,6 +361,11 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE return getLedgerStorage(ledgerId).getEntry(ledgerId, entryId); } + @Override + public ByteBuf getEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException { + return getLedgerStorage(ledgerId).getEntry(ledgerId, entryId, noReadAhead); + } + @Override public long getLastAddConfirmed(long ledgerId) throws IOException, BookieException { return getLedgerStorage(ledgerId).getLastAddConfirmed(ledgerId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 708b63da62b..412722eec55 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -568,9 +568,14 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) @Override public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException { + return getEntry(ledgerId, entryId, false); + } + + @Override + public ByteBuf getEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException { long startTime = MathUtils.nowInNano(); try { - ByteBuf entry = doGetEntry(ledgerId, entryId); + ByteBuf entry = doGetEntry(ledgerId, entryId, noReadAhead); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; } catch (IOException e) { @@ -579,7 +584,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE } } - private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, BookieException { + private ByteBuf doGetEntry(long ledgerId, long entryId, boolean noReadAhead) throws IOException, BookieException { log.debug() .attr("ledgerId", ledgerId) .attr("entryId", entryId) @@ -658,8 +663,10 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book readCache.put(ledgerId, entryId, entry); // Try to read more entries - long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); - fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + if (!noReadAhead) { + long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); + fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + } return entry; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 1c37d3c54a0..330e1bfc492 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -72,6 +72,7 @@ import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadOptions; import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.client.api.WriteHandle; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; @@ -638,9 +639,26 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc */ public Enumeration readEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException { + return readEntries(firstEntry, lastEntry, ReadOptions.DEFAULT); + } + + /** + * Read a sequence of entries synchronously with options that apply to this request. + * + * @param firstEntry + * id of first entry of sequence (included) + * @param lastEntry + * id of last entry of sequence (included) + * @param options + * options for this read request + * + * @see #asyncReadEntries(long, long, ReadCallback, Object, ReadOptions) + */ + public Enumeration readEntries(long firstEntry, long lastEntry, ReadOptions options) + throws InterruptedException, BKException { CompletableFuture> result = new CompletableFuture<>(); - asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(result), null); + asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(result), null, options); return SyncCallbackUtils.waitForResult(result); } @@ -681,9 +699,29 @@ public Enumeration batchReadEntries(long startEntry, int maxCount, */ public Enumeration readUnconfirmedEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException { + return readUnconfirmedEntries(firstEntry, lastEntry, ReadOptions.DEFAULT); + } + + /** + * Read a sequence of entries synchronously with options that apply to this request, allowing to read after the + * LastAddConfirmed range. + * + * @param firstEntry + * id of first entry of sequence (included) + * @param lastEntry + * id of last entry of sequence (included) + * @param options + * options for this read request + * + * @see #readEntries(long, long, ReadOptions) + * @see #asyncReadUnconfirmedEntries(long, long, ReadCallback, java.lang.Object, ReadOptions) + * @see #asyncReadLastConfirmed(ReadLastConfirmedCallback, java.lang.Object) + */ + public Enumeration readUnconfirmedEntries(long firstEntry, long lastEntry, ReadOptions options) + throws InterruptedException, BKException { CompletableFuture> result = new CompletableFuture<>(); - asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(result), null); + asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(result), null, options); return SyncCallbackUtils.waitForResult(result); } @@ -722,6 +760,24 @@ public Enumeration batchReadUnconfirmedEntries(long firstEntry, int * control object */ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { + asyncReadEntries(firstEntry, lastEntry, cb, ctx, ReadOptions.DEFAULT); + } + + /** + * Read a sequence of entries asynchronously with options that apply to this request. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence + * @param cb + * object implementing read callback interface + * @param ctx + * control object + * @param options + * options for this read request + */ + public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx, ReadOptions options) { // Little sanity check if (firstEntry < 0 || firstEntry > lastEntry) { log.error() @@ -742,7 +798,7 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O return; } - asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); + asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false, options); } /** @@ -819,6 +875,30 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration seq, * @see #readUnconfirmedEntries(long, long) */ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { + asyncReadUnconfirmedEntries(firstEntry, lastEntry, cb, ctx, ReadOptions.DEFAULT); + } + + /** + * Read a sequence of entries asynchronously with options that apply to this request, allowing to read after the + * LastAddConfirmed range. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence + * @param cb + * object implementing read callback interface + * @param ctx + * control object + * @param options + * options for this read request + * + * @see #asyncReadEntries(long, long, ReadCallback, Object, ReadOptions) + * @see #asyncReadLastConfirmed(ReadLastConfirmedCallback, Object) + * @see #readUnconfirmedEntries(long, long, ReadOptions) + */ + public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx, + ReadOptions options) { // Little sanity check if (firstEntry < 0 || firstEntry > lastEntry) { log.error() @@ -829,7 +909,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal return; } - asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); + asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false, options); } /** @@ -886,6 +966,21 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration seq, */ @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { + return readAsync(firstEntry, lastEntry, ReadOptions.DEFAULT); + } + + /** + * Read a sequence of entries asynchronously with options that apply to this request. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence + * @param options + * options for this read request + */ + @Override + public CompletableFuture readAsync(long firstEntry, long lastEntry, ReadOptions options) { // Little sanity check if (firstEntry < 0 || firstEntry > lastEntry) { log.error() @@ -904,7 +999,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return FutureUtils.exception(new BKReadException()); } - return readEntriesInternalAsync(firstEntry, lastEntry, false); + return readEntriesInternalAsync(firstEntry, lastEntry, false, options); } /** @@ -975,6 +1070,10 @@ private boolean notSupportBatchRead() { return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize(); } + private static ReadOptions readOptionsOrDefault(ReadOptions options) { + return options == null ? ReadOptions.DEFAULT : options; + } + private CompletableFuture batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize, boolean isRecoveryRead) { int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes; @@ -1059,6 +1158,27 @@ private CompletableFuture batchReadEntriesInternalAsync(long star */ @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { + return readUnconfirmedAsync(firstEntry, lastEntry, ReadOptions.DEFAULT); + } + + /** + * Read a sequence of entries asynchronously with options that apply to this request, allowing to read after the + * LastAddConfirmed range. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence + * @param options + * options for this read request + * + * @see #asyncReadEntries(long, long, ReadCallback, Object, ReadOptions) + * @see #asyncReadLastConfirmed(ReadLastConfirmedCallback, Object) + * @see #readUnconfirmedEntries(long, long, ReadOptions) + */ + @Override + public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry, + ReadOptions options) { // Little sanity check if (firstEntry < 0 || firstEntry > lastEntry) { log.error() @@ -1068,13 +1188,18 @@ public CompletableFuture readUnconfirmedAsync(long firstEntry, lo return FutureUtils.exception(new BKIncorrectParameterException()); } - return readEntriesInternalAsync(firstEntry, lastEntry, false); + return readEntriesInternalAsync(firstEntry, lastEntry, false, options); } void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx, boolean isRecoveryRead) { + asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, isRecoveryRead, ReadOptions.DEFAULT); + } + + void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, + Object ctx, boolean isRecoveryRead, ReadOptions options) { if (!clientCtx.isClientClosed()) { - readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead) + readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead, options) .whenCompleteAsync(new FutureEventListener() { @Override public void onSuccess(LedgerEntries entries) { @@ -1174,8 +1299,15 @@ public LedgerEntry readLastEntry() CompletableFuture readEntriesInternalAsync(long firstEntry, long lastEntry, boolean isRecoveryRead) { + return readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead, ReadOptions.DEFAULT); + } + + CompletableFuture readEntriesInternalAsync(long firstEntry, + long lastEntry, + boolean isRecoveryRead, + ReadOptions options) { PendingReadOp op = new PendingReadOp(this, clientCtx, - firstEntry, lastEntry, isRecoveryRead); + firstEntry, lastEntry, isRecoveryRead, readOptionsOrDefault(options)); if (!clientCtx.isClientClosed()) { // Waiting on the first one. // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 79cfa41bfa4..1cbd067f1e4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import lombok.CustomLog; +import org.apache.bookkeeper.client.api.ReadOptions; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.common.util.MathUtils; @@ -47,14 +48,25 @@ class PendingReadOp extends ReadOpBase implements ReadEntryCallback { protected boolean parallelRead = false; protected final LinkedList seq; + private final ReadOptions readOptions; PendingReadOp(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, boolean isRecoveryRead) { + this(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead, ReadOptions.DEFAULT); + } + + PendingReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + long endEntryId, + boolean isRecoveryRead, + ReadOptions readOptions) { super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead); this.seq = new LinkedList<>(); + this.readOptions = readOptions; numPendingEntries = endEntryId - startEntryId + 1; } @@ -178,13 +190,19 @@ void sendReadTo(int bookieIndex, BookieId to, SingleLedgerEntryRequest entry) th lh.throttler.acquire(); } + int flags = isRecoveryRead + ? BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING + : BookieProtocol.FLAG_NONE; + if (readOptions.isReadAheadDisabled()) { + flags |= BookieProtocol.FLAG_NO_READ_AHEAD; + } + if (isRecoveryRead) { - int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); } else { clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + this, new ReadContext(bookieIndex, to, entry), flags); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index 8e2e633a35a..fedd935b41e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -45,6 +45,21 @@ public interface ReadHandle extends Handle { */ CompletableFuture readAsync(long firstEntry, long lastEntry); + /** + * Read a sequence of entries asynchronously with options that apply to this request. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence, inclusive + * @param options + * options for this read request + * @return an handle to the result of the operation + */ + default CompletableFuture readAsync(long firstEntry, long lastEntry, ReadOptions options) { + return readAsync(firstEntry, lastEntry); + } + /** * Read a sequence of entries asynchronously. * @@ -76,6 +91,23 @@ default LedgerEntries read(long firstEntry, long lastEntry) throws BKException, BKException.HANDLER); } + /** + * Read a sequence of entries synchronously with options that apply to this request. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence, inclusive + * @param options + * options for this read request + * @return the result of the operation + */ + default LedgerEntries read(long firstEntry, long lastEntry, ReadOptions options) + throws BKException, InterruptedException { + return FutureUtils.result(readAsync(firstEntry, lastEntry, options), + BKException.HANDLER); + } + /** * * @param startEntry @@ -116,6 +148,23 @@ default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize) */ CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry); + /** + * Read a sequence of entries asynchronously with options that apply to this request, allowing to read after the + * LastAddConfirmed range. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence, inclusive + * @param options + * options for this read request + * @return an handle to the result of the operation + */ + default CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry, + ReadOptions options) { + return readUnconfirmedAsync(firstEntry, lastEntry); + } + /** * Read a sequence of entries synchronously. * @@ -133,6 +182,26 @@ default LedgerEntries readUnconfirmed(long firstEntry, long lastEntry) BKException.HANDLER); } + /** + * Read a sequence of entries synchronously with options that apply to this request, allowing to read after the + * LastAddConfirmed range. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence, inclusive + * @param options + * options for this read request + * @return an handle to the result of the operation + * + * @see #readUnconfirmedAsync(long, long, ReadOptions) + */ + default LedgerEntries readUnconfirmed(long firstEntry, long lastEntry, ReadOptions options) + throws BKException, InterruptedException { + return FutureUtils.result(readUnconfirmedAsync(firstEntry, lastEntry, options), + BKException.HANDLER); + } + /** * Obtains asynchronously the last confirmed write from a quorum of bookies. This * call obtains the last add confirmed each bookie has received for this ledger diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadOptions.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadOptions.java new file mode 100644 index 00000000000..0fcfa54d1a0 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadOptions.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client.api; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; + +/** + * Options that apply to a single read request. + */ +@Public +@Unstable +public final class ReadOptions { + + /** + * Default read options. Read-ahead behavior is unchanged. + */ + public static final ReadOptions DEFAULT = builder().build(); + + private final boolean disableReadAhead; + + private ReadOptions(boolean disableReadAhead) { + this.disableReadAhead = disableReadAhead; + } + + /** + * Whether this read should avoid triggering bookie-side read-ahead on cache miss. + * + *

This option does not bypass existing cache entries. + */ + public boolean isReadAheadDisabled() { + return disableReadAhead; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private boolean disableReadAhead; + + private Builder() { + } + + public Builder disableReadAhead(boolean disableReadAhead) { + this.disableReadAhead = disableReadAhead; + return this; + } + + public ReadOptions build() { + return new ReadOptions(disableReadAhead); + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java index 6db3e143519..d385ddbb636 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java @@ -59,7 +59,8 @@ protected ReferenceCounted readData() throws Exception { long frameSize = 24 + 8 + 4; for (int i = 0; i < maxCount; i++) { try { - ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId() + i); + ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId() + i, + request.isNoReadAhead()); frameSize += entry.readableBytes() + 4; if (data == null) { data = ByteBufList.get(entry); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 6a93f8d2cc6..ea5854e7de5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -190,6 +190,7 @@ public static short getFlags(int packetHeader) { short FLAG_DO_FENCING = 0x0001; short FLAG_RECOVERY_ADD = 0x0002; short FLAG_HIGH_PRIORITY = 0x0004; + short FLAG_NO_READ_AHEAD = 0x0008; /** * A Bookie request object. @@ -245,6 +246,10 @@ boolean isHighPriority() { return (flags & FLAG_HIGH_PRIORITY) == FLAG_HIGH_PRIORITY; } + boolean isNoReadAhead() { + return (flags & FLAG_NO_READ_AHEAD) == FLAG_NO_READ_AHEAD; + } + @Override public String toString() { return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode, ledgerId, entryId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index c68f7929646..6a6fe07f65d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -964,6 +964,10 @@ private void readEntryInternal(final long ledgerId, .setLedgerId(ledgerId) .setEntryId(entryId); + if (((short) flags & BookieProtocol.FLAG_NO_READ_AHEAD) == BookieProtocol.FLAG_NO_READ_AHEAD) { + readBuilder = readBuilder.setReadFlags(BookieProtocol.FLAG_NO_READ_AHEAD); + } + if (null != previousLAC) { readBuilder = readBuilder.setPreviousLAC(previousLAC); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 29cd159788b..486a9499127 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -137,7 +137,8 @@ protected void processPacket() { } protected ReferenceCounted readData() throws Exception { - return requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId()); + return requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId(), + request.isNoReadAhead()); } private void sendResponse(ReferenceCounted data, int errorCode, long startTimeNanos) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index 118d5a66da6..a2d8fc1cf18 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -174,7 +174,8 @@ protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder, boolean readLACPiggyBack, Stopwatch startTimeSw) throws IOException, BookieException { - ByteBuf entryBody = requestProcessor.getBookie().readEntry(ledgerId, entryId); + ByteBuf entryBody = requestProcessor.getBookie().readEntry(ledgerId, entryId, + RequestUtils.isNoReadAhead(readRequest)); if (null != fenceResult) { handleReadResultForFenceRead(entryBody, readResponseBuilder, entryId, startTimeSw); return null; @@ -382,4 +383,3 @@ public String toString() { return RequestUtils.toSafeString(request); } } - diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java index 9b8533da0c6..e93c8ffdcb4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java @@ -40,6 +40,12 @@ public static boolean shouldPiggybackEntry(BookkeeperProtocol.ReadRequest readRe return hasFlag(readRequest, BookkeeperProtocol.ReadRequest.Flag.ENTRY_PIGGYBACK); } + public static boolean isNoReadAhead(BookkeeperProtocol.ReadRequest readRequest) { + return readRequest.hasReadFlags() + && (readRequest.getReadFlags() & BookieProtocol.FLAG_NO_READ_AHEAD) + == BookieProtocol.FLAG_NO_READ_AHEAD; + } + static boolean hasFlag(BookkeeperProtocol.ReadRequest request, BookkeeperProtocol.ReadRequest.Flag flag) { return request.hasFlag() && request.getFlag() == flag; } @@ -76,6 +82,9 @@ public static String toSafeString(BookkeeperProtocol.Request request) { if (readRequest.hasFlag()) { stringHelper.add("flag", readRequest.getFlag()); } + if (readRequest.hasReadFlags()) { + stringHelper.add("readFlags", readRequest.getReadFlags()); + } if (readRequest.hasPreviousLAC()) { stringHelper.add("previousLAC", readRequest.getPreviousLAC()); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java index 81ef7f9495c..079af1977b2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java @@ -47,6 +47,78 @@ public class DbLedgerStorageReadCacheTest { private static final Logger LOGGER = LoggerFactory.getLogger(DbLedgerStorageReadCacheTest.class); + @Test + public void readMissTriggersReadAheadByDefault() throws Exception { + TestDB testDB = new TestDB(); + try { + setup(testDB, 16L, 8, -1); + addEntries(testDB.getStorage(), 0, 1, 0, 4); + testDB.getStorage().flush(); + + SingleDirectoryDbLedgerStorage sdb = testDB.getStorage().getLedgerStorageList().get(0); + DbLedgerStorageStats stats = sdb.getDbLedgerStorageStats(); + long missesBefore = stats.getReadCacheMissCounter().get(); + long hitsBefore = stats.getReadCacheHitCounter().get(); + + assertAndReleaseEntry(testDB.getStorage().getEntry(0, 0), 0, 0); + assertEquals(missesBefore + 1, stats.getReadCacheMissCounter().get().longValue()); + + assertAndReleaseEntry(testDB.getStorage().getEntry(0, 1), 0, 1); + assertEquals(missesBefore + 1, stats.getReadCacheMissCounter().get().longValue()); + assertEquals(hitsBefore + 1, stats.getReadCacheHitCounter().get().longValue()); + } finally { + teardown(testDB.getStorage(), testDB.getTmpDir()); + } + } + + @Test + public void noReadAheadReadMissDoesNotPrefillFollowingEntries() throws Exception { + TestDB testDB = new TestDB(); + try { + setup(testDB, 16L, 8, -1); + addEntries(testDB.getStorage(), 0, 1, 0, 4); + testDB.getStorage().flush(); + + SingleDirectoryDbLedgerStorage sdb = testDB.getStorage().getLedgerStorageList().get(0); + DbLedgerStorageStats stats = sdb.getDbLedgerStorageStats(); + long missesBefore = stats.getReadCacheMissCounter().get(); + long hitsBefore = stats.getReadCacheHitCounter().get(); + + assertAndReleaseEntry(testDB.getStorage().getEntry(0, 0, true), 0, 0); + assertEquals(missesBefore + 1, stats.getReadCacheMissCounter().get().longValue()); + + assertAndReleaseEntry(testDB.getStorage().getEntry(0, 1), 0, 1); + assertEquals(missesBefore + 2, stats.getReadCacheMissCounter().get().longValue()); + assertEquals(hitsBefore, stats.getReadCacheHitCounter().get().longValue()); + } finally { + teardown(testDB.getStorage(), testDB.getTmpDir()); + } + } + + @Test + public void noReadAheadStillUsesCachedTargetEntry() throws Exception { + TestDB testDB = new TestDB(); + try { + setup(testDB, 16L, 8, -1); + addEntries(testDB.getStorage(), 0, 1, 0, 4); + testDB.getStorage().flush(); + + SingleDirectoryDbLedgerStorage sdb = testDB.getStorage().getLedgerStorageList().get(0); + DbLedgerStorageStats stats = sdb.getDbLedgerStorageStats(); + long missesBefore = stats.getReadCacheMissCounter().get(); + long hitsBefore = stats.getReadCacheHitCounter().get(); + + assertAndReleaseEntry(testDB.getStorage().getEntry(0, 0, true), 0, 0); + assertEquals(missesBefore + 1, stats.getReadCacheMissCounter().get().longValue()); + + assertAndReleaseEntry(testDB.getStorage().getEntry(0, 0, true), 0, 0); + assertEquals(missesBefore + 1, stats.getReadCacheMissCounter().get().longValue()); + assertEquals(hitsBefore + 1, stats.getReadCacheHitCounter().get().longValue()); + } finally { + teardown(testDB.getStorage(), testDB.getTmpDir()); + } + } + @Test public void chargeReadAheadCacheRegressionTest() { TestDB testDB = new TestDB(); @@ -247,6 +319,15 @@ private String get4KbMsg() { return buffer.toString(); } + private void assertAndReleaseEntry(ByteBuf entry, long ledgerId, long entryId) { + try { + assertEquals(ledgerId, entry.getLong(0)); + assertEquals(entryId, entry.getLong(8)); + } finally { + entry.release(); + } + } + private CacheResult readAheadCacheBatchBytesSize() { Long cacheMissCount; TestDB testDB = new TestDB(); @@ -365,4 +446,4 @@ public void setCacheHitCount(long cacheHitCount) { this.cacheHitCount = cacheHitCount; } } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingReadOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingReadOpTest.java new file mode 100644 index 00000000000..399fe23386c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingReadOpTest.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Collections; +import java.util.List; +import java.util.TreeMap; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadOptions; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieClient; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class PendingReadOpTest { + + @Test + public void testReadOptionsDisableReadAheadSetsBookieReadFlag() throws Exception { + BookieClient bookieClient = mock(BookieClient.class); + + newPendingReadOp(bookieClient, false, ReadOptions.builder().disableReadAhead(true).build()).initiate(); + + ArgumentCaptor flags = ArgumentCaptor.forClass(Integer.class); + verify(bookieClient).readEntry(any(BookieId.class), anyLong(), eq(0L), any(), any(), flags.capture()); + assertEquals(BookieProtocol.FLAG_NO_READ_AHEAD, flags.getValue() & BookieProtocol.FLAG_NO_READ_AHEAD); + } + + @Test + public void testDefaultReadOptionsDoNotSetNoReadAheadFlag() throws Exception { + BookieClient bookieClient = mock(BookieClient.class); + + newPendingReadOp(bookieClient, false, ReadOptions.DEFAULT).initiate(); + + ArgumentCaptor flags = ArgumentCaptor.forClass(Integer.class); + verify(bookieClient).readEntry(any(BookieId.class), anyLong(), eq(0L), any(), any(), flags.capture()); + assertEquals(0, flags.getValue() & BookieProtocol.FLAG_NO_READ_AHEAD); + } + + @Test + public void testRecoveryReadPreservesExistingFlagsWithNoReadAhead() throws Exception { + BookieClient bookieClient = mock(BookieClient.class); + + newPendingReadOp(bookieClient, true, ReadOptions.builder().disableReadAhead(true).build()).initiate(); + + ArgumentCaptor flags = ArgumentCaptor.forClass(Integer.class); + verify(bookieClient).readEntry(any(BookieId.class), anyLong(), eq(0L), any(), any(), + flags.capture(), isNull()); + assertEquals(BookieProtocol.FLAG_HIGH_PRIORITY, flags.getValue() & BookieProtocol.FLAG_HIGH_PRIORITY); + assertEquals(BookieProtocol.FLAG_DO_FENCING, flags.getValue() & BookieProtocol.FLAG_DO_FENCING); + assertEquals(BookieProtocol.FLAG_NO_READ_AHEAD, flags.getValue() & BookieProtocol.FLAG_NO_READ_AHEAD); + } + + private static PendingReadOp newPendingReadOp(BookieClient bookieClient, + boolean recoveryRead, + ReadOptions readOptions) throws Exception { + ClientContext clientContext = mock(ClientContext.class); + ClientConfiguration conf = new ClientConfiguration().setReorderReadSequenceEnabled(false); + doReturn(ClientInternalConf.fromConfig(conf)).when(clientContext).getConf(); + doReturn(bookieClient).when(clientContext).getBookieClient(); + + LedgerHandle ledgerHandle = mock(LedgerHandle.class); + LedgerMetadata metadata = mock(LedgerMetadata.class); + BookieId bookie = BookieId.parse("127.0.0.1:3181"); + List ensemble = Collections.singletonList(bookie); + TreeMap> ensembles = new TreeMap<>(); + ensembles.put(0L, ensemble); + RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(1, 1, 1); + + doReturn(metadata).when(ledgerHandle).getLedgerMetadata(); + doReturn(1).when(metadata).getWriteQuorumSize(); + doReturn(1).when(metadata).getAckQuorumSize(); + doReturn(1).when(metadata).getEnsembleSize(); + doReturn(ensemble).when(metadata).getEnsembleAt(0L); + doReturn(ensembles).when(metadata).getAllEnsembles(); + doAnswer(invocation -> schedule.getWriteSet(invocation.getArgument(0))).when(ledgerHandle) + .getWriteSetForReadOperation(anyLong()); + + return new PendingReadOp(ledgerHandle, clientContext, 0, 0, recoveryRead, readOptions) + .parallelRead(true); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java index 3f897558384..51b48985366 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -87,8 +88,8 @@ public void setup() throws IOException, BookieException { ByteBuf buffer3 = ByteBufAllocator.DEFAULT.buffer(4); ByteBuf buffer4 = ByteBufAllocator.DEFAULT.buffer(4); - when(bookie.readEntry(anyLong(), anyLong())).thenReturn(buffer0).thenReturn(buffer1).thenReturn(buffer2) - .thenReturn(buffer3).thenReturn(buffer4); + when(bookie.readEntry(anyLong(), anyLong(), anyBoolean())) + .thenReturn(buffer0).thenReturn(buffer1).thenReturn(buffer2).thenReturn(buffer3).thenReturn(buffer4); } @Test @@ -221,4 +222,4 @@ public void testNonFenceRequest() throws Exception { assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode()); assertEquals(BookieProtocol.EOK, response.getErrorCode()); } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java index ba75e4b75a6..dc1f321c09e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java @@ -19,8 +19,11 @@ package org.apache.bookkeeper.proto; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_HIGH_PRIORITY; import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_NONE; +import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_NO_READ_AHEAD; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -133,6 +136,22 @@ public void testV2RequestDecoderThrowExceptionOnUnknownRequests() throws Excepti v2ReqEncoder.decode((ByteBuf) v3ReqEncoder.encode(v3Req, UnpooledByteBufAllocator.DEFAULT)); } + @Test + public void testV2ReadRequestPreservesNoReadAheadFlag() throws Exception { + RequestEnDeCoderPreV3 v2ReqEncoder = new RequestEnDeCoderPreV3(registry); + short flags = FLAG_HIGH_PRIORITY | FLAG_NO_READ_AHEAD; + BookieProtocol.ReadRequest req = BookieProtocol.ReadRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, 1L, 2L, flags, null); + ByteBuf buf = (ByteBuf) v2ReqEncoder.encode(req, UnpooledByteBufAllocator.DEFAULT); + buf.readInt(); // Skip the frame size. + + BookieProtocol.ReadRequest reqDecoded = (BookieProtocol.ReadRequest) v2ReqEncoder.decode(buf); + assertEquals(flags, reqDecoded.getFlags()); + assertTrue(reqDecoded.isHighPriority()); + assertTrue(reqDecoded.isNoReadAhead()); + reqDecoded.recycle(); + } + @Test public void testV2BatchReadRequest() throws Exception { RequestEnDeCoderPreV3 v2ReqEncoder = new RequestEnDeCoderPreV3(registry); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java index 33a4fdc8295..aa72c7cf088 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.proto; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -99,7 +100,7 @@ public void testWatchIsCancelledOnTimeout() throws Exception { when(bookie.waitForLastAddConfirmedUpdate(anyLong(), anyLong(), any())) .thenReturn(true); - when(bookie.readEntry(anyLong(), anyLong())).thenReturn(Unpooled.buffer()); + when(bookie.readEntry(anyLong(), anyLong(), anyBoolean())).thenReturn(Unpooled.buffer()); when(bookie.readLastAddConfirmed(anyLong())).thenReturn(Long.valueOf(1)); CompletableFuture cancelFuture = new CompletableFuture<>(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index bd308c6b3f6..fa44f187db6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -202,6 +202,46 @@ public void testNonFenceRequest() throws Exception { assertEquals(BookieProtocol.EOK, response.getErrorCode()); } + @Test + public void testNoReadAheadFlagIsPassedToBookie() throws Exception { + ChannelPromise promise = new DefaultChannelPromise(channel); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + promise.setSuccess(); + latch.countDown(); + return promise; + }).when(channel).writeAndFlush(any(Response.class)); + + long ledgerId = System.currentTimeMillis(); + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + 1, BookieProtocol.FLAG_NO_READ_AHEAD, new byte[]{}); + ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); + processor.run(); + + latch.await(); + verify(bookie, times(1)).readEntry(ledgerId, 1, true); + } + + @Test + public void testDefaultReadDoesNotPassNoReadAheadToBookie() throws Exception { + ChannelPromise promise = new DefaultChannelPromise(channel); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + promise.setSuccess(); + latch.countDown(); + return promise; + }).when(channel).writeAndFlush(any(Response.class)); + + long ledgerId = System.currentTimeMillis(); + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + 1, BookieProtocol.FLAG_NONE, new byte[]{}); + ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); + processor.run(); + + latch.await(); + verify(bookie, times(1)).readEntry(ledgerId, 1, false); + } + /** * Test that when throttleReadResponses=true and the caller is not in the Netty event loop, * the read thread is not blocked by the write. onReadRequestFinish() should only be called diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3Test.java new file mode 100644 index 00000000000..3d5c8a7a059 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3Test.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.proto; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; +import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; +import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test {@link ReadEntryProcessorV3}. + */ +public class ReadEntryProcessorV3Test { + + private Bookie bookie; + private BookieRequestHandler requestHandler; + private BookieRequestProcessor requestProcessor; + + @Before + public void setup() throws Exception { + Channel channel = mock(Channel.class); + requestHandler = mock(BookieRequestHandler.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + when(requestHandler.ctx()).thenReturn(ctx); + + bookie = mock(Bookie.class); + requestProcessor = mock(BookieRequestProcessor.class); + when(requestProcessor.getBookie()).thenReturn(bookie); + when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE)); + } + + @Test + public void testNoReadAheadReadFlagIsPassedToBookie() throws Exception { + long ledgerId = 1L; + long entryId = 2L; + when(bookie.readEntry(ledgerId, entryId, true)).thenReturn(Unpooled.wrappedBuffer(new byte[] { 1 })); + when(bookie.readLastAddConfirmed(ledgerId)).thenReturn(entryId); + + ReadEntryProcessorV3 processor = newProcessor(ReadRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId) + .setReadFlags(BookieProtocol.FLAG_NO_READ_AHEAD) + .build()); + + ReadResponse response = processor.readEntry(ReadResponse.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId), entryId, Stopwatch.createStarted()); + + assertEquals(StatusCode.EOK, response.getStatus()); + verify(bookie).readEntry(ledgerId, entryId, true); + } + + @Test + public void testDefaultReadDoesNotPassNoReadAheadToBookie() throws Exception { + long ledgerId = 1L; + long entryId = 2L; + when(bookie.readEntry(ledgerId, entryId, false)).thenReturn(Unpooled.wrappedBuffer(new byte[] { 1 })); + when(bookie.readLastAddConfirmed(ledgerId)).thenReturn(entryId); + + ReadEntryProcessorV3 processor = newProcessor(ReadRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId) + .build()); + + ReadResponse response = processor.readEntry(ReadResponse.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId), entryId, Stopwatch.createStarted()); + + assertEquals(StatusCode.EOK, response.getStatus()); + verify(bookie).readEntry(ledgerId, entryId, false); + } + + private ReadEntryProcessorV3 newProcessor(ReadRequest readRequest) throws BookieException { + Request request = Request.newBuilder() + .setHeader(BKPacketHeader.newBuilder() + .setTxnId(System.currentTimeMillis()) + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_ENTRY) + .build()) + .setReadRequest(readRequest) + .build(); + return new ReadEntryProcessorV3(request, requestHandler, requestProcessor, null); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java index 46304023433..b9361cb24c3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java @@ -109,6 +109,15 @@ public void testFlagsV3() { .build(); assertFalse(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER)); assertFalse(RequestUtils.hasFlag(read, ReadRequest.Flag.ENTRY_PIGGYBACK)); + assertFalse(RequestUtils.isNoReadAhead(read)); + + read = ReadRequest.newBuilder() + .setLedgerId(10).setEntryId(1) + .setFlag(ReadRequest.Flag.FENCE_LEDGER) + .setReadFlags(BookieProtocol.FLAG_NO_READ_AHEAD) + .build(); + assertTrue(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER)); + assertTrue(RequestUtils.isNoReadAhead(read)); AddRequest add = AddRequest.newBuilder() .setLedgerId(10).setEntryId(1) @@ -196,11 +205,13 @@ public void testToString() { readRequest = ReadRequest.newBuilder().setLedgerId(10).setEntryId(23).setPreviousLAC(2).setTimeOut(100) .setMasterKey(ByteString.copyFrom("masterKey".getBytes())).setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK) + .setReadFlags(BookieProtocol.FLAG_NO_READ_AHEAD) .build(); request = Request.newBuilder().setHeader(header).setReadRequest(readRequest).build(); toString = RequestUtils.toSafeString(request); assertFalse("ReadRequest's safeString should have filtered out masterKey", toString.contains("masterKey")); assertTrue("ReadRequest's safeString shouldn contain flag", toString.contains("flag")); + assertTrue("ReadRequest's safeString should contain readFlags", toString.contains("readFlags")); assertTrue("ReadRequest's safeString shouldn contain previousLAC", toString.contains("previousLAC")); assertTrue("ReadRequest's safeString shouldn contain timeOut", toString.contains("timeOut"));