Skip to content

Commit

Permalink
Merge pull request #2464 from Sorieee/2021.x_sorie_rocketmq_group
Browse files Browse the repository at this point in the history
Support anonymous consumer group
  • Loading branch information
DanielLiu1123 authored Mar 21, 2022
2 parents 9857de3 + 154775d commit 2432d64
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ spring:
processor-out-0:
producer:
group: output_2

bindings:
producer-out-0:
destination: num
Expand All @@ -29,7 +28,6 @@ spring:
group: processor_group
consumer-in-0:
destination: square
group: consumer_group

logging:
level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@

import com.alibaba.cloud.nacos.NacosConfigAutoConfiguration;
import com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration;
import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.cloud.nacos.refresh.NacosRefreshHistory;
import com.alibaba.nacos.client.config.NacosConfigService;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.util.ReflectionTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
Expand All @@ -52,6 +56,19 @@ public class NacosConfigEndpointTests {
@Autowired
private NacosRefreshHistory refreshHistory;

static {

try {
NacosConfigService mockedNacosConfigService = Mockito
.mock(NacosConfigService.class);
Mockito.when(mockedNacosConfigService.getServerStatus()).thenReturn("UP");
ReflectionTestUtils.setField(NacosConfigManager.class, "service",
mockedNacosConfigService);
}
catch (Exception ignore) {
ignore.printStackTrace();
}
}
@Test
public void contextLoads() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.alibaba.cloud.stream.binder.rocketmq;

import java.util.UUID;

import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter;
Expand All @@ -28,6 +31,7 @@
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.common.protocol.NamespaceUtil;

import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
Expand All @@ -48,6 +52,7 @@
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;


/**
* A {@link org.springframework.cloud.stream.binder.Binder} that uses RocketMQ as the
* underlying middleware.
Expand Down Expand Up @@ -113,11 +118,17 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties)
throws Exception {
// todo support anymous consumer
if (!StringUtils.hasLength(group)) {
boolean anonymous = !StringUtils.hasLength(group);
/***
* When using DLQ, at least the group property must be provided for proper naming of the DLQ destination
* According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference
*/
if (anonymous && NamespaceUtil.isDLQTopic(destination.getName())) {
throw new RuntimeException(
"'group must be configured for channel " + destination.getName());
"group must be configured for DLQ" + destination.getName());
}
group = anonymous ? nextDefaultConsumerGroup() : group;

RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties,
extendedConsumerProperties.getExtension());
extendedConsumerProperties.getExtension().setGroup(group);
Expand Down Expand Up @@ -173,6 +184,14 @@ protected MessageHandler getPolledConsumerErrorMessageHandler(
};
}

/**
* generate next default consumer group.
* @return next default consumer group name.
*/
private static String nextDefaultConsumerGroup() {
return RocketMQConst.DEFAULT_GROUP + UUID.randomUUID().toString();
}

/**
* Binders can return an {@link ErrorMessageStrategy} for building error messages;
* binder implementations typically might add extra headers to the error message.
Expand Down Expand Up @@ -203,5 +222,4 @@ public String getDefaultsPrefix() {
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class RocketMQConst extends MessageConst {
/**
* Default group for SCS RocketMQ Binder.
*/
public static final String DEFAULT_GROUP = "binder_default_group_name";
public static final String DEFAULT_GROUP = "anonymous";

/**
* user args for SCS RocketMQ Binder.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2013-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.cloud.stream.binder.rocketmq;

import javax.annotation.Resource;

import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.ExtendedBindingHandlerMappingsProviderConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageProducer;

import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;


@SpringBootTest(classes = RocketMQMessageChannelBinderTest.TestConfig.class,
webEnvironment = NONE,
properties = {
"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876",
"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
"spring.cloud.stream.bindings.output.content-type=application/json",

"spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input1.content-type=application/json",
"spring.cloud.stream.bindings.input1.group=test-group1",
"spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true",
"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
"spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input2.content-type=application/json",
"spring.cloud.stream.bindings.input2.group=test-group2",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tag1"
})
public class RocketMQMessageChannelBinderTest {
@Resource
RocketMQMessageChannelBinder binder;
@Test
public void createConsumerEndpoint() throws Exception {
TestConsumerDestination destination = new TestConsumerDestination("test");
MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, "test",
new ExtendedConsumerProperties<>(new RocketMQConsumerProperties()));
Assertions.assertNotNull(consumerEndpoint);
}

@Test
public void createAnymousConsumerEndpoint() throws Exception {
TestConsumerDestination destination = new TestConsumerDestination("test");
MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, null,
new ExtendedConsumerProperties<>(new RocketMQConsumerProperties()));
Assertions.assertNotNull(consumerEndpoint);
}

@Test
public void createDLQAnymousConsumerEndpoint() throws Exception {
TestConsumerDestination destination = new TestConsumerDestination("%DLQ%test");
Assertions.assertThrows(RuntimeException.class, () -> {
MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, null,
new ExtendedConsumerProperties<>(new RocketMQConsumerProperties()));
});
}

@Configuration
@EnableAutoConfiguration
@ImportAutoConfiguration({ ExtendedBindingHandlerMappingsProviderConfiguration.class,
RocketMQBinderAutoConfiguration.class})
public static class TestConfig {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2013-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.cloud.stream.binder.rocketmq;

import org.springframework.cloud.stream.provisioning.ConsumerDestination;

public class TestConsumerDestination implements ConsumerDestination {
private String name;

public TestConsumerDestination(String name) {
this.name = name;
}

@Override
public String getName() {
return name;
}
}

0 comments on commit 2432d64

Please sign in to comment.