From 243d64cc3bb233e7b4f4fae3b5197025d38435cc Mon Sep 17 00:00:00 2001 From: silenceqi Date: Sat, 11 Jan 2025 17:22:28 +0800 Subject: [PATCH] feat: add cluster-level shard state metrics (#78) --- modules/elastic/api/manage.go | 70 ++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/modules/elastic/api/manage.go b/modules/elastic/api/manage.go index b33c74b9..2d0d03e1 100644 --- a/modules/elastic/api/manage.go +++ b/modules/elastic/api/manage.go @@ -534,7 +534,7 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http key := h.GetParameter(req, "key") var metricType string switch key { - case v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey, CircuitBreakerMetricKey: + case v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey, CircuitBreakerMetricKey,ShardStateMetricKey: metricType = v1.MetricTypeNodeStats case ClusterDocumentsMetricKey, ClusterStorageMetricKey, @@ -570,7 +570,73 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http defer cancel() if util.StringInArray([]string{v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey}, key) { metrics, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key) - }else{ + } else if key == ShardStateMetricKey { + clusterUUID, err := h.getClusterUUID(id) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + query := util.MapStr{ + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.cluster_uuid": util.MapStr{ + "value": clusterUUID, + }, + }, + }, + }, + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "elasticsearch", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "shard_stats", + }, + }, + }, + }, + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": min, + "lte": max, + }, + }, + }, + }, + }, + }, + } + shardStateMetric, err := getNodeShardStateMetric(ctx, query, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + metrics = map[string]*common.MetricItem{ + ShardStateMetricKey: shardStateMetric, + } + } else { metrics, err = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key) } if err != nil {