diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java index 5bfcd8a99..bb8ddc2d2 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java @@ -17,6 +17,7 @@ package com.automq.rocketmq.proxy.remoting; +import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; @@ -37,8 +38,8 @@ public static RemotingCommand codeNotSupportedResponse(RemotingCommand request) /** * Generates a version not supported response command. */ - public static RemotingCommand versionNotSupportedResponse(RemotingCommand request) { - String error = "client version " + request.getVersion() + " not supported"; + public static RemotingCommand clientNotSupportedResponse(RemotingCommand request) { + String error = request.getLanguage().name() + " client version " + MQVersion.getVersionDesc(request.getVersion()) + " not supported, please use JAVA client v4.9.5 or later."; return RemotingCommand.createResponseCommand(ResponseCode.VERSION_NOT_SUPPORTED, error); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehavior.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehavior.java index 848af2683..b8a66ac6e 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehavior.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehavior.java @@ -21,8 +21,10 @@ import com.automq.rocketmq.proxy.model.ProxyContextExt; import com.automq.rocketmq.proxy.remoting.RemotingUtil; import java.util.Optional; +import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; @@ -56,6 +58,17 @@ default String dstBrokerName(RemotingCommand request) { } } + default Optional checkClientVersion(RemotingCommand request) { + if (request.getLanguage() != LanguageCode.JAVA) { + return Optional.of(RemotingUtil.clientNotSupportedResponse(request)); + } + + if (request.getVersion() < MQVersion.Version.V4_9_5.ordinal()) { + return Optional.of(RemotingUtil.clientNotSupportedResponse(request)); + } + return Optional.empty(); + } + /** * Check the version of the request. *

@@ -65,7 +78,7 @@ default String dstBrokerName(RemotingCommand request) { * @param request The remoting request. * @return The error response or null if the version is supported. */ - default Optional checkVersion(RemotingCommand request) { + default Optional checkRequiredField(RemotingCommand request) { if (requestNeedBrokerName(request.getCode())) { String brokerName; if (request.getCode() == RequestCode.SEND_MESSAGE_V2) { @@ -74,7 +87,7 @@ default Optional checkVersion(RemotingCommand request) { brokerName = request.getExtFields().get(BROKER_NAME_FIELD); } if (brokerName == null) { - return Optional.of(RemotingUtil.versionNotSupportedResponse(request)); + return Optional.of(RemotingUtil.clientNotSupportedResponse(request)); } } return Optional.empty(); @@ -82,6 +95,7 @@ default Optional checkVersion(RemotingCommand request) { default boolean requestNeedBrokerName(int requestCode) { return requestCode == RequestCode.SEND_MESSAGE_V2 - || requestCode == RequestCode.PULL_MESSAGE; + || requestCode == RequestCode.PULL_MESSAGE + || requestCode == RequestCode.CONSUMER_SEND_MSG_BACK; } } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendConsumerManagerActivity.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendConsumerManagerActivity.java index 18bbd33a8..46c07c1d2 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendConsumerManagerActivity.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendConsumerManagerActivity.java @@ -19,6 +19,7 @@ import com.automq.rocketmq.proxy.remoting.RemotingUtil; import io.netty.channel.ChannelHandlerContext; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.message.MessageQueue; @@ -50,6 +51,11 @@ public ExtendConsumerManagerActivity(RequestPipeline requestPipeline, @Override protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { + Optional response = checkClientVersion(request); + if (response.isPresent()) { + return response.get(); + } + switch (request.getCode()) { case RequestCode.GET_CONSUMER_CONNECTION_LIST, RequestCode.UNLOCK_BATCH_MQ, diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java index 94a9ccdfa..32e334d6f 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendPullMessageActivity.java @@ -66,7 +66,7 @@ public ExtendPullMessageActivity(RequestPipeline requestPipeline, @Override protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { - Optional response = checkVersion(request); + Optional response = checkRequiredField(request); if (response.isPresent()) { return response.get(); } @@ -110,7 +110,7 @@ private RemotingCommand pullMessage(ChannelHandlerContext ctx, RemotingCommand r PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); - // Check the topic existence. + // Check the topic type. TopicMessageType type = messagingProcessor.getMetadataService().getTopicMessageType(context, requestHeader.getTopic()); if (type == TopicMessageType.UNSPECIFIED) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java index ced023cfb..4f1c42394 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/ExtendSendMessageActivity.java @@ -54,7 +54,7 @@ public ExtendSendMessageActivity(RequestPipeline requestPipeline, @Override protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { - Optional response = checkVersion(request); + Optional response = checkRequiredField(request); if (response.isPresent()) { return response.get(); } @@ -114,7 +114,6 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand } message.getProperties().put(MessageConst.PROPERTY_BORN_HOST, bornHost); - // TODO: Do we need handle more properties here? messagingProcessor.sendMessage(context, new SendMessageQueueSelector(dstBrokerName, requestHeader), // For v4 remoting protocol, we honor the producer group in the request header. diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehaviorTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehaviorTest.java index f3e44afbd..fa66fad20 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehaviorTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehaviorTest.java @@ -57,22 +57,22 @@ void dstBrokerName() { void checkVersion() { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, null); request.addExtField("PLACE_HOLDER_KEY", "PLACE_HOLDER_VALUE"); - assertTrue(commonRemotingBehavior.checkVersion(request).isPresent()); - assertEquals(commonRemotingBehavior.checkVersion(request).get().getCode(), ResponseCode.VERSION_NOT_SUPPORTED); + assertTrue(commonRemotingBehavior.checkRequiredField(request).isPresent()); + assertEquals(commonRemotingBehavior.checkRequiredField(request).get().getCode(), ResponseCode.VERSION_NOT_SUPPORTED); request.addExtField(BROKER_NAME_FIELD, "bname"); - assertTrue(commonRemotingBehavior.checkVersion(request).isPresent()); - assertEquals(commonRemotingBehavior.checkVersion(request).get().getCode(), ResponseCode.VERSION_NOT_SUPPORTED); + assertTrue(commonRemotingBehavior.checkRequiredField(request).isPresent()); + assertEquals(commonRemotingBehavior.checkRequiredField(request).get().getCode(), ResponseCode.VERSION_NOT_SUPPORTED); request.addExtField(BROKER_NAME_FIELD_FOR_SEND_MESSAGE_V2, "bname"); - assertTrue(commonRemotingBehavior.checkVersion(request).isEmpty()); + assertTrue(commonRemotingBehavior.checkRequiredField(request).isEmpty()); request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); request.addExtField(BROKER_NAME_FIELD_FOR_SEND_MESSAGE_V2, "bname"); - assertTrue(commonRemotingBehavior.checkVersion(request).isPresent()); - assertEquals(commonRemotingBehavior.checkVersion(request).get().getCode(), ResponseCode.VERSION_NOT_SUPPORTED); + assertTrue(commonRemotingBehavior.checkRequiredField(request).isPresent()); + assertEquals(commonRemotingBehavior.checkRequiredField(request).get().getCode(), ResponseCode.VERSION_NOT_SUPPORTED); request.addExtField(BROKER_NAME_FIELD, "bname"); - assertTrue(commonRemotingBehavior.checkVersion(request).isEmpty()); + assertTrue(commonRemotingBehavior.checkRequiredField(request).isEmpty()); } } \ No newline at end of file