From 5633d7abff791954ecd053c72e8f6162eb2aadae Mon Sep 17 00:00:00 2001 From: ruflin Date: Thu, 5 Apr 2018 14:25:53 +0200 Subject: [PATCH 1/2] Provide same data structure as X-Pack for ES node_stats This PR adds a flag `xpack.enabled` to the Elasticsearch Metricbeat module. If enabled the data is transformed to the format as X-Pack Monitoring needs for the UI. It overwrites the index with `monitoring`. A config to enable the feature currently looks as following: ``` metricbeat.modules: - module: elasticsearch metricsets: ["node_stats"] hosts: ["localhost:9200"] period: 1s xpack.enabled: true ``` * Data is sent to the index `.monitoring-es-6-mb-{date}`. This is hard coded. * To enabled the feature `xpack.enabled` has to be set in the module * If a node is master is checked on each request as it can change over time. The master request is not atomic so this information could be inaccurate. The master detection also only works correctly, if data is fetched from only 1 node. * The cluser_uuid is fetched on the first request and cached from then based on the assumption that the cluster_id for a node id does not change over time. * `limit_in_bytes` and `usage_in_bytes` are provided as string by ES and not converted to int as they could overflow in ES. * Experimental message is logged when used. The feature is not documented yet on purpose. As the data goes into a separate index it is expected that monitoring already has loaded the template. --- metricbeat/mb/event.go | 6 + .../node_stats/_meta/data-xpack.json | 148 ++++++++++++ .../_meta/test/node_stats_xpack.623.json | 171 ++++++++++++++ .../elasticsearch/node_stats/data_xpack.go | 223 ++++++++++++++++++ .../elasticsearch/node_stats/node_stats.go | 27 ++- 5 files changed, 571 insertions(+), 4 deletions(-) create mode 100644 metricbeat/module/elasticsearch/node_stats/_meta/data-xpack.json create mode 100644 metricbeat/module/elasticsearch/node_stats/_meta/test/node_stats_xpack.623.json create mode 100644 metricbeat/module/elasticsearch/node_stats/data_xpack.go diff --git a/metricbeat/mb/event.go b/metricbeat/mb/event.go index 34167cb19438..0eb73fd30cc4 100644 --- a/metricbeat/mb/event.go +++ b/metricbeat/mb/event.go @@ -18,6 +18,7 @@ type Event struct { ModuleFields common.MapStr // Fields that will be namespaced under [module]. MetricSetFields common.MapStr // Fields that will be namespaced under [module].[metricset]. + Index string // Index name prefix. If set overwrites the default prefix. Namespace string // Fully qualified namespace to use for MetricSetFields. Timestamp time.Time // Timestamp when the event data was collected. Error error // Error that occurred while collecting the event data. @@ -60,6 +61,11 @@ func (e *Event) BeatEvent(module, metricSet string, modifiers ...EventModifier) e.MetricSetFields = nil } + // Set index prefix to overwrite default + if e.Index != "" { + b.Meta = common.MapStr{"index": e.Index} + } + if e.Error != nil { b.Fields["error"] = common.MapStr{ "message": e.Error.Error(), diff --git a/metricbeat/module/elasticsearch/node_stats/_meta/data-xpack.json b/metricbeat/module/elasticsearch/node_stats/_meta/data-xpack.json new file mode 100644 index 000000000000..82f3df2c0fbc --- /dev/null +++ b/metricbeat/module/elasticsearch/node_stats/_meta/data-xpack.json @@ -0,0 +1,148 @@ +{ + "@timestamp": "2018-04-05T12:17:50.378Z", + "@metadata": { + "beat": "metricbeat", + "type": "doc", + "version": "7.0.0-alpha1" + }, + "cluster_uuid": "elasticsearch", + "interval_ms": 10000, + "type": "node_stats", + "source_node": { + "transport_address": "127.0.0.1:9300", + "ip": "127.0.0.1:9300", + "name": "0F564AX", + "uuid": "0F564AXWTwme40EvgjAyPg", + "host": "127.0.0.1" + }, + "node_stats": { + "node_id": "0F564AXWTwme40EvgjAyPg", + "mlockall": false, + "node_master": true + }, + "beat": { + "hostname": "ruflin", + "version": "7.0.0-alpha1", + "name": "ruflin" + }, + "indices": { + "fs": { + "summary": { + "free_in_bytes": 20373749760, + "available_in_bytes": 20111605760, + "total_in_bytes": 249779191808 + } + }, + "indices": { + "indexing": { + "throttle_time_in_millis": 0, + "index_total": 147, + "index_time_in_millis": 3635 + }, + "search": { + "query_total": 16, + "query_time_in_millis": 261 + }, + "query_cache": { + "hit_count": 0, + "miss_count": 0, + "evictions": 0, + "memory_size_in_bytes": 0 + }, + "fielddata": { + "evictions": 0, + "memory_size_in_bytes": 0 + }, + "segments": { + "index_writer_memory_in_bytes": 0, + "memory_in_bytes": 51216, + "terms_memory_in_bytes": 39654, + "term_vectors_memory_in_bytes": 0, + "version_map_memory_in_bytes": 0, + "stored_fields_memory_in_bytes": 4072, + "points_memory_in_bytes": 406, + "fixed_bit_set_memory_in_bytes": 0, + "count": 13, + "doc_values_memory_in_bytes": 1900, + "norms_memory_in_bytes": 5184 + }, + "request_cache": { + "memory_size_in_bytes": 0, + "evictions": 0, + "hit_count": 0, + "miss_count": 3 + }, + "docs": { + "count": 139 + }, + "store": { + "size_in_bytes": 333573 + } + }, + "os": { + "cpu": {} + }, + "process": { + "max_file_descriptors": 10240, + "cpu": {}, + "open_file_descriptors": 190 + }, + "jvm": { + "mem": { + "heap_used_in_bytes": 225025520, + "heap_used_percent": 21, + "heap_max_in_bytes": 1038876672 + }, + "gc": { + "collectors": { + "young": { + "collection_time_in_millis": 1444, + "collection_count": 8 + }, + "old": { + "collection_count": 8, + "collection_time_in_millis": 1444 + } + } + } + }, + "thread_pool": { + "bulk": { + "threads": 4, + "queue": 0, + "rejected": 0 + }, + "generic": { + "queue": 0, + "rejected": 0, + "threads": 4 + }, + "get": { + "threads": 4, + "queue": 0, + "rejected": 0 + }, + "index": { + "threads": 2, + "queue": 0, + "rejected": 0 + }, + "management": { + "queue": 0, + "rejected": 0, + "threads": 4 + }, + "search": { + "threads": 7, + "queue": 0, + "rejected": 0 + } + } + }, + "metricset": { + "name": "node_stats", + "module": "elasticsearch", + "host": "localhost:9200", + "rtt": 5047 + } +} diff --git a/metricbeat/module/elasticsearch/node_stats/_meta/test/node_stats_xpack.623.json b/metricbeat/module/elasticsearch/node_stats/_meta/test/node_stats_xpack.623.json new file mode 100644 index 000000000000..3446325a7718 --- /dev/null +++ b/metricbeat/module/elasticsearch/node_stats/_meta/test/node_stats_xpack.623.json @@ -0,0 +1,171 @@ +{ + "_index": ".monitoring-es-6-2018.04.05", + "_type": "doc", + "_id": "8Owbk2IBdPgvni_OoEeL", + "_score": 1, + "_source": { + "cluster_uuid": "xsXCfgZ-TKGZTwMAY7Pmsg", + "timestamp": "2018-04-05T00:01:24.088Z", + "interval_ms": 10000, + "type": "node_stats", + "source_node": { + "uuid": "GOTrV1s4RTy06Dg1DxkLhA", + "host": "172.25.132.135", + "transport_address": "172.25.132.135:19503", + "ip": "172.25.132.135", + "name": "instance-0000000001", + "timestamp": "2018-04-05T00:01:24.073Z" + }, + "node_stats": { + "node_id": "GOTrV1s4RTy06Dg1DxkLhA", + "node_master": true, + "mlockall": false, + "indices": { + "docs": { + "count": 631398 + }, + "store": { + "size_in_bytes": 412447617 + }, + "indexing": { + "index_total": 2587515, + "index_time_in_millis": 596592, + "throttle_time_in_millis": 0 + }, + "search": { + "query_total": 164387, + "query_time_in_millis": 21627 + }, + "query_cache": { + "memory_size_in_bytes": 0, + "hit_count": 0, + "miss_count": 29562, + "evictions": 0 + }, + "fielddata": { + "memory_size_in_bytes": 2544, + "evictions": 0 + }, + "segments": { + "count": 187, + "memory_in_bytes": 3587267, + "terms_memory_in_bytes": 2163417, + "stored_fields_memory_in_bytes": 126192, + "term_vectors_memory_in_bytes": 0, + "norms_memory_in_bytes": 170880, + "points_memory_in_bytes": 160182, + "doc_values_memory_in_bytes": 966596, + "index_writer_memory_in_bytes": 145155, + "version_map_memory_in_bytes": 5277, + "fixed_bit_set_memory_in_bytes": 4176 + }, + "request_cache": { + "memory_size_in_bytes": 4870, + "evictions": 0, + "hit_count": 51288, + "miss_count": 16 + } + }, + "os": { + "cpu": { + "load_average": { + "1m": 4.7, + "5m": 4.7, + "15m": 4.51 + } + }, + "cgroup": { + "cpuacct": { + "control_group": "/", + "usage_nanos": 14503393680842 + }, + "cpu": { + "control_group": "/", + "cfs_period_micros": 100000, + "cfs_quota_micros": 236945, + "stat": { + "number_of_elapsed_periods": 2707918, + "number_of_times_throttled": 14684, + "time_throttled_nanos": 1799832211936 + } + }, + "memory": { + "control_group": "/", + "limit_in_bytes": "1073741824", + "usage_in_bytes": "1069973504" + } + } + }, + "process": { + "open_file_descriptors": 716, + "max_file_descriptors": 1048576, + "cpu": { + "percent": 0 + } + }, + "jvm": { + "mem": { + "heap_used_in_bytes": 196405456, + "heap_used_percent": 47, + "heap_max_in_bytes": 415694848 + }, + "gc": { + "collectors": { + "young": { + "collection_count": 55024, + "collection_time_in_millis": 425431 + }, + "old": { + "collection_count": 109, + "collection_time_in_millis": 11415 + } + } + } + }, + "thread_pool": { + "bulk": { + "threads": 2, + "queue": 0, + "rejected": 0 + }, + "generic": { + "threads": 6, + "queue": 0, + "rejected": 0 + }, + "get": { + "threads": 2, + "queue": 0, + "rejected": 0 + }, + "index": { + "threads": 2, + "queue": 0, + "rejected": 0 + }, + "management": { + "threads": 4, + "queue": 0, + "rejected": 0 + }, + "search": { + "threads": 4, + "queue": 0, + "rejected": 0 + }, + "watcher": { + "threads": 10, + "queue": 0, + "rejected": 0 + } + }, + "fs": { + "total": { + "total_in_bytes": 27917287424, + "free_in_bytes": 27203026944, + "available_in_bytes": 27203026944 + } + } + } + } +} diff --git a/metricbeat/module/elasticsearch/node_stats/data_xpack.go b/metricbeat/module/elasticsearch/node_stats/data_xpack.go new file mode 100644 index 000000000000..d225ad53448e --- /dev/null +++ b/metricbeat/module/elasticsearch/node_stats/data_xpack.go @@ -0,0 +1,223 @@ +package node_stats + +import ( + "encoding/json" + + "time" + + "github.com/elastic/beats/libbeat/common" + s "github.com/elastic/beats/libbeat/common/schema" + c "github.com/elastic/beats/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +var ( + sourceNodeXpack = s.Schema{ + "host": c.Str("host"), + "transport_address": c.Str("transport_address"), + "ip": c.Str("ip"), + "name": c.Str("name"), + } + + schemaXpack = s.Schema{ + "indices": c.Dict("indices", s.Schema{ + "docs": c.Dict("docs", s.Schema{ + "count": c.Int("count"), + }), + "store": c.Dict("store", s.Schema{ + "size_in_bytes": c.Int("size_in_bytes"), + }), + "indexing": c.Dict("indexing", s.Schema{ + "index_total": c.Int("index_total"), + "index_time_in_millis": c.Int("index_time_in_millis"), + "throttle_time_in_millis": c.Int("throttle_time_in_millis"), + }), + "search": c.Dict("search", s.Schema{ + "query_total": c.Int("query_total"), + "query_time_in_millis": c.Int("query_time_in_millis"), + }), + "query_cache": c.Dict("query_cache", s.Schema{ + "memory_size_in_bytes": c.Int("memory_size_in_bytes"), + "hit_count": c.Int("hit_count"), + "miss_count": c.Int("miss_count"), + "evictions": c.Int("evictions"), + }), + "fielddata": c.Dict("fielddata", s.Schema{ + "memory_size_in_bytes": c.Int("memory_size_in_bytes"), + "evictions": c.Int("evictions"), + }), + "segments": c.Dict("segments", s.Schema{ + "count": c.Int("count"), + "memory_in_bytes": c.Int("memory_in_bytes"), + "terms_memory_in_bytes": c.Int("terms_memory_in_bytes"), + "stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"), + "term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"), + "norms_memory_in_bytes": c.Int("norms_memory_in_bytes"), + "points_memory_in_bytes": c.Int("points_memory_in_bytes"), + "doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"), + "index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"), + "version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"), + "fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"), + }), + "request_cache": c.Dict("request_cache", s.Schema{ + "memory_size_in_bytes": c.Int("memory_size_in_bytes"), + "evictions": c.Int("evictions"), + "hit_count": c.Int("hit_count"), + "miss_count": c.Int("miss_count"), + }), + }), + "os": c.Dict("os", s.Schema{ + "cpu": c.Dict("cpu", s.Schema{ + "load_average": c.Dict("load_average", s.Schema{ + "1m": c.Float("1m"), + "5m": c.Float("5m"), + "15m": c.Float("15m"), + }), + }), + "cgroup": c.Dict("cgroup", s.Schema{ + "cpuacct": c.Dict("cpuacct", s.Schema{ + "control_group": c.Str("control_group"), + "usage_nanos": c.Int("usage_nanos"), + }), + "cpu": c.Dict("cpu", s.Schema{ + "control_group": c.Str("control_group"), + "cfs_period_micros": c.Int("cfs_period_micros"), + "cfs_quota_micros": c.Int("cfs_quota_micros"), + "stat": c.Dict("stat", s.Schema{ + "number_of_elapsed_periods": c.Int("number_of_elapsed_periods"), + "number_of_times_throttled": c.Int("number_of_times_throttled"), + "time_throttled_nanos": c.Int("time_throttled_nanos"), + }), + }), + "memory": c.Dict("memory", s.Schema{ + "control_group": c.Str("control_group"), + // The two following values are currently string. See https://github.com/elastic/elasticsearch/pull/26166 + "limit_in_bytes": c.Str("limit_in_bytes"), + "usage_in_bytes": c.Str("usage_in_bytes"), + }), + }), + }), + "process": c.Dict("process", s.Schema{ + "open_file_descriptors": c.Int("open_file_descriptors"), + "max_file_descriptors": c.Int("max_file_descriptors"), + "cpu": c.Dict("cpu", s.Schema{ + "percent": c.Int("percent"), + }), + }), + "jvm": c.Dict("jvm", s.Schema{ + "mem": c.Dict("mem", s.Schema{ + "heap_used_in_bytes": c.Int("heap_used_in_bytes"), + "heap_used_percent": c.Int("heap_used_percent"), + "heap_max_in_bytes": c.Int("heap_max_in_bytes"), + }), + "gc": c.Dict("gc", s.Schema{ + "collectors": c.Dict("collectors", s.Schema{ + "young": c.Dict("young", s.Schema{ + "collection_count": c.Int("collection_count"), + "collection_time_in_millis": c.Int("collection_time_in_millis"), + }), + "old": c.Dict("young", s.Schema{ + "collection_count": c.Int("collection_count"), + "collection_time_in_millis": c.Int("collection_time_in_millis"), + }), + }), + }), + }), + "thread_pool": c.Dict("thread_pool", s.Schema{ + "bulk": c.Dict("bulk", s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + }), + "generic": c.Dict("generic", s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + }), + "get": c.Dict("get", s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + }), + "index": c.Dict("index", s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + }), + "management": c.Dict("management", s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + }), + "search": c.Dict("search", s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + }), + "watcher": c.Dict("watcher", s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + }), + }), + "fs": c.Dict("fs", s.Schema{ + "summary": c.Dict("total", s.Schema{ + "total_in_bytes": c.Int("total_in_bytes"), + "free_in_bytes": c.Int("free_in_bytes"), + "available_in_bytes": c.Int("available_in_bytes"), + }), + }), + } +) + +func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) { + nodesStruct := struct { + ClusterName string `json:"cluster_name"` + Nodes map[string]map[string]interface{} `json:"nodes"` + }{} + + json.Unmarshal(content, &nodesStruct) + + // Normally the nodeStruct should only contain one node. But if _local is removed + // from the path and Metricbeat is not installed on the same machine as the node + // it will provid the data for multiple nodes. This will mean the detection of the + // master node will not be accurate anymore as often in these cases a proxy is in front + // of ES and it's not know if the request will be routed to the same node as before. + for nodeID, node := range nodesStruct.Nodes { + clusterID, err := elasticsearch.GetClusterID(m.http, m.HostData().SanitizedURI, nodeID) + if err != nil { + logp.Err("could not fetch cluster id: %s", err) + continue + } + + isMaster, _ := elasticsearch.IsMaster(m.http, m.HostData().SanitizedURI) + + event := mb.Event{} + // Build source_node object + sourceNode, _ := sourceNodeXpack.Apply(node) + sourceNode["uuid"] = nodeID + + nodeData, _ := schemaXpack.Apply(node) + nodeStats := common.MapStr{ + "node_master": isMaster, + "node_id": nodeID, + } + nodeStats.DeepUpdate(nodeData) + + event.RootFields = common.MapStr{ + "timestamp": time.Now(), + "cluster_uuid": clusterID, + "interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000, + "type": "node_stats", + "source_node": sourceNode, + "node_stats": nodeStats, + } + + // Hard coded index prefix for monitoring, no detection done for ES version at the moment + // It has an additonal md in the name to make it clear the data is coming from metricbeat + event.Index = ".monitoring-es-6-mb" + r.Event(event) + } +} diff --git a/metricbeat/module/elasticsearch/node_stats/node_stats.go b/metricbeat/module/elasticsearch/node_stats/node_stats.go index f603d4f98571..c94bb8d441e5 100644 --- a/metricbeat/module/elasticsearch/node_stats/node_stats.go +++ b/metricbeat/module/elasticsearch/node_stats/node_stats.go @@ -29,20 +29,35 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet - http *helper.HTTP + http *helper.HTTP + xpack bool } // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The elasticsearch node_stats metricset is beta") + config := struct { + XPack bool `config:"xpack.enabled"` + }{ + XPack: false, + } + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + if config.XPack { + cfgwarn.Experimental("The experimental xpack.enabled flag in elasticsearch/node_stats metricset is enabled.") + } + http, err := helper.NewHTTP(base) if err != nil { return nil, err } return &MetricSet{ - base, - http, + BaseMetricSet: base, + http: http, + xpack: config.XPack, }, nil } @@ -54,5 +69,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } - eventsMapping(r, content) + if m.xpack { + eventsMappingXPack(r, m, content) + } else { + eventsMapping(r, content) + } } From 7b01883e12b96d1ad3f0e6aced5c333d8007e41e Mon Sep 17 00:00:00 2001 From: ruflin Date: Thu, 26 Apr 2018 16:42:34 +0200 Subject: [PATCH 2/2] remove deep update --- metricbeat/module/elasticsearch/node_stats/data_xpack.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/metricbeat/module/elasticsearch/node_stats/data_xpack.go b/metricbeat/module/elasticsearch/node_stats/data_xpack.go index d225ad53448e..a26564516bdc 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_xpack.go +++ b/metricbeat/module/elasticsearch/node_stats/data_xpack.go @@ -200,11 +200,8 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) { sourceNode["uuid"] = nodeID nodeData, _ := schemaXpack.Apply(node) - nodeStats := common.MapStr{ - "node_master": isMaster, - "node_id": nodeID, - } - nodeStats.DeepUpdate(nodeData) + nodeData["node_master"] = isMaster + nodeData["node_id"] = nodeID event.RootFields = common.MapStr{ "timestamp": time.Now(), @@ -212,7 +209,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) { "interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000, "type": "node_stats", "source_node": sourceNode, - "node_stats": nodeStats, + "node_stats": nodeData, } // Hard coded index prefix for monitoring, no detection done for ES version at the moment