Skip to content

Commit

Permalink
RocketMq retry-topic support SQL filtering messages
Browse files Browse the repository at this point in the history
Signed-off-by: chengyouling <[email protected]>
  • Loading branch information
chengyouling committed Feb 25, 2025
1 parent 311a1db commit f7a255f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.sermant.mq.grayscale.rocketmq.interceptor;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
Expand All @@ -26,7 +27,12 @@
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;

/**
* TAG/SQL92 query message statement interceptor
Expand All @@ -35,44 +41,83 @@
* @since 2024-05-27
**/
public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbstractInterceptor {
private final Object lock = new Object();
private static final Logger LOGGER = LoggerFactory.getLogger();

private static final String RETRY_TOPIC_FLAG = "%RETRY%";

@Override
public ExecuteContext doAfter(ExecuteContext context) throws Exception {
ConcurrentMap<String, Object> map = (ConcurrentMap<String, Object>) context.getResult();
ConcurrentMap<String, Object> subscriptionInner = (ConcurrentMap<String, Object>) context.getResult();
RebalanceImpl balance = (RebalanceImpl) context.getObject();
if (balance.getConsumerGroup() == null) {
List<Object> retrySubscriptionDatas = new ArrayList<>();
if (balance.getConsumerGroup() == null
|| !isGrayTagChanged(subscriptionInner, balance, retrySubscriptionDatas)) {

Check failure on line 54 in sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

[Checkstyle Check] reported by reviewdog 🐶 '||' has incorrect indentation level 12, expected level should be 16. Raw Output: /home/runner/work/Sermant/Sermant/./sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java:54:13: error: '||' has incorrect indentation level 12, expected level should be 16. (com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck)
return context;
}
for (Object subscriptionData : map.values()) {
for (Object subscriptionData : subscriptionInner.values()) {
String topic = RocketMqReflectUtils.getTopic(subscriptionData);
if (topic.contains(RETRY_TOPIC_FLAG)) {
continue;
}
if (RocketMqSubscriptionDataUtils
.isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) {
continue;
}
buildSql92SubscriptionData(subscriptionData, balance);
buildSql92SubscriptionData(subscriptionData, balance, topic);

// update %RETRY%+GROUP substring with sql92
updateRetrySubscriptionData(subscriptionData, retrySubscriptionDatas);
}
return context;
}

private void buildSql92SubscriptionData(Object subscriptionData, RebalanceImpl balance) {
synchronized (lock) {
String topic = RocketMqReflectUtils.getTopic(subscriptionData);
if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) {
return;
}
String consumerGroup = balance.getConsumerGroup();
MQClientInstance instance = balance.getmQClientFactory();
if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) {
RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance);
RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags();
RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask();
private boolean isGrayTagChanged(ConcurrentMap<String, Object> subscriptionInner, RebalanceImpl balance,
List<Object> retrySubscriptionDatas) {
String topic = "";
for (Object subscriptionData : subscriptionInner.values()) {
String tempTopic = RocketMqReflectUtils.getTopic(subscriptionData);
if (!tempTopic.contains(RETRY_TOPIC_FLAG)) {
topic = tempTopic;
} else {
retrySubscriptionDatas.add(subscriptionData);
}
String namesrvAddr = balance.getmQClientFactory().getClientConfig().getNamesrvAddr();
resetsSql92SubscriptionData(topic, consumerGroup, subscriptionData, namesrvAddr);
}
return RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance);
}

private void updateRetrySubscriptionData(Object subscriptionData, Collection<Object> retrySubscriptionDatas) {
for (Object subData : retrySubscriptionDatas) {
RocketMqReflectUtils.getTagsSet(subData).clear();
RocketMqReflectUtils.getCodeSet(subData).clear();
RocketMqReflectUtils.setSubscriptionDatae(subData, "setExpressionType",
new Class[]{String.class}, new Object[]{"SQL92"});
RocketMqReflectUtils.setSubscriptionDatae(subData, "setSubVersion",
new Class[]{long.class}, new Object[]{System.currentTimeMillis()});
String originSubData = RocketMqReflectUtils.getSubString(subData);
String sqlSubstr = RocketMqReflectUtils.getSubString(subscriptionData);
RocketMqReflectUtils.setSubscriptionDatae(subData, "setSubString",
new Class[]{String.class}, new Object[]{sqlSubstr});
String originTopic = RocketMqReflectUtils.getTopic(subscriptionData);
String retryTopic = RocketMqReflectUtils.getTopic(subData);
LOGGER.warning(String.format(Locale.ENGLISH, "update retry topic [%s] SQL92 expression, "
+ "originTopic: [%s], originSubStr: [%s], newSubStr: [%s]", retryTopic, originTopic,
originSubData, sqlSubstr));
}
}

// update change flag when finished build substr
RocketMqSubscriptionDataUtils.resetTagChangeMap(namesrvAddr, topic, consumerGroup, false);
private void buildSql92SubscriptionData(Object subscriptionData, RebalanceImpl balance, String topic) {
String consumerGroup = balance.getConsumerGroup();
MQClientInstance instance = balance.getmQClientFactory();
if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) {
RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance);
RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags();
RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask();
}
String namesrvAddr = balance.getmQClientFactory().getClientConfig().getNamesrvAddr();
resetsSql92SubscriptionData(topic, consumerGroup, subscriptionData, namesrvAddr);

// update change flag when finished build substr
RocketMqSubscriptionDataUtils.resetTagChangeMap(namesrvAddr, topic, consumerGroup, false);
}

private void resetsSql92SubscriptionData(String topic, String consumerGroup, Object subscriptionData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ public void testDoAfter() throws Exception {
ExecuteContext context = ExecuteContext.forMemberMethod(rebalanced, null, null, null, null);
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic("TOPIC_TEST");
SubscriptionData retrySubscriptionData = new SubscriptionData();
retrySubscriptionData.setTopic("%RETRY%consumerGroup");
ConcurrentMap<String, SubscriptionData> map = new ConcurrentHashMap<>();
map.put("test", subscriptionData);
map.put("testRetry", retrySubscriptionData);
context.afterMethod(map, null);
RocketMqSchedulerRebuildSubscriptionInterceptor interceptor
= new RocketMqSchedulerRebuildSubscriptionInterceptor();
interceptor.doAfter(context);
Assert.assertEquals("(x_lane_canary in ('gray'))", subscriptionData.getSubString());
Assert.assertEquals("(x_lane_canary in ('gray'))", retrySubscriptionData.getSubString());
}
}

0 comments on commit f7a255f

Please sign in to comment.