diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java index a23faab7d62..046a7d95439 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java @@ -178,14 +178,14 @@ public static PlainAccessResource parse(RemotingCommand request, String remoteAd // Content SortedMap map = new TreeMap<>(); for (Map.Entry entry : request.getExtFields().entrySet()) { + if (request.getVersion() <= MQVersion.Version.V4_9_3.ordinal() && + MixAll.UNIQUE_MSG_QUERY_FLAG.equals(entry.getKey())) { + continue; + } if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) { map.put(entry.getKey(), entry.getValue()); } } - if (request.getVersion() <= MQVersion.Version.V4_9_3.ordinal() - && map.containsKey(MixAll.UNIQUE_MSG_QUERY_FLAG)) { - map.remove(MixAll.UNIQUE_MSG_QUERY_FLAG); - } accessResource.setContent(AclUtils.combineRequestContent(request, map)); return accessResource; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index 9fffb1eda38..060b051ffe9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -336,6 +336,10 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { for (Pair selectorViewPair : messageStore.getMetricsView()) { providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); } + + for (Pair selectorViewPair : PopMetricsManager.getMetricsView()) { + providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); + } } private void initStatsMetrics() { @@ -494,6 +498,7 @@ private void initLagAndDlqMetrics() { private void initOtherMetrics() { RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder); messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder); + PopMetricsManager.initMetrics(brokerMeter, brokerController, BrokerMetricsManager::newAttributesBuilder); } public void shutdown() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java new file mode 100644 index 00000000000..41917ed5066 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.metrics; + +public class PopMetricsConstant { + public static final String HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME = "rocketmq_pop_buffer_scan_time_consume"; + public static final String COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL = "rocketmq_pop_revive_in_message_total"; + public static final String COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL = "rocketmq_pop_revive_out_message_total"; + public static final String COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL = "rocketmq_pop_revive_retry_messages_total"; + + public static final String GAUGE_POP_REVIVE_LAG = "rocketmq_pop_revive_lag"; + public static final String GAUGE_POP_REVIVE_LATENCY = "rocketmq_pop_revive_latency"; + public static final String GAUGE_POP_OFFSET_BUFFER_SIZE = "rocketmq_pop_offset_buffer_size"; + public static final String GAUGE_POP_CHECKPOINT_BUFFER_SIZE = "rocketmq_pop_checkpoint_buffer_size"; + + public static final String LABEL_REVIVE_MESSAGE_TYPE = "revive_message_type"; + public static final String LABEL_PUT_STATUS = "put_status"; + public static final String LABEL_QUEUE_ID = "queue_id"; +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java new file mode 100644 index 00000000000..463371d7e8b --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.metrics; + +import com.google.common.collect.Lists; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.View; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.processor.PopBufferMergeService; +import org.apache.rocketmq.broker.processor.PopReviveService; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.metrics.NopLongCounter; +import org.apache.rocketmq.common.metrics.NopLongHistogram; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.pop.AckMsg; +import org.apache.rocketmq.store.pop.PopCheckPoint; + +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_CHECKPOINT_BUFFER_SIZE; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_OFFSET_BUFFER_SIZE; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LAG; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LATENCY; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_PUT_STATUS; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_QUEUE_ID; +import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_REVIVE_MESSAGE_TYPE; + +public class PopMetricsManager { + public static Supplier attributesBuilderSupplier; + + private static LongHistogram popBufferScanTimeConsume = new NopLongHistogram(); + private static LongCounter popRevivePutTotal = new NopLongCounter(); + private static LongCounter popReviveGetTotal = new NopLongCounter(); + private static LongCounter popReviveRetryMessageTotal = new NopLongCounter(); + + public static List> getMetricsView() { + List rpcCostTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(10).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofSeconds(1).toMillis(), + (double) Duration.ofSeconds(2).toMillis(), + (double) Duration.ofSeconds(3).toMillis() + ); + InstrumentSelector popBufferScanTimeConsumeSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME) + .build(); + View popBufferScanTimeConsumeView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) + .build(); + return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeView)); + } + + public static void initMetrics(Meter meter, BrokerController brokerController, + Supplier attributesBuilderSupplier) { + PopMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier; + + popBufferScanTimeConsume = meter.histogramBuilder(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME) + .setDescription("Time consuming of pop buffer scan") + .setUnit("milliseconds") + .ofLongs() + .build(); + popRevivePutTotal = meter.counterBuilder(COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL) + .setDescription("Total number of put message to revive topic") + .build(); + popReviveGetTotal = meter.counterBuilder(COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL) + .setDescription("Total number of get message from revive topic") + .build(); + popReviveRetryMessageTotal = meter.counterBuilder(COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL) + .setDescription("Total number of put message to pop retry topic") + .build(); + + meter.gaugeBuilder(GAUGE_POP_OFFSET_BUFFER_SIZE) + .setDescription("Time number of buffered offset") + .ofLongs() + .buildWithCallback(measurement -> calculatePopBufferOffsetSize(brokerController, measurement)); + meter.gaugeBuilder(GAUGE_POP_CHECKPOINT_BUFFER_SIZE) + .setDescription("The number of buffered checkpoint") + .ofLongs() + .buildWithCallback(measurement -> calculatePopBufferCkSize(brokerController, measurement)); + meter.gaugeBuilder(GAUGE_POP_REVIVE_LAG) + .setDescription("The processing lag of revive topic") + .setUnit("milliseconds") + .ofLongs() + .buildWithCallback(measurement -> calculatePopReviveLag(brokerController, measurement)); + meter.gaugeBuilder(GAUGE_POP_REVIVE_LATENCY) + .setDescription("The processing latency of revive topic") + .setUnit("milliseconds") + .ofLongs() + .buildWithCallback(measurement -> calculatePopReviveLatency(brokerController, measurement)); + } + + private static void calculatePopBufferOffsetSize(BrokerController brokerController, + ObservableLongMeasurement measurement) { + PopBufferMergeService popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService(); + measurement.record(popBufferMergeService.getOffsetTotalSize(), newAttributesBuilder().build()); + } + + private static void calculatePopBufferCkSize(BrokerController brokerController, + ObservableLongMeasurement measurement) { + PopBufferMergeService popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService(); + measurement.record(popBufferMergeService.getBufferedCKSize(), newAttributesBuilder().build()); + } + + private static void calculatePopReviveLatency(BrokerController brokerController, + ObservableLongMeasurement measurement) { + PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices(); + for (PopReviveService popReviveService : popReviveServices) { + measurement.record(popReviveService.getReviveBehindMillis(), newAttributesBuilder() + .put(LABEL_QUEUE_ID, popReviveService.getQueueId()) + .build()); + } + } + + private static void calculatePopReviveLag(BrokerController brokerController, + ObservableLongMeasurement measurement) { + PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices(); + for (PopReviveService popReviveService : popReviveServices) { + measurement.record(popReviveService.getReviveBehindMessages(), newAttributesBuilder() + .put(LABEL_QUEUE_ID, popReviveService.getQueueId()) + .build()); + } + } + + public static void incPopReviveAckPutCount(AckMsg ackMsg, PutMessageStatus status) { + incPopRevivePutCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), PopReviveMessageType.ACK, status, 1); + } + + public static void incPopReviveCkPutCount(PopCheckPoint checkPoint, PutMessageStatus status) { + incPopRevivePutCount(checkPoint.getCId(), checkPoint.getTopic(), PopReviveMessageType.CK, status, 1); + } + + public static void incPopRevivePutCount(String group, String topic, PopReviveMessageType messageType, + PutMessageStatus status, int num) { + Attributes attributes = newAttributesBuilder() + .put(LABEL_CONSUMER_GROUP, group) + .put(LABEL_TOPIC, topic) + .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name()) + .put(LABEL_PUT_STATUS, status.name()) + .build(); + popRevivePutTotal.add(num, attributes); + } + + public static void incPopReviveAckGetCount(AckMsg ackMsg, int queueId) { + incPopReviveGetCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), PopReviveMessageType.ACK, queueId, 1); + } + + public static void incPopReviveCkGetCount(PopCheckPoint checkPoint, int queueId) { + incPopReviveGetCount(checkPoint.getCId(), checkPoint.getTopic(), PopReviveMessageType.CK, queueId, 1); + } + + public static void incPopReviveGetCount(String group, String topic, PopReviveMessageType messageType, int queueId, + int num) { + AttributesBuilder builder = newAttributesBuilder(); + Attributes attributes = builder + .put(LABEL_CONSUMER_GROUP, group) + .put(LABEL_TOPIC, topic) + .put(LABEL_QUEUE_ID, queueId) + .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name()) + .build(); + popReviveGetTotal.add(num, attributes); + } + + public static void incPopReviveRetryMessageCount(PopCheckPoint checkPoint, PutMessageStatus status) { + AttributesBuilder builder = newAttributesBuilder(); + Attributes attributes = builder + .put(LABEL_CONSUMER_GROUP, checkPoint.getCId()) + .put(LABEL_TOPIC, checkPoint.getTopic()) + .put(LABEL_PUT_STATUS, status.name()) + .build(); + popReviveRetryMessageTotal.add(1, attributes); + } + + public static void recordPopBufferScanTimeConsume(long time) { + popBufferScanTimeConsume.record(time, newAttributesBuilder().build()); + } + + public static AttributesBuilder newAttributesBuilder() { + return attributesBuilderSupplier != null ? attributesBuilderSupplier.get() : Attributes.builder(); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java new file mode 100644 index 00000000000..3f6fe9c4750 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.metrics; + +public enum PopReviveMessageType { + CK, + ACK +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 2653de0f50f..1985c22d6e8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.TopicConfig; @@ -58,6 +59,10 @@ public AckMessageProcessor(final BrokerController brokerController) { } } + public PopReviveService[] getPopReviveServices() { + return popReviveServices; + } + public void startPopReviveService() { for (PopReviveService popReviveService : popReviveServices) { popReviveService.start(); @@ -159,7 +164,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re } try { oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), requestHeader.getQueueId()); + requestHeader.getTopic(), requestHeader.getQueueId()); if (requestHeader.getOffset() < oldOffset) { return response; } @@ -216,6 +221,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { POP_LOGGER.error("put ack msg error:" + putMessageResult); } + PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); decInFlightMessageNum(requestHeader); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 78f50c92b1c..17e1e86c94d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1426,10 +1426,11 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, return response; } - private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetAllProducerInfoRequestHeader requestHeader = - (GetAllProducerInfoRequestHeader) request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class); + (GetAllProducerInfoRequestHeader) request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class); ProducerTableInfo producerTable = this.brokerController.getProducerManager().getProducerTable(); if (producerTable != null) { @@ -1443,6 +1444,7 @@ private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx, RemotingCo response.setCode(ResponseCode.SYSTEM_ERROR); return response; } + private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -1692,13 +1694,13 @@ private Long searchOffsetByTimestamp(String topic, int queueId, long timestamp) /** * Reset consumer offset. * - * @param topic Required, not null. - * @param group Required, not null. - * @param queueId if target queue ID is negative, all message queues will be reset; - * otherwise, only the target queue would get reset. - * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; - * otherwise, binary search is performed to locate target offset. - * @param offset Target offset to reset to if target queue ID is properly provided. + * @param topic Required, not null. + * @param group Required, not null. + * @param queueId if target queue ID is negative, all message queues will be reset; + * otherwise, only the target queue would get reset. + * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; + * otherwise, binary search is performed to locate target offset. + * @param offset Target offset to reset to if target queue ID is properly provided. * @return Affected queues and their new offset */ private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) { @@ -2260,8 +2262,8 @@ private HashMap prepareRuntimeInfo() { runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); if (this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) { - runtimeInfo.put("timerReadBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getReadBehind())); - runtimeInfo.put("timerOffsetBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getOffsetBehind())); + runtimeInfo.put("timerReadBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind())); + runtimeInfo.put("timerOffsetBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehindMessages())); runtimeInfo.put("timerCongestNum", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getAllCongestNum())); runtimeInfo.put("timerEnqueueTps", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueTps())); runtimeInfo.put("timerDequeueTps", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueTps())); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index 91e176f8c31..2ccdf07f6aa 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; @@ -127,7 +128,8 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re return response; } - protected RemotingCommand processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader requestHeader, String[] extraInfo, RemotingCommand response, ChangeInvisibleTimeResponseHeader responseHeader) { + protected RemotingCommand processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader requestHeader, + String[] extraInfo, RemotingCommand response, ChangeInvisibleTimeResponseHeader responseHeader) { long popTime = ExtraInfoUtil.getPopTime(extraInfo); long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); @@ -194,6 +196,7 @@ private void ackOrigin(final ChangeInvisibleTimeRequestHeader requestHeader, Str && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}", ackMsg, putMessageResult); } + PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); } private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader requestHeader, int reviveQid, @@ -209,7 +212,7 @@ private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader ck.setStartOffset(offset); ck.setCId(requestHeader.getConsumerGroup()); ck.setTopic(requestHeader.getTopic()); - ck.setQueueId((byte) queueId); + ck.setQueueId(queueId); ck.addDiff(0); ck.setBrokerName(brokerName); @@ -229,9 +232,12 @@ private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader ck.getReviveTime(), putMessageResult); } - if (putMessageResult != null && putMessageResult.isOk()) { - this.brokerController.getBrokerStatsManager().incBrokerCkNums(1); - this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); + if (putMessageResult != null) { + PopMetricsManager.incPopReviveCkPutCount(ck, putMessageResult.getPutMessageStatus()); + if (putMessageResult.isOk()) { + this.brokerController.getBrokerStatsManager().incBrokerCkNums(1); + this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); + } } return putMessageResult; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 4167438e981..4d6359c1dcb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -24,16 +24,17 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.BrokerRole; @@ -104,7 +105,7 @@ public void run() { this.waitForRunning(interval); - if (!this.serving && this.buffer.size() == 0 && totalSize() == 0) { + if (!this.serving && this.buffer.size() == 0 && getOffsetTotalSize() == 0) { this.serving = true; } } catch (Throwable e) { @@ -121,7 +122,7 @@ public void run() { if (!isShouldRunning()) { return; } - while (this.buffer.size() > 0 || totalSize() > 0) { + while (this.buffer.size() > 0 || getOffsetTotalSize() > 0) { scan(); } } @@ -304,6 +305,7 @@ private void scan() { eclipse, count, countCk, counter.get(), offsetBufferSize); } } + PopMetricsManager.recordPopBufferScanTimeConsume(eclipse); scanTimes++; if (scanTimes >= countOfMinute1) { @@ -312,7 +314,7 @@ private void scan() { } } - private int totalSize() { + public int getOffsetTotalSize() { int count = 0; Iterator>> iterator = this.commitOffsets.entrySet().iterator(); while (iterator.hasNext()) { @@ -323,6 +325,10 @@ private int totalSize() { return count; } + public int getBufferedCKSize() { + return this.counter.get(); + } + private void markBitCAS(AtomicInteger setBits, int index) { while (true) { int bits = setBits.get(); @@ -419,7 +425,7 @@ public void addCkMock(String group, String topic, int queueId, long startOffset, ck.setStartOffset(startOffset); ck.setCId(group); ck.setTopic(topic); - ck.setQueueId((byte) queueId); + ck.setQueueId(queueId); ck.setBrokerName(brokerName); PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, Long.MAX_VALUE, ck, nextBeginOffset, true); @@ -540,6 +546,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean } MessageExtBrokerInner msgInner = popMessageProcessor.buildCkMsg(pointWrapper.getCk(), pointWrapper.getReviveQueueId()); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); + PopMetricsManager.incPopReviveCkPutCount(pointWrapper.getCk(), putMessageResult.getPutMessageStatus()); if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT @@ -584,6 +591,7 @@ private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgI msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); + PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 647d2e8a928..cd459532693 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -809,7 +809,7 @@ private void appendCheckPoint(final PopMessageRequestHeader requestHeader, ck.setStartOffset(offset); ck.setCId(requestHeader.getConsumerGroup()); ck.setTopic(topic); - ck.setQueueId((byte) queueId); + ck.setQueueId(queueId); ck.setBrokerName(brokerName); for (Long msgQueueOffset : getMessageTmpResult.getMessageQueueOffset()) { ck.addDiff((int) (msgQueueOffset - offset)); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index d0e9dbc36f2..f451c6047e2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; +import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.KeyBuilder; @@ -66,6 +67,7 @@ public class PopReviveService extends ServiceThread { private int queueId; private BrokerController brokerController; private String reviveTopic; + private long currentReviveMessageTimestamp = -1; private volatile boolean shouldRunPopRevive = false; private final NavigableMap> inflightReviveRequestMap = Collections.synchronizedNavigableMap(new TreeMap<>()); @@ -86,6 +88,10 @@ public String getServiceName() { return "PopReviveService_" + this.queueId; } + public int getQueueId() { + return queueId; + } + public void setShouldRunPopRevive(final boolean shouldRunPopRevive) { this.shouldRunPopRevive = shouldRunPopRevive; } @@ -120,6 +126,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId()); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); + PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus()); if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(), @@ -131,7 +138,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) return false; } this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint); - this.brokerController.getBrokerStatsManager().incBrokerPutNums(1); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1); this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); if (brokerController.getPopMessageProcessor() != null) { @@ -197,11 +204,13 @@ private boolean reachTail(PullResult pullResult, long offset) { || pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL && offset == pullResult.getMaxOffset(); } - private CompletableFuture> getBizMessage(String topic, long offset, int queueId, String brokerName) { + private CompletableFuture> getBizMessage(String topic, long offset, int queueId, + String brokerName) { return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false); } - public PullResult getMessage(String group, String topic, int queueId, long offset, int nums, boolean deCompressBody) { + public PullResult getMessage(String group, String topic, int queueId, long offset, int nums, + boolean deCompressBody) { GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(group, topic, queueId, offset, nums, null); if (getMessageResult != null) { @@ -315,7 +324,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { List messageExts = getReviveMessage(offset, queueId); if (messageExts == null || messageExts.isEmpty()) { long old = endTime; - long timerDelay = brokerController.getMessageStore().getTimerMessageStore().getReadBehind(); + long timerDelay = brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind(); long commitLogDelay = brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehind(); // move endTime if (endTime != 0 && System.currentTimeMillis() - endTime > 3 * PopAckConstants.SECOND && timerDelay <= 0 && commitLogDelay <= 0) { @@ -355,6 +364,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { continue; } map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point); + PopMetricsManager.incPopReviveCkGetCount(point, queueId); point.setReviveOffset(messageExt.getQueueOffset()); if (firstRt == 0) { firstRt = point.getReviveTime(); @@ -365,6 +375,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw); } AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); + PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId); String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime(); PopCheckPoint point = map.get(mergeKey); if (point == null) { @@ -408,7 +419,7 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) { PopCheckPoint point = new PopCheckPoint(); point.setStartOffset(ackMsg.getStartOffset()); point.setPopTime(ackMsg.getPopTime()); - point.setQueueId((byte) ackMsg.getQueueId()); + point.setQueueId(ackMsg.getQueueId()); point.setCId(ackMsg.getConsumerGroup()); point.setTopic(ackMsg.getTopic()); point.setNum((byte) 0); @@ -555,6 +566,25 @@ private void rePutCK(PopCheckPoint oldCK, Pair pair) { brokerController.getMessageStore().putMessage(ckMsg); } + public long getReviveBehindMillis() { + if (currentReviveMessageTimestamp <= 0) { + return 0; + } + long maxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId); + if (maxOffset - reviveOffset > 1) { + return Math.max(0, System.currentTimeMillis() - currentReviveMessageTimestamp); + } + return 0; + } + + public long getReviveBehindMessages() { + if (currentReviveMessageTimestamp <= 0) { + return 0; + } + long diff = brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId) - reviveOffset; + return Math.max(0, diff); + } + @Override public void run() { int slow = 1; @@ -586,7 +616,10 @@ public void run() { long delay = 0; if (sortList != null && !sortList.isEmpty()) { delay = (System.currentTimeMillis() - sortList.get(0).getReviveTime()) / 1000; + currentReviveMessageTimestamp = sortList.get(0).getReviveTime(); slow = 1; + } else { + currentReviveMessageTimestamp = System.currentTimeMillis(); } POP_LOGGER.info("reviveQueueId={},revive finish,old offset is {}, new offset is {}, ckDelay={} ", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index dbc87a870b1..b2db356c8a4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -295,7 +295,7 @@ private void handlePutMessageResult(PutMessageResult putMessageResult, this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); - this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum()); if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) { Attributes attributes = BrokerMetricsManager.newAttributesBuilder() diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 45517e1bbf3..6faa7525b58 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -429,7 +429,7 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); - this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum()); this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(), queueIdInt, (int) (this.brokerController.getMessageStore().now() - beginTimeMillis)); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 372fb83ead4..26e757ab41a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -758,7 +758,7 @@ public void onSuccess(PutMessageResult result) { ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1); ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes()); - ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum()); + ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic, result.getAppendMessageResult().getMsgNum()); attributes = BrokerMetricsManager.newAttributesBuilder() .put(LABEL_TOPIC, topic) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java index 1a53b9468d5..acc7a3da74a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java @@ -100,7 +100,7 @@ public void testBasic() throws Exception { ck.setCId(group); ck.setTopic(topic); int queueId = 0; - ck.setQueueId((byte) queueId); + ck.setQueueId(queueId); int reviveQid = 0; long nextBeginOffset = 101L; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java index 3b509196bc1..4e83ac74908 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java @@ -53,7 +53,7 @@ public void testNum() { PopCheckPoint popCheckPoint = new PopCheckPoint(); popCheckPoint.setTopic(topic); popCheckPoint.setCId(group); - popCheckPoint.setQueueId((byte) 0); + popCheckPoint.setQueueId(0); popCheckPoint.setPopTime(System.currentTimeMillis()); counter.decrementInFlightMessageNum(popCheckPoint); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index 79fe6d587b5..1c3a0cd459a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -90,7 +90,7 @@ public void before() { when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore); - when(timerMessageStore.getReadBehind()).thenReturn(0L); + when(timerMessageStore.getDequeueBehind()).thenReturn(0L); when(timerMessageStore.getEnqueueBehind()).thenReturn(0L); when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new TopicConfig()); @@ -106,7 +106,7 @@ public void testWhenAckMoreThanCk() throws Throwable { long maxReviveOffset = 4; when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, REVIVE_TOPIC, REVIVE_QUEUE_ID)) - .thenReturn(0L); + .thenReturn(0L); List reviveMessageExtList = new ArrayList<>(); long basePopTime = System.currentTimeMillis(); { @@ -208,7 +208,7 @@ public static PopCheckPoint buildPopCheckPoint(long startOffset, long popTime, l PopCheckPoint ck = new PopCheckPoint(); ck.setStartOffset(startOffset); ck.setPopTime(popTime); - ck.setQueueId((byte) 0); + ck.setQueueId(0); ck.setCId(GROUP); ck.setTopic(TOPIC); ck.setNum((byte) 1); @@ -249,14 +249,15 @@ public static MessageExtBrokerInner buildCkMsg(PopCheckPoint ck) { return msgInner; } - public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long deliverMs, long reviveOffset, long deliverTime) { + public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long deliverMs, long reviveOffset, + long deliverTime) { MessageExtBrokerInner messageExtBrokerInner = buildAckInnerMessage( - REVIVE_TOPIC, - ackMsg, - REVIVE_QUEUE_ID, - STORE_HOST, - deliverMs, - PopMessageProcessor.genAckUniqueId(ackMsg) + REVIVE_TOPIC, + ackMsg, + REVIVE_QUEUE_ID, + STORE_HOST, + deliverMs, + PopMessageProcessor.genAckUniqueId(ackMsg) ); messageExtBrokerInner.setQueueOffset(reviveOffset); messageExtBrokerInner.setDeliverTimeMs(deliverMs); @@ -264,7 +265,8 @@ public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long deliverMs, l return messageExtBrokerInner; } - public static MessageExtBrokerInner buildAckInnerMessage(String reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs, String ackUniqueId) { + public static MessageExtBrokerInner buildAckInnerMessage(String reviveTopic, AckMsg ackMsg, int reviveQid, + SocketAddress host, long deliverMs, String ackUniqueId) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(reviveTopic); msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset)); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 793778e0371..c6631cb5e1e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -1215,12 +1215,11 @@ private boolean isSetEqual(Set set1, Set set2) { return true; } - if (set1 == null || set2 == null || set1.size() != set2.size() - || set1.size() == 0 || set2.size() == 0) { + if (set1 == null || set2 == null || set1.size() != set2.size() || set1.size() == 0) { return false; } - Iterator iter = set2.iterator(); + Iterator iter = set2.iterator(); boolean isEqual = true; while (iter.hasNext()) { if (!set1.contains(iter.next())) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 3b825e52aa8..4f6fbafddf8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1297,7 +1297,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); - } else if (transactionListener != null) { + } else { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java index 16887b7ff7a..96dc1df1858 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java @@ -124,13 +124,14 @@ public int compareTo(TraceContext o) { @Override public String toString() { StringBuilder sb = new StringBuilder(1024); - sb.append(traceType).append("_").append(groupName) - .append("_").append(regionId).append("_").append(isSuccess).append("_"); + sb.append("TraceContext{").append(traceType).append("_").append(groupName).append("_") + .append(regionId).append("_").append(isSuccess).append("_"); if (traceBeans != null && traceBeans.size() > 0) { for (TraceBean bean : traceBeans) { - sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_"); + sb.append(bean.getMsgId()).append("_").append(bean.getTopic()).append("_"); } } - return "TraceContext{" + sb.toString() + '}'; + sb.append('}'); + return sb.toString(); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java index cec9fce79b0..4a6588e4124 100644 --- a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java +++ b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java @@ -18,45 +18,32 @@ public class FAQUrl { - public static final String APPLY_TOPIC_URL = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String APPLY_TOPIC_URL = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String GROUP_NAME_DUPLICATE_URL = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String GROUP_NAME_DUPLICATE_URL = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String CLIENT_PARAMETER_CHECK_URL = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String CLIENT_PARAMETER_CHECK_URL = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String SUBSCRIPTION_GROUP_NOT_EXIST = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String SUBSCRIPTION_GROUP_NOT_EXIST = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String CLIENT_SERVICE_NOT_OK = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String CLIENT_SERVICE_NOT_OK = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; // FAQ: No route info of this topic, TopicABC - public static final String NO_TOPIC_ROUTE_INFO = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String NO_TOPIC_ROUTE_INFO = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String LOAD_JSON_EXCEPTION = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String LOAD_JSON_EXCEPTION = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String SAME_GROUP_DIFFERENT_TOPIC = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String SAME_GROUP_DIFFERENT_TOPIC = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String MQLIST_NOT_EXIST = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String MQLIST_NOT_EXIST = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String UNEXPECTED_EXCEPTION_URL = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String UNEXPECTED_EXCEPTION_URL = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String SEND_MSG_FAILED = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String SEND_MSG_FAILED = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; - public static final String UNKNOWN_HOST_EXCEPTION = - "https://rocketmq.apache.org/docs/bestPractice/22FAQ"; + public static final String UNKNOWN_HOST_EXCEPTION = "https://rocketmq.apache.org/docs/bestPractice/06FAQ"; private static final String TIP_STRING_BEGIN = "\nSee "; private static final String TIP_STRING_END = " for further details."; diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java index b994276c9d3..1f1b4dd8907 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java @@ -21,12 +21,16 @@ public abstract class ConcurrentHashMapUtils { - private static final boolean IS_JDK8; + private static boolean isJdk8; static { // Java 8 // Java 9+: 9,11,17 - IS_JDK8 = System.getProperty("java.version").startsWith("1.8."); + try { + isJdk8 = System.getProperty("java.version").startsWith("1.8."); + } catch (Exception ignore) { + isJdk8 = true; + } } /** @@ -36,7 +40,7 @@ public abstract class ConcurrentHashMapUtils { * @see https://bugs.openjdk.java.net/browse/JDK-8161372 */ public static V computeIfAbsent(ConcurrentMap map, K key, Function func) { - if (IS_JDK8) { + if (isJdk8) { V v = map.get(key); if (null == v) { v = map.computeIfAbsent(key, func); diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index b78c8546899..2c67e463e6f 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -54,7 +54,7 @@ public static void main(String[] args) throws MQClientException, InterruptedExce * */ // Uncomment the following line while debugging, namesrvAddr should be set to your local address -// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); + producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); /* * Launch the instance. diff --git a/namesrv/src/main/resources/rmq.namesrv.logback.xml b/namesrv/src/main/resources/rmq.namesrv.logback.xml index 6a9411b4731..826f3ca155d 100644 --- a/namesrv/src/main/resources/rmq.namesrv.logback.xml +++ b/namesrv/src/main/resources/rmq.namesrv.logback.xml @@ -88,38 +88,31 @@ - - + - - + - - + - - + - - + - - + - - + diff --git a/proxy/pom.xml b/proxy/pom.xml index f5373e9149d..dff54a22edc 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -53,6 +53,10 @@ org.apache.rocketmq rocketmq-client + + org.apache.rocketmq + rocketmq-acl + io.grpc grpc-netty-shaded diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 66239f0e8be..674eced9124 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + +import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -31,6 +33,7 @@ import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; @@ -64,6 +67,8 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen protected ThreadPoolExecutor producerProcessorExecutor; protected ThreadPoolExecutor consumerProcessorExecutor; + protected static final String ROCKETMQ_HOME = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, + System.getenv(MixAll.ROCKETMQ_HOME_ENV)); protected DefaultMessagingProcessor(ServiceManager serviceManager) { ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); @@ -103,7 +108,7 @@ public static DefaultMessagingProcessor createForLocalMode(BrokerController brok } public static DefaultMessagingProcessor createForClusterMode() { - return createForClusterMode(null); + return createForClusterMode(AclUtils.getAclRPCHook(ROCKETMQ_HOME + MixAll.ACL_CONF_TOOLS_FILE)); } public static DefaultMessagingProcessor createForClusterMode(RPCHook rpcHook) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 8dddb4e35b6..e94397409d0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -521,9 +521,10 @@ public void updateNameServerAddressList(List addrs) { } else if (addrs.size() != old.size()) { update = true; } else { - for (int i = 0; i < addrs.size() && !update; i++) { - if (!old.contains(addrs.get(i))) { + for (String addr : addrs) { + if (!old.contains(addr)) { update = true; + break; } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java index c49546d622d..60cc4e3e227 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java @@ -26,11 +26,11 @@ public abstract class RemotingSerializable { private final static Charset CHARSET_UTF8 = StandardCharsets.UTF_8; public static byte[] encode(final Object obj) { - final String json = toJson(obj, false); - if (json != null) { - return json.getBytes(CHARSET_UTF8); + if (obj == null) { + return null; } - return null; + final String json = toJson(obj, false); + return json.getBytes(CHARSET_UTF8); } public static String toJson(final Object obj, boolean prettyFormat) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java index fc0ea00c2a8..b22906a5ca9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.Random; import java.util.Set; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -41,9 +40,8 @@ public static class MappingAllocator { Map brokerNumMap = new HashMap<>(); Map idToBroker = new HashMap<>(); //used for remapping - Map brokerNumMapBeforeRemapping = null; + Map brokerNumMapBeforeRemapping; int currentIndex = 0; - Random random = new Random(); List leastBrokers = new ArrayList<>(); private MappingAllocator(Map idToBroker, Map brokerNumMap, Map brokerNumMapBeforeRemapping) { this.idToBroker.putAll(idToBroker); @@ -151,7 +149,7 @@ public static Map.Entry checkNameEpochNumConsistence(String topic || brokerConfigMap.isEmpty()) { return null; } - //make sure it it not null + //make sure it is not null long maxEpoch = -1; int maxNum = -1; String scope = null; @@ -224,7 +222,7 @@ public static void makeSureLogicQueueMappingItemImmutable(List attributesBuilderSupplier; @@ -49,6 +60,15 @@ public class DefaultStoreMetricsManager { public static ObservableLongGauge dispatchBehind = new NopObservableLongGauge(); public static ObservableLongGauge messageReserveTime = new NopObservableLongGauge(); + public static ObservableLongGauge timerEnqueueLag = new NopObservableLongGauge(); + public static ObservableLongGauge timerEnqueueLatency = new NopObservableLongGauge(); + public static ObservableLongGauge timerDequeueLag = new NopObservableLongGauge(); + public static ObservableLongGauge timerDequeueLatency = new NopObservableLongGauge(); + public static ObservableLongGauge timingMessages = new NopObservableLongGauge(); + + public static LongCounter timerDequeueTotal = new NopLongCounter(); + public static LongCounter timerEnqueueTotal = new NopLongCounter(); + public static List> getMetricsView() { return Collections.emptyList(); } @@ -95,6 +115,72 @@ public static void init(Meter meter, Supplier attributesBuild } measurement.record(System.currentTimeMillis() - earliestMessageTime, newAttributesBuilder().build()); }); + + timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG) + .setDescription("Timer enqueue messages lag") + .ofLongs() + .buildWithCallback(measurement -> { + TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); + measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build()); + }); + + timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY) + .setDescription("Timer enqueue latency") + .setUnit("milliseconds") + .ofLongs() + .buildWithCallback(measurement -> { + TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); + measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build()); + }); + timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG) + .setDescription("Timer dequeue messages lag") + .ofLongs() + .buildWithCallback(measurement -> { + TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); + measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build()); + }); + timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY) + .setDescription("Timer dequeue latency") + .setUnit("milliseconds") + .ofLongs() + .buildWithCallback(measurement -> { + TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); + measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build()); + }); + timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES) + .setDescription("Current message number in timing") + .ofLongs() + .buildWithCallback(measurement -> { + TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); + timerMessageStore.getTimerMetrics() + .getTimingCount() + .forEach((topic, metric) -> { + measurement.record( + metric.getCount().get(), + newAttributesBuilder().put(LABEL_TOPIC, topic).build() + ); + }); + }); + timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL) + .setDescription("Total number of timer dequeue") + .build(); + timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL) + .setDescription("Total number of timer enqueue") + .build(); + } + + public static void incTimerDequeueCount(String topic) { + timerDequeueTotal.add(1, newAttributesBuilder() + .put(LABEL_TOPIC, topic) + .build()); + } + + public static void incTimerEnqueueCount(String topic) { + AttributesBuilder attributesBuilder = newAttributesBuilder(); + if (topic != null) { + attributesBuilder.put(LABEL_TOPIC, topic); + } + timerEnqueueTotal.add(1, attributesBuilder.build()); } public static AttributesBuilder newAttributesBuilder() { diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java index a65e2d5560e..e041b66d9c5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java @@ -32,7 +32,7 @@ public class PopCheckPoint implements Comparable { @JSONField(name = "n") private byte num; @JSONField(name = "q") - private byte queueId; + private int queueId; @JSONField(name = "t") private String topic; @JSONField(name = "c") @@ -96,11 +96,11 @@ public void setNum(byte num) { this.num = num; } - public byte getQueueId() { + public int getQueueId() { return queueId; } - public void setQueueId(byte queueId) { + public void setQueueId(int queueId) { this.queueId = queueId; } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java index d864dd50a4a..fb717550f40 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java @@ -43,7 +43,7 @@ public void record() { this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning; this.msgPutTotalTodayMorning = - this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); + this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic(); this.msgGetTotalTodayMorning = this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopic(); @@ -84,7 +84,7 @@ public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) { } public long getMsgPutTotalTodayNow() { - return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); + return this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic(); } public long getMsgGetTotalTodayNow() { diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 132ddc3331c..2dd3fc5b52a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -76,6 +76,7 @@ public class BrokerStatsManager { public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS"; public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS"; public static final String BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC = "BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC"; + public static final String BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC = "BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC"; public static final String SNDBCK2DLQ_TIMES = "SNDBCK2DLQ_TIMES"; public static final String COMMERCIAL_OWNER = "Owner"; @@ -190,6 +191,8 @@ public void init() { this.statsTable.put(BROKER_CK_NUMS, new StatsItemSet(BROKER_CK_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, new StatsItemSet(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log)); + this.statsTable.put(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, + new StatsItemSet(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log)); this.statsTable.put(Stats.GROUP_GET_FROM_DISK_NUMS, new StatsItemSet(Stats.GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(Stats.GROUP_GET_FROM_DISK_SIZE, @@ -513,11 +516,12 @@ public void incBrokerPutNums() { this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1); } - public void incBrokerPutNums(final int incValue) { + public void incBrokerPutNums(final String topic, final int incValue) { this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); + incBrokerPutNumsWithoutSystemTopic(topic, incValue); } - public void incBrokerGetNums(String topic, final int incValue) { + public void incBrokerGetNums(final String topic, final int incValue) { this.statsTable.get(Stats.BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); this.incBrokerGetNumsWithoutSystemTopic(topic, incValue); } @@ -537,6 +541,13 @@ public void incBrokerGetNumsWithoutSystemTopic(final String topic, final int inc this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); } + public void incBrokerPutNumsWithoutSystemTopic(final String topic, final int incValue) { + if (TopicValidator.isSystemTopic(topic)) { + return; + } + this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); + } + public long getBrokerGetNumsWithoutSystemTopic() { final StatsItemSet statsItemSet = this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC); if (statsItemSet == null) { @@ -549,6 +560,18 @@ public long getBrokerGetNumsWithoutSystemTopic() { return statsItem.getValue().longValue(); } + public long getBrokerPutNumsWithoutSystemTopic() { + final StatsItemSet statsItemSet = this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC); + if (statsItemSet == null) { + return 0; + } + final StatsItem statsItem = statsItemSet.getStatsItem(this.clusterName); + if (statsItem == null) { + return 0; + } + return statsItem.getValue().longValue(); + } + public void incSendBackNums(final String group, final String topic) { final String statsKey = buildStatsKey(topic, group); this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1); diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 89b93abd0ac..c6ab81df42c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -17,32 +17,6 @@ package org.apache.rocketmq.store.timer; import com.conversantmedia.util.concurrent.DisruptorBlockingQueue; -import java.util.function.Function; -import org.apache.commons.collections.CollectionUtils; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.TopicFilterType; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.store.ConsumeQueue; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.logfile.MappedFile; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.PutMessageResult; -import org.apache.rocketmq.store.SelectMappedBufferResult; -import org.apache.rocketmq.store.config.BrokerRole; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.stats.BrokerStatsManager; - import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -66,6 +40,32 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import org.apache.commons.collections.CollectionUtils; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.util.PerfCounter; public class TimerMessageStore { @@ -1050,7 +1050,7 @@ private int doPut(MessageExtBrokerInner message, boolean roll) throws Exception this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1); this.brokerStatsManager.incTopicPutSize(message.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); - this.brokerStatsManager.incBrokerPutNums(1); + this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1); } return PUT_OK; case SERVICE_NOT_AVAILABLE: @@ -1106,6 +1106,13 @@ private MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll return msgInner; } + private String getRealTopic(MessageExt msgExt) { + if (msgExt == null) { + return null; + } + return msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC); + } + private long formatTimeMs(long timeMs) { return timeMs / precisionMs * precisionMs; } @@ -1300,6 +1307,7 @@ public void run() { req.setLatch(latch); try { perfs.startTick("enqueue_put"); + DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg())); if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) { dequeuePutQueue.put(req); } else { @@ -1414,6 +1422,7 @@ public void run() { } try { perfs.startTick("dequeue_put"); + DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg())); addMetric(tr.getMsg(), -1); MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); @@ -1600,7 +1609,7 @@ public void run() { TimerMessageStore.LOGGER.info("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " + "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}", storeConfig.getBrokerRole(), - format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getReadBehind(), + format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getDequeueBehind(), tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset, enqueuePutQueue.size(), dequeueGetQueue.size(), dequeuePutQueue.size(), getAllCongestNum(), format(lastEnqueueButExpiredStoreTime)); } @@ -1636,22 +1645,34 @@ public boolean isReject(long deliverTimeMs) { return false; } - public long getEnqueueBehind() { + public long getEnqueueBehindMessages() { + long tmpQueueOffset = currQueueOffset; + ConsumeQueue cq = (ConsumeQueue) messageStore.getConsumeQueue(TIMER_TOPIC, 0); + long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue(); + return maxOffsetInQueue - tmpQueueOffset; + } + + public long getEnqueueBehindMillis() { if (System.currentTimeMillis() - lastEnqueueButExpiredTime < 2000) { return (System.currentTimeMillis() - lastEnqueueButExpiredStoreTime) / 1000; } return 0; } - public long getReadBehind() { - return (System.currentTimeMillis() - currReadTimeMs) / 1000; + public long getEnqueueBehind() { + return getEnqueueBehindMillis() / 1000; } - public long getOffsetBehind() { - long tmpQueueOffset = currQueueOffset; - ConsumeQueue cq = (ConsumeQueue) messageStore.getConsumeQueue(TIMER_TOPIC, 0); - long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue(); - return maxOffsetInQueue - tmpQueueOffset; + public long getDequeueBehindMessages() { + return timerWheel.getAllNum(currReadTimeMs); + } + + public long getDequeueBehindMillis() { + return System.currentTimeMillis() - currReadTimeMs; + } + + public long getDequeueBehind() { + return getDequeueBehindMillis() / 1000; } public float getEnqueueTps() { diff --git a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java index c32db16ddcc..a602da09395 100644 --- a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java @@ -199,4 +199,17 @@ public void testIncBrokerGetNumsWithoutSystemTopic() { .getValue().doubleValue()).isEqualTo(1L); assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopic()).isEqualTo(1L); } + + @Test + public void testIncBrokerPutNumsWithoutSystemTopic() { + brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TOPIC, 1); + assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, CLUSTER_NAME) + .getValue().doubleValue()).isEqualTo(1L); + assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L); + + brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TopicValidator.RMQ_SYS_TRACE_TOPIC, 1); + assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, CLUSTER_NAME) + .getValue().doubleValue()).isEqualTo(1L); + assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L); + } } diff --git a/tieredstore/README.md b/tieredstore/README.md index d58b5e0c601..9c8ea6b8aa3 100644 --- a/tieredstore/README.md +++ b/tieredstore/README.md @@ -57,8 +57,8 @@ Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apa ## How to contribute -We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/tiered_storage/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines: +We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines: -1. Extend [TieredFileSegment](https://github.com/apache/rocketmq/blob/tiered_storage/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java) and implement the methods of [TieredStoreProvider](https://github.com/apache/rocketmq/blob/tiered_storage/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java) interface. +1. Extend [TieredFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java) and implement the methods of [TieredStoreProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java) interface. 2. Record metrics where appropriate. See `rocketmq_tiered_store_provider_rpc_latency`, `rocketmq_tiered_store_provider_upload_bytes`, and `rocketmq_tiered_store_provider_download_bytes` 3. No need to maintain your own cache and avoid polluting the page cache. It is already having the read-ahead cache. diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 1d7aeae380f..0b1194953a2 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -166,7 +166,7 @@ public void dispatch(DispatchRequest request) { if (result == AppendResult.SUCCESS) { Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_TOPIC, request.getTopic()) - .put(TieredStoreMetricsConstant.LABEL_QUEUE, request.getQueueId()) + .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, request.getQueueId()) .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase()) .build(); TieredStoreMetricsManager.messagesDispatchTotal.add(1, attributes); @@ -271,7 +271,7 @@ protected void dispatchByMQContainer(TieredMessageQueueContainer container) { } Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic()) - .put(TieredStoreMetricsConstant.LABEL_QUEUE, mq.getQueueId()) + .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, mq.getQueueId()) .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase()) .build(); TieredStoreMetricsManager.messagesDispatchTotal.add(queueOffset - beforeOffset, attributes); @@ -290,7 +290,8 @@ protected void dispatchByMQContainer(TieredMessageQueueContainer container) { } } - public void handleAppendCommitLogResult(AppendResult result, TieredMessageQueueContainer container, long queueOffset, + public void handleAppendCommitLogResult(AppendResult result, TieredMessageQueueContainer container, + long queueOffset, long dispatchOffset, long newCommitLogOffset, int size, long tagCode, ByteBuffer message) { MessageQueue mq = container.getMessageQueue(); String topic = mq.getTopic(); @@ -449,7 +450,7 @@ public void buildCQAndIndexFile() { cqMetricsMap.forEach((messageQueue, count) -> { Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_TOPIC, messageQueue.getTopic()) - .put(TieredStoreMetricsConstant.LABEL_QUEUE, messageQueue.getQueueId()) + .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, messageQueue.getQueueId()) .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase()) .build(); TieredStoreMetricsManager.messagesDispatchTotal.add(count, attributes); @@ -457,7 +458,7 @@ public void buildCQAndIndexFile() { ifMetricsMap.forEach((messageQueue, count) -> { Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_TOPIC, messageQueue.getTopic()) - .put(TieredStoreMetricsConstant.LABEL_QUEUE, messageQueue.getQueueId()) + .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, messageQueue.getQueueId()) .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.INDEX.name().toLowerCase()) .build(); TieredStoreMetricsManager.messagesDispatchTotal.add(count, attributes); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java index 3029d5dd532..ad728151049 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java @@ -40,7 +40,7 @@ public class TieredStoreMetricsConstant { public static final String LABEL_TOPIC = "topic"; public static final String LABEL_GROUP = "group"; - public static final String LABEL_QUEUE = "queue"; + public static final String LABEL_QUEUE_ID = "queue_id"; public static final String LABEL_FILE_TYPE = "file_type"; // blob constants diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java index 69fe28047ab..0b0dfd63a53 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java @@ -70,7 +70,7 @@ import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_PROVIDER_RPC_LATENCY; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_UPLOAD_BYTES; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE; -import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE; +import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE_ID; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.STORAGE_MEDIUM_BLOB; @@ -181,13 +181,13 @@ public static void init(Meter meter, Supplier attributesBuild Attributes commitLogAttributes = newAttributesBuilder() .put(LABEL_TOPIC, mq.getTopic()) - .put(LABEL_QUEUE, mq.getQueueId()) + .put(LABEL_QUEUE_ID, mq.getQueueId()) .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase()) .build(); measurement.record(Math.max(maxOffset - container.getDispatchOffset(), 0), commitLogAttributes); Attributes consumeQueueAttributes = newAttributesBuilder() .put(LABEL_TOPIC, mq.getTopic()) - .put(LABEL_QUEUE, mq.getQueueId()) + .put(LABEL_QUEUE_ID, mq.getQueueId()) .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase()) .build(); measurement.record(Math.max(maxOffset - container.getConsumeQueueMaxOffset(), 0), consumeQueueAttributes); @@ -209,7 +209,7 @@ public static void init(Meter meter, Supplier attributesBuild Attributes commitLogAttributes = newAttributesBuilder() .put(LABEL_TOPIC, mq.getTopic()) - .put(LABEL_QUEUE, mq.getQueueId()) + .put(LABEL_QUEUE_ID, mq.getQueueId()) .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase()) .build(); long commitLogDispatchLatency = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), container.getDispatchOffset()); @@ -221,7 +221,7 @@ public static void init(Meter meter, Supplier attributesBuild Attributes consumeQueueAttributes = newAttributesBuilder() .put(LABEL_TOPIC, mq.getTopic()) - .put(LABEL_QUEUE, mq.getQueueId()) + .put(LABEL_QUEUE_ID, mq.getQueueId()) .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase()) .build(); long consumeQueueDispatchOffset = container.getConsumeQueueMaxOffset(); @@ -307,7 +307,7 @@ public static void init(Meter meter, Supplier attributesBuild MessageQueue mq = container.getMessageQueue(); Attributes attributes = newAttributesBuilder() .put(LABEL_TOPIC, mq.getTopic()) - .put(LABEL_QUEUE, mq.getQueueId()) + .put(LABEL_QUEUE_ID, mq.getQueueId()) .build(); measurement.record(System.currentTimeMillis() - timestamp, attributes); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index fc3e079fe73..6744487f6d8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -53,6 +53,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageConst; @@ -61,6 +62,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.namesrv.NamesrvUtil; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -1658,8 +1660,14 @@ public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, f TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr, timeoutMillis); Iterator> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet().iterator(); while (iterator.hasNext()) { - String topic = iterator.next().getKey(); - if (topicList.getTopicList().contains(topic) || !specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX))) { + TopicConfig topicConfig = iterator.next().getValue(); + if (topicList.getTopicList().contains(topicConfig.getTopicName()) + || TopicValidator.isSystemTopic(topicConfig.getTopicName())) { + iterator.remove(); + } else if (!specialTopic && StringUtils.startsWithAny(topicConfig.getTopicName(), + MixAll.RETRY_GROUP_TOPIC_PREFIX, MixAll.DLQ_GROUP_TOPIC_PREFIX)) { + iterator.remove(); + } else if (!PermName.isValid(topicConfig.getPerm())) { iterator.remove(); } }