From 9ce24963bfd9112d5b4addefd91d12e52784e19d Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 10 May 2026 23:39:23 +0800 Subject: [PATCH] Fix batch read retry on digest mismatch Batch reads marked the request as complete before verifying the digest of each returned entry. If one entry in the response failed digest verification, the operation attempted to retry on another replica, but the request had already been completed, so the retry response could be ignored and the batch read could hang or fail. Verify all entries in the batch before completing the request. On digest mismatch, leave the request incomplete, discard the whole response, and retry the same batch on the next replica. Only create LedgerEntryImpl instances after the full batch has passed digest verification, so no partially verified entries are retained. Add tests for retrying after a corrupt batch response and for the case where an earlier entry verifies successfully but a later entry fails digest verification. --- .../bookkeeper/client/BatchedReadOp.java | 42 ++++-- .../bookkeeper/client/TestBatchedRead.java | 136 ++++++++++++++++++ 2 files changed, 164 insertions(+), 14 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index ebba79faea8..2fdb70195c3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -112,6 +112,11 @@ public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBu heardFromHosts.add(rctx.to); heardFromHostsBitSet.set(rctx.bookieIndex, true); + /* + * Retain the response while this read op handles it. complete() returns true only when it + * transfers the buffers into request.entries. For digest failures, duplicate responses, or + * other incomplete paths, complete() returns false and this retained reference is released here. + */ bufList.retain(); // if entry has completed don't handle twice if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) { @@ -160,32 +165,41 @@ boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { if (isComplete()) { return false; } - if (!complete.getAndSet(true)) { + + /* + * Verify the whole batch before creating LedgerEntryImpl instances. If any entry fails + * digest verification, no partial entries are retained and readEntriesComplete() releases + * the retained ByteBufList after this method returns false. + */ + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + try { + lh.macManager.verifyDigestAndReturnData(eId + i, buffer); + } catch (BKException.BKDigestMatchException e) { + clientCtx.getClientStats().getReadOpDmCounter().inc(); + logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", + BKException.Code.DigestMatchException); + return false; + } + } + + if (complete.compareAndSet(false, true)) { + rc = BKException.Code.OK; for (int i = 0; i < bufList.size(); i++) { ByteBuf buffer = bufList.getBuffer(i); - ByteBuf content; - try { - content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer); - } catch (BKException.BKDigestMatchException e) { - clientCtx.getClientStats().getReadOpDmCounter().inc(); - logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", - BKException.Code.DigestMatchException); - return false; - } - rc = BKException.Code.OK; /* * The length is a long and it is the last field of the metadata of an entry. * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. */ - LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8)); - entryImpl.setEntryBuf(content); + entryImpl.setEntryBuf(buffer); entries.add(entryImpl); } writeSet.recycle(); return true; } else { - writeSet.recycle(); + // Another response completed the request first; readEntriesComplete() releases bufList. return false; } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java index 1bb95ed0478..7e11d2f70a1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java @@ -21,22 +21,32 @@ package org.apache.bookkeeper.client; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.awaitility.Awaitility; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -289,4 +299,130 @@ public void testReadFailureWithFailedBookies() throws Exception { lh.close(); newBk.close(); } + + @Test + public void testDigestMismatchRetriesNextReplicaAndCompletes() throws Exception { + ClientConfiguration conf = new ClientConfiguration(baseClientConf) + .setUseV2WireProtocol(true) + .setReorderReadSequenceEnabled(false) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + + try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) { + byte[] data = "batch-digest-data".getBytes(StandardCharsets.UTF_8); + LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd); + writer.addEntry(data); + long ledgerId = writer.getId(); + BookieId corruptReplica = writer.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + writer.close(); + + ServerConfiguration corruptConf = killBookie(corruptReplica); + startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf)); + + LedgerHandle reader = bk.openLedger(ledgerId, digestType, passwd); + BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 1, 1024, false); + readOp.submit(); + + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertArrayEquals(data, entry.getEntryBytes()); + entry.close(); + assertFalse(entries.hasNext()); + reader.close(); + } + } + + @Test + public void testDigestMismatchAfterPartialVerificationDoesNotRetainEntries() throws Exception { + ClientConfiguration conf = new ClientConfiguration(baseClientConf) + .setUseV2WireProtocol(true) + .setReorderReadSequenceEnabled(false) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + + try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) { + byte[] entry0 = "batch-digest-entry-0".getBytes(StandardCharsets.UTF_8); + byte[] entry1 = "batch-digest-entry-1".getBytes(StandardCharsets.UTF_8); + LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd); + writer.addEntry(entry0); + writer.addEntry(entry1); + long ledgerId = writer.getId(); + List ensemble = writer.getLedgerMetadata().getAllEnsembles().get(0L); + BookieId corruptReplica = ensemble.get(0); + BookieId retryReplica = ensemble.get(1); + writer.close(); + + CountDownLatch corruptReadLatch = new CountDownLatch(1); + ServerConfiguration corruptConf = killBookie(corruptReplica); + startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf, 1L, corruptReadLatch)); + + CountDownLatch retryLatch = new CountDownLatch(1); + sleepBookie(retryReplica, retryLatch); + + LedgerHandle reader = null; + try { + reader = bk.openLedger(ledgerId, digestType, passwd); + BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 2, 2048, false); + readOp.submit(); + + assertTrue("corrupt replica did not read the corrupted entry", + corruptReadLatch.await(10, TimeUnit.SECONDS)); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertNotNull(readOp.request); + BatchedReadOp.SequenceReadRequest request = + (BatchedReadOp.SequenceReadRequest) readOp.request; + assertTrue(request.nextReplicaIndexToReadFrom >= 2); + }); + assertTrue("digest mismatch must not retain partially verified entries", + readOp.request.entries.isEmpty()); + + retryLatch.countDown(); + Iterator entries = readOp.future().get(10, TimeUnit.SECONDS).iterator(); + assertTrue(entries.hasNext()); + LedgerEntry first = entries.next(); + assertArrayEquals(entry0, first.getEntryBytes()); + first.close(); + assertTrue(entries.hasNext()); + LedgerEntry second = entries.next(); + assertArrayEquals(entry1, second.getEntryBytes()); + second.close(); + assertFalse(entries.hasNext()); + } finally { + retryLatch.countDown(); + if (reader != null) { + reader.close(); + } + } + } + } + + static class CorruptReadBookie extends TestBookieImpl { + private final long corruptEntryId; + private final CountDownLatch corruptReadLatch; + + CorruptReadBookie(ServerConfiguration conf) throws Exception { + this(conf, -1L, null); + } + + CorruptReadBookie(ServerConfiguration conf, long corruptEntryId, CountDownLatch corruptReadLatch) + throws Exception { + super(conf); + this.corruptEntryId = corruptEntryId; + this.corruptReadLatch = corruptReadLatch; + } + + @Override + public ByteBuf readEntry(long ledgerId, long entryId) + throws IOException, Bookie.NoLedgerException, BookieException { + ByteBuf localBuf = super.readEntry(ledgerId, entryId); + if (corruptEntryId < 0 || corruptEntryId == entryId) { + for (int i = 0; i < localBuf.capacity(); i++) { + localBuf.setByte(i, 0); + } + if (corruptReadLatch != null) { + corruptReadLatch.countDown(); + } + } + return localBuf; + } + } }