Skip to content

Commit 8de6fcc

Browse files
committed
Merge remote-tracking branch 'upstream/develop' into ISSUE-9959
2 parents 37bb714 + 7fc5452 commit 8de6fcc

27 files changed

Lines changed: 1222 additions & 206 deletions

File tree

NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Apache RocketMQ
2-
Copyright 2016-2025 The Apache Software Foundation
2+
Copyright 2016-2026 The Apache Software Foundation
33

44
This product includes software developed at
55
The Apache Software Foundation (http://www.apache.org/).

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,15 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime,
501501
PopConsumerRecord ackRecord = new PopConsumerRecord(
502502
popTime, groupId, topicId, queueId, 0, invisibleTime, offset, null);
503503

504-
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));
504+
// No need to generate new records when the group does not exist,
505+
// because these retry messages will not be consumed by anyone.
506+
if (brokerConfig.isPopReviveSkipIfGroupAbsent() &&
507+
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)) {
508+
log.info("PopConsumerService change invisibility skip, time={}, " +
509+
"groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset);
510+
} else {
511+
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));
512+
}
505513

506514
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
507515
if (popConsumerCache.deleteRecords(Collections.singletonList(ackRecord)).isEmpty()) {
@@ -519,6 +527,13 @@ public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(Po
519527
}
520528

521529
public CompletableFuture<Boolean> revive(PopConsumerRecord record) {
530+
531+
if (brokerConfig.isPopReviveSkipIfGroupAbsent() &&
532+
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(record.getGroupId())) {
533+
log.info("PopConsumerService skip revive message, record={}", record);
534+
return CompletableFuture.completedFuture(true);
535+
}
536+
522537
return this.getMessageAsync(record)
523538
.thenCompose(result -> {
524539
if (result == null) {

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,18 @@ public void ackAsyncTest() {
324324
consumerService.shutdown();
325325
}
326326

327+
@Test
328+
public void reviveSkipIfGroupAbsent() {
329+
String groupName = "PopGroupAbsent";
330+
brokerController.getBrokerConfig().setPopReviveSkipIfGroupAbsent(true);
331+
PopConsumerRecord record = Mockito.mock(PopConsumerRecord.class);
332+
Mockito.when(record.getGroupId()).thenReturn(groupName);
333+
Mockito.when(brokerController.getSubscriptionGroupManager()
334+
.containsSubscriptionGroup(groupName)).thenReturn(false);
335+
CompletableFuture<Boolean> result = consumerService.revive(record);
336+
Assert.assertTrue(result.join());
337+
}
338+
327339
@Test
328340
public void reviveRetryTest() {
329341
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
@@ -393,6 +405,8 @@ public void reviveRetryTest() {
393405
@Test
394406
public void reviveBackoffRetryTest() {
395407
Mockito.when(brokerController.getEscapeBridge()).thenReturn(Mockito.mock(EscapeBridge.class));
408+
Mockito.when(brokerController.getSubscriptionGroupManager()
409+
.containsSubscriptionGroup(anyString())).thenReturn(true);
396410
PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
397411

398412
consumerService.getPopConsumerStore().start();

broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
package org.apache.rocketmq.broker.subscription;
1919

20+
import java.io.IOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.Paths;
24+
import java.util.Comparator;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.UUID;
28+
import java.util.stream.Stream;
2029
import org.apache.rocketmq.broker.BrokerController;
2130
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
2231
import org.apache.rocketmq.common.BrokerConfig;
@@ -34,15 +43,6 @@
3443
import org.mockito.Mockito;
3544
import org.mockito.junit.MockitoJUnitRunner;
3645

37-
import java.io.IOException;
38-
import java.nio.file.Files;
39-
import java.nio.file.Path;
40-
import java.nio.file.Paths;
41-
import java.util.Comparator;
42-
import java.util.HashMap;
43-
import java.util.Map;
44-
import java.util.UUID;
45-
4646
import static org.mockito.Mockito.when;
4747

4848
@RunWith(MockitoJUnitRunner.class)
@@ -78,24 +78,28 @@ public void destroy() {
7878
if (notToBeExecuted()) {
7979
return;
8080
}
81-
Path pathToBeDeleted = Paths.get(basePath);
82-
83-
try {
84-
Files.walk(pathToBeDeleted)
85-
.sorted(Comparator.reverseOrder())
86-
.forEach(path -> {
87-
try {
88-
Files.delete(path);
89-
} catch (IOException e) {
90-
// ignore
91-
}
92-
});
93-
} catch (IOException e) {
94-
// ignore
95-
}
81+
9682
if (rocksDBSubscriptionGroupManager != null) {
9783
rocksDBSubscriptionGroupManager.stop();
9884
}
85+
86+
Path root = Paths.get(basePath);
87+
if (Files.notExists(root)) {
88+
return;
89+
}
90+
91+
try (Stream<Path> walk = Files.walk(root)) {
92+
walk.sorted(Comparator.reverseOrder())
93+
.forEach(p -> {
94+
try {
95+
Files.deleteIfExists(p);
96+
} catch (IOException e) {
97+
// ignore
98+
}
99+
});
100+
} catch (IOException e) {
101+
// ignore
102+
}
99103
}
100104

101105

client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,21 +286,19 @@ private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
286286
}
287287
case CLUSTERING: {
288288
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
289-
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
290-
if (null == mqSet) {
289+
if (null == mqSet || mqSet.isEmpty()) {
291290
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
292291
this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
293292
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
294293
}
294+
break;
295295
}
296296

297-
if (null == cidAll) {
297+
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
298+
if (null == cidAll || cidAll.isEmpty()) {
298299
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
299-
}
300-
301-
if (mqSet != null && cidAll != null) {
302-
List<MessageQueue> mqAll = new ArrayList<>();
303-
mqAll.addAll(mqSet);
300+
} else {
301+
List<MessageQueue> mqAll = new ArrayList<>(mqSet);
304302

305303
Collections.sort(mqAll);
306304
Collections.sort(cidAll);

client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
2222
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
2323
import org.apache.rocketmq.common.message.MessageQueue;
24+
import org.apache.rocketmq.common.utils.StartAndShutdown;
2425

25-
public class MQFaultStrategy {
26+
public class MQFaultStrategy implements StartAndShutdown {
2627
private LatencyFaultTolerance<String> latencyFaultTolerance;
2728
private volatile boolean sendLatencyFaultEnable;
2829
private volatile boolean startDetectorEnable;
@@ -130,6 +131,11 @@ public void startDetector() {
130131
this.latencyFaultTolerance.startDetector();
131132
}
132133

134+
@Override
135+
public void start() throws Exception {
136+
this.startDetector();
137+
}
138+
133139
public void shutdown() {
134140
this.latencyFaultTolerance.shutdown();
135141
}

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ public class BrokerConfig extends BrokerIdentity {
251251
private int popReviveMaxReturnSizePerRead = 16 * 1024;
252252
private int popReviveConcurrency = 32;
253253
private int popReviveMaxAttemptTimes = 16;
254+
private boolean popReviveSkipIfGroupAbsent = true;
254255
// each message queue will have a corresponding retry queue
255256
private boolean useSeparateRetryQueue = false;
256257
private boolean realTimeNotifyConsumerChange = true;
@@ -699,6 +700,14 @@ public void setPopReviveMaxAttemptTimes(int popReviveMaxAttemptTimes) {
699700
this.popReviveMaxAttemptTimes = popReviveMaxAttemptTimes;
700701
}
701702

703+
public boolean isPopReviveSkipIfGroupAbsent() {
704+
return popReviveSkipIfGroupAbsent;
705+
}
706+
707+
public void setPopReviveSkipIfGroupAbsent(boolean popReviveSkipIfGroupAbsent) {
708+
this.popReviveSkipIfGroupAbsent = popReviveSkipIfGroupAbsent;
709+
}
710+
702711
public boolean isTraceOn() {
703712
return traceOn;
704713
}

proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected static void initConfiguration(CommandLineArgument commandLineArgument)
122122
System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
123123
}
124124
ConfigurationManager.initEnv();
125-
ConfigurationManager.intConfig();
125+
ConfigurationManager.initConfig();
126126
setConfigFromCommandLineArgument(commandLineArgument);
127127
log.info("Current configuration: " + ConfigurationManager.formatProxyConfig());
128128

proxy/src/main/java/org/apache/rocketmq/proxy/config/ConfigurationManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static void initEnv() {
4040
}
4141
}
4242

43-
public static void intConfig() throws Exception {
43+
public static void initConfig() throws Exception {
4444
configuration = new Configuration();
4545
configuration.init();
4646
}

proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,10 +1226,6 @@ public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
12261226
this.metricsExporterType = metricsExporterType;
12271227
}
12281228

1229-
public void setMetricsExporterType(int metricsExporterType) {
1230-
this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
1231-
}
1232-
12331229
public void setMetricsExporterType(String metricsExporterType) {
12341230
this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
12351231
}

0 commit comments

Comments
 (0)