Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6785] Isolate the remoteChannel by group #6786

Merged
merged 1 commit into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {

protected ThreadPoolExecutor threadPoolExecutor;
protected ConsumerManager consumerManager;
protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
protected final Map<String /* group @ channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the "@" symbol conflict with the Group character?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a problem, as long as clients from different groups can be distinguished here.

protected String localProxyId;

public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
Expand Down Expand Up @@ -188,7 +188,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeCo
}

RemoteChannel decodedChannel = RemoteChannel.decode(data.getChannelData());
RemoteChannel channel = remoteChannelMap.computeIfAbsent(decodedChannel.id().asLongText(), key -> decodedChannel);
RemoteChannel channel = remoteChannelMap.computeIfAbsent(data.getGroup() + "@" + decodedChannel.id().asLongText(), key -> decodedChannel);
channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
Expand All @@ -50,8 +53,6 @@
import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
Expand All @@ -74,7 +75,9 @@

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -215,64 +218,106 @@ public void testSyncGrpcV2Channel() throws Exception {
@Test
public void testSyncRemotingChannel() throws Exception {
String consumerGroup = "consumerGroup";
String consumerGroup2 = "consumerGroup2";
Channel channel = createMockChannel();
Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
subscriptionDataSet.add(FilterAPI.buildSubscriptionData("topic", "tagSub"));
Set<SubscriptionData> subscriptionDataSet2 = new HashSet<>();
subscriptionDataSet2.add(FilterAPI.buildSubscriptionData("topic2", "tagSub2"));
RemotingProxyOutClient remotingProxyOutClient = mock(RemotingProxyOutClient.class);
RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, createMockChannel(), clientId, subscriptionDataSet);
RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, subscriptionDataSet);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
remotingChannel,
clientId,
LanguageCode.JAVA,
4
);

ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
SendResult sendResult = new SendResult();
sendResult.setSendStatus(SendStatus.SEND_OK);
doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
.sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());

HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
heartbeatSyncer.onConsumerRegister(
consumerGroup,
clientChannelInfo,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
subscriptionDataSet
RemotingChannel remotingChannel2 = new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, subscriptionDataSet2);
ClientChannelInfo clientChannelInfo2 = new ClientChannelInfo(
remotingChannel2,
clientId,
LanguageCode.JAVA,
4
);

await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty());
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());

String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());

heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));

// start test sync client unregister
// reset localServeAddr
ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);

ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
SendResult okSendResult = new SendResult();
okSendResult.setSendStatus(SendStatus.SEND_OK);
{
ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
.sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());

heartbeatSyncer.onConsumerRegister(
consumerGroup,
clientChannelInfo,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
subscriptionDataSet
);
heartbeatSyncer.onConsumerRegister(
consumerGroup2,
clientChannelInfo2,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
subscriptionDataSet2
);

await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());

// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());

heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
/*
data in syncChannelInfoArgumentCaptor will be like:
1st, data of group1
2nd, data of group2
3rd, data of group1
4th, data of group2
*/
assertEquals(4, syncChannelInfoArgumentCaptor.getAllValues().size());
List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(2).getChannel());
assertNotSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
Set<Set<SubscriptionData>> checkSubscriptionDatas = new HashSet<>();
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
}

// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
{
// start test sync client unregister
// reset localServeAddr
ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
.sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());
heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
heartbeatSyncer.onConsumerUnRegister(consumerGroup2, clientChannelInfo2);
await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);

ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());

// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
List<ClientChannelInfo> channelInfoList = syncUnRegisterChannelInfoArgumentCaptor.getAllValues();
assertNotSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
Set<Set<SubscriptionData>> checkSubscriptionDatas = new HashSet<>();
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
}
}

private MessageExt convertFromMessage(Message message) {
Expand All @@ -282,6 +327,10 @@ private MessageExt convertFromMessage(Message message) {
return messageExt;
}

private List<MessageExt> convertFromMessage(List<Message> message) {
return message.stream().map(this::convertFromMessage).collect(Collectors.toList());
}

private Channel createMockChannel() {
return new MockChannel(RandomStringUtils.randomAlphabetic(10));
}
Expand Down