diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml index fa752afd91..5f28e5362c 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml @@ -18,7 +18,6 @@ spring: processor-out-0: producer: group: output_2 - bindings: producer-out-0: destination: num @@ -29,7 +28,6 @@ spring: group: processor_group consumer-in-0: destination: square - group: consumer_group logging: level: diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-config/src/test/java/com/alibaba/cloud/nacos/endpoint/NacosConfigEndpointTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-config/src/test/java/com/alibaba/cloud/nacos/endpoint/NacosConfigEndpointTests.java index c278ad1f44..bac5b958b9 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-config/src/test/java/com/alibaba/cloud/nacos/endpoint/NacosConfigEndpointTests.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-config/src/test/java/com/alibaba/cloud/nacos/endpoint/NacosConfigEndpointTests.java @@ -20,9 +20,12 @@ import com.alibaba.cloud.nacos.NacosConfigAutoConfiguration; import com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration; +import com.alibaba.cloud.nacos.NacosConfigManager; import com.alibaba.cloud.nacos.NacosConfigProperties; import com.alibaba.cloud.nacos.refresh.NacosRefreshHistory; +import com.alibaba.nacos.client.config.NacosConfigService; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.Health.Builder; @@ -30,6 +33,7 @@ import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Configuration; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE; @@ -52,6 +56,19 @@ public class NacosConfigEndpointTests { @Autowired private NacosRefreshHistory refreshHistory; + static { + + try { + NacosConfigService mockedNacosConfigService = Mockito + .mock(NacosConfigService.class); + Mockito.when(mockedNacosConfigService.getServerStatus()).thenReturn("UP"); + ReflectionTestUtils.setField(NacosConfigManager.class, "service", + mockedNacosConfigService); + } + catch (Exception ignore) { + ignore.printStackTrace(); + } + } @Test public void contextLoads() throws Exception { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 7e1cea97e9..160c4f809d 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,9 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import java.util.UUID; + +import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst; import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler; import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter; @@ -28,6 +31,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; @@ -48,6 +52,7 @@ import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; + /** * A {@link org.springframework.cloud.stream.binder.Binder} that uses RocketMQ as the * underlying middleware. @@ -113,11 +118,17 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination String group, ExtendedConsumerProperties extendedConsumerProperties) throws Exception { - // todo support anymous consumer - if (!StringUtils.hasLength(group)) { + boolean anonymous = !StringUtils.hasLength(group); + /*** + * When using DLQ, at least the group property must be provided for proper naming of the DLQ destination + * According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference + */ + if (anonymous && NamespaceUtil.isDLQTopic(destination.getName())) { throw new RuntimeException( - "'group must be configured for channel " + destination.getName()); + "group must be configured for DLQ" + destination.getName()); } + group = anonymous ? nextDefaultConsumerGroup() : group; + RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties, extendedConsumerProperties.getExtension()); extendedConsumerProperties.getExtension().setGroup(group); @@ -173,6 +184,14 @@ protected MessageHandler getPolledConsumerErrorMessageHandler( }; } + /** + * generate next default consumer group. + * @return next default consumer group name. + */ + private static String nextDefaultConsumerGroup() { + return RocketMQConst.DEFAULT_GROUP + UUID.randomUUID().toString(); + } + /** * Binders can return an {@link ErrorMessageStrategy} for building error messages; * binder implementations typically might add extra headers to the error message. @@ -203,5 +222,4 @@ public String getDefaultsPrefix() { public Class getExtendedPropertiesEntryClass() { return this.extendedBindingProperties.getExtendedPropertiesEntryClass(); } - } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java index d0a3b88e01..d8e0b881f8 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java @@ -31,7 +31,7 @@ public class RocketMQConst extends MessageConst { /** * Default group for SCS RocketMQ Binder. */ - public static final String DEFAULT_GROUP = "binder_default_group_name"; + public static final String DEFAULT_GROUP = "anonymous"; /** * user args for SCS RocketMQ Binder. diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinderTest.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinderTest.java new file mode 100644 index 0000000000..34da503493 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinderTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq; + +import javax.annotation.Resource; + +import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.ExtendedBindingHandlerMappingsProviderConfiguration; +import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.core.MessageProducer; + +import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE; + + +@SpringBootTest(classes = RocketMQMessageChannelBinderTest.TestConfig.class, + webEnvironment = NONE, + properties = { + "spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876", + "spring.cloud.stream.bindings.output.destination=TopicOrderTest", + "spring.cloud.stream.bindings.output.content-type=application/json", + + "spring.cloud.stream.bindings.input1.destination=TopicOrderTest", + "spring.cloud.stream.bindings.input1.content-type=application/json", + "spring.cloud.stream.bindings.input1.group=test-group1", + "spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true", + "spring.cloud.stream.bindings.input1.consumer.maxAttempts=1", + "spring.cloud.stream.bindings.input2.destination=TopicOrderTest", + "spring.cloud.stream.bindings.input2.content-type=application/json", + "spring.cloud.stream.bindings.input2.group=test-group2", + "spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false", + "spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tag1" + }) +public class RocketMQMessageChannelBinderTest { + @Resource + RocketMQMessageChannelBinder binder; + @Test + public void createConsumerEndpoint() throws Exception { + TestConsumerDestination destination = new TestConsumerDestination("test"); + MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, "test", + new ExtendedConsumerProperties<>(new RocketMQConsumerProperties())); + Assertions.assertNotNull(consumerEndpoint); + } + + @Test + public void createAnymousConsumerEndpoint() throws Exception { + TestConsumerDestination destination = new TestConsumerDestination("test"); + MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, null, + new ExtendedConsumerProperties<>(new RocketMQConsumerProperties())); + Assertions.assertNotNull(consumerEndpoint); + } + + @Test + public void createDLQAnymousConsumerEndpoint() throws Exception { + TestConsumerDestination destination = new TestConsumerDestination("%DLQ%test"); + Assertions.assertThrows(RuntimeException.class, () -> { + MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, null, + new ExtendedConsumerProperties<>(new RocketMQConsumerProperties())); + }); + } + + @Configuration + @EnableAutoConfiguration + @ImportAutoConfiguration({ ExtendedBindingHandlerMappingsProviderConfiguration.class, + RocketMQBinderAutoConfiguration.class}) + public static class TestConfig { + + } +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/TestConsumerDestination.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/TestConsumerDestination.java new file mode 100644 index 0000000000..9c68b9eac3 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/TestConsumerDestination.java @@ -0,0 +1,32 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq; + +import org.springframework.cloud.stream.provisioning.ConsumerDestination; + +public class TestConsumerDestination implements ConsumerDestination { + private String name; + + public TestConsumerDestination(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } +}