diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java index 509aae6525..47533e75ab 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java @@ -17,12 +17,16 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound; import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.hook.CheckForbiddenHook; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -45,12 +49,16 @@ */ public final class RocketMQProduceFactory { - private RocketMQProduceFactory() { - } - private final static Logger log = LoggerFactory .getLogger(RocketMQProduceFactory.class); + private static final Map PRODUCER_REUSABLE_MAP = new ConcurrentHashMap<>(); + + private static final String DEFAULT_GROUP_NAME = "DEFAULT_GROUP_NAME"; + + private RocketMQProduceFactory() { + } + /** * init for the producer,including convert producer params. * @param topic topic @@ -59,11 +67,13 @@ private RocketMQProduceFactory() { */ public static DefaultMQProducer initRocketMQProducer(String topic, RocketMQProducerProperties producerProperties) { + if (StringUtils.isEmpty(producerProperties.getGroup())) { + producerProperties.setGroup(DEFAULT_GROUP_NAME); + } Assert.notNull(producerProperties.getGroup(), "Property 'group' is required - producerGroup"); Assert.notNull(producerProperties.getNameServer(), "Property 'nameServer' is required"); - RPCHook rpcHook = null; if (!StringUtils.isEmpty(producerProperties.getAccessKey()) && !StringUtils.isEmpty(producerProperties.getSecretKey())) { @@ -96,10 +106,15 @@ public static DefaultMQProducer initRocketMQProducer(String topic, } } else { - producer = new DefaultMQProducer(producerProperties.getNamespace(), + String key = getKey(producerProperties); + if (PRODUCER_REUSABLE_MAP.containsKey(key)) { + return PRODUCER_REUSABLE_MAP.get(key); + } + producer = new ReusableMQProducer(producerProperties.getNamespace(), producerProperties.getGroup(), rpcHook, producerProperties.getEnableMsgTrace(), - producerProperties.getCustomizedTraceTopic()); + producerProperties.getCustomizedTraceTopic(), key); + PRODUCER_REUSABLE_MAP.put(key, producer); } producer.setVipChannelEnabled( @@ -134,4 +149,48 @@ public static DefaultMQProducer initRocketMQProducer(String topic, return producer; } + /** + * get the key from producerProperties. + * @param producerProperties producer properties + * @return key + */ + private static String getKey(RocketMQProducerProperties producerProperties) { + return producerProperties.getNameServer() + "," + producerProperties.getGroup() + + producerProperties.getSendCallBack(); + } + + /** + * This is a special kind of MQProducer that can be reused among different threads. + * The start and shutdown method can be invoked multiple times, but the real start and + * shutdown logics will only be executed once. + */ + protected static class ReusableMQProducer extends DefaultMQProducer { + + private final AtomicInteger atomicInteger = new AtomicInteger(); + + private final String key; + + public ReusableMQProducer(String namespace, String group, RPCHook rpcHook, + boolean enableMsgTrace, String customizedTraceTopic, String key) { + super(namespace, group, rpcHook, enableMsgTrace, customizedTraceTopic); + this.key = key; + } + + @Override + public void start() throws MQClientException { + if (atomicInteger.getAndIncrement() == 0) { + super.start(); + } + } + + @Override + public void shutdown() { + if (atomicInteger.decrementAndGet() == 0) { + PRODUCER_REUSABLE_MAP.remove(key); + super.shutdown(); + } + } + + } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java index 425c8e2c26..3f1146fb60 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java @@ -46,11 +46,15 @@ public class RocketMQCommonProperties implements Serializable { /** * Consumers of the same role is required to have exactly same subscriptions and * consumerGroup to correctly achieve load balance. It's required and needs to be - * globally unique. Producer group conceptually aggregates all producer instances of + * globally unique. + *
+ * Producer group conceptually aggregates all producer instances of * exactly same role, which is particularly important when transactional messages are * involved. For non-transactional messages, it does not matter as long as it's unique * per process. See here - * for further discussion. + * for further discussion. However, group for non-transactional messages can indicate + * whether the internal RocketMQProducer should be reused (Only the bindings that use + * the same group can be reused). */ private String group;