Skip to content

Commit

Permalink
[ISSUE apache#6785] Isolate the remoteChannel by group
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed May 22, 2023
1 parent 70480a1 commit bde54e3
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {

protected ThreadPoolExecutor threadPoolExecutor;
protected ConsumerManager consumerManager;
protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
protected final Map<String /* group @ channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
protected String localProxyId;

public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
Expand Down Expand Up @@ -188,7 +188,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeCo
}

RemoteChannel decodedChannel = RemoteChannel.decode(data.getChannelData());
RemoteChannel channel = remoteChannelMap.computeIfAbsent(decodedChannel.id().asLongText(), key -> decodedChannel);
RemoteChannel channel = remoteChannelMap.computeIfAbsent(data.getGroup() + "@" + decodedChannel.id().asLongText(), key -> decodedChannel);
channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
Expand All @@ -50,8 +53,6 @@
import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
Expand All @@ -74,7 +75,9 @@

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -215,64 +218,106 @@ public void testSyncGrpcV2Channel() throws Exception {
@Test
public void testSyncRemotingChannel() throws Exception {
String consumerGroup = "consumerGroup";
String consumerGroup2 = "consumerGroup2";
Channel channel = createMockChannel();
Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
subscriptionDataSet.add(FilterAPI.buildSubscriptionData("topic", "tagSub"));
Set<SubscriptionData> subscriptionDataSet2 = new HashSet<>();
subscriptionDataSet2.add(FilterAPI.buildSubscriptionData("topic2", "tagSub2"));
RemotingProxyOutClient remotingProxyOutClient = mock(RemotingProxyOutClient.class);
RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, createMockChannel(), clientId, subscriptionDataSet);
RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, subscriptionDataSet);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
remotingChannel,
clientId,
LanguageCode.JAVA,
4
);

ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
SendResult sendResult = new SendResult();
sendResult.setSendStatus(SendStatus.SEND_OK);
doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
.sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());

HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
heartbeatSyncer.onConsumerRegister(
consumerGroup,
clientChannelInfo,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
subscriptionDataSet
RemotingChannel remotingChannel2 = new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, subscriptionDataSet2);
ClientChannelInfo clientChannelInfo2 = new ClientChannelInfo(
remotingChannel2,
clientId,
LanguageCode.JAVA,
4
);

await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty());
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());

String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());

heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));

// start test sync client unregister
// reset localServeAddr
ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);

ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
SendResult okSendResult = new SendResult();
okSendResult.setSendStatus(SendStatus.SEND_OK);
{
ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
.sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());

heartbeatSyncer.onConsumerRegister(
consumerGroup,
clientChannelInfo,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
subscriptionDataSet
);
heartbeatSyncer.onConsumerRegister(
consumerGroup2,
clientChannelInfo2,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
subscriptionDataSet2
);

await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());

// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());

heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
/*
data in syncChannelInfoArgumentCaptor will be like:
1st, data of group1
2nd, data of group2
3rd, data of group1
4th, data of group2
*/
assertEquals(4, syncChannelInfoArgumentCaptor.getAllValues().size());
List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(2).getChannel());
assertNotSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
Set<Set<SubscriptionData>> checkSubscriptionDatas = new HashSet<>();
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
}

// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
{
// start test sync client unregister
// reset localServeAddr
ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
.sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());
heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
heartbeatSyncer.onConsumerUnRegister(consumerGroup2, clientChannelInfo2);
await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);

ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());

// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
List<ClientChannelInfo> channelInfoList = syncUnRegisterChannelInfoArgumentCaptor.getAllValues();
assertNotSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
Set<Set<SubscriptionData>> checkSubscriptionDatas = new HashSet<>();
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
}
}

private MessageExt convertFromMessage(Message message) {
Expand All @@ -282,6 +327,10 @@ private MessageExt convertFromMessage(Message message) {
return messageExt;
}

private List<MessageExt> convertFromMessage(List<Message> message) {
return message.stream().map(this::convertFromMessage).collect(Collectors.toList());
}

private Channel createMockChannel() {
return new MockChannel(RandomStringUtils.randomAlphabetic(10));
}
Expand Down

0 comments on commit bde54e3

Please sign in to comment.