diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index c595178d193..134887f9ad4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -52,6 +52,24 @@ import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC; import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT; +/** + * Pop-mode long polling service that suspends Pop requests and wakes them up when new messages arrive. + *
+ * Core responsibilities: + *
+ * The cache structure is as follows: {
+ * groupId@topicId@queueId: {
+ * active: ConcurrentSkipListMap
Popped messages are stored here by + * {@link PopConsumerService#popAsync}. The background {@link #run()} thread + * periodically scans the cache and processes expired records: + *
Each {@code groupId@topicId@queueId} entry is backed by a
+ * {@link ConsumerRecords} instance containing two
+ * {@link ConcurrentSkipListMap}s — one for active records and one for
+ * records staged for removal.
+ */
public class PopConsumerCache extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -46,6 +81,14 @@ public class PopConsumerCache extends ServiceThread {
private final Consumer Used by {@link #writeRecords} to add popped messages,
+ * {@link #deleteRecords} to remove acked messages, and
+ * {@link #cleanupRecords} to process expired records.
+ */
private final ConcurrentMap Each record is inserted into the {@link ConsumerRecords} for its
+ * {@code groupId@topicId@queueId}. If no entry exists for that key, a
+ * new one is created. The cache size estimate is incremented.
+ *
+ * @param consumerRecordList the popped records to cache
+ */
public void writeRecords(List Uses two {@link ConcurrentSkipListMap}s to separate active and
+ * expiring records for safe two-phase cleanup:
+ * Populated by {@link #stageExpiredRecords} and drained by
+ * {@link PopConsumerCache#cleanupRecords}. Sorted by offset
+ * so that {@link #getMinOffset} can include these records in
+ * the minimum offset computation.
+ */
private final ConcurrentSkipListMap Records are added via {@link #write} when messages are popped,
+ * removed via {@link #delete} when an ack arrives, and moved to
+ * {@link #removeTreeMap} via {@link #stageExpiredRecords} when
+ * the visibility timeout or stay-buffer time expires.
+ *
+ * Sorted by offset for efficient minimum-offset queries
+ * ({@link #getMinOffset}).
+ */
private final ConcurrentSkipListMap Used by the KVStore-based ack path ({@code popConsumerKVServiceEnable=true}).
+ * When a message is popped, a record is written here. When the consumer acks
+ * the message or the visibility timeout expires, the record is deleted or
+ * revived. The default implementation is {@code PopConsumerRocksdbStore}.
+ *
+ * This interface supports three operations:
+ * The visibility timeout ({@code popTime + invisibleTime}) determines when
+ * a popped-but-unacked message becomes eligible for revival. Set by the
+ * consumer (default 60s via {@code DefaultMQPushConsumer#setPopInvisibleTime}).
+ * Can be changed by proxy with config.
+ * Can be extended via {@code ChangeInvisibleTime}.
+ */
@JSONField(ordinal = 5)
private long invisibleTime;
@@ -67,9 +76,31 @@ public int getCode() {
@JSONField(ordinal = 7)
private int attemptTimes;
+ /**
+ * Client-generated idempotency key for FIFO ordered consumption.
+ *
+ * Possible values:
+ * When {@code true}, the reconsume count is not incremented on
+ * revive, so the message will not be prematurely sent to the DLQ due to
+ * repeated visibility timeout extensions. Set via
+ * {@code ChangeInvisibleTimeRequestHeader#isSuspend}.
+ */
@JSONField(ordinal = 9)
private boolean suspend;
@@ -102,7 +133,19 @@ public long getVisibilityTimeout() {
}
/**
- * Key: timestamp(8) + groupId + topicId + queueId + offset
+ * Build the RocksDB key for this record.
+ *
+ * Format:
+ * The {@code visibilityTimeout} is placed first so that records are ordered
+ * by expiration time in RocksDB's SST files. This allows
+ * {@code PopConsumerRocksdbStore#scanExpiredRecords} to use a bounded iterator
+ * to scan only the relevant time window without a full table scan.
+ *
+ * NACK(changeInvisibleTime) will create a new record, and the old one will be deleted.
*/
@JSONField(serialize = false)
public byte[] getKeyBytes() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
index dc68f9d9fe5..fcd5826853e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
@@ -38,6 +38,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * RocksDB-backed implementation of {@link PopConsumerKVStore} for the
+ * KVStore-based Pop ack path.
+ *
+ * Stores Pop consumer records in a dedicated {@code "popState"} column
+ * family. Each record is keyed by {@code visibilityTimeout|groupId@topicId@queueId@offset}
+ * so that {@link #scanExpiredRecords} can efficiently scan only expired
+ * records within a time window without a full table scan.
+ *
+ * Write and delete operations use synchronous flush and WAL for
+ * durability — Pop visibility state is the sole source of truth in the
+ * KVStore path and must survive crashes.
+ */
public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements PopConsumerKVStore {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -55,31 +68,71 @@ public PopConsumerRocksdbStore(String filePath, long blockCacheSize, long writeB
this.writeBufferSize = writeBufferSize;
}
- // https://www.cnblogs.com/renjc/p/rocksdb-class-db.html
- // https://github.com/johnzeng/rocksdb-doc-cn/blob/master/doc/RocksDB-Tuning-Guide.md
+ /**
+ * Configure RocksDB options for Pop consumer record storage.
+ *
+ * Unlike the parent class defaults, write and delete options enable
+ * WAL and synchronous flush — Pop visibility state is the sole source
+ * of truth and must survive crashes. Compaction is configured to be
+ * aggressive so that expired-then-deleted records are purged promptly,
+ * reclaiming disk space.
+ *
+ * @see rocksdb-class-db
+ * @see RocksDB-Tuning-Guide
+ */
protected void initOptions() {
+ // durability-first: enable WAL and sync flush for pop state recovery
this.options = RocksDBOptionsFactory.createDBOptions();
this.writeOptions = new WriteOptions();
+ // fsync every write to disk
this.writeOptions.setSync(true);
+ // enable WAL
this.writeOptions.setDisableWAL(false);
+ // allow writing throttling under pressure
this.writeOptions.setNoSlowdown(false);
+ // delete must be durable too — otherwise ack can be lost and message revived incorrectly
this.deleteOptions = new WriteOptions();
this.deleteOptions.setSync(true);
this.deleteOptions.setDisableWAL(false);
this.deleteOptions.setNoSlowdown(false);
+ // aggressive compaction to purge expired pop records and reclaim space
this.compactRangeOptions = new CompactRangeOptions();
+ // force compact bottom level
this.compactRangeOptions.setBottommostLevelCompaction(
CompactRangeOptions.BottommostLevelCompaction.kForce);
+ // allow compaction to pause writes
this.compactRangeOptions.setAllowWriteStall(true);
+ // manual compaction runs in parallel with auto-compaction.
+ // Appropriate here because expired Pop records generate tombstones continuously,
+ // and cleanup should not starve RocksDB's normal background work
this.compactRangeOptions.setExclusiveManualCompaction(false);
+ // Allows compaction to move data across levels
this.compactRangeOptions.setChangeLevel(true);
+ // -1 delegates level selection to RocksDB's internal heuristics
this.compactRangeOptions.setTargetLevel(-1);
+ // Splits the compaction work into at most 4 parallel sub-tasks
this.compactRangeOptions.setMaxSubcompactions(4);
}
+ /**
+ * Initialise the RocksDB instance with a dedicated column family for Pop state.
+ *
+ * Two column families are created:
+ * Called by {@link AbstractRocksDBStorage#start()} before the storage
+ * is marked as loaded. Returns {@code false} if any step fails, preventing
+ * all subsequent read/write operations via {@link #hold()}.
+ *
+ * @return {@code true} if the database was opened successfully
+ */
@Override
protected boolean postLoad() {
try {
@@ -111,6 +164,16 @@ public String getFilePath() {
return this.dbPath;
}
+ /**
+ * Batch-write consumer records to RocksDB via a single {@link WriteBatch}.
+ * Key: (popTime + invisibleTime) + groupId + topicId + queueId + offset
+ * value: PopConsumerRecord.toJsonBytes
+ *
+ * Each record is serialized with its visibility-timeout-prefixed key
+ * so that {@link #scanExpiredRecords} can efficiently scan by time range.
+ *
+ * @param consumerRecordList the records to persist
+ */
@Override
public void writeRecords(List Deletion uses the same durability guarantees as writes ({@code sync=true},
+ * WAL enabled)
+ *
+ * @param consumerRecordList the records to remove
+ */
@Override
public void deleteRecords(List Because each record's key is prefixed with {@code visibilityTimeout},
+ * this method uses a RocksDB iterator bounded by {@code [lower, upper)} to
+ * efficiently scan only the relevant time window without a full table scan.
+ *
+ * @param lower inclusive lower bound of the visibility timeout (ms)
+ * @param upper exclusive upper bound of the visibility timeout (ms)
+ * @param maxCount maximum number of records to return
+ * @return up to {@code maxCount} expired records, or an empty list
+ */
@Override
- // https://github.com/facebook/rocksdb/issues/10300
public List If messages were found:
+ * The consumer offset is then committed:
+ * For FIFO consumers, the offset is read from the regular consumer offset.
+ * For non-FIFO consumers, a separate pull offset is used (compatibility with
+ * pull consumer switchover).
+ *
+ * If no offset is stored (first pop), it is initialized via
+ * {@code PopMessageProcessor#getInitOffset} based on {@code initMode}
+ * (beginning or end of the queue).
+ *
+ * If a reset offset exists (offset reset command issued), the cache is
+ * cleared, FIFO lock unlock, and the reset offset takes effect
+ * immediately.
+ *
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param initMode consume init mode (min/max)
+ * @param fifo whether this is a FIFO ordered consumption
+ * @return the consume offset to start popping from
+ */
public long getPopOffset(String groupId, String topicId, int queueId, int initMode, boolean fifo) {
// For FIFO messages, the pull offset is not used.
@@ -213,6 +267,7 @@ public long getPopOffset(String groupId, String topicId, int queueId, int initMo
this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, queueId) :
this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
+ // init offset
if (offset < 0L) {
try {
offset = this.brokerController.getPopMessageProcessor()
@@ -223,6 +278,8 @@ public long getPopOffset(String groupId, String topicId, int queueId, int initMo
throw new RuntimeException(e);
}
}
+
+ // get reset offset
Long resetOffset =
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId);
if (resetOffset != null) {
@@ -231,9 +288,29 @@ public long getPopOffset(String groupId, String topicId, int queueId, int initMo
this.brokerController.getConsumerOffsetManager()
.commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset);
}
+
return resetOffset != null ? resetOffset : offset;
}
+ /**
+ * Fetch messages from the store with automatic offset correction.
+ * No external callers, except unit tests.
+ *
+ * If the stored offset is behind the actual consume queue offset
+ * ({@code OFFSET_TOO_SMALL}, {@code OFFSET_OVERFLOW_BADLY},
+ * {@code OFFSET_FOUND_NULL}), the offset is corrected and a retry is
+ * issued with the corrected offset. This prevents duplicate messages
+ * when the Pop buffer offset has not yet been committed.
+ *
+ * @param clientHost the client address
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param offset the consume offset to start from
+ * @param batchSize max number of messages
+ * @param filter message filter
+ * @return a future completing with the fetch result
+ */
public CompletableFuture Chained via {@link CompletableFuture#thenCompose} from
+ * {@link #getMessageFromTopicAsync}. When the batch is already full
+ * ({@code remain <= 0}), the pending count is added to the context and
+ * the chain stops. Otherwise, messages are fetched from the store and
+ * the result is merged into the context via {@link #handleGetMessageResult}.
+ *
+ * Early termination can occur inside this method when:
+ * Each queue is visited once. For each queue the
+ * {@link #getMessageAsync(CompletableFuture, String, String, String, int, int, MessageFilter, PopConsumerRecord.RetryType)}
+ * method is chained via {@link CompletableFuture#thenCompose}. The chain carries
+ * the accumulated result through all queues, stopping early when the batch is
+ * filled, the queue is blocked, or the inflight threshold is reached.
+ *
+ * Queue iteration order respects {@code priorityOrderAsc} and uses
+ * {@code requestCount} as a round-robin offset for load balancing.
+ *
+ * @param future the accumulator future
+ * @param clientHost the client address
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param requestCount round-robin counter for queue selection
+ * @param batchSize max number of messages to return
+ * @param filter message filter expression
+ * @param retryType whether this is a retry topic V1/V2
+ * @return a future completing with the pop result context
+ */
protected CompletableFuture This method coordinates the full Pop lifecycle:
+ * The deletion is a two-step fallback:
+ * memo: Notify polling request when receive orderly ack
+ *
+ * @param popTime the original pop time of the message
+ * @param invisibleTime the original visibility timeout
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param offset the acked offset
+ * @return a future that completes with {@code true} on success
+ */
public CompletableFuture refer: ChangeInvisibleTimeProcessor.appendCheckPointThenAckOrigin
+ * This is the KVStore equivalent of {@code ChangeInvisibleTimeProcessor#appendCheckPointThenAckOrigin}.
+ *
+ * A new record with the updated timeout is written to the KVStore, and the
+ * old record (identified by the original {@code popTime + invisibleTime}) is
+ * deleted from the cache and KVStore.
+ *
+ * If the new and old records have the same visibility timeout (e.g. the
+ * consumer extended by the same duration it already had), the delete one is
+ * skipped because the write one already overwrites the old record in RocksDB.
+ *
+ * @param popTime the original pop time
+ * @param invisibleTime the original visibility timeout
+ * @param changedPopTime the new pop time (typically current time)
+ * @param changedInvisibleTime the new visibility timeout
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param offset the message offset
+ * @param suspend whether to suspend (nack without incrementing reconsume count)
+ */
public void changeInvisibilityDuration(long popTime, long invisibleTime, long changedPopTime,
long changedInvisibleTime, String groupId, String topicId,
int queueId, long offset, boolean suspend) {
@@ -511,6 +722,7 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
// No need to generate new records when the group does not exist,
// because these retry messages will not be consumed by anyone.
+ // default value of popReviveSkipIfGroupAbsent is true
boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() &&
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId);
@@ -528,19 +740,41 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
}
// If the new CK has the same key as the old CK (same visibilityTimeout),
- // the write already overwrites the old record in RocksDB, skip delete
+ // the write one already overwrites the old record in RocksDB, skip delete
// to avoid removing the newly written record.
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
}
}
+ /**
+ * Read the original message from storage for revival.
+ * No external callers, except unit tests.
+ *
+ * Used by {@link #revive(PopConsumerRecord)} when a visibility timeout
+ * expires. Delegates to {@link org.apache.rocketmq.broker.EscapeBridge}
+ * which can read from either the local store or a remote broker's store.
+ *
+ * @param consumerRecord the expired record
+ * @return a triple of (message, info, needRetry)
+ */
// Use broker escape bridge to support remote read
public CompletableFuture Skips the record if the consumer group no longer exists.
+ * Otherwise, reads the original message,
+ * and re-publishes it via {@link #reviveRetry}.
+ *
+ * @param record the expired record to revive
+ * @return a future completing with {@code true} on success
+ */
public CompletableFuture This is the core revival loop called by {@link #run()}:
+ * Each iteration:
+ * Handles both single ({@link RequestCode#ACK_MESSAGE}) and batch
+ * ({@link RequestCode#BATCH_ACK_MESSAGE}) acks. Each ack is processed
+ * through one of two paths:
+ * Orderly ack is handled separately by {@link #ackOrderly} /
+ * {@link #ackOrderlyNew}, which update the consumer order info and advance
+ * the consumer offset while notifying any long-polling waiters.
+ *
+ * This class also owns and manages the {@link PopReviveService} instances
+ * for the file-based revive path.
+ */
public class AckMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -121,13 +143,36 @@ public boolean rejectRequest() {
return false;
}
+ /**
+ * Process an ack request (single or batch).
+ *
+ * Routes to one of two paths based on {@code popConsumerKVServiceEnable}:
+ * Orderly acks ({@code rqId == POP_ORDER_REVIVE_QUEUE}) are handled by
+ * {@link #ackOrderly} / {@link #ackOrderlyNew} instead.
+ *
+ * @param channel the Netty channel of the requesting client
+ * @param request the incoming request
+ * @param brokerAllowSuspend whether the broker may suspend the request
+ * @return the response to send back to the client
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
+ // init context params
AckMessageRequestHeader requestHeader;
BatchAckMessageRequestBody reqBody = null;
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
response.setOpaque(request.getOpaque());
+
if (request.getCode() == RequestCode.ACK_MESSAGE) {
+ // decode and validate request
requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -167,12 +212,15 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setRemark(errorInfo);
return response;
}
+
+ // append ack, default mode is file based merge, call appendAck
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
appendAckNew(requestHeader, null, response, channel, null);
} else {
appendAck(requestHeader, null, response, channel, null);
}
} else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) {
+ // decode and validate request
if (request.getBody() != null) {
reqBody = BatchAckMessageRequestBody.decode(request.getBody(), BatchAckMessageRequestBody.class);
}
@@ -180,7 +228,10 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setCode(ResponseCode.NO_MESSAGE);
return response;
}
+
+ // process each ack
for (BatchAck bAck : reqBody.getAcks()) {
+ // default value of popConsumerKVServiceEnable is false
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
appendAckNew(null, bAck, response, channel, reqBody.getBrokerName());
} else {
@@ -188,6 +239,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
}
}
} else {
+ // unsupported request, logging and return
POP_LOGGER.error("AckMessageProcessor failed to process RequestCode: {}, consumer: {} ", request.getCode(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(String.format("AckMessageProcessor failed to process RequestCode: %d", request.getCode()));
@@ -196,8 +248,31 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return response;
}
+ /**
+ * Append an ack (single or batch) in the file-based path.
+ *
+ * For single ack: parses the extra info from the request header,
+ * routes orderly acks to {@link #ackOrderly}, or creates a single {@link AckMsg}.
+ *
+ * For batch ack: expands the {@link BitSet} from the
+ * {@link BatchAck} into individual offsets, routes orderly acks individually,
+ * and packs the remaining offsets into a {@link BatchAckMsg}.
+ *
+ * The ack is first offered to {@link PopBufferMergeService#addAk}.
+ * If the buffer merge is not available, the ack is serialized as JSON and
+ * written to the revive topic with tag {@link PopAckConstants#ACK_TAG}
+ * or {@link PopAckConstants#BATCH_ACK_TAG}.
+ *
+ * @param requestHeader the single-ack request header (null for batch)
+ * @param batchAck the batch ack body (null for single)
+ * @param response the response to modify on error
+ * @param channel the Netty channel
+ * @param brokerName the broker name
+ * @throws RemotingCommandException if offset validation fails
+ */
private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck,
final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException {
+ // init context params
String[] extraInfo;
String consumeGroup, topic;
int qId, rqId;
@@ -205,8 +280,11 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
long popTime, invisibleTime;
AckMsg ackMsg;
int ackCount = 0;
+
+ // ack orderly or set context params
if (batchAck == null) {
// single ack
+ // set context params
extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
consumeGroup = requestHeader.getConsumerGroup();
@@ -218,15 +296,18 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
popTime = ExtraInfoUtil.getPopTime(extraInfo);
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
+ // ack orderly if revive queue
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
return;
}
+ // set ackMsg and ackCount
ackMsg = new AckMsg();
ackCount = 1;
} else {
// batch ack
+ // set context params
consumeGroup = batchAck.getConsumerGroup();
topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
qId = batchAck.getQueueId();
@@ -236,6 +317,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
popTime = batchAck.getPopTime();
invisibleTime = batchAck.getInvisibleTime();
+ // offset check
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId);
long maxOffset;
try {
@@ -248,6 +330,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
return;
}
+ // ack orderly or add offset to batchAckMsg
BatchAckMsg batchAckMsg = new BatchAckMsg();
BitSet bitSet = batchAck.getBitSet();
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
@@ -264,10 +347,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
batchAckMsg.getAckOffsetList().add(offset);
}
}
+
+ // skip if empty or is revive queue
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}
+ // set ackMsg and ackCount
ackMsg = batchAckMsg;
ackCount = batchAckMsg.getAckOffsetList().size();
}
@@ -275,6 +361,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, topic, ackCount);
+ // set ackMsg
ackMsg.setConsumerGroup(consumeGroup);
ackMsg.setTopic(topic);
ackMsg.setQueueId(qId);
@@ -283,11 +370,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
ackMsg.setPopTime(popTime);
ackMsg.setBrokerName(brokerName);
+ // add ackMsg
if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
return;
}
+ // create revive message by ackMsg, if add ackMsg failed
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(StandardCharsets.UTF_8));
@@ -305,7 +394,9 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
msgInner.setDeliverTimeMs(popTime + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+
+ // store revive message
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default is false
int finalAckCount = ackCount;
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 5ff132ca237..5de0c3eac78 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -27,6 +27,7 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.MemoryConsumerOrderInfoManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
@@ -52,6 +53,23 @@
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+/**
+ * Processes the nack {@code ChangeInvisibleTime} request from consumers.
+ *
+ * When a consumer needs more time to process a message (or wants to
+ * suspend/nack it), this processor updates the message's visibility
+ * timeout. The implementation varies by the ack mode:
+ * For orderly consumption, the next visible time is updated directly in
+ * the {@link ConsumerOrderInfoManager} without writing to the revive topic.
+ */
public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
@@ -76,8 +94,12 @@ public boolean rejectRequest() {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
+ // process request async
CompletableFuture Routes to the appropriate handler based on message type:
+ * Called after the new checkpoint has been written successfully. This method
+ * writes an {@link PopAckConstants#ACK_TAG} message that matches the
+ * original checkpoint's merge key. When {@link PopReviveService} processes this
+ * ack, it sets the corresponding bit in the old CK's bitMap, causing
+ * the old CK to be treated as fully acked and skipped during revive.
+ *
+ * If {@link PopBufferMergeService#addAk} accepts the ack (buffer
+ * merge enabled), it is merged in memory without writing to the store.
+ *
+ * @param requestHeader the original request header
+ * @param extraInfo the extra info from the original pop request
+ * @return a future that completes with {@code true} on success
+ */
private CompletableFuture This is the core of the file-based non-orderly ChangeInvisibleTime path:
+ * buffer checkpoint in memory then enqueue them into system revive queue then wait to be acked.
+ *
+ * Two in-memory data structures drive the merge logic:
+ * The background {@link #scan()} thread periodically evaluates each buffered CK:
+ * This service is enabled by {@code enablePopBufferMerge} and only runs on
+ * a master or a slave acting as master. When {@code enablePopBatchAck} is set,
+ * multiple ack offsets are packed into a single {@link BatchAckMsg}.
+ */
public class PopBufferMergeService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+ /**
+ * In-memory map of check points.
+ * Key: topic + group + queueId + startOffset + popTime + brokerName
+ * Value: check point wrapper
+ * use cases:
+ * - scan: iterate buffer
+ * - addAckMsg: get check point from buffer and mark ack state of Check Point
+ */
ConcurrentHashMap For each {@code topic@cid@queueId} queue, the method peeks the head (oldest)
+ * wrapper and checks whether it is ready to commit:
+ * If the head is ready, it is committed and removed. Processing continues
+ * to the next wrapper in the same queue. If the head is not ready, the loop
+ * breaks — this ensures strict FIFO order and prevents consumer offset
+ * regression.
+ *
+ * Called at the end of {@link #scan()} after the buffer has been processed.
+ *
+ * @return the total number of remaining wrappers across all queues (for logging)
+ */
private int scanCommitOffset() {
Iterator Three types of entries are removed:
+ * For each entry in {@link #buffer}:
+ * After processing the buffer, calls {@link #scanCommitOffset()} to commit offsets
+ * for finished checkpoints in FIFO order.
+ *
+ * If the scan duration exceeds {@code popCkStayBufferTimeOut - 1000ms}, the service
+ * temporarily stops accepting new CKs ({@link #serving} = false) to avoid backlog.
+ */
private void scan() {
long startTime = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
@@ -244,7 +352,6 @@ private void scan() {
continue;
}
-
// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
@@ -259,6 +366,7 @@ private void scan() {
PopCheckPoint point = pointWrapper.getCk();
long now = System.currentTimeMillis();
+ // check whether check point is timeout
boolean removeCk = !this.serving;
// ck will be timeout
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
@@ -275,17 +383,18 @@ private void scan() {
}
// double check
- if (isCkDone(pointWrapper)) {
+ if (isCkDone(pointWrapper)) { // all checkpoint are acked, do nothing
continue;
- } else if (pointWrapper.isJustOffset()) {
+ } else if (pointWrapper.isJustOffset()) { // store checkpoint
// just offset should be in store.
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
}
continue;
- } else if (removeCk) {
+ } else if (removeCk) { // store checkpoint if needed
// put buffer ak to store
+ // revive queue offset < 0 means checkpoint was not stored
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
@@ -295,11 +404,13 @@ private void scan() {
continue;
}
- if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
+ // store checkpoint
+ if (brokerController.getBrokerConfig().isEnablePopBatchAck()) { // default is false
List Uses a CAS (compare-and-swap) loop to ensure thread safety without locking.
+ * If the bit is already set, this method returns immediately (no-op).
+ *
+ * @param setBits the atomic bitmask to update
+ * @param index the bit position (0-based)
+ */
private void markBitCAS(AtomicInteger setBits, int index) {
while (true) {
int bits = setBits.get();
@@ -384,6 +508,22 @@ private void markBitCAS(AtomicInteger setBits, int index) {
}
}
+ /**
+ * Commit the consumer offset for the checkpoint's {@code topic@cid@queueId}.
+ *
+ * Called from {@link #scanCommitOffset()} after the checkpoint is confirmed
+ * as finished (all acks received or CK stored). The offset is advanced to
+ * {@link PopCheckPointWrapper#nextBeginOffset}, which is the offset of the
+ * first message after this batch.
+ *
+ * The operation is guarded by {@link PopMessageProcessor.QueueLockManager}
+ * to prevent concurrent offset updates on the same queue.
+ *
+ * @param wrapper the finished checkpoint wrapper
+ * @return {@code true} if the offset was committed or no commit is needed
+ * ({@code nextBeginOffset < 0}); {@code false} if the lock could
+ * not be acquired (caller should retry later)
+ */
private boolean commitOffset(final PopCheckPointWrapper wrapper) {
if (wrapper.getNextBeginOffset() < 0) {
return true;
@@ -413,8 +553,25 @@ private boolean commitOffset(final PopCheckPointWrapper wrapper) {
return true;
}
+ /**
+ * Enqueue the checkpoint wrapper into the per-{@code topic@cid@queueId} offset queue
+ * for sequential offset committing.
+ *
+ * The queue is maintained in FIFO order. The {@link #scanCommitOffset()} method
+ * drains the queue from the head, ensuring that offsets are committed in the same
+ * order as the checkpoints were created, which prevents consumer offset regression.
+ *
+ * The {@link QueueWithTime#time} is also updated to the CK's pop time so that
+ * {@link #scanGarbage()} can identify and remove stale entries after 5 minutes of
+ * inactivity.
+ *
+ * @param pointWrapper the checkpoint wrapper to enqueue
+ * @return true if the element was added to the queue successfully
+ */
private boolean putOffsetQueue(PopCheckPointWrapper pointWrapper) {
QueueWithTime The ack is not written to the revive topic immediately. Instead, a flag is
+ * set in {@link PopCheckPointWrapper#bits} via {@link #markBitCAS}.
+ * The pending ack will later be flushed to storage by {@link #scan()} when the
+ * checkpoint is evicted (timeout / buffer full / service stopping).
+ *
+ * Rejection conditions (return false):
+ * Every sub-message has a corresponding bit in
+ * {@link PopCheckPointWrapper#bits}. This method returns {@code true} when
+ * all bits are set, meaning the CK can be removed from the buffer without
+ * writing any ack to the revive topic (clean completion).
+ *
+ * @param pointWrapper the checkpoint wrapper to check
+ * @return {@code true} if every sub-message has been acked
+ */
private boolean isCkDone(PopCheckPointWrapper pointWrapper) {
byte num = pointWrapper.getCk().getNum();
for (byte i = 0; i < num; i++) {
@@ -807,6 +1053,18 @@ private boolean isCkDone(PopCheckPointWrapper pointWrapper) {
return true;
}
+ /**
+ * Check whether all acked sub-messages have been fully persisted.
+ *
+ * Uses XOR: {@code bits ^ toStoreBits}. A bit is set in the result when
+ * the corresponding sub-message has been acked ({@code bits}) but not yet
+ * persisted ({@code toStoreBits}). Returns {@code true} only when every
+ * acked message has also been persisted, meaning the checkpoint is ready
+ * for final cleanup.
+ *
+ * @param pointWrapper the checkpoint wrapper to check
+ * @return {@code true} if no ack remains to be persisted
+ */
private boolean isCkDoneForFinish(PopCheckPointWrapper pointWrapper) {
byte num = pointWrapper.getCk().getNum();
int bits = pointWrapper.getBits().get() ^ pointWrapper.getToStoreBits().get();
@@ -842,17 +1100,46 @@ public LinkedBlockingDeque Three-state indicator:
+ * When {@code true}:
+ * This is the core processor for the Pop consumption mode. It handles:
+ * This class also owns the {@link PopLongPollingService},
+ * {@link PopBufferMergeService}, and {@link QueueLockManager} instances
+ * used by the file-based ack path.
+ */
public class PopMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -222,13 +241,36 @@ public void notifyMessageArriving(final String topic, final int queueId, final S
topic, queueId, cid, false, null, 0L, null, null);
}
+ /**
+ * Process a PopMessage request.
+ *
+ * This method handles the full Pop lifecycle:
+ * Queues are visited sequentially (respecting {@code priorityOrderAsc}).
+ * For each queue a {@link #popMsgFromQueue} call is chained via
+ * {@code CompletableFuture#thenCompose}. The chained future carries the
+ * remaining number of messages still needed ({@code restNum}).
+ *
+ * Early termination can occur inside {@link #popMsgFromQueue} when:
+ * This method is called as a step in a {@link CompletableFuture} chain
+ * (see {@link #popMsgFromTopic}). The {@code restNum} argument is the
+ * number of messages still needed — when it drops to {@code 0} or below,
+ * subsequent calls in the chain may short-circuit early.
+ *
+ * The method has several early-termination paths (all return
+ * immediately with the current {@code restNum}):
+ * Otherwise, it asynchronously fetches messages from the store, handles
+ * offset correction, updates order-consume tracking / checkpoint data, and
+ * merges the results into {@code getMessageResult}.
+ *
+ * @param topic topic name
+ * @param attemptId attempt id for idempotent consumption
+ * @param isRetry whether this is a retry topic
+ * @param getMessageResult accumulator for messages popped so far
+ * @param requestHeader pop request parameters
+ * @param queueId target queue id
+ * @param restNum number of messages still needed before the batch
+ * size is satisfied
+ * @param reviveQid revive queue id for checkpoint
+ * @param channel netty channel of the requesting client
+ * @param popTime pop invocation timestamp
+ * @param messageFilter expression filter applied to each message
+ * @param startOffsetInfo buffer for offset tracing info
+ * @param msgOffsetInfo buffer for per-message offset tracing info
+ * @param orderCountInfo buffer for order-consume count info
+ * @return a future completing with the remaining number of messages needed
+ */
private CompletableFuture There is only one public method for business: run Each revive queue has its own dedicated {@code PopReviveService} instance.
+ * The service periodically:
+ * This is the file-based revive path (CK + Ack messages are stored in
+ * the system revive topic). It is complemented by the KVStore-based path in
+ * {@code PopConsumerService} which handles the {@code PopConsumerKVStore} flow.
+ */
public class PopReviveService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 };
@@ -74,6 +93,25 @@ public class PopReviveService extends ServiceThread {
private long currentReviveMessageTimestamp = -1;
private volatile boolean shouldRunPopRevive = false;
+ /**
+ * Tracks checkpoints that are currently being revived.
+ *
+ * Key — the checkpoint being processed.
+ * Value — a pair of (startTime, completed), where:
+ * The map is sorted by {@link PopCheckPoint#compareTo} (by startOffset).
+ * This ordering is used to drain completed entries from the head, ensuring
+ * the revive topic offset is committed strictly in sequence.
+ *
+ * Concurrency is limited to at most 3 entries at a time (see
+ * {@link #mergeAndRevive}). If an entry stays incomplete for over 30
+ * seconds, it is considered hung and is skipped via {@link #rePutCK}.
+ */
private final NavigableMap Constructs a new {@link MessageExtBrokerInner} from the original
+ * message, increments the reconsume count (unless suspended), sets the
+ * first-pop time and origin group properties, and writes it to the
+ * appropriate retry topic (V1 or V2 depending on configuration).
+ *
+ * If the retry topic does not exist, it is created automatically
+ * via {@link #addRetryTopicIfNotExist}.
+ *
+ * @param popCheckPoint the checkpoint that triggered the revive
+ * @param messageExt the original message to re-publish
+ * @return {@code true} if the message was written successfully
+ */
private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) {
+ // convert checkpoint to inner message
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId(), brokerController.getBrokerConfig().isEnableRetryTopicV2()));
@@ -133,9 +187,15 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
}
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+
+ // set topic and queueId
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt));
+
+ // store message
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+
+ // logging and metric
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
@@ -205,6 +265,17 @@ private int getRetryQueueId(String retryTopic, MessageExt messageExt) {
return oriQueueId;
}
+ /**
+ * Pull a batch of messages from the revive topic at the given offset.
+ *
+ * If the offset becomes illegal (e.g. the revive topic was truncated),
+ * the revive offset is corrected to {@code nextBeginOffset - 1} so that
+ * the next scan starts from a valid position.
+ *
+ * @param offset the queue offset to start reading from
+ * @param queueId the revive queue id
+ * @return a list of decoded messages, or {@code null} if at the tail
+ */
protected List This method reads messages from the revive topic starting from the
+ * current offset. Each message is classified by its tag:
+ * AckMsg that arrive after their checkpoint has already been processed
+ * ({@code enableSkipLongAwaitingAck}) are handled by creating a mock CK
+ * via {@link #mockCkForAck} so that the revive offset can still be
+ * committed correctly.
+ *
+ * The scan stops when any of:
+ * When an ack arrives long after its CK has been consumed (e.g. network
+ * delay), the CK is no longer in the scan map. If {@code enableSkipLongAwaitingAck}
+ * is enabled, this method creates a synthetic CK so that the revive offset
+ * can still be advanced correctly in {@link #mergeAndRevive}.
+ *
+ * @param messageExt the revive topic message that carried the ack
+ * @param ackMsg the decoded ack
+ * @param mergeKey the merge key for the CK lookup
+ * @param mockPointMap map to collect the mock CKs
+ * @return {@code true} if a mock CK was created
+ */
private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeKey, HashMap The mock CK has {@code num = 0} and empty bitMap, meaning no actual
+ * messages to revive. Its only purpose is to carry the {@code reviveOffset}
+ * so that the revive consumer offset can be committed past this ack.
+ *
+ * @param ackMsg the ack message
+ * @param reviveOffset the queue offset of the ack message in the revive topic
+ * @return a mock checkpoint with no sub-messages
+ */
private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
PopCheckPoint point = new PopCheckPoint();
point.setStartOffset(ackMsg.getStartOffset());
@@ -496,7 +634,26 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
return point;
}
+ /**
+ * Process collected checkpoints and revive all un-acked sub-messages.
+ *
+ * Checkpoints are sorted by revive offset. For each one:
+ * After processing, the revive topic offset is advanced past all
+ * processed checkpoints.
+ *
+ * @param consumeReviveObj the container with collected CKs and scan state
+ * @throws Throwable if any revive operation fails
+ */
protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
+ // sort checkpoints and init newOffset
ArrayList For each sub-message whose bit is not set in the bitMap, the original
+ * message is fetched via {@link #getBizMessage} and re-published to the
+ * retry topic via {@link #reviveRetry}. All revive attempts run
+ * concurrently via {@link CompletableFuture#allOf}.
+ *
+ * After all attempts complete:
+ * When a sub-message cannot be revived (e.g. the original message is
+ * temporarily unavailable), the CK is re-published with:
+ * If {@code rePutTimes} exceeds the backoff table length and
+ * {@code skipWhenCKRePutReachMaxTimes} is set, the CK is dropped.
+ *
+ * @param oldCK the original checkpoint that failed to revive
+ * @param pair the failed offset and result (object1 = offset, object2 = result)
+ */
private void rePutCK(PopCheckPoint oldCK, Pair Each iteration:
+ * Called before every read/write operation. Returns {@code true} if the
+ * database is fully loaded, the handle is non-null, and the instance has not
+ * been closed (e.g. due to a scheduled reload). Subclasses may override
+ * {@link #release()} to pair with this call (e.g. for reference counting).
+ *
+ * @return {@code true} if the database is ready for operations
+ */
public boolean hold() {
if (!this.loaded || this.db == null || this.closed) {
LOGGER.error("hold rocksdb Failed. {}", this.dbPath);
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
index cc96770b22a..474179d52f8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
@@ -19,15 +19,33 @@
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+/**
+ * Bit-level utility methods, primarily used by Pop-mode ack tracking.
+ *
+ * An {@code int} bitmask is used to track the ack state of up to 32 sub-messages
+ * within a single Pop checkpoint (see {@code PopCheckPoint}).
+ */
public class DataConverter {
public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ /**
+ * Convert a {@code long} to an 8-byte array (big-endian).
+ */
public static byte[] Long2Byte(Long v) {
ByteBuffer tmp = ByteBuffer.allocate(8);
tmp.putLong(v);
return tmp.array();
}
+ /**
+ * Set or clear the bit at {@code index} in an int bitmask.
+ * Uses {@code 1L} (long literal) to avoid signed-int overflow when {@code index == 31}.
+ *
+ * @param value the original bitmask
+ * @param index the bit position (0-based, 0..31)
+ * @param flag {@code true} to set, {@code false} to clear
+ * @return the updated bitmask
+ */
public static int setBit(int value, int index, boolean flag) {
if (flag) {
return (int) (value | (1L << index));
@@ -36,6 +54,13 @@ public static int setBit(int value, int index, boolean flag) {
}
}
+ /**
+ * Test whether the bit at {@code index} is set in an int bitmask.
+ *
+ * @param value the bitmask
+ * @param index the bit position (0-based, 0..31)
+ * @return {@code true} if the bit is 1
+ */
public static boolean getBit(int value, int index) {
return (value & (1L << index)) != 0;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 1b38a19ae6a..94e4f4ffc8b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -55,6 +55,14 @@
public class ProxyStartup {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ /**
+ * proxy components container, manager components with method start/shutdown/...
+ * - gRPC thread pool executor
+ * - message processor (wrap broker controller)
+ * - grpc server
+ * - remoting protocol server
+ * - ...
+ */
private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown();
private static class ProxyStartAndShutdown extends AbstractStartAndShutdown {
@@ -73,8 +81,10 @@ public static void main(String[] args) {
// init thread pool monitor for proxy.
initThreadPoolMonitor();
+ // init business thread pool for grpc server
ThreadPoolExecutor executor = createServerExecutor();
+ // create message processor, wrap broker controller in local mode
MessagingProcessor messagingProcessor = createMessagingProcessor();
// tls cert update
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 5a1a5859305..7def5314972 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -262,7 +262,7 @@ public class ProxyConfig implements ConfigFile {
private String remotingAccessAddr = "";
private int remotingListenPort = 8080;
- // related to proxy's send strategy in cluster mode.
+ // related to proxy's sending strategy in cluster mode.
private boolean sendLatencyEnable = false;
private boolean startDetectorEnable = false;
private int detectTimeout = 200;
@@ -270,9 +270,38 @@ public class ProxyConfig implements ConfigFile {
private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. send message(and send message v2)
+ * 2. send batch message
+ * 3. consume send message back
+ * 4. end transaction
+ * 5. recall message
+ */
private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. pull message
+ * 2. lite pull message
+ * 3. pop message
+ */
private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. update consumer offset
+ * 2. ack message
+ * 3. change message invisible time
+ * 4. get consumer connection list
+ */
private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. unregister client
+ * 2. check client config
+ * 3. get consumer list by group
+ * 4. get min/max offset, query consume offset, search offset by timestamp
+ * 5. lock/unlock batch mq
+ */
private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER;
private int remotingHeartbeatThreadPoolQueueCapacity = 50000;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 3429ad54e27..12508d32108 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -73,6 +73,15 @@
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+/**
+ * RocketMQ gRPC protocol implementation
+ *
+ * When auto-renew is enabled ({@code enableProxyAutoRenew}), the proxy
+ * periodically extends the invisible time of delivered but unacked messages
+ * so that they are not revived while the consumer is still processing them.
+ *
+ * This method extracts the {@code PROPERTY_POP_CK} from each popped
+ * message, wraps it into a {@link MessageReceiptHandle}, and registers it
+ * via {@link MessagingProcessor#addReceiptHandle}. The returned
+ * {@link Runnable} is executed after the response has been written to the
+ * client stream.
+ *
+ * @param ctx the proxy context
+ * @param request the original receive-message request
+ * @param group consumer group
+ * @param topic topic name
+ * @param popResult the pop result returned from the broker
+ * @param writer the response stream writer
+ * @return a runnable to execute after the response write, or {@code null}
+ * if no messages were found
+ */
private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request,
String group, String topic, PopResult popResult, ReceiveMessageResponseStreamWriter writer
) {
+ // check result status
if (!PopStatus.FOUND.equals(popResult.getPopStatus())) {
return null;
}
+ // get socket channel
GrpcClientChannel clientChannel = grpcChannelManager.getChannel(ctx.getClientID());
if (clientChannel == null) {
GrpcProxyException e = new GrpcProxyException(Code.MESSAGE_NOT_FOUND,
@@ -207,6 +246,7 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request
writer.processThrowableWhenWriteMessage(e, ctx, request, messageExt));
throw e;
}
+
return () -> {
List Owns a {@link DefaultReceiptHandleManager} and wires its
+ * {@link RenewEvent} listener to {@link MessagingProcessor#changeInvisibleTime}.
+ * When a receipt handle is about to expire, the manager fires a {@code RENEW}
+ * event which this processor translates into a
+ * {@code ChangeInvisibleTime} call.
+ *
+ * When the renewal limit is reached, a {@code STOP_RENEW} event fires
+ * which nacks the message via {@code changeInvisibleTime} with the group's
+ * retry policy delay.
+ */
public class ReceiptHandleProcessor extends AbstractProcessor {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected DefaultReceiptHandleManager receiptHandleManager;
+ /**
+ * Wire the receipt handle manager to the messaging processor.
+ *
+ * pass StateEventListener to DefaultReceiptHandleManager
+ * so that when DefaultReceiptHandleManager find the message is expired,
+ * call StateEventListener to change the invisible time of the message.
+ *
+ * Creates an event listener that translates all {@link RenewEvent}
+ * types ({@code RENEW}, {@code STOP_RENEW}, {@code CLEAR_GROUP}) into
+ * {@link MessagingProcessor#changeInvisibleTime} calls, which update
+ * the message's visibility timeout on the broker.
+ *
+ * @param messagingProcessor the core messaging processor
+ * @param serviceManager the service manager providing metadata and consumer services
+ */
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
super(messagingProcessor, serviceManager);
+
+ // create event listener
StateEventListener When auto-renew is enabled, popped messages are registered here with their
+ * {@code PROPERTY_POP_CK} data. A periodic {@link #scheduledExecutorService} scans
+ * all registered handles and extends the invisible time for messages that are
+ * about to expire. When the total renewal duration exceeds
+ * {@code renewMaxTimeMillis}, the message is nack'd and returned to the broker.
+ *
+ * Handles are grouped by {@link ReceiptHandleGroupKey} (channel + consumer group)
+ * and cleaned up automatically when a gRPC client disconnects.
+ */
public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implements ReceiptHandleManager {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MetadataService metadataService;
@@ -77,6 +89,8 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
this.consumerManager = consumerManager;
this.eventListener = eventListener;
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+
+ // by default, minThreadNum is 2, maxThreadNum is 4
this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
proxyConfig.getRenewThreadPoolNums(),
proxyConfig.getRenewMaxThreadPoolNums(),
@@ -84,6 +98,8 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
"RenewalWorkerThread",
proxyConfig.getRenewThreadPoolQueueCapacity()
);
+
+ // by default, minThreadNum is 2, maxThreadNum is 4
this.returnHandleGroupWorkerService = ThreadPoolMonitor.createAndMonitor(
proxyConfig.getReturnHandleGroupThreadPoolNums(),
proxyConfig.getReturnHandleGroupThreadPoolNums() * 2,
@@ -91,6 +107,8 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
"ReturnHandleGroupWorkerThread",
proxyConfig.getRenewThreadPoolQueueCapacity()
);
+
+ // clear receipt by group when consumer unregister
consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
@@ -115,11 +133,15 @@ public void shutdown() {
}
});
+
this.receiptHandleGroupMap = new ConcurrentHashMap<>();
this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
+
+ // add periodic scan task
this.appendStartAndShutdown(new StartAndShutdown() {
@Override
public void start() throws Exception {
+ // by default, interval is 5000ms
scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
}
@@ -154,6 +176,23 @@ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
}
+ /**
+ * Periodic scan of all receipt handle groups, called by the
+ * {@link #scheduledExecutorService} at a fixed interval.
+ *
+ * For each group:
+ * The scan runs synchronously in the scheduler thread; the actual
+ * renewal work is dispatched asynchronously to the worker pool.
+ */
protected void scheduleRenewTask() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
@@ -191,15 +230,39 @@ protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey key, Rec
}
}
+ /**
+ * Renew a single message's visibility timeout, or stop if the renewal
+ * limit has been reached.
+ *
+ * Decision logic:
+ * The method:
+ * Disk flush and HA replication run in parallel via
+ * {@link CompletableFuture#thenCombine}. If either fails, the combined
+ * result is updated with the failure status — both must succeed for
+ * the overall result to be {@code PUT_OK}.
+ *
+ * @param putMessageResult the append result to update
+ * @param messageExt the original message (needed by flush)
+ * @param needAckNums number of slave acks required (0/1 = no HA)
+ * @param needHandleHA whether HA replication is configured
+ * @return a future completing with the merged result
+ */
private CompletableFuture The difference between getData is:
+ * getMessage add process: setInCache
+ *
+ * Finds the mapped file containing the offset and selects a buffer
+ * for the given size. The returned buffer includes cache-status metadata
+ * for cold-data flow control.
+ *
+ * @param offset physical offset in the commit log
+ * @param size number of bytes to read
+ * @return the mapped buffer, or {@code null} if the file is unavailable
+ */
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index aee767dae2f..9bfbfca6961 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -211,6 +211,13 @@ public class DefaultMessageStore implements MessageStore {
private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+ /**
+ * BatchDispatchRequest queue
+ * offer by ConcurrentReputMessageService.createBatchDispatchRequest()
+ * poll by MainBatchDispatchRequestService.pollBatchDispatchRequest()
+ *
+ * if enableBuildConsumeQueueConcurrently is false, It is useless
+ */
private final ConcurrentLinkedQueue Before writing, any registered {@link PutMessageHook} instances are
+ * invoked — a non-null result from a hook short-circuits the process.
+ * Inner-batch message flags are validated
+ * then the actual write is delegated to {@link CommitLog#asyncPutMessage}.
+ *
+ * @param msg the message to write
+ * @return a future that completes with the put result
+ */
@Override
public CompletableFuture The method:
+ * The lookup strategy:
+ * When a batch of messages is popped, the queue offsets of the messages may not
+ * be contiguous (e.g. batch messages, ConsumeQueue compaction, filter mismatch gaps).
+ * This list records {@code actualQueueOffset - startOffset} for each message in the
+ * batch, so that the system can correctly map an ack offset back to its index within
+ * the checkpoint via {@link #indexOfAck}, and reconstruct the original offset via
+ * {@link #ackOffsetByIndex}.
+ *
+ * When this field is null or empty (old-version CK), offsets are assumed to be
+ * {@code startOffset + index}.
+ */
@JSONField(name = "d")
private List The index is used to look up the corresponding bit in the {@link #bitMap}
+ * (or in {@code PopCheckPointWrapper.bits}) and to retrieve the original
+ * queue offset via {@link #ackOffsetByIndex}.
+ *
+ * @param ackOffset the queue offset being acked
+ * @return the sub-message index (0-based), or -1 if the offset is not found
+ * in this checkpoint
+ */
public int indexOfAck(long ackOffset) {
if (ackOffset < startOffset) {
return -1;
}
- // old version of checkpoint
+ // old version of checkpoint, this will not happen in 5.*
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
if (ackOffset - startOffset < num) {
@@ -184,8 +233,16 @@ public int indexOfAck(long ackOffset) {
return queueOffsetDiff.indexOf((int) (ackOffset - startOffset));
}
+ /**
+ * get original queue offset by index.
+ * the method name is miss-leading, it should be getQueueOffsetByIndex.
+ * queueOffset = startOffset + queueOffsetDiff[index]
+ *
+ * @param index sub-message index within this checkpoint (0-based)
+ * @return the original queue offset in the consume queue
+ */
public long ackOffsetByIndex(byte index) {
- // old version of checkpoint
+ // old version of checkpoint, this will not happen in 5.*
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
return startOffset + index;
}
+ *
+ */
protected static class ConsumerRecords {
private final String groupId;
private final String topicId;
private final int queueId;
private final BrokerConfig brokerConfig;
+ /**
+ * Staged records awaiting cleanup (revival or KVStore write).
+ *
+ *
+ *
+ */
public interface PopConsumerKVStore {
/**
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index d10b584ef69..73c85311614 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -58,6 +58,15 @@ public int getCode() {
@JSONField(ordinal = 4)
private int retryFlag;
+ /**
+ * Message visibility timeout in milliseconds.
+ *
+ *
+ *
+ */
@JSONField(ordinal = 8)
private String attemptId;
+ /**
+ * Whether the consumer has suspended (nacked) this message.
+ *
+ *
+ * visibilityTimeout(8B) + groupId + '@' + topicId + '@' + queueId(4B) + '@' + offset(8B)
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param context the pop context to update
+ * @param result the result from the message store
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param retryType whether this is a retry topic V1/V2
+ * @param offset the original consume offset used for this fetch
+ * @return the updated pop context
+ */
public PopConsumerContext handleGetMessageResult(PopConsumerContext context, GetMessageResult result,
String topicId, int queueId, PopConsumerRecord.RetryType retryType, long offset) {
@@ -205,6 +236,29 @@ public PopConsumerContext handleGetMessageResult(PopConsumerContext context, Get
return context;
}
+ /**
+ * Retrieve the starting consume offset for a pop request.
+ * should be private, no external callers.
+ *
+ *
+ *
+ *
+ * @param future the accumulator future carrying the pop context
+ * @param clientHost the client address
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param batchSize max number of messages still needed
+ * @param filter message filter
+ * @param retryType whether this is a retry topic V1/V2
+ * @return a future completing with the pop context updated with results
+ */
protected CompletableFuture
+ *
+ *
+ * @param clientHost the client address
+ * @param popTime the pop invocation timestamp
+ * @param invisibleTime the message visibility timeout
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id (-1 for all queues)
+ * @param batchSize max number of messages to return
+ * @param fifo whether this is a FIFO ordered consumption
+ * @param attemptId attempt id for idempotent consumption
+ * @param initMode consume init mode (min/max)
+ * @param filter message filter expression
+ * @return a future that completes with the pop result context
+ */
public CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ * @param currentTime tracks the last scanned visibility timeout (for incremental progress)
+ * @param maxCount maximum number of records to process per batch(load from config: 16 * 1024)
+ * @return the number of consumed (revived) records
+ */
public long revive(AtomicLong currentTime, int maxCount) {
Stopwatch stopwatch = Stopwatch.createStarted();
long upperTime = System.currentTimeMillis() - 50L;
+
+ // scan expired records between [currentTime-3s, now-50ms)]
List
+ *
+ */
@Override
public void run() {
this.consumerRunning.set(true);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 34a790efca7..7e044c38db3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -26,6 +26,7 @@
import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
@@ -54,6 +55,27 @@
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
+/**
+ * Processes consumer ack messages in Pop consumption mode.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param channel the Netty channel
+ * @param request the incoming request
+ * @param brokerAllowSuspend whether the broker may suspend
+ * @return a future that completes with the response
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
public CompletableFuture
+ *
+ *
+ * @param requestHeader the original request header
+ * @param reviveQid the revive queue to write to
+ * @param queueId the original queue id
+ * @param offset the message offset being extended
+ * @param popTime the new pop time (current time)
+ * @param extraInfo the extra info from the original pop request
+ * @return a future that completes with {@code true} on success
+ */
private CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ */
private void scanGarbage() {
Iterator
+ *
+ *
+ *
+ *
+ *
+ * @param reviveQid revive queue id (used only for logging)
+ * @param ackMsg the ack message from the consumer
+ * @return true if the ack was merged successfully
+ */
public boolean addAk(int reviveQid, AckMsg ackMsg) {
+ // validate env
if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
return false;
}
if (!serving) {
return false;
}
+
try {
+ // get and validate checkpoint
PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());
if (pointWrapper == null) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -568,7 +768,8 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
return false;
}
- if (ackMsg instanceof BatchAckMsg) {
+ // merge ackMsg with checkpoint
+ if (ackMsg instanceof BatchAckMsg) { // merge batch ackMsg
for (Long ackOffset : ((BatchAckMsg) ackMsg).getAckOffsetList()) {
int indexOfAck = point.indexOfAck(ackOffset);
if (indexOfAck > -1) {
@@ -577,7 +778,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
}
}
- } else {
+ } else { // merge ackMsg
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
markBitCAS(pointWrapper.getBits(), indexOfAck);
@@ -587,6 +788,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
}
}
+ // logging
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);
}
@@ -608,6 +810,12 @@ public void clearOffsetQueue(String lockKey) {
this.commitOffsets.remove(lockKey);
}
+ /**
+ * write message(checkpoint) to revive topic, then update pointWrapper related info.
+ *
+ * @param pointWrapper checkpoint
+ * @param runInCurrent async or sync
+ */
private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean runInCurrent) {
if (pointWrapper.getReviveQueueOffset() >= 0) {
return;
@@ -617,6 +825,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean
// Indicates that ck message is storing
pointWrapper.setReviveQueueOffset(Long.MAX_VALUE);
+ // default value of isAppendCkAsync is false
if (brokerController.getBrokerConfig().isAppendCkAsync() && runInCurrent) {
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handleCkMessagePutResult(putMessageResult, pointWrapper);
@@ -655,7 +864,21 @@ private void handleCkMessagePutResult(PutMessageResult putMessageResult, final P
}
}
+ /**
+ * Persist message which created by checkpoint to the revive topic.
+ *
+ *
+ *
+ *
+ * @param pointWrapper the checkpoint wrapper containing the original CK
+ * @param msgIndex the sub-message index within the CK batch to ack
+ * @param count atomic counter incremented on successful persistence
+ */
private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgIndex, AtomicInteger count) {
+ // build ackMsg and Message by checkpoint
PopCheckPoint point = pointWrapper.getCk();
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
final AckMsg ackMsg = new AckMsg();
@@ -679,7 +902,8 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+ // store message then change store status of the checkpoint
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default value is false
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex);
}).exceptionally(throwable -> {
@@ -687,11 +911,22 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde
return null;
});
} else {
+ // store message
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ // change store status of the checkpoint
handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex);
}
}
+ /**
+ * update store status of checkpoint if revive message stored successfully.
+ *
+ * @param ackMsg the ack message that was persisted
+ * @param putMessageResult the result returned by the store
+ * @param pointWrapper the checkpoint wrapper being processed
+ * @param count atomic counter incremented on success
+ * @param msgIndex the sub-message index that was persisted
+ */
private void handleAckPutMessageResult(AckMsg ackMsg, PutMessageResult putMessageResult,
PopCheckPointWrapper pointWrapper, AtomicInteger count, byte msgIndex) {
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
@@ -797,6 +1032,17 @@ private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
return true;
}
+ /**
+ * Check whether all sub-messages in the checkpoint have been acked.
+ *
+ *
+ *
+ */
private volatile long reviveQueueOffset;
private final PopCheckPoint ck;
- // bit for concurrent
+ // store ack states of messages, one byte for each message
private final AtomicInteger bits;
- // bit for stored buffer ak
+ // bits for stored buffer ak, one byte for each message
private final AtomicInteger toStoreBits;
+ // nextOffset of original topic
private final long nextBeginOffset;
+ // topic@group@queueId
private final String lockKey;
+ // topic + group + queueId + startOffset + popTime + brokerName
private final String mergeKey;
+ /**
+ * Whether this checkpoint should be written to the revive topic directly.
+ *
+ *
+ *
+ *
+ * @see PopBufferMergeService#addCkJustOffset
+ * @see PopBufferMergeService#addCkMock
+ */
private final boolean justOffset;
+ // whether check point has stored in revive queue
private volatile boolean ckStored = false;
public PopCheckPointWrapper(int reviveQueueId, long reviveQueueOffset, PopCheckPoint point,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index c32e1b5ae23..81c6aeb1411 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -33,6 +33,7 @@
import org.apache.rocketmq.broker.longpolling.PopRequest;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.pop.PopConsumerContext;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
@@ -99,6 +100,24 @@
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
+/**
+ * Processes PopMessage requests from consumers.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param ctx the Netty channel handler context
+ * @param request the incoming PopMessage request
+ * @return the response, or {@code null} if the response is sent asynchronously
+ * (zero-copy path or long-polling suspension)
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
+ // init request and response
final long beginTimeMills = this.brokerController.getMessageStore().now();
Channel channel = ctx.channel();
+
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
@@ -240,6 +282,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
}
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
+ // validation
// Pop mode only supports consumption in cluster load balancing mode
brokerController.getConsumerManager().compensateBasicConsumerInfo(
requestHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
@@ -319,6 +362,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
return response;
}
+ // init filter
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
SubscriptionData subscriptionData = null;
ExpressionMessageFilter messageFilter = null;
@@ -382,6 +426,9 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
ExpressionMessageFilter finalMessageFilter = messageFilter;
SubscriptionData finalSubscriptionData = subscriptionData;
+ // There are two type of ack mode:
+ // 1. ack by KV service
+ // 2. ack by file merge service, default mode
if (brokerConfig.isPopConsumerKVServiceEnable()) {
CompletableFuture
+ *
+ *
+ * @param topicConfig topic configuration; {@code null} skips all queues
+ * @param isRetry whether the topic is a retry topic
+ * @param getMessageResult accumulator for the messages popped so far
+ * @param requestHeader pop request parameters
+ * @param reviveQid revive queue id
+ * @param channel netty channel of the requesting client
+ * @param popTime pop timestamp
+ * @param messageFilter expression filter applied to each message
+ * @param startOffsetInfo buffer for offset tracing info
+ * @param msgOffsetInfo buffer for per-message offset tracing info
+ * @param orderCountInfo buffer for order-consume count info
+ * @param randomQ random queue offset for round-robin load balancing
+ * @param getMessageFuture future that carries the remaining message count
+ * @return a future completing with the remaining number of messages needed
+ */
private CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param consumeReviveObj the mutable container that receives the collected
+ * CKs and the computed {@code endTime}
+ */
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
+ // init context parameters
HashMap
+ *
+ *
+ *
+ *
+ *
+ * @param popCheckPoint the checkpoint whose un-acked messages should be revived
+ */
private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
+ // env check and init
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
List
+ *
+ *
+ *
+ *
+ */
@Override
public void run() {
int slow = 1;
while (!this.isStopped()) {
try {
+ // env check
if (System.currentTimeMillis() < brokerController.getShouldStartTime()) {
POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getShouldStartTime());
this.waitForRunning(1000);
@@ -676,6 +903,8 @@ public void run() {
}
POP_LOGGER.info("start revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+
+ // consume revive message
ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();
consumeReviveMessage(consumeReviveObj);
@@ -684,8 +913,10 @@ public void run() {
continue;
}
+ // merge checkpoint and ackMsg then revive
mergeAndRevive(consumeReviveObj);
+ // wait and logging
ArrayList
+ *
+ */
public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -168,6 +177,16 @@ protected Status convertExceptionToStatus(Throwable t) {
return ResponseBuilder.getInstance().buildStatus(t);
}
+ /**
+ * submit grpc task to related thread pool.
+ *
+ * @param executor thread pool
+ * @param context context
+ * @param request grpc request
+ * @param runnable process task
+ * @param responseObserver grpc response observer
+ * @param statusResponseCreator error response creator
+ */
protected
+ *
+ */
@Override
public StreamObserver> sendMessage(ProxyContext ctx, QueueSelector queueSelector,
String producerGroup, int sysFlag, List
> future = new CompletableFuture<>();
@@ -96,6 +104,7 @@ public CompletableFuture
> sendMessage(ProxyContext ctx, QueueSe
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
AddressableMessageQueue finalMessageQueue = messageQueue;
+ // call SendMessageProcessor of broker
future = this.serviceManager.getMessageService().sendMessage(
ctx,
messageQueue,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index bc3730aed9a..555b78f1906 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -29,17 +29,50 @@
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
+/**
+ * Bridges receipt handle renewal events to the messaging processor.
+ *
+ *
> sendMessage(
ProxyContext ctx,
AddressableMessageQueue messageQueue,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index f9dfd825337..9f168a3128b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -60,6 +60,18 @@
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+/**
+ * Manages receipt handles for gRPC proxy auto-renewal of message visibility timeouts.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param context the proxy context
+ * @param key the receipt handle group key
+ * @param messageReceiptHandle the handle to renew
+ * @return a future completing with the updated handle (or {@code null} if
+ * renewal is stopped)
+ */
protected CompletableFuture
+ *
+ *
+ * @param msg the message to write
+ * @return a future that completes with the append result
+ */
public CompletableFuture
+ *
+ *
+ * @param group consumer group
+ * @param topic topic name
+ * @param queueId queue id
+ * @param offset starting offset in the consume queue
+ * @param maxMsgNums maximum number of messages to return
+ * @param maxTotalMsgSize maximum total message body size
+ * @param messageFilter message filter (maybe null)
+ * @return the pull result with status, messages, and next offset
+ */
@Override
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize, final MessageFilter messageFilter) {
@@ -876,6 +925,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
return null;
}
+ // try to get from compaction store
Optionalfalse).
+ *
+ *
+ *
+ * @param offset physical offset to find
+ * @param returnFirstOnNotFound if {@code true}, returns the first mapped
+ * file when the offset is outside the range
+ * @return the mapped file, or {@code null} if not found and
+ * {@code returnFirstOnNotFound} is {@code false}
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
+ // offset is not in range of [firstOffset, lastOffset]
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
@@ -705,6 +718,7 @@ public MappedFile findMappedFileByOffset(final long offset, final boolean return
this.mappedFileSize,
this.mappedFiles.size());
} else {
+ // get file by index
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
@@ -717,6 +731,7 @@ public MappedFile findMappedFileByOffset(final long offset, final boolean return
return targetFile;
}
+ // iterate to find file
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 5c38cfe92a9..b96dfd98882 100644
--- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -19,6 +19,12 @@
import java.nio.ByteBuffer;
import org.apache.rocketmq.store.logfile.MappedFile;
+/**
+ * result while select mapped file
+ * - mapped file
+ * - offset and size
+ * - whether it is in memory
+ */
public class SelectMappedBufferResult {
private final long startOffset;
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index 803ebc68957..e4ed5c085e8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -21,24 +21,62 @@
import java.util.ArrayList;
import java.util.List;
+/**
+ * state check info for multi-messages pop from consume queue
+ */
public class PopCheckPoint implements Comparable