-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RocketMq retry-topic support SQL filtering messages #1750
base: develop
Are you sure you want to change the base?
Conversation
0935d59
to
f822209
Compare
Codecov ReportAttention: Patch coverage is
Flags with carried forward coverage won't be shown. Click here to find out more.
|
5dd3b97
to
f3082d4
Compare
...rmant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java
Outdated
Show resolved
Hide resolved
if (balance.getConsumerGroup() == null || !isGrayTagChanged(map, balance)) { | ||
return context; | ||
} | ||
for (Object subscriptionData : map.values()) { | ||
String topic = RocketMqReflectUtils.getTopic(subscriptionData); | ||
if (topic.contains(RETYPE)) { | ||
continue; | ||
} | ||
if (RocketMqSubscriptionDataUtils | ||
.isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) { | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isGrayTagChanged has the similar code with these code. You can traverse the map only once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private void updateRetrySubscriptionData(Object subscriptionData, Collection<Object> subscriptionDatas) { | ||
String originTopic = RocketMqReflectUtils.getTopic(subscriptionData); | ||
for (Object subData : subscriptionDatas) { | ||
String retryTopic = RocketMqReflectUtils.getTopic(subData); | ||
String sqlSubstr = RocketMqReflectUtils.getSubString(subscriptionData); | ||
if (retryTopic.contains(RETYPE)) { | ||
RocketMqReflectUtils.getTagsSet(subData).clear(); | ||
RocketMqReflectUtils.getCodeSet(subData).clear(); | ||
RocketMqReflectUtils.setSubscriptionDatae(subData, "setExpressionType", | ||
new Class[]{String.class}, new Object[]{"SQL92"}); | ||
RocketMqReflectUtils.setSubscriptionDatae(subData, "setSubVersion", | ||
new Class[]{long.class}, new Object[]{System.currentTimeMillis()}); | ||
String originSubData = RocketMqReflectUtils.getSubString(subData); | ||
RocketMqReflectUtils.setSubscriptionDatae(subData, "setSubString", | ||
new Class[]{String.class}, new Object[]{sqlSubstr}); | ||
LOGGER.warning(String.format(Locale.ENGLISH, "update retry topic [%s] SQL92 expression, " | ||
+ "originTopic: [%s], originSubStr: [%s], newSubStr: [%s]", retryTopic, originTopic, | ||
originSubData, sqlSubstr)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why traverse map.values() and determine whether the topic contains again here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
b7316e5
to
768cf5c
Compare
RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance); | ||
RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags(); | ||
RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask(); | ||
private boolean isGrayTagChanged(ConcurrentMap<String, Object> map, RebalanceImpl balance, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not map,use specific name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
cd7a266
to
e0054b4
Compare
for (Object subData : retrySubscriptionDatas) { | ||
RocketMqReflectUtils.getTagsSet(subData).clear(); | ||
RocketMqReflectUtils.getCodeSet(subData).clear(); | ||
RocketMqReflectUtils.setSubscriptionDatae(subData, "setExpressionType", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename setSubscriptionDatae
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Signed-off-by: chengyouling <[email protected]>
93f4e8e
to
06e8c45
Compare
What type of PR is this?
Bug.
What this PR does / why we need it?
Retry-topic synchronous adjustment change to using SQL92 filter message.
Which issue(s) this PR fixes?
Fixes #1748
Does this PR introduce a user-facing change?
No
Checklist