Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor RocketMQProduceFactory: reuse producer #2699

Merged
merged 7 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
Expand All @@ -45,12 +49,16 @@
*/
public final class RocketMQProduceFactory {

private RocketMQProduceFactory() {
}

private final static Logger log = LoggerFactory
.getLogger(RocketMQProduceFactory.class);

private static final Map<String, DefaultMQProducer> PRODUCER_REUSABLE_MAP = new ConcurrentHashMap<>();

private static final String DEFAULT_GROUP_NAME = "DEFAULT_GROUP_NAME";

private RocketMQProduceFactory() {
}

/**
* init for the producer,including convert producer params.
* @param topic topic
Expand All @@ -59,11 +67,13 @@ private RocketMQProduceFactory() {
*/
public static DefaultMQProducer initRocketMQProducer(String topic,
RocketMQProducerProperties producerProperties) {
if (StringUtils.isEmpty(producerProperties.getGroup())) {
producerProperties.setGroup(DEFAULT_GROUP_NAME);
}
Assert.notNull(producerProperties.getGroup(),
"Property 'group' is required - producerGroup");
Assert.notNull(producerProperties.getNameServer(),
"Property 'nameServer' is required");

RPCHook rpcHook = null;
if (!StringUtils.isEmpty(producerProperties.getAccessKey())
&& !StringUtils.isEmpty(producerProperties.getSecretKey())) {
Expand Down Expand Up @@ -96,10 +106,15 @@ public static DefaultMQProducer initRocketMQProducer(String topic,
}
}
else {
producer = new DefaultMQProducer(producerProperties.getNamespace(),
String key = getKey(producerProperties);
if (PRODUCER_REUSABLE_MAP.containsKey(key)) {
return PRODUCER_REUSABLE_MAP.get(key);
}
producer = new ReusableMQProducer(producerProperties.getNamespace(),
producerProperties.getGroup(), rpcHook,
producerProperties.getEnableMsgTrace(),
producerProperties.getCustomizedTraceTopic());
producerProperties.getCustomizedTraceTopic(), key);
PRODUCER_REUSABLE_MAP.put(key, producer);
}

producer.setVipChannelEnabled(
Expand Down Expand Up @@ -134,4 +149,48 @@ public static DefaultMQProducer initRocketMQProducer(String topic,
return producer;
}

/**
* get the key from producerProperties.
* @param producerProperties producer properties
* @return key
*/
private static String getKey(RocketMQProducerProperties producerProperties) {
return producerProperties.getNameServer() + "," + producerProperties.getGroup()
+ producerProperties.getSendCallBack();
}

/**
* This is a special kind of MQProducer that can be reused among different threads.
* The start and shutdown method can be invoked multiple times, but the real start and
* shutdown logics will only be executed once.
*/
protected static class ReusableMQProducer extends DefaultMQProducer {

private final AtomicInteger atomicInteger = new AtomicInteger();

private final String key;

public ReusableMQProducer(String namespace, String group, RPCHook rpcHook,
boolean enableMsgTrace, String customizedTraceTopic, String key) {
super(namespace, group, rpcHook, enableMsgTrace, customizedTraceTopic);
this.key = key;
}

@Override
public void start() throws MQClientException {
if (atomicInteger.getAndIncrement() == 0) {
super.start();
}
}

@Override
public void shutdown() {
if (atomicInteger.decrementAndGet() == 0) {
PRODUCER_REUSABLE_MAP.remove(key);
super.shutdown();
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ public class RocketMQCommonProperties implements Serializable {
/**
* Consumers of the same role is required to have exactly same subscriptions and
* consumerGroup to correctly achieve load balance. It's required and needs to be
* globally unique. Producer group conceptually aggregates all producer instances of
* globally unique.
* <br>
* Producer group conceptually aggregates all producer instances of
* exactly same role, which is particularly important when transactional messages are
* involved. For non-transactional messages, it does not matter as long as it's unique
* per process. See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a>
* for further discussion.
* for further discussion. However, group for non-transactional messages can indicate
* whether the internal RocketMQProducer should be reused (Only the bindings that use
* the same group can be reused).
*/
private String group;

Expand Down