diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java index 0b9f4295c26..bddb57f1501 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java @@ -51,5 +51,6 @@ RemotingCommand handle(final GetMessageResult getMessageResult, final boolean brokerAllowSuspend, final MessageFilter messageFilter, final RemotingCommand response, - final TopicQueueMappingContext mappingContext); + final TopicQueueMappingContext mappingContext, + final long beginTimeMills); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java index 913e1a96c43..43b66b4c516 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java @@ -84,7 +84,8 @@ public RemotingCommand handle(final GetMessageResult getMessageResult, final boolean brokerAllowSuspend, final MessageFilter messageFilter, RemotingCommand response, - TopicQueueMappingContext mappingContext) { + TopicQueueMappingContext mappingContext, + long beginTimeMills) { PullMessageProcessor processor = brokerController.getPullMessageProcessor(); final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -137,8 +138,6 @@ public RemotingCommand handle(final GetMessageResult getMessageResult, } if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - - final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java index a72759883c6..55552003d80 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java @@ -81,6 +81,7 @@ public boolean rejectRequest() { private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class); final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader(); final PeekMessageRequestHeader requestHeader = @@ -188,7 +189,6 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount()); if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 105e11643f4..59ff2e0fd52 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -196,6 +196,7 @@ public boolean notifyMessageArriving(final String topic, final String cid, final @Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis())); if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) { request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis())); @@ -435,7 +436,6 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC switch (finalResponse.getCode()) { case ResponseCode.SUCCESS: if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index ea9c327e98a..d53454f215d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -300,6 +300,7 @@ public boolean rejectRequest() { private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend, boolean brokerAllowFlowCtrSuspend) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = @@ -555,7 +556,8 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re brokerAllowSuspend, messageFilter, finalResponse, - mappingContext + mappingContext, + beginTimeMills ); }) .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result)); @@ -574,7 +576,8 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re brokerAllowSuspend, messageFilter, response, - mappingContext + mappingContext, + beginTimeMills ); } return null;