Skip to content

Commit 2853384

Browse files
forfunsvongosling
andauthored
[ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210)
* change clientId algorithm * code format * develop * optimize on 2.0.5.EINSITANG * revert pom version * change note * change note * revert demo.rocketmq.myNameServer * remove clientInstaceName * remove unuse method * pass ci-check * remove pass annotation * correct variable word * optimize annotation * merge Co-authored-by: von gosling <vongosling@apache.org>
1 parent c7f230f commit 2853384

4 files changed

Lines changed: 79 additions & 17 deletions

File tree

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.rocketmq.spring.core.RocketMQTemplate;
2626
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
2727
import org.apache.rocketmq.spring.support.RocketMQUtil;
28+
import org.apache.rocketmq.spring.support.SpringBeanUtil;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031
import org.springframework.aop.framework.AopProxyUtils;
@@ -52,20 +53,20 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
5253
private RocketMQMessageConverter rocketMQMessageConverter;
5354

5455
public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
55-
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
56+
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
5657
this.rocketMQMessageConverter = rocketMQMessageConverter;
5758
this.environment = environment;
5859
this.rocketMQProperties = rocketMQProperties;
5960
}
6061

6162
@Override
6263
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
63-
this.applicationContext = (ConfigurableApplicationContext)applicationContext;
64+
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
6465
}
6566

6667
@Override
6768
public void afterSingletonsInstantiated() {
68-
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
69+
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, ExtRocketMQTemplateConfiguration.class);
6970

7071
if (Objects.nonNull(beans)) {
7172
beans.forEach(this::registerTemplate);
@@ -80,7 +81,7 @@ private void registerTemplate(String beanName, Object bean) {
8081
}
8182

8283
ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
83-
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
84+
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
8485
validate(annotation, genericApplicationContext);
8586

8687
DefaultMQProducer mqProducer = createProducer(annotation);
@@ -92,7 +93,7 @@ private void registerTemplate(String beanName, Object bean) {
9293
throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
9394
beanName), e);
9495
}
95-
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
96+
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
9697
rocketMQTemplate.setProducer(mqProducer);
9798
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
9899
log.info("Set real producer to :{} {}", beanName, annotation.value());
@@ -130,7 +131,7 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota
130131
}
131132

132133
private void validate(ExtRocketMQTemplateConfiguration annotation,
133-
GenericApplicationContext genericApplicationContext) {
134+
GenericApplicationContext genericApplicationContext) {
134135
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
135136
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
136137
"please check the @ExtRocketMQTemplateConfiguration",

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717

1818
package org.apache.rocketmq.spring.autoconfigure;
1919

20-
import java.util.Collections;
21-
import java.util.Map;
22-
import java.util.Objects;
23-
import java.util.concurrent.atomic.AtomicLong;
2420
import org.apache.rocketmq.client.AccessChannel;
2521
import org.apache.rocketmq.spring.annotation.ConsumeMode;
2622
import org.apache.rocketmq.spring.annotation.MessageModel;
@@ -29,6 +25,7 @@
2925
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
3026
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
3127
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
28+
import org.apache.rocketmq.spring.support.SpringBeanUtil;
3229
import org.slf4j.Logger;
3330
import org.slf4j.LoggerFactory;
3431
import org.springframework.aop.framework.AopProxyUtils;
@@ -43,6 +40,11 @@
4340
import org.springframework.core.env.StandardEnvironment;
4441
import org.springframework.util.StringUtils;
4542

43+
import java.util.Collections;
44+
import java.util.Map;
45+
import java.util.Objects;
46+
import java.util.concurrent.atomic.AtomicLong;
47+
4648
@Configuration
4749
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
4850
private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
@@ -58,7 +60,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
5860
private RocketMQMessageConverter rocketMQMessageConverter;
5961

6062
public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
61-
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
63+
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
6264
this.rocketMQMessageConverter = rocketMQMessageConverter;
6365
this.environment = environment;
6466
this.rocketMQProperties = rocketMQProperties;
@@ -71,7 +73,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
7173

7274
@Override
7375
public void afterSingletonsInstantiated() {
74-
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
76+
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQMessageListener.
77+
class);
7578

7679
if (Objects.nonNull(beans)) {
7780
beans.forEach(this::registerContainer);
@@ -127,7 +130,7 @@ private void registerContainer(String beanName, Object bean) {
127130
}
128131

129132
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
130-
RocketMQMessageListener annotation) {
133+
RocketMQMessageListener annotation) {
131134
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
132135

133136
container.setRocketMQMessageListener(annotation);
@@ -145,13 +148,15 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
145148
container.setSelectorExpression(tags);
146149
}
147150
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
151+
148152
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
149153
container.setRocketMQListener((RocketMQListener) bean);
150154
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
151155
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
152156
}
157+
153158
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
154-
container.setName(name); // REVIEW ME, use the same clientId or multiple?
159+
container.setName(name);
155160

156161
return container;
157162
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import java.util.concurrent.LinkedBlockingDeque;
2323
import java.util.concurrent.ThreadPoolExecutor;
2424
import java.util.concurrent.TimeUnit;
25+
2526
import org.apache.rocketmq.client.producer.TransactionMQProducer;
2627
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
2728
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
2829
import org.apache.rocketmq.spring.core.RocketMQTemplate;
2930
import org.apache.rocketmq.spring.support.RocketMQUtil;
31+
import org.apache.rocketmq.spring.support.SpringBeanUtil;
3032
import org.slf4j.Logger;
3133
import org.slf4j.LoggerFactory;
3234
import org.springframework.aop.framework.AopProxyUtils;
@@ -44,12 +46,14 @@ public class RocketMQTransactionConfiguration implements ApplicationContextAware
4446

4547
private ConfigurableApplicationContext applicationContext;
4648

47-
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
49+
@Override
50+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
4851
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
4952
}
5053

51-
@Override public void afterSingletonsInstantiated() {
52-
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class);
54+
@Override
55+
public void afterSingletonsInstantiated() {
56+
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQTransactionListener.class);
5357

5458
if (Objects.nonNull(beans)) {
5559
beans.forEach(this::registerTransactionListener);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.spring.support;
19+
20+
import org.springframework.aop.scope.ScopedProxyUtils;
21+
import org.springframework.context.ConfigurableApplicationContext;
22+
import org.springframework.lang.NonNull;
23+
24+
import java.lang.annotation.Annotation;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
public class SpringBeanUtil {
30+
31+
/**
32+
* Override applicationContext.getBeansWithAnnotation method to make sure without same ProxyTarget beans
33+
*
34+
* @param applicationContext spring Application Context
35+
* @param clazz annotation class
36+
* @return beans map without proxyTarget bean
37+
*/
38+
public static Map<String, Object> getBeansWithAnnotation(@NonNull ConfigurableApplicationContext applicationContext, Class<? extends Annotation> clazz) {
39+
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(clazz);
40+
Map<String, Object> filterBeans = new HashMap<>(beans.size());
41+
// remove proxy target
42+
Set<Map.Entry<String, Object>> entrySet = beans.entrySet();
43+
entrySet.forEach((entry) -> {
44+
final String beanName = entry.getKey();
45+
if (!ScopedProxyUtils.isScopedTarget(beanName)) {
46+
filterBeans.put(beanName, entry.getValue());
47+
}
48+
});
49+
return filterBeans;
50+
}
51+
52+
}

0 commit comments

Comments
 (0)