From fc199fce6c26793e9a8bc9b6ac5b60420a6a5f12 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Fri, 13 Feb 2026 15:35:58 +0800 Subject: [PATCH] fix writeLac memory leak --- .../bookkeeper/client/PendingWriteLacOp.java | 25 ++- .../client/PendingWriteLacOpTest.java | 157 ++++++++++++++++++ 2 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java index f9a5397daf0..b94971b973e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java @@ -17,6 +17,7 @@ */ package org.apache.bookkeeper.client; +import io.netty.util.ReferenceCountUtil; import java.util.BitSet; import java.util.List; import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback; @@ -59,11 +60,11 @@ class PendingWriteLacOp implements WriteLacCallback { this.cb = cb; this.ctx = ctx; this.lac = LedgerHandle.INVALID_ENTRY_ID; - ackSet = lh.distributionSchedule.getAckSet(); + ackSet = lh.getDistributionSchedule().getAckSet(); currentEnsemble = ensemble; } - void setLac(long lac) { + synchronized void setLac(long lac) { this.lac = lac; this.receivedResponseSet = new BitSet( @@ -86,10 +87,14 @@ void initiate(ByteBufList toSend) { } @Override - public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) { + public synchronized void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) { int bookieIndex = (Integer) ctx; + // We got response. + receivedResponseSet.clear(bookieIndex); + if (completed) { + maybeRecycle(); return; } @@ -97,13 +102,11 @@ public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) { lastSeenError = rc; } - // We got response. - receivedResponseSet.clear(bookieIndex); - if (rc == BKException.Code.OK) { if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) { completed = true; cb.addLacComplete(rc, lh, ctx); + maybeRecycle(); return; } } else { @@ -114,5 +117,15 @@ public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) { completed = true; cb.addLacComplete(lastSeenError, lh, ctx); } + + maybeRecycle(); } + + private void maybeRecycle() { + if (receivedResponseSet.isEmpty() && toSend != null) { + ReferenceCountUtil.release(toSend); + toSend = null; + } + } + } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java new file mode 100644 index 00000000000..b84f12d40d4 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.util.ByteBufList; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test of {@link PendingWriteLacOp}. + */ +public class PendingWriteLacOpTest implements AsyncCallback.AddLacCallback { + + private LedgerHandle lh; + private ClientContext mockClientContext; + private ByteBufList toSend; + private boolean callbackInvoked; + + @Before + public void setup() { + lh = mock(LedgerHandle.class); + + toSend = ByteBufList.get(); + callbackInvoked = false; + } + + @Test + public void testWriteLacOp() { + + // 332 + when(lh.getDistributionSchedule()) + .thenReturn(new RoundRobinDistributionSchedule(3, 2, 3)); + PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, mockClientContext, + lh.getCurrentEnsemble(), this, null); + + LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class); + when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3); + when(ledgerMetadata.getAckQuorumSize()).thenReturn(2); + when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata); + + writeLacOp.setLac(1000); + + assertEquals(1000, writeLacOp.lac); + assertFalse(writeLacOp.completed); + assertFalse(writeLacOp.receivedResponseSet.isEmpty()); + + writeLacOp.toSend = toSend; + assertEquals(1, toSend.refCnt()); + + writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0); + writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1); + + assertTrue(callbackInvoked); + assertTrue(writeLacOp.completed); + assertFalse(writeLacOp.receivedResponseSet.isEmpty()); + assertNotNull(writeLacOp.toSend); + + writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2); + assertTrue(writeLacOp.receivedResponseSet.isEmpty()); + assertNull(writeLacOp.toSend); + assertEquals(0, toSend.refCnt()); + + // 333 + callbackInvoked = false; + toSend = ByteBufList.get(); + when(lh.getDistributionSchedule()) + .thenReturn(new RoundRobinDistributionSchedule(3, 3, 3)); + writeLacOp = new PendingWriteLacOp(lh, mockClientContext, lh.getCurrentEnsemble(), this, null); + + ledgerMetadata = mock(LedgerMetadata.class); + when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3); + when(ledgerMetadata.getAckQuorumSize()).thenReturn(3); + when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata); + + writeLacOp.setLac(1000); + + assertEquals(1000, writeLacOp.lac); + assertFalse(writeLacOp.completed); + assertFalse(writeLacOp.receivedResponseSet.isEmpty()); + + writeLacOp.toSend = toSend; + assertEquals(1, toSend.refCnt()); + + writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0); + writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1); + + assertFalse(callbackInvoked); + assertFalse(writeLacOp.completed); + assertFalse(writeLacOp.receivedResponseSet.isEmpty()); + assertNotNull(writeLacOp.toSend); + + writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2); + assertTrue(writeLacOp.receivedResponseSet.isEmpty()); + assertNull(writeLacOp.toSend); + assertEquals(0, toSend.refCnt()); + + // 111 + callbackInvoked = false; + toSend = ByteBufList.get(); + when(lh.getDistributionSchedule()) + .thenReturn(new RoundRobinDistributionSchedule(1, 1, 1)); + writeLacOp = new PendingWriteLacOp(lh, mockClientContext, lh.getCurrentEnsemble(), this, null); + + ledgerMetadata = mock(LedgerMetadata.class); + when(ledgerMetadata.getWriteQuorumSize()).thenReturn(1); + when(ledgerMetadata.getAckQuorumSize()).thenReturn(1); + when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata); + + writeLacOp.setLac(1000); + + assertEquals(1000, writeLacOp.lac); + assertFalse(writeLacOp.completed); + assertFalse(writeLacOp.receivedResponseSet.isEmpty()); + + writeLacOp.toSend = toSend; + assertEquals(1, toSend.refCnt()); + + writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0); + + assertTrue(callbackInvoked); + assertTrue(writeLacOp.completed); + assertTrue(writeLacOp.receivedResponseSet.isEmpty()); + assertNull(writeLacOp.toSend); + assertEquals(0, toSend.refCnt()); + + } + + @Override + public synchronized void addLacComplete(int rc, LedgerHandle lh, Object ctx) { + callbackInvoked = true; + } +}