Skip to content

Commit

Permalink
[ISSUE #8211] Add two metrics rocketmq_topic_create_execution_time an…
Browse files Browse the repository at this point in the history
…d rocketmq_consumer_group_create_execution_time (#8212)

* Add tow metric createTopicTime and createSubscriptionTime in broker

* roll back BrokerConfig.java

* Add metric view of createTopicTime and createSubscriptionTime in broker

* Add two metric rocketmq_active_topic_number and rocketmq_active_subscription_number

Signed-off-by: 黄梓淇 <[email protected]>
  • Loading branch information
Stephanie0002 authored May 31, 2024
1 parent 7a5ea90 commit 4027139
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class BrokerMetricsConstant {
public static final String COUNTER_THROUGHPUT_IN_TOTAL = "rocketmq_throughput_in_total";
public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "rocketmq_throughput_out_total";
public static final String HISTOGRAM_MESSAGE_SIZE = "rocketmq_message_size";
public static final String HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME = "rocketmq_topic_create_execution_time";
public static final String HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME = "rocketmq_consumer_group_create_execution_time";

public static final String GAUGE_PRODUCER_CONNECTIONS = "rocketmq_producer_connections";
public static final String GAUGE_CONSUMER_CONNECTIONS = "rocketmq_consumer_connections";
Expand All @@ -52,6 +54,7 @@ public class BrokerMetricsConstant {
public static final String LABEL_PROCESSOR = "processor";

public static final String LABEL_TOPIC = "topic";
public static final String LABEL_INVOCATION_STATUS = "invocation_status";
public static final String LABEL_IS_RETRY = "is_retry";
public static final String LABEL_IS_SYSTEM = "is_system";
public static final String LABEL_CONSUMER_GROUP = "consumer_group";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.slf4j.bridge.SLF4JBridgeHandler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -92,6 +93,8 @@
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
Expand Down Expand Up @@ -135,6 +138,8 @@ public class BrokerMetricsManager {
public static LongCounter throughputInTotal = new NopLongCounter();
public static LongCounter throughputOutTotal = new NopLongCounter();
public static LongHistogram messageSize = new NopLongHistogram();
public static LongHistogram topicCreateExecuteTime = new NopLongHistogram();
public static LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram();

// client connection metrics
public static ObservableLongGauge producerConnection = new NopObservableLongGauge();
Expand Down Expand Up @@ -381,6 +386,14 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
1d * 12 * 60 * 60, //12h
1d * 24 * 60 * 60 //24h
);

List<Double> createTimeBuckets = Arrays.asList(
(double) Duration.ofMillis(10).toMillis(), //10ms
(double) Duration.ofMillis(100).toMillis(), //100ms
(double) Duration.ofSeconds(1).toMillis(), //1s
(double) Duration.ofSeconds(3).toMillis(), //3s
(double) Duration.ofSeconds(5).toMillis() //5s
);
InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_MESSAGE_SIZE)
Expand All @@ -401,6 +414,24 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build());

InstrumentSelector createTopicTimeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
.build();
InstrumentSelector createSubGroupTimeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
.build();
ViewBuilder createTopicTimeViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
ViewBuilder createSubGroupTimeViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
// To config the cardinalityLimit for openTelemetry metrics exporting.
SdkMeterProviderUtil.setCardinalityLimit(createTopicTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(createTopicTimeSelector, createTopicTimeViewBuilder.build());
SdkMeterProviderUtil.setCardinalityLimit(createSubGroupTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(createSubGroupTimeSelector, createSubGroupTimeViewBuilder.build());

for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
ViewBuilder viewBuilder = selectorViewPair.getObject2();
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
Expand Down Expand Up @@ -482,6 +513,18 @@ private void initRequestMetrics() {
.setDescription("Incoming messages size")
.ofLongs()
.build();

topicCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
.setDescription("The distribution of create topic time")
.ofLongs()
.setUnit("milliseconds")
.build();

consumerGroupCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
.setDescription("The distribution of create subscription time")
.ofLongs()
.setUnit("milliseconds")
.build();
}

private void initConnectionMetrics() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.broker.metrics;

public enum InvocationStatus {
SUCCESS("success"),
FAILURE("failure");

private final String name;

InvocationStatus(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.opentelemetry.api.common.Attributes;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
Expand All @@ -58,6 +59,8 @@
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.metrics.InvocationStatus;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
Expand Down Expand Up @@ -212,7 +215,8 @@
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;

public class AdminBrokerProcessor implements NettyRequestProcessor {
Expand Down Expand Up @@ -465,45 +469,46 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext

String topic = requestHeader.getTopic();

TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
if (!result.isValid()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(result.getRemark());
return response;
}
if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
if (TopicValidator.isSystemTopic(topic)) {
long executionTime;
try {
TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
if (!result.isValid()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The topic[" + topic + "] is conflict with system topic.");
response.setRemark(result.getRemark());
return response;
}
}

TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
topicConfig.setOrder(requestHeader.getOrder());
String attributesModification = requestHeader.getAttributes();
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
if (TopicValidator.isSystemTopic(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The topic[" + topic + "] is conflict with system topic.");
return response;
}
}

if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
&& !brokerController.getBrokerConfig().isEnableMixedMessageType()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("MIXED message type is not supported.");
return response;
}
TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
topicConfig.setOrder(requestHeader.getOrder());
String attributesModification = requestHeader.getAttributes();
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));

if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
&& !brokerController.getBrokerConfig().isEnableMixedMessageType()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("MIXED message type is not supported.");
return response;
}

if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) {
LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
response.setCode(ResponseCode.SUCCESS);
return response;
}
if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) {
LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
response.setCode(ResponseCode.SUCCESS);
return response;
}

try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
this.brokerController.registerSingleTopicAll(topicConfig);
Expand All @@ -517,7 +522,16 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
response.setRemark(e.getMessage());
return response;
}
long executionTime = System.currentTimeMillis() - startTime;
finally {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
}
LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime);
return response;
}
Expand Down Expand Up @@ -1468,6 +1482,12 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
response.setRemark(null);
long executionTime = System.currentTimeMillis() - startTime;
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime);
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
return response;
}

Expand Down

0 comments on commit 4027139

Please sign in to comment.