Skip to content

Commit 5aab2f0

Browse files
authored
[fix][client] Fix race condition between isDuplicate() and flushAsync() method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty Recycler (#25208)
1 parent 4add84c commit 5aab2f0

5 files changed

Lines changed: 74 additions & 24 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@
136136
import org.apache.pulsar.common.util.FutureUtil;
137137
import org.apache.pulsar.common.util.SafeCollectionUtils;
138138
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
139-
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
139+
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
140140
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
141141
import org.slf4j.Logger;
142142
import org.slf4j.LoggerFactory;
@@ -3187,7 +3187,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
31873187
} else {
31883188
if (Commands.peerSupportsMultiMessageAcknowledgment(
31893189
getClientCnx().getRemoteEndpointProtocolVersion())) {
3190-
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
3190+
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
31913191
new ArrayList<>(chunkMsgIds.length);
31923192
for (MessageIdImpl cMsgId : chunkMsgIds) {
31933193
if (cMsgId != null && chunkMsgIds.length > 1) {
@@ -3225,7 +3225,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
32253225
}
32263226

32273227
private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
3228-
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries,
3228+
List<Triple<Long, Long, ConcurrentBitSet>> entries,
32293229
long requestID) {
32303230
BaseCommand cmd = newMultiMessageAckCommon(entries);
32313231
cmd.getAck()
@@ -3244,7 +3244,7 @@ protected BaseCommand initialValue() throws Exception {
32443244
}
32453245
};
32463246

3247-
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
3247+
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) {
32483248
BaseCommand cmd = LOCAL_BASE_COMMAND.get()
32493249
.clear()
32503250
.setType(BaseCommand.Type.ACK);
@@ -3253,7 +3253,7 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
32533253
for (int i = 0; i < entriesCount; i++) {
32543254
long ledgerId = entries.get(i).getLeft();
32553255
long entryId = entries.get(i).getMiddle();
3256-
ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
3256+
ConcurrentBitSet bitSet = entries.get(i).getRight();
32573257
MessageIdData msgId = ack.addMessageId()
32583258
.setLedgerId(ledgerId)
32593259
.setEntryId(entryId);
@@ -3262,7 +3262,6 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
32623262
for (int j = 0; j < ackSet.length; j++) {
32633263
msgId.addAckSet(ackSet[j]);
32643264
}
3265-
bitSet.recycle();
32663265
}
32673266
}
32683267

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.apache.pulsar.common.protocol.Commands;
5252
import org.apache.pulsar.common.util.FutureUtil;
5353
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
54-
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
54+
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
5555
import org.jspecify.annotations.Nullable;
5656

5757
/**
@@ -83,7 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
8383
*/
8484
private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
8585
@VisibleForTesting
86-
final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
86+
final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSet> pendingIndividualBatchIndexAcks;
8787

8888
private final ScheduledFuture<?> scheduledTask;
8989
private final boolean batchIndexAckEnabled;
@@ -133,7 +133,7 @@ public boolean isDuplicate(MessageId messageId) {
133133
return true;
134134
}
135135
if (messageIdAdv.getBatchIndex() >= 0) {
136-
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key);
136+
ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.get(key);
137137
return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex());
138138
}
139139
return false;
@@ -327,21 +327,22 @@ private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId, Map<Stri
327327

328328
@VisibleForTesting
329329
CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
330-
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
330+
ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
331331
MessageIdAdvUtils.discardBatch(msgId), __ -> {
332332
final BitSet ackSet = msgId.getAckSet();
333-
final ConcurrentBitSetRecyclable value;
333+
final ConcurrentBitSet value;
334334
if (ackSet != null) {
335335
synchronized (ackSet) {
336336
if (!ackSet.isEmpty()) {
337-
value = ConcurrentBitSetRecyclable.create(ackSet);
337+
value = new ConcurrentBitSet();
338+
value.or(ackSet);
338339
} else {
339-
value = ConcurrentBitSetRecyclable.create();
340+
value = new ConcurrentBitSet();
340341
value.set(0, msgId.getBatchSize());
341342
}
342343
}
343344
} else {
344-
value = ConcurrentBitSetRecyclable.create();
345+
value = new ConcurrentBitSet();
345346
value.set(0, msgId.getBatchSize());
346347
}
347348
return value;
@@ -445,7 +446,7 @@ private void flushAsync(ClientCnx cnx) {
445446
}
446447

447448
// Flush all individual acks
448-
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
449+
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
449450
new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
450451
if (!pendingIndividualAcks.isEmpty()) {
451452
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
@@ -487,7 +488,7 @@ private void flushAsync(ClientCnx cnx) {
487488
}
488489

489490
while (true) {
490-
Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
491+
Map.Entry<MessageIdAdv, ConcurrentBitSet> entry =
491492
pendingIndividualBatchIndexAcks.pollFirstEntry();
492493
if (entry == null) {
493494
// The entry has been removed in a different thread
@@ -539,7 +540,7 @@ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, Message
539540
// cumulative ack chunk by the last messageId
540541
if (chunkMsgIds != null && ackType != AckType.Cumulative) {
541542
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
542-
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
543+
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
543544
for (MessageIdImpl cMsgId : chunkMsgIds) {
544545
if (cMsgId != null && chunkMsgIds.length > 1) {
545546
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
@@ -568,7 +569,7 @@ private CompletableFuture<Void> newMessageAckCommandAndWrite(
568569
long entryId, BitSetRecyclable ackSet, AckType ackType,
569570
Map<String, Long> properties, boolean flush,
570571
TimedCompletableFuture<Void> timedCompletableFuture,
571-
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
572+
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) {
572573
if (consumer.isAckReceiptEnabled()) {
573574
final long requestId = consumer.getClient().newRequestId();
574575
final ByteBuf cmd;

pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import io.netty.channel.ChannelHandlerContext;
3232
import io.netty.channel.EventLoopGroup;
3333
import io.netty.channel.nio.NioEventLoopGroup;
34+
import java.util.ArrayList;
3435
import java.util.BitSet;
3536
import java.util.Collections;
37+
import java.util.List;
3638
import java.util.concurrent.CompletableFuture;
3739
import java.util.concurrent.ConcurrentHashMap;
3840
import java.util.concurrent.TimeUnit;
@@ -405,6 +407,54 @@ public void testDoIndividualBatchAckAsync() {
405407
tracker.close();
406408
}
407409

410+
@Test
411+
public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws Exception {
412+
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
413+
conf.setMaxAcknowledgmentGroupSize(1);
414+
PersistentAcknowledgmentsGroupingTracker tracker =
415+
new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
416+
417+
BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 0, 10, null);
418+
BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 1, 10, null);
419+
420+
int loops = 10000;
421+
int addAcknowledgmentThreadCount = 10;
422+
List<Thread> addAcknowledgmentThreads = new ArrayList<>(addAcknowledgmentThreadCount);
423+
for (int i = 0; i < addAcknowledgmentThreadCount; i++) {
424+
Thread addAcknowledgmentThread = new Thread(() -> {
425+
for (int j = 0; j < loops; j++) {
426+
tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap());
427+
}
428+
}, "doIndividualBatchAck-thread-" + i);
429+
addAcknowledgmentThread.start();
430+
addAcknowledgmentThreads.add(addAcknowledgmentThread);
431+
}
432+
433+
int isDuplicateThreadCount = 10;
434+
AtomicBoolean assertResult = new AtomicBoolean();
435+
List<Thread> isDuplicateThreads = new ArrayList<>(isDuplicateThreadCount);
436+
for (int i = 0; i < isDuplicateThreadCount; i++) {
437+
Thread isDuplicateThread = new Thread(() -> {
438+
for (int j = 0; j < loops; j++) {
439+
boolean duplicate = tracker.isDuplicate(batchMessageId1);
440+
assertResult.set(assertResult.get() || duplicate);
441+
}
442+
}, "isDuplicate-thread-" + i);
443+
isDuplicateThread.start();
444+
isDuplicateThreads.add(isDuplicateThread);
445+
}
446+
447+
for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) {
448+
addAcknowledgmentThread.join();
449+
}
450+
451+
for (Thread isDuplicateThread : isDuplicateThreads) {
452+
isDuplicateThread.join();
453+
}
454+
455+
assertFalse(assertResult.get());
456+
}
457+
408458
public class ClientCnxTest extends ClientCnx {
409459

410460
public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
import org.apache.pulsar.common.schema.SchemaInfo;
113113
import org.apache.pulsar.common.schema.SchemaType;
114114
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
115-
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
115+
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
116116

117117
@UtilityClass
118118
@Slf4j
@@ -1035,7 +1035,7 @@ public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg,
10351035
}
10361036

10371037
public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
1038-
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
1038+
List<Triple<Long, Long, ConcurrentBitSet>> entries) {
10391039
BaseCommand cmd = newMultiMessageAckCommon(entries);
10401040
cmd.getAck()
10411041
.setConsumerId(consumerId)
@@ -1045,14 +1045,14 @@ public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID
10451045
return serializeWithSize(cmd);
10461046
}
10471047

1048-
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
1048+
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) {
10491049
BaseCommand cmd = localCmd(Type.ACK);
10501050
CommandAck ack = cmd.setAck();
10511051
int entriesCount = entries.size();
10521052
for (int i = 0; i < entriesCount; i++) {
10531053
long ledgerId = entries.get(i).getLeft();
10541054
long entryId = entries.get(i).getMiddle();
1055-
ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
1055+
ConcurrentBitSet bitSet = entries.get(i).getRight();
10561056
MessageIdData msgId = ack.addMessageId()
10571057
.setLedgerId(ledgerId)
10581058
.setEntryId(entryId);
@@ -1061,15 +1061,14 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
10611061
for (int j = 0; j < ackSet.length; j++) {
10621062
msgId.addAckSet(ackSet[j]);
10631063
}
1064-
bitSet.recycle();
10651064
}
10661065
}
10671066

10681067
return cmd;
10691068
}
10701069

10711070
public static ByteBuf newMultiMessageAck(long consumerId,
1072-
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries,
1071+
List<Triple<Long, Long, ConcurrentBitSet>> entries,
10731072
long requestId) {
10741073
BaseCommand cmd = newMultiMessageAckCommon(entries);
10751074
cmd.getAck()

pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
/**
2828
* Safe multithreaded version of {@code BitSet} and leverage netty recycler.
2929
*/
30+
@Deprecated
3031
@EqualsAndHashCode(callSuper = true)
3132
public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
3233

0 commit comments

Comments
 (0)