diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java index 5733aa40bac..0af2ac616c4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java @@ -27,6 +27,8 @@ public class BrokerMetricsConstant { public static final String COUNTER_THROUGHPUT_IN_TOTAL = "rocketmq_throughput_in_total"; public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "rocketmq_throughput_out_total"; public static final String HISTOGRAM_MESSAGE_SIZE = "rocketmq_message_size"; + public static final String HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME = "rocketmq_topic_create_execution_time"; + public static final String HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME = "rocketmq_consumer_group_create_execution_time"; public static final String GAUGE_PRODUCER_CONNECTIONS = "rocketmq_producer_connections"; public static final String GAUGE_CONSUMER_CONNECTIONS = "rocketmq_consumer_connections"; @@ -52,6 +54,7 @@ public class BrokerMetricsConstant { public static final String LABEL_PROCESSOR = "processor"; public static final String LABEL_TOPIC = "topic"; + public static final String LABEL_INVOCATION_STATUS = "invocation_status"; public static final String LABEL_IS_RETRY = "is_retry"; public static final String LABEL_IS_SYSTEM = "is_system"; public static final String LABEL_CONSUMER_GROUP = "consumer_group"; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index fc7e97bda95..0050a0dcd4c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -64,6 +64,7 @@ import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant; import org.slf4j.bridge.SLF4JBridgeHandler; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -92,6 +93,8 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; @@ -135,6 +138,8 @@ public class BrokerMetricsManager { public static LongCounter throughputInTotal = new NopLongCounter(); public static LongCounter throughputOutTotal = new NopLongCounter(); public static LongHistogram messageSize = new NopLongHistogram(); + public static LongHistogram topicCreateExecuteTime = new NopLongHistogram(); + public static LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram(); // client connection metrics public static ObservableLongGauge producerConnection = new NopObservableLongGauge(); @@ -381,6 +386,14 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { 1d * 12 * 60 * 60, //12h 1d * 24 * 60 * 60 //24h ); + + List createTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(10).toMillis(), //10ms + (double) Duration.ofMillis(100).toMillis(), //100ms + (double) Duration.ofSeconds(1).toMillis(), //1s + (double) Duration.ofSeconds(3).toMillis(), //3s + (double) Duration.ofSeconds(5).toMillis() //5s + ); InstrumentSelector messageSizeSelector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) .setName(HISTOGRAM_MESSAGE_SIZE) @@ -401,6 +414,24 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build()); + InstrumentSelector createTopicTimeSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME) + .build(); + InstrumentSelector createSubGroupTimeSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME) + .build(); + ViewBuilder createTopicTimeViewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets)); + ViewBuilder createSubGroupTimeViewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets)); + // To config the cardinalityLimit for openTelemetry metrics exporting. + SdkMeterProviderUtil.setCardinalityLimit(createTopicTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(createTopicTimeSelector, createTopicTimeViewBuilder.build()); + SdkMeterProviderUtil.setCardinalityLimit(createSubGroupTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(createSubGroupTimeSelector, createSubGroupTimeViewBuilder.build()); + for (Pair selectorViewPair : RemotingMetricsManager.getMetricsView()) { ViewBuilder viewBuilder = selectorViewPair.getObject2(); SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); @@ -482,6 +513,18 @@ private void initRequestMetrics() { .setDescription("Incoming messages size") .ofLongs() .build(); + + topicCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME) + .setDescription("The distribution of create topic time") + .ofLongs() + .setUnit("milliseconds") + .build(); + + consumerGroupCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME) + .setDescription("The distribution of create subscription time") + .ofLongs() + .setUnit("milliseconds") + .build(); } private void initConnectionMetrics() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/InvocationStatus.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/InvocationStatus.java new file mode 100644 index 00000000000..c7501e53d96 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/InvocationStatus.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.metrics; + +public enum InvocationStatus { + SUCCESS("success"), + FAILURE("failure"); + + private final String name; + + InvocationStatus(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a1a6f5bf6ce..40a7a461e89 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import io.opentelemetry.api.common.Attributes; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator; @@ -58,6 +59,8 @@ import org.apache.rocketmq.broker.controller.ReplicasManager; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; +import org.apache.rocketmq.broker.metrics.InvocationStatus; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; @@ -212,7 +215,8 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.LibC; - +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -465,45 +469,46 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext String topic = requestHeader.getTopic(); - TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); - if (!result.isValid()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(result.getRemark()); - return response; - } - if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { - if (TopicValidator.isSystemTopic(topic)) { + long executionTime; + try { + TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); + if (!result.isValid()) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The topic[" + topic + "] is conflict with system topic."); + response.setRemark(result.getRemark()); return response; } - } - - TopicConfig topicConfig = new TopicConfig(topic); - topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); - topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); - topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); - topicConfig.setPerm(requestHeader.getPerm()); - topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); - topicConfig.setOrder(requestHeader.getOrder()); - String attributesModification = requestHeader.getAttributes(); - topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification)); + if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { + if (TopicValidator.isSystemTopic(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The topic[" + topic + "] is conflict with system topic."); + return response; + } + } - if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED - && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("MIXED message type is not supported."); - return response; - } + TopicConfig topicConfig = new TopicConfig(topic); + topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); + topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); + topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); + topicConfig.setPerm(requestHeader.getPerm()); + topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); + topicConfig.setOrder(requestHeader.getOrder()); + String attributesModification = requestHeader.getAttributes(); + topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification)); + + if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED + && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MIXED message type is not supported."); + return response; + } - if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { - LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", - requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - response.setCode(ResponseCode.SUCCESS); - return response; - } + if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { + LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", + requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + response.setCode(ResponseCode.SUCCESS); + return response; + } - try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { this.brokerController.registerSingleTopicAll(topicConfig); @@ -517,7 +522,16 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext response.setRemark(e.getMessage()); return response; } - long executionTime = System.currentTimeMillis() - startTime; + finally { + executionTime = System.currentTimeMillis() - startTime; + InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? + InvocationStatus.SUCCESS : InvocationStatus.FAILURE; + Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + .put(LABEL_INVOCATION_STATUS, status.getName()) + .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic)) + .build(); + BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes); + } LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime); return response; } @@ -1468,6 +1482,12 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c response.setRemark(null); long executionTime = System.currentTimeMillis() - startTime; LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime); + InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? + InvocationStatus.SUCCESS : InvocationStatus.FAILURE; + Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + .put(LABEL_INVOCATION_STATUS, status.getName()) + .build(); + BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes); return response; }