Skip to content

Commit aeff4ec

Browse files
author
wenbingshen
committed
fix writeLac memory leak
1 parent 3a5cf9d commit aeff4ec

2 files changed

Lines changed: 175 additions & 5 deletions

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.bookkeeper.client;
1919

20+
import io.netty.util.ReferenceCountUtil;
2021
import java.util.BitSet;
2122
import java.util.List;
2223
import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
@@ -59,7 +60,7 @@ class PendingWriteLacOp implements WriteLacCallback {
5960
this.cb = cb;
6061
this.ctx = ctx;
6162
this.lac = LedgerHandle.INVALID_ENTRY_ID;
62-
ackSet = lh.distributionSchedule.getAckSet();
63+
ackSet = lh.getDistributionSchedule().getAckSet();
6364
currentEnsemble = ensemble;
6465
}
6566

@@ -86,24 +87,26 @@ void initiate(ByteBufList toSend) {
8687
}
8788

8889
@Override
89-
public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
90+
public synchronized void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
9091
int bookieIndex = (Integer) ctx;
9192

93+
// We got response.
94+
receivedResponseSet.clear(bookieIndex);
95+
9296
if (completed) {
97+
maybeRecycle();
9398
return;
9499
}
95100

96101
if (BKException.Code.OK != rc) {
97102
lastSeenError = rc;
98103
}
99104

100-
// We got response.
101-
receivedResponseSet.clear(bookieIndex);
102-
103105
if (rc == BKException.Code.OK) {
104106
if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
105107
completed = true;
106108
cb.addLacComplete(rc, lh, ctx);
109+
maybeRecycle();
107110
return;
108111
}
109112
} else {
@@ -114,5 +117,15 @@ public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
114117
completed = true;
115118
cb.addLacComplete(lastSeenError, lh, ctx);
116119
}
120+
121+
maybeRecycle();
117122
}
123+
124+
private void maybeRecycle() {
125+
if (receivedResponseSet.isEmpty() && toSend != null) {
126+
ReferenceCountUtil.release(toSend);
127+
toSend = null;
128+
}
129+
}
130+
118131
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.client;
20+
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertNotNull;
24+
import static org.junit.Assert.assertNull;
25+
import static org.junit.Assert.assertTrue;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.when;
28+
29+
import org.apache.bookkeeper.client.api.LedgerMetadata;
30+
import org.apache.bookkeeper.util.ByteBufList;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
34+
/**
35+
* Unit test of {@link PendingWriteLacOp}.
36+
*/
37+
public class PendingWriteLacOpTest implements AsyncCallback.AddLacCallback {
38+
39+
private LedgerHandle lh;
40+
private ClientContext mockClientContext;
41+
private ByteBufList toSend;
42+
private boolean callbackInvoked;
43+
44+
@Before
45+
public void setup() {
46+
lh = mock(LedgerHandle.class);
47+
48+
toSend = ByteBufList.get();
49+
callbackInvoked = false;
50+
}
51+
52+
@Test
53+
public void testWriteLacOp() {
54+
55+
// 332
56+
when(lh.getDistributionSchedule())
57+
.thenReturn(new RoundRobinDistributionSchedule(3, 2, 3));
58+
PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, mockClientContext,
59+
lh.getCurrentEnsemble(), this, null);
60+
61+
LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
62+
when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
63+
when(ledgerMetadata.getAckQuorumSize()).thenReturn(2);
64+
when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
65+
66+
writeLacOp.setLac(1000);
67+
68+
assertEquals(1000, writeLacOp.lac);
69+
assertFalse(writeLacOp.completed);
70+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
71+
72+
writeLacOp.toSend = toSend;
73+
assertEquals(1, toSend.refCnt());
74+
75+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
76+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
77+
78+
assertTrue(callbackInvoked);
79+
assertTrue(writeLacOp.completed);
80+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
81+
assertNotNull(writeLacOp.toSend);
82+
83+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
84+
assertTrue(writeLacOp.receivedResponseSet.isEmpty());
85+
assertNull(writeLacOp.toSend);
86+
assertEquals(0, toSend.refCnt());
87+
88+
// 333
89+
callbackInvoked = false;
90+
toSend = ByteBufList.get();
91+
when(lh.getDistributionSchedule())
92+
.thenReturn(new RoundRobinDistributionSchedule(3, 3, 3));
93+
writeLacOp = new PendingWriteLacOp(lh, mockClientContext, lh.getCurrentEnsemble(), this, null);
94+
95+
ledgerMetadata = mock(LedgerMetadata.class);
96+
when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
97+
when(ledgerMetadata.getAckQuorumSize()).thenReturn(3);
98+
when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
99+
100+
writeLacOp.setLac(1000);
101+
102+
assertEquals(1000, writeLacOp.lac);
103+
assertFalse(writeLacOp.completed);
104+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
105+
106+
writeLacOp.toSend = toSend;
107+
assertEquals(1, toSend.refCnt());
108+
109+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
110+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
111+
112+
assertFalse(callbackInvoked);
113+
assertFalse(writeLacOp.completed);
114+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
115+
assertNotNull(writeLacOp.toSend);
116+
117+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
118+
assertTrue(writeLacOp.receivedResponseSet.isEmpty());
119+
assertNull(writeLacOp.toSend);
120+
assertEquals(0, toSend.refCnt());
121+
122+
// 111
123+
callbackInvoked = false;
124+
toSend = ByteBufList.get();
125+
when(lh.getDistributionSchedule())
126+
.thenReturn(new RoundRobinDistributionSchedule(1, 1, 1));
127+
writeLacOp = new PendingWriteLacOp(lh, mockClientContext, lh.getCurrentEnsemble(), this, null);
128+
129+
ledgerMetadata = mock(LedgerMetadata.class);
130+
when(ledgerMetadata.getWriteQuorumSize()).thenReturn(1);
131+
when(ledgerMetadata.getAckQuorumSize()).thenReturn(1);
132+
when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
133+
134+
writeLacOp.setLac(1000);
135+
136+
assertEquals(1000, writeLacOp.lac);
137+
assertFalse(writeLacOp.completed);
138+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
139+
140+
writeLacOp.toSend = toSend;
141+
assertEquals(1, toSend.refCnt());
142+
143+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
144+
145+
assertTrue(callbackInvoked);
146+
assertTrue(writeLacOp.completed);
147+
assertTrue(writeLacOp.receivedResponseSet.isEmpty());
148+
assertNull(writeLacOp.toSend);
149+
assertEquals(0, toSend.refCnt());
150+
151+
}
152+
153+
@Override
154+
public synchronized void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
155+
callbackInvoked = true;
156+
}
157+
}

0 commit comments

Comments
 (0)