Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
187 commits
Select commit Hold shift + click to select a range
7d2c4a3
docs: add comments to class GrpcMessagingApplication
winglechen Oct 14, 2025
2da77dc
Merge branch 'apache:develop' into comment
winglechen Oct 20, 2025
906a88b
Merge branch 'apache:develop' into comment
winglechen Oct 27, 2025
11597bb
Merge branch 'apache:develop' into comment
winglechen Nov 5, 2025
034db71
Merge branch 'apache:develop' into comment
winglechen Nov 12, 2025
512bc80
comment: add comments to ProxyStartup
winglechen Nov 14, 2025
a0cac85
comment: add comments to remoting related thread pool config of Proxy…
winglechen Nov 16, 2025
1ecc3a0
Merge branch 'apache:develop' into comment
winglechen Nov 16, 2025
156ec22
comment: add comments to method sendMessage of SendMessageActivity of…
winglechen Nov 16, 2025
7517165
comment: add comments to method sendMessage of ProducerProcessor
winglechen Nov 16, 2025
d567caf
comment: add comments to method sendMessage of MessageService
winglechen Nov 16, 2025
2098c73
Merge branch 'apache:develop' into comment
winglechen Nov 21, 2025
89243b1
Merge branch 'apache:develop' into comment
winglechen Nov 25, 2025
b4311c4
Merge branch 'apache:develop' into comment
winglechen Dec 3, 2025
f700590
Merge branch 'apache:develop' into comment
winglechen Dec 11, 2025
d758885
Merge branch 'apache:develop' into comment
winglechen Dec 17, 2025
076b6de
Merge branch 'apache:develop' into comment
winglechen Feb 2, 2026
c5cec8d
Merge branch 'apache:develop' into comment
winglechen Feb 7, 2026
6babbfa
Merge branch 'apache:develop' into comment
winglechen Feb 28, 2026
d96f2fa
Merge branch 'apache:develop' into comment
winglechen Apr 1, 2026
dd9c8f8
Merge branch 'apache:develop' into comment
winglechen May 4, 2026
30aa8b5
comment: add comments of ack mode to BrokerConfig.popConsumerKVServic…
winglechen May 6, 2026
9f55d70
Merge branch 'apache:develop' into comment
winglechen May 11, 2026
a75c7ba
comment: add class comments to SelectMappedBufferResult
winglechen May 14, 2026
eadaec6
Merge branch 'comment' of github.com:wolforest/rocketmq-comment into …
winglechen May 14, 2026
477cec2
comment: add class comments to GetMessageResult
winglechen May 14, 2026
3e81bb1
comment: add comments to method getMessage
winglechen May 16, 2026
c869be8
comment: add comments to method receiveMessage of ReceiveMessageActivity
winglechen May 16, 2026
ae075b3
comment: add comments to the fields startOffsetInfo, msgOffsetInfo, a…
winglechen May 17, 2026
031348e
comment: add comments of ack mode to method processRequest of PopMess…
winglechen May 17, 2026
7e490e9
comment: add process flow comments to method processRequest of PopMes…
winglechen May 18, 2026
f491213
comment: add class comments to PopLongPollingService
winglechen May 18, 2026
c8ddf65
Merge branch 'apache:develop' into comment
winglechen May 18, 2026
80f3e62
comment: add version related comments to method processRequest of Pop…
winglechen May 18, 2026
92bffea
comment: add method comments to method popMsgFromTopic of PopMessageP…
winglechen May 18, 2026
c43e83e
comment: add process flow comments to method popMsgFromQueue of PopMe…
winglechen May 18, 2026
af61749
comment: add method comments to method popMsgFromQueue of PopMessageP…
winglechen May 18, 2026
314879f
comment: add comments to method getPopOffset of PopMessageProcessor
winglechen May 18, 2026
174e0ff
comment: add comments to method getInitOffset of PopMessageProcessor
winglechen May 18, 2026
c8cb465
comment: add process comments to method getInitOffset of PopMessagePr…
winglechen May 18, 2026
0cab192
comment: add class comments to PopBufferMergeService
winglechen May 19, 2026
30d5e43
Merge branch 'apache:develop' into comment
winglechen May 20, 2026
4f612a9
comment: add class comments and attribute comments to PopCheckPoint
winglechen May 21, 2026
baef876
comment: add attribute comments to PopCheckPointWrapper
winglechen May 23, 2026
dac6f4e
comment: add method comments to addCkMock of PopBufferMergeService
winglechen May 23, 2026
ffea2f4
comment: add method comments to addCk of PopBufferMergeService
winglechen May 23, 2026
31a456d
comment: add method comments to putCkToStore of PopBufferMergeService
winglechen May 23, 2026
81a3a14
comment: add attribute comments to buffer and commitOffsets of PopBuf…
winglechen May 23, 2026
e6a279f
comment: add method comments to putOffsetQueue of PopBufferMergeService
winglechen May 23, 2026
6a247b5
comment: add process comments to method run of PopBufferMergeService
winglechen May 23, 2026
f5a0fc6
comment: add method comments to scanGarbage of PopBufferMergeService
winglechen May 23, 2026
a3ea07b
comment: add method comments to scan of PopBufferMergeService
winglechen May 23, 2026
ed37ae3
Merge branch 'apache:develop' into comment
winglechen May 23, 2026
848f23f
comment: add process comments to method addAk of PopBufferMergeService
winglechen May 26, 2026
00252be
comment: add method comments to addAk of PopBufferMergeService
winglechen May 26, 2026
801a041
comment: add attribute comments to queueOffsetDiff of PopCheckPoint
winglechen May 26, 2026
cd92c92
comment: add notes about queueOffsetDiff null scenario of PopCheckPoint
winglechen May 26, 2026
4576158
Merge branch 'apache:develop' into comment
winglechen May 26, 2026
e3287d4
comment: update attribute comments for bits and toStoreBits of PopChe…
winglechen May 26, 2026
107d6fb
comment: add class and method comments to DataConverter
winglechen May 26, 2026
404019b
comment: add method comments to markBitCAS of PopBufferMergeService
winglechen May 26, 2026
f49cbe4
comment: add method comments to indexOfAck of PopCheckPoint
winglechen May 26, 2026
b98be60
comment: add method comments to ackOffsetByIndex of PopCheckPoint
winglechen May 26, 2026
1049c19
comment: add inline comments to putAckToStore and scan of PopBufferMe…
winglechen May 26, 2026
6c561f5
comment: add method comments to putAckToStore of PopBufferMergeService
winglechen May 26, 2026
ca47354
comment: add method comments to handleAckPutMessageResult of PopBuffe…
winglechen May 26, 2026
2eb0e90
comment: add attribute comments to justOffset of PopCheckPointWrapper
winglechen May 26, 2026
6c554ae
comment: update attribute comments to reviveQueueOffset of PopCheckPo…
winglechen May 26, 2026
d3d9587
comment: add inline comment for reviveQueueOffset check in scan
winglechen May 26, 2026
a56679b
comment: fix inline comments in scan
winglechen May 26, 2026
fdfc788
comment: update inline comment for isCkDone check in scan
winglechen May 26, 2026
d138c0c
comment: add method comments to isCkDone of PopBufferMergeService
winglechen May 26, 2026
1246674
comment: add method comments to isCkDoneForFinish of PopBufferMergeSe…
winglechen May 26, 2026
0f4a4cb
comment: add method comments to scanCommitOffset of PopBufferMergeSer…
winglechen May 26, 2026
fbd9a5d
comment: add naming note to scanCommitOffset
winglechen May 26, 2026
57b5089
comment: add method comments to commitOffset of PopBufferMergeService
winglechen May 26, 2026
5278b4b
comment: add inline comment for scanCommitOffset call in scan
winglechen May 26, 2026
224815b
comment: add inline comments for store and remove in scan
winglechen May 27, 2026
26c908d
comment: add in-line comments to method scanGarbage of PopBufferMerge…
winglechen May 27, 2026
05e2f4d
comment: add class comments to PopReviveService
winglechen May 27, 2026
352e569
comment: add note about public methods to PopReviveService
winglechen May 27, 2026
3ebb734
comment: add in-line comments to method run of PopReviveService
winglechen May 27, 2026
cd64fa1
comment: add method comments to run of PopReviveService
winglechen May 27, 2026
72f7c36
comment: add method comments to consumeReviveMessage of PopReviveService
winglechen May 27, 2026
e85b23a
comment: add in-line comments to method consumeReviveMessage of PopRe…
winglechen May 27, 2026
6947ab3
comment: add method comments to getReviveMessage of PopReviveService
winglechen May 27, 2026
4874a13
comment: add in-line comments to method consumeReviveMessage of PopRe…
winglechen May 27, 2026
a024659
comment: add method comments to mockCkForAck and createMockCkForAck o…
winglechen May 27, 2026
0e682f3
comment: add method comments to mergeAndRevive of PopReviveService
winglechen May 27, 2026
8fbe422
comment: add in-line comments to method mergeAndRevive of PopReviveSe…
winglechen May 27, 2026
f163d0d
comment: add method comments to rePutCK of PopReviveService
winglechen May 27, 2026
fe4ff18
comment: add method comments to reviveMsgFromCk of PopReviveService
winglechen May 27, 2026
feb456b
Merge branch 'apache:develop' into comment
winglechen May 27, 2026
4891cff
comment: add in-line comments to method reviveMsgFromCk of PopReviveS…
winglechen May 27, 2026
10a7fe8
comment: add concurrency control comments to method mergeAndRevive of…
winglechen May 27, 2026
5d121c1
Merge branch 'comment' of github.com:wolforest/rocketmq-comment into …
winglechen May 27, 2026
6145444
comment: add attribute comments to inflightReviveRequestMap of PopRev…
winglechen May 27, 2026
f24cae0
comment: add in-line comments to method reviveRetry of PopReviveService
winglechen May 27, 2026
241951d
comment: add method comments to reviveRetry of PopReviveService
winglechen May 27, 2026
d51909c
comment: add class comments to AckMessageProcessor
winglechen May 27, 2026
7a35bb4
comment: add class comments to ChangeInvisibleTimeProcessor
winglechen May 27, 2026
3efe3ba
comment: add note about default mode in PopMessageProcessor
winglechen May 28, 2026
76495cf
comment: add in-line comments to method processRequest of AckMessageP…
winglechen May 28, 2026
0485831
comment: add method comments to processRequest of AckMessageProcessor
winglechen May 28, 2026
7927a88
comment: add in-line comments to method processRequest of ChangeInvis…
winglechen May 28, 2026
0d5ebc9
comment: add in-line comments to method processRequestAsync of Change…
winglechen May 28, 2026
7b762e0
comment: add in-line comments to method appendCheckPointThenAckOrigin…
winglechen May 28, 2026
328195d
comment: add in-line comments to method ackOrigin of ChangeInvisibleT…
winglechen May 28, 2026
aa04f66
comment: update inline comment for appendCheckPointThenAckOrigin
winglechen May 28, 2026
b72eb4e
comment: add method comments to processRequestAsync of ChangeInvisibl…
winglechen May 28, 2026
37c4ae2
comment: add method comments to appendCheckPointThenAckOrigin of Chan…
winglechen May 28, 2026
92cf992
comment: add method comments to ackOrigin of ChangeInvisibleTimeProce…
winglechen May 28, 2026
97d68c8
comment: update inline comment for ack mode in AckMessageProcessor
winglechen May 28, 2026
fe753cb
comment: add in-line comments to method appendAck of AckMessageProcessor
winglechen May 28, 2026
f92c4e8
comment: add method comments to appendAck of AckMessageProcessor
winglechen May 28, 2026
d5f4348
comment: add class comments to PopMessageProcessor
winglechen May 28, 2026
72698c1
comment: add method comments to processRequest of PopMessageProcessor
winglechen May 28, 2026
24d2877
comment: add inline comments for kv path in PopMessageProcessor
winglechen May 28, 2026
318426e
comment: update inline comment for long polling in PopMessageProcessor
winglechen May 28, 2026
ed4aa41
comment: add in-line comments to method popAsync of PopConsumerService
winglechen May 28, 2026
de70be6
comment: add method comments to popAsync of PopConsumerService
winglechen May 28, 2026
31b092b
comment: add method comments to getMessageFromTopicAsync of PopConsum…
winglechen May 28, 2026
353f71b
comment: add in-line comments to method getMessageFromTopicAsync of P…
winglechen May 28, 2026
d2dc764
comment: add method comments to getMessageAsync of PopConsumerService
winglechen May 28, 2026
7737ed7
comment: add in-line comments to method getPopOffset of PopConsumerSe…
winglechen May 29, 2026
487dc96
comment: add method comments to getPopOffset of PopConsumerService
winglechen May 29, 2026
84b5585
comment: add method comments to handleGetMessageResult of PopConsumer…
winglechen May 29, 2026
19c6149
Merge branch 'apache:develop' into comment
winglechen May 29, 2026
d6a82d6
comment: add class comments to PopConsumerCache
winglechen May 29, 2026
ba20e32
comment: add interface comments to PopConsumerKVStore
winglechen May 29, 2026
9b9c92a
comment: add attribute comments to attemptId of PopConsumerRecord
winglechen May 29, 2026
4003b67
comment: add attribute comments to suspend of PopConsumerRecord
winglechen May 29, 2026
f185ded
comment: add class and attribute comments to ConsumerRecords of PopCo…
winglechen May 30, 2026
9623e49
comment: add attribute comments to consumerRecordTable of PopConsumer…
winglechen May 30, 2026
85dd752
comment: add method comments to writeRecords of PopConsumerCache
winglechen May 30, 2026
d7078e1
comment: add cache structure to PopConsumerCache
winglechen May 30, 2026
60fd9fa
comment: add method comments to hold of AbstractRocksDBStorage
winglechen May 30, 2026
0534da3
comment: add in-line comments to method initOptions of PopConsumerRoc…
winglechen May 30, 2026
b7c9988
comment: add method and inline comments to initOptions of PopConsumer…
winglechen May 30, 2026
0852219
comment: add method comments to postLoad of PopConsumerRocksdbStore
winglechen May 30, 2026
8a7d4d3
comment: add method comments to writeRecords of PopConsumerRocksdbStore
winglechen May 30, 2026
7571b11
comment: add method comments to deleteRecords of PopConsumerRocksdbStore
winglechen May 30, 2026
cc63393
comment: add method comments to scanExpiredRecords of PopConsumerRock…
winglechen May 30, 2026
627c1f9
comment: add method comments to ackAsync of PopConsumerService
winglechen May 31, 2026
0bc9ab2
comment: add method comments to changeInvisibilityDuration of PopCons…
winglechen May 31, 2026
af6ec7b
comment: add in-line comments to attribute invisibleTime
winglechen May 31, 2026
2434070
comment: optimize getKeyBytes comment in PopConsumerRecord
winglechen Jun 1, 2026
68c2ec5
comment: update writeRecords comment to match getKeyBytes
winglechen Jun 1, 2026
8ecba02
comment: add attribute comments to invisibleTime of PopConsumerRecord
winglechen Jun 1, 2026
3fb081f
comment: add method comments to getMessageAsync overloads of PopConsu…
winglechen Jun 1, 2026
2b4d321
Merge branch 'apache:develop' into comment
winglechen Jun 1, 2026
057eb18
comment: add external caller info to methods of PopConsumerService
winglechen Jun 1, 2026
fb481d9
Merge branch 'comment' of github.com:wolforest/rocketmq-comment into …
winglechen Jun 1, 2026
099cc8d
Merge branch 'apache:develop' into comment
winglechen Jun 1, 2026
39eab9f
comment: add method comments to run of PopConsumerService
winglechen Jun 1, 2026
30932d0
comment: add method comments to revive overloads of PopConsumerService
winglechen Jun 1, 2026
ce2758d
comment: update revive maxCount comment
winglechen Jun 1, 2026
d0c1ea0
comment: add in-line method revive of PopConsumerService
winglechen Jun 1, 2026
dd18745
comment: add in-line method reviveRetry of PopConsumerService
winglechen Jun 1, 2026
e4bdb2b
comment: add scan time comments to method revive of PopConsumerService
winglechen Jun 1, 2026
397eb37
comment: add in-line method scanExpiredRecords of PopConsumerService
winglechen Jun 1, 2026
1bf22dc
comment: add class comments to PopConsumerRocksdbStore
winglechen Jun 1, 2026
80e6f98
comment: add attribute comments to batchDispatchRequestQueue of Defau…
winglechen Jun 1, 2026
5327c63
comment: add in-line comments to method asyncPutMessage of DefaultMes…
winglechen Jun 1, 2026
0294b85
comment: add method comments to asyncPutMessage of DefaultMessageStore
winglechen Jun 1, 2026
5cf43ac
Merge branch 'apache:develop' into comment
winglechen Jun 1, 2026
9eefc30
comment: add in-line comments to method asyncPutMessage of CommitLog
winglechen Jun 1, 2026
5a39553
comment: add method comments to asyncPutMessage of CommitLog
winglechen Jun 1, 2026
6a79a9b
comment: add method comments to handleDiskFlushAndHA of CommitLog
winglechen Jun 1, 2026
8d6c189
comment: add in-line comments to method getMessage of DefaultMessageS…
winglechen Jun 2, 2026
75995e3
comment: add method comments to getMessage of DefaultMessageStore
winglechen Jun 2, 2026
2ccd095
comment: add method comments to getMessage of CommitLog
winglechen Jun 2, 2026
57b3beb
comment: update cache-status comment in log
winglechen Jun 2, 2026
a637850
comment: add in-line comments to method findMappedFileByOffset Mapped…
winglechen Jun 2, 2026
63ed93b
comment: optimize findMappedFileByOffset comment in MappedFileQueue
winglechen Jun 2, 2026
366ce8f
comment: add method comments to handleAutoRenew of ReceiveMessageActi…
winglechen Jun 2, 2026
c16179c
comment: delete useless comments of PopMessageProcessor
winglechen Jun 2, 2026
791f374
comment: add class comments to DefaultReceiptHandleManager
winglechen Jun 2, 2026
a0769db
Merge branch 'apache:develop' into comment
winglechen Jun 2, 2026
62a3575
comment: add class comments to ReceiptHandleProcessor
winglechen Jun 2, 2026
d0230d1
comment: add constructor comments to ReceiptHandleProcessor
winglechen Jun 2, 2026
6112186
comment: add inline comments to constructor of ReceiptHandleProcessor
winglechen Jun 2, 2026
6fd2dbe
comment: add inline comments to constructor of DefaultReceiptHandleMa…
winglechen Jun 2, 2026
2540bb5
comment: add method comments to scheduleRenewTask of DefaultReceiptHa…
winglechen Jun 2, 2026
fe4271b
comment: add method comments to startRenewMessage of DefaultReceiptHa…
winglechen Jun 2, 2026
06cdb22
comment: add method comments to queryRoute of RouteActivity
winglechen Jun 5, 2026
359179e
Merge branch 'apache:develop' into comment
winglechen Jun 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@
import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC;
import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT;

/**
* Pop-mode long polling service that suspends Pop requests and wakes them up when new messages arrive.
* <p>
* Core responsibilities:
* <ul>
* <li>Suspend Pop requests — when the broker has no messages to return immediately, registers requests
* into the {@code pollingMap} keyed by {@code topic@cid@queueId} and waits</li>
* <li>Wake up on new message arrival — {@link #notifyMessageArriving} is triggered by the message arriving
* listener; it fetches matching Pop requests from the pollingMap, applies Tag filtering, and re-submits
* them to the PopMessageProcessor to return results to the client</li>
* <li>Timeout scanning — the background thread periodically scans the waiting queues and wakes up
* timed-out requests with an empty result</li>
* <li>Retry topic bridging — {@link #notifyMessageArrivingFromRetry} translates a new message on the retry
* topic into a wake-up notification on the original topic</li>
* <li>Resource cleanup — periodically removes stale polling entries for deleted topics or
* offline consumer groups</li>
* </ul>
*/
public class PopLongPollingService extends ServiceThread {

private static final Logger POP_LOGGER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,41 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* In-memory cache for un-acked Pop consumer records, used when
* {@code enablePopBufferMerge} is enabled in the KVStore path.
*
* <p>
* The cache structure is as follows: {
* groupId@topicId@queueId: {
* active: ConcurrentSkipListMap<offset, PopConsumerRecord>,
* removed: ConcurrentSkipListMap<offset, PopConsumerRecord>
* }
* }
* active(recordTreeMap): in-flight records
* removed(removedTreeMap): records to be removed
* </p>
*
* <p>Popped messages are stored here by
* {@link PopConsumerService#popAsync}. The background {@link #run()} thread
* periodically scans the cache and processes expired records:
* <ul>
* <li><b>Visibility timeout expired</b> — the record is passed to the
* {@code reviveConsumer} (which calls
* {@link PopConsumerService#revive}) to re-publish the message to
* the retry topic</li>
* <li><b>Consumer offline</b> (lock timeout) — all records for that
* {code groupId, topicId} pair are flushed to
* {@link PopConsumerKVStore} without revival</li>
* <li><b>Consumer acked</b> — the record is removed via
* {@link #deleteRecords} when a matching ack arrives</li>
* </ul>
*
* <p>Each {@code groupId@topicId@queueId} entry is backed by a
* {@link ConsumerRecords} instance containing two
* {@link ConcurrentSkipListMap}s — one for active records and one for
* records staged for removal.
*/
public class PopConsumerCache extends ServiceThread {

private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
Expand All @@ -46,6 +81,14 @@ public class PopConsumerCache extends ServiceThread {
private final Consumer<PopConsumerRecord> reviveConsumer;

private final AtomicInteger estimateCacheSize;
/**
* Maps {@code consumerGroupId@topicId@queueId} to the buffered records for that
* consumer-queue.
*
* <p>Used by {@link #writeRecords} to add popped messages,
* {@link #deleteRecords} to remove acked messages, and
* {@link #cleanupRecords} to process expired records.
*/
private final ConcurrentMap<String, ConsumerRecords> consumerRecordTable;

public PopConsumerCache(BrokerController brokerController, PopConsumerKVStore consumerRecordStore,
Expand Down Expand Up @@ -89,9 +132,20 @@ public long getPopInFlightMessageCount(String groupId, String topicId, int queue
return consumerRecords != null ? consumerRecords.getInFlightRecordCount() : 0L;
}

/**
* Write popped records into the cache.
*
* <p>Each record is inserted into the {@link ConsumerRecords} for its
* {@code groupId@topicId@queueId}. If no entry exists for that key, a
* new one is created. The cache size estimate is incremented.
*
* @param consumerRecordList the popped records to cache
*/
public void writeRecords(List<PopConsumerRecord> consumerRecordList) {
this.estimateCacheSize.addAndGet(consumerRecordList.size());
consumerRecordList.forEach(consumerRecord -> {
// consumerRecords is the recordMap in cache
// it contains two maps of PopConsumerRecord
ConsumerRecords consumerRecords = ConcurrentHashMapUtils.computeIfAbsent(consumerRecordTable,
this.getKey(consumerRecord), k -> new ConsumerRecords(brokerController.getBrokerConfig(),
consumerRecord.getGroupId(), consumerRecord.getTopicId(), consumerRecord.getQueueId()));
Expand Down Expand Up @@ -205,13 +259,46 @@ public void run() {
}
}

/**
* Records for one {@code consumerGroupId@topicId@queueId} in the Pop cache.
*
* <p>Uses two {@link ConcurrentSkipListMap}s to separate active and
* expiring records for safe two-phase cleanup:
* <ol>
* <li>{@link #stageExpiredRecords} moves timed-out records from
* {@link #recordTreeMap} to {@link #removeTreeMap}</li>
* <li>{@link PopConsumerCache#cleanupRecords} drains
* {@link #removeTreeMap} — true-expired records are revived,
* approaching-expired records are written to the KVStore</li>
* </ol>
*/
protected static class ConsumerRecords {

private final String groupId;
private final String topicId;
private final int queueId;
private final BrokerConfig brokerConfig;
/**
* Staged records awaiting cleanup (revival or KVStore write).
*
* <p>Populated by {@link #stageExpiredRecords} and drained by
* {@link PopConsumerCache#cleanupRecords}. Sorted by offset
* so that {@link #getMinOffset} can include these records in
* the minimum offset computation.
*/
private final ConcurrentSkipListMap<Long /* offset */, PopConsumerRecord> removeTreeMap;
/**
* Active (in-flight) records that have been popped but not yet
* acked by the consumer.
*
* <p>Records are added via {@link #write} when messages are popped,
* removed via {@link #delete} when an ack arrives, and moved to
* {@link #removeTreeMap} via {@link #stageExpiredRecords} when
* the visibility timeout or stay-buffer time expires.
*
* <p>Sorted by offset for efficient minimum-offset queries
* ({@link #getMinOffset}).
*/
private final ConcurrentSkipListMap<Long /* offset */, PopConsumerRecord> recordTreeMap;

public ConsumerRecords(BrokerConfig brokerConfig, String groupId, String topicId, int queueId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@

import java.util.List;

/**
* Persistent key-value store for un-acked Pop consumer records.
*
* <p>Used by the KVStore-based ack path ({@code popConsumerKVServiceEnable=true}).
* When a message is popped, a record is written here. When the consumer acks
* the message or the visibility timeout expires, the record is deleted or
* revived. The default implementation is {@code PopConsumerRocksdbStore}.
*
* <p>This interface supports three operations:
* <ul>
* <li>{@link #writeRecords} — persist popped records</li>
* <li>{@link #deleteRecords} — remove acked records</li>
* <li>{@link #scanExpiredRecords} — find records whose visibility timeout
* has elapsed for revival</li>
* </ul>
*/
public interface PopConsumerKVStore {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public int getCode() {
@JSONField(ordinal = 4)
private int retryFlag;

/**
* Message visibility timeout in milliseconds.
*
* <p>The visibility timeout ({@code popTime + invisibleTime}) determines when
* a popped-but-unacked message becomes eligible for revival. Set by the
* consumer (default 60s via {@code DefaultMQPushConsumer#setPopInvisibleTime}).
* Can be changed by proxy with config.
* Can be extended via {@code ChangeInvisibleTime}.
*/
@JSONField(ordinal = 5)
private long invisibleTime;

Expand All @@ -67,9 +76,31 @@ public int getCode() {
@JSONField(ordinal = 7)
private int attemptTimes;

/**
* Client-generated idempotency key for FIFO ordered consumption.
*
* <p>Possible values:
* <ul>
* <li>Client request — a unique id from {@code PopMessageRequestHeader#getAttemptId},
* used by {@code ConsumerOrderInfoManager} to block subsequent pops on the same
* queue until the current batch is acked</li>
* <li>{@code null} — for ack records ({@code PopConsumerService#ackAsync}) and
* change-invisibility records, where FIFO ordering is not applicable</li>
* <li>Copied from the original record — for revive retry records, the attemptId is
* inherited from the expired record</li>
* </ul>
*/
@JSONField(ordinal = 8)
private String attemptId;

/**
* Whether the consumer has suspended (nacked) this message.
*
* <p>When {@code true}, the reconsume count is <b>not</b> incremented on
* revive, so the message will not be prematurely sent to the DLQ due to
* repeated visibility timeout extensions. Set via
* {@code ChangeInvisibleTimeRequestHeader#isSuspend}.
*/
@JSONField(ordinal = 9)
private boolean suspend;

Expand Down Expand Up @@ -102,7 +133,19 @@ public long getVisibilityTimeout() {
}

/**
* Key: timestamp(8) + groupId + topicId + queueId + offset
* Build the RocksDB key for this record.
*
* <p>Format:
* <pre>
* visibilityTimeout(8B) + groupId + '@' + topicId + '@' + queueId(4B) + '@' + offset(8B)
* </pre>
*
* <p>The {@code visibilityTimeout} is placed first so that records are ordered
* by expiration time in RocksDB's SST files. This allows
* {@code PopConsumerRocksdbStore#scanExpiredRecords} to use a bounded iterator
* to scan only the relevant time window without a full table scan.
*
* <p>NACK(changeInvisibleTime) will create a new record, and the old one will be deleted.
*/
@JSONField(serialize = false)
public byte[] getKeyBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* RocksDB-backed implementation of {@link PopConsumerKVStore} for the
* KVStore-based Pop ack path.
*
* <p>Stores Pop consumer records in a dedicated {@code "popState"} column
* family. Each record is keyed by {@code visibilityTimeout|groupId@topicId@queueId@offset}
* so that {@link #scanExpiredRecords} can efficiently scan only expired
* records within a time window without a full table scan.
*
* <p>Write and delete operations use synchronous flush and WAL for
* durability — Pop visibility state is the sole source of truth in the
* KVStore path and must survive crashes.
*/
public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements PopConsumerKVStore {

private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
Expand All @@ -55,31 +68,71 @@ public PopConsumerRocksdbStore(String filePath, long blockCacheSize, long writeB
this.writeBufferSize = writeBufferSize;
}

// https://www.cnblogs.com/renjc/p/rocksdb-class-db.html
// https://github.com/johnzeng/rocksdb-doc-cn/blob/master/doc/RocksDB-Tuning-Guide.md
/**
* Configure RocksDB options for Pop consumer record storage.
*
* <p>Unlike the parent class defaults, write and delete options enable
* WAL and synchronous flush — Pop visibility state is the sole source
* of truth and must survive crashes. Compaction is configured to be
* aggressive so that expired-then-deleted records are purged promptly,
* reclaiming disk space.
*
* @see <a href="https://www.cnblogs.com/renjc/p/rocksdb-class-db.html">rocksdb-class-db</a>
* @see <a href="https://github.com/johnzeng/rocksdb-doc-cn/blob/master/doc/RocksDB-Tuning-Guide.md">RocksDB-Tuning-Guide</a>
*/
protected void initOptions() {
// durability-first: enable WAL and sync flush for pop state recovery
this.options = RocksDBOptionsFactory.createDBOptions();

this.writeOptions = new WriteOptions();
// fsync every write to disk
this.writeOptions.setSync(true);
// enable WAL
this.writeOptions.setDisableWAL(false);
// allow writing throttling under pressure
this.writeOptions.setNoSlowdown(false);

// delete must be durable too — otherwise ack can be lost and message revived incorrectly
this.deleteOptions = new WriteOptions();
this.deleteOptions.setSync(true);
this.deleteOptions.setDisableWAL(false);
this.deleteOptions.setNoSlowdown(false);

// aggressive compaction to purge expired pop records and reclaim space
this.compactRangeOptions = new CompactRangeOptions();
// force compact bottom level
this.compactRangeOptions.setBottommostLevelCompaction(
CompactRangeOptions.BottommostLevelCompaction.kForce);
// allow compaction to pause writes
this.compactRangeOptions.setAllowWriteStall(true);
// manual compaction runs in parallel with auto-compaction.
// Appropriate here because expired Pop records generate tombstones continuously,
// and cleanup should not starve RocksDB's normal background work
this.compactRangeOptions.setExclusiveManualCompaction(false);
// Allows compaction to move data across levels
this.compactRangeOptions.setChangeLevel(true);
// -1 delegates level selection to RocksDB's internal heuristics
this.compactRangeOptions.setTargetLevel(-1);
// Splits the compaction work into at most 4 parallel sub-tasks
this.compactRangeOptions.setMaxSubcompactions(4);
}

/**
* Initialise the RocksDB instance with a dedicated column family for Pop state.
*
* <p>Two column families are created:
* <ol>
* <li>{@code default} — unused, required by RocksDB</li>
* <li>{@code "popState"} — stores Pop consumer records keyed by
* {@code visibilityTimeout|groupId@topicId@queueId@offset}</li>
* </ol>
*
* <p>Called by {@link AbstractRocksDBStorage#start()} before the storage
* is marked as loaded. Returns {@code false} if any step fails, preventing
* all subsequent read/write operations via {@link #hold()}.
*
* @return {@code true} if the database was opened successfully
*/
@Override
protected boolean postLoad() {
try {
Expand Down Expand Up @@ -111,6 +164,16 @@ public String getFilePath() {
return this.dbPath;
}

/**
* Batch-write consumer records to RocksDB via a single {@link WriteBatch}.
* Key: (popTime + invisibleTime) + groupId + topicId + queueId + offset
* value: PopConsumerRecord.toJsonBytes
*
* <p>Each record is serialized with its visibility-timeout-prefixed key
* so that {@link #scanExpiredRecords} can efficiently scan by time range.
*
* @param consumerRecordList the records to persist
*/
@Override
public void writeRecords(List<PopConsumerRecord> consumerRecordList) {
if (!consumerRecordList.isEmpty()) {
Expand All @@ -125,6 +188,14 @@ public void writeRecords(List<PopConsumerRecord> consumerRecordList) {
}
}

/**
* Batch-delete consumer records from RocksDB via a single {@link WriteBatch}.
*
* <p>Deletion uses the same durability guarantees as writes ({@code sync=true},
* WAL enabled)
*
* @param consumerRecordList the records to remove
*/
@Override
public void deleteRecords(List<PopConsumerRecord> consumerRecordList) {
if (!consumerRecordList.isEmpty()) {
Expand All @@ -139,8 +210,19 @@ public void deleteRecords(List<PopConsumerRecord> consumerRecordList) {
}
}

/**
* Scan and return expired consumer records within a visibility-timeout range.
*
* <p>Because each record's key is prefixed with {@code visibilityTimeout},
* this method uses a RocksDB iterator bounded by {@code [lower, upper)} to
* efficiently scan only the relevant time window without a full table scan.
*
* @param lower inclusive lower bound of the visibility timeout (ms)
* @param upper exclusive upper bound of the visibility timeout (ms)
* @param maxCount maximum number of records to return
* @return up to {@code maxCount} expired records, or an empty list
*/
@Override
// https://github.com/facebook/rocksdb/issues/10300
public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, int maxCount) {
// In RocksDB, we can use SstPartitionerFixedPrefixFactory in cfOptions
// and new ColumnFamilyOptions().useFixedLengthPrefixExtractor() to
Expand All @@ -153,6 +235,7 @@ public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, int ma
RocksIterator iterator = db.newIterator(this.columnFamilyHandle, scanOptions)) {
iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lower).array());
while (iterator.isValid() && consumerRecordList.size() < maxCount) {
// decode json bytes to PopConsumerRecord
consumerRecordList.add(PopConsumerRecord.decode(iterator.value()));
iterator.next();
}
Expand Down
Loading