diff --git a/.chloggen/elasticsearch-docs.yaml b/.chloggen/elasticsearch-docs.yaml new file mode 100644 index 000000000000..36078981a341 --- /dev/null +++ b/.chloggen/elasticsearch-docs.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add document count metrics on index level + +# One or more tracking issues related to the change +issues: [14635] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/elasticsearchreceiver/documentation.md b/receiver/elasticsearchreceiver/documentation.md index 2c9f7611b231..4b3545ca80c7 100644 --- a/receiver/elasticsearchreceiver/documentation.md +++ b/receiver/elasticsearchreceiver/documentation.md @@ -25,6 +25,7 @@ These are the metrics available for this scraper. | elasticsearch.index.cache.evictions | The number of evictions from the cache for an index. | {evictions} | Sum(Int) | | | elasticsearch.index.cache.memory.usage | The size in bytes of the cache for an index. | By | Sum(Int) | | | elasticsearch.index.cache.size | The number of elements of the query cache for an index. | 1 | Sum(Int) | | +| elasticsearch.index.documents | The number of documents for an index. | {documents} | Sum(Int) | | | **elasticsearch.index.operations.completed** | The number of operations completed for an index. | {operations} | Sum(Int) | | | elasticsearch.index.operations.merge.docs_count | The total number of documents in merge operations for an index. | {documents} | Sum(Int) | | | elasticsearch.index.operations.merge.size | The total size of merged segments for an index. | By | Sum(Int) | | diff --git a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go index af847a5d99f5..a28571425cb4 100644 --- a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go +++ b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go @@ -54,6 +54,7 @@ type MetricsSettings struct { ElasticsearchIndexCacheEvictions MetricSettings `mapstructure:"elasticsearch.index.cache.evictions"` ElasticsearchIndexCacheMemoryUsage MetricSettings `mapstructure:"elasticsearch.index.cache.memory.usage"` ElasticsearchIndexCacheSize MetricSettings `mapstructure:"elasticsearch.index.cache.size"` + ElasticsearchIndexDocuments MetricSettings `mapstructure:"elasticsearch.index.documents"` ElasticsearchIndexOperationsCompleted MetricSettings `mapstructure:"elasticsearch.index.operations.completed"` ElasticsearchIndexOperationsMergeDocsCount MetricSettings `mapstructure:"elasticsearch.index.operations.merge.docs_count"` ElasticsearchIndexOperationsMergeSize MetricSettings `mapstructure:"elasticsearch.index.operations.merge.size"` @@ -176,6 +177,9 @@ func DefaultMetricsSettings() MetricsSettings { ElasticsearchIndexCacheSize: MetricSettings{ Enabled: false, }, + ElasticsearchIndexDocuments: MetricSettings{ + Enabled: false, + }, ElasticsearchIndexOperationsCompleted: MetricSettings{ Enabled: true, }, @@ -1842,6 +1846,60 @@ func newMetricElasticsearchIndexCacheSize(settings MetricSettings) metricElastic return m } +type metricElasticsearchIndexDocuments struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills elasticsearch.index.documents metric with initial data. +func (m *metricElasticsearchIndexDocuments) init() { + m.data.SetName("elasticsearch.index.documents") + m.data.SetDescription("The number of documents for an index.") + m.data.SetUnit("{documents}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + m.data.Sum().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricElasticsearchIndexDocuments) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, documentStateAttributeValue string, indexAggregationTypeAttributeValue string) { + if !m.settings.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) + dp.Attributes().PutStr("state", documentStateAttributeValue) + dp.Attributes().PutStr("aggregation", indexAggregationTypeAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricElasticsearchIndexDocuments) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricElasticsearchIndexDocuments) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricElasticsearchIndexDocuments(settings MetricSettings) metricElasticsearchIndexDocuments { + m := metricElasticsearchIndexDocuments{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + type metricElasticsearchIndexOperationsCompleted struct { data pmetric.Metric // data buffer for generated metric. settings MetricSettings // metric settings provided by user. @@ -5321,6 +5379,7 @@ type MetricsBuilder struct { metricElasticsearchIndexCacheEvictions metricElasticsearchIndexCacheEvictions metricElasticsearchIndexCacheMemoryUsage metricElasticsearchIndexCacheMemoryUsage metricElasticsearchIndexCacheSize metricElasticsearchIndexCacheSize + metricElasticsearchIndexDocuments metricElasticsearchIndexDocuments metricElasticsearchIndexOperationsCompleted metricElasticsearchIndexOperationsCompleted metricElasticsearchIndexOperationsMergeDocsCount metricElasticsearchIndexOperationsMergeDocsCount metricElasticsearchIndexOperationsMergeSize metricElasticsearchIndexOperationsMergeSize @@ -5422,6 +5481,7 @@ func NewMetricsBuilder(settings MetricsSettings, buildInfo component.BuildInfo, metricElasticsearchIndexCacheEvictions: newMetricElasticsearchIndexCacheEvictions(settings.ElasticsearchIndexCacheEvictions), metricElasticsearchIndexCacheMemoryUsage: newMetricElasticsearchIndexCacheMemoryUsage(settings.ElasticsearchIndexCacheMemoryUsage), metricElasticsearchIndexCacheSize: newMetricElasticsearchIndexCacheSize(settings.ElasticsearchIndexCacheSize), + metricElasticsearchIndexDocuments: newMetricElasticsearchIndexDocuments(settings.ElasticsearchIndexDocuments), metricElasticsearchIndexOperationsCompleted: newMetricElasticsearchIndexOperationsCompleted(settings.ElasticsearchIndexOperationsCompleted), metricElasticsearchIndexOperationsMergeDocsCount: newMetricElasticsearchIndexOperationsMergeDocsCount(settings.ElasticsearchIndexOperationsMergeDocsCount), metricElasticsearchIndexOperationsMergeSize: newMetricElasticsearchIndexOperationsMergeSize(settings.ElasticsearchIndexOperationsMergeSize), @@ -5579,6 +5639,7 @@ func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { mb.metricElasticsearchIndexCacheEvictions.emit(ils.Metrics()) mb.metricElasticsearchIndexCacheMemoryUsage.emit(ils.Metrics()) mb.metricElasticsearchIndexCacheSize.emit(ils.Metrics()) + mb.metricElasticsearchIndexDocuments.emit(ils.Metrics()) mb.metricElasticsearchIndexOperationsCompleted.emit(ils.Metrics()) mb.metricElasticsearchIndexOperationsMergeDocsCount.emit(ils.Metrics()) mb.metricElasticsearchIndexOperationsMergeSize.emit(ils.Metrics()) @@ -5750,6 +5811,11 @@ func (mb *MetricsBuilder) RecordElasticsearchIndexCacheSizeDataPoint(ts pcommon. mb.metricElasticsearchIndexCacheSize.recordDataPoint(mb.startTime, ts, val, indexAggregationTypeAttributeValue.String()) } +// RecordElasticsearchIndexDocumentsDataPoint adds a data point to elasticsearch.index.documents metric. +func (mb *MetricsBuilder) RecordElasticsearchIndexDocumentsDataPoint(ts pcommon.Timestamp, val int64, documentStateAttributeValue AttributeDocumentState, indexAggregationTypeAttributeValue AttributeIndexAggregationType) { + mb.metricElasticsearchIndexDocuments.recordDataPoint(mb.startTime, ts, val, documentStateAttributeValue.String(), indexAggregationTypeAttributeValue.String()) +} + // RecordElasticsearchIndexOperationsCompletedDataPoint adds a data point to elasticsearch.index.operations.completed metric. func (mb *MetricsBuilder) RecordElasticsearchIndexOperationsCompletedDataPoint(ts pcommon.Timestamp, val int64, operationAttributeValue AttributeOperation, indexAggregationTypeAttributeValue AttributeIndexAggregationType) { mb.metricElasticsearchIndexOperationsCompleted.recordDataPoint(mb.startTime, ts, val, operationAttributeValue.String(), indexAggregationTypeAttributeValue.String()) diff --git a/receiver/elasticsearchreceiver/metadata.yaml b/receiver/elasticsearchreceiver/metadata.yaml index 70ffc26c68f5..c8b8c0e66a9b 100644 --- a/receiver/elasticsearchreceiver/metadata.yaml +++ b/receiver/elasticsearchreceiver/metadata.yaml @@ -905,3 +905,12 @@ metrics: value_type: int attributes: [cache_name, index_aggregation_type] enabled: false + elasticsearch.index.documents: + description: The number of documents for an index. + unit: "{documents}" + sum: + monotonic: false + aggregation: cumulative + value_type: int + attributes: [document_state, index_aggregation_type] + enabled: false diff --git a/receiver/elasticsearchreceiver/scraper.go b/receiver/elasticsearchreceiver/scraper.go index 77715c209c07..fc52a7fdef73 100644 --- a/receiver/elasticsearchreceiver/scraper.go +++ b/receiver/elasticsearchreceiver/scraper.go @@ -354,7 +354,7 @@ func (r *elasticsearchScraper) scrapeIndicesMetrics(ctx context.Context, now pco indexStats, err := r.client.IndexStats(ctx, r.cfg.Indices) if err != nil { - errs.AddPartial(22, err) + errs.AddPartial(24, err) return } @@ -492,5 +492,12 @@ func (r *elasticsearchScraper) scrapeOneIndexMetrics(now pcommon.Timestamp, name now, stats.Total.QueryCache.Evictions, metadata.AttributeCacheNameQuery, metadata.AttributeIndexAggregationTypeTotal, ) + r.mb.RecordElasticsearchIndexDocumentsDataPoint( + now, stats.Primaries.DocumentStats.ActiveCount, metadata.AttributeDocumentStateActive, metadata.AttributeIndexAggregationTypePrimaryShards, + ) + r.mb.RecordElasticsearchIndexDocumentsDataPoint( + now, stats.Total.DocumentStats.ActiveCount, metadata.AttributeDocumentStateActive, metadata.AttributeIndexAggregationTypeTotal, + ) + r.mb.EmitForResource(metadata.WithElasticsearchIndexName(name), metadata.WithElasticsearchClusterName(r.clusterName)) } diff --git a/receiver/elasticsearchreceiver/scraper_test.go b/receiver/elasticsearchreceiver/scraper_test.go index 7c0fc234d35a..c0fde98fb0d8 100644 --- a/receiver/elasticsearchreceiver/scraper_test.go +++ b/receiver/elasticsearchreceiver/scraper_test.go @@ -59,6 +59,7 @@ func TestScraper(t *testing.T) { config.Metrics.ElasticsearchIndexCacheMemoryUsage.Enabled = true config.Metrics.ElasticsearchIndexCacheSize.Enabled = true config.Metrics.ElasticsearchIndexCacheEvictions.Enabled = true + config.Metrics.ElasticsearchIndexDocuments.Enabled = true sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), config) diff --git a/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json b/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json index 0db2fa33d217..267b0936742a 100644 --- a/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json +++ b/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json @@ -3194,6 +3194,55 @@ ] }, "unit": "{evictions}" + }, + { + "description": "The number of documents for an index.", + "name": "elasticsearch.index.documents", + "sum": { + "aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE", + "isMonotonic": false, + "dataPoints": [ + { + "asInt": "40", + "attributes": [ + { + "key": "state", + "value": { + "stringValue": "active" + } + }, + { + "key": "aggregation", + "value": { + "stringValue": "total" + } + } + ], + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + }, + { + "asInt": "40", + "attributes": [ + { + "key": "state", + "value": { + "stringValue": "active" + } + }, + { + "key": "aggregation", + "value": { + "stringValue": "primary_shards" + } + } + ], + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + } + ] + }, + "unit": "{documents}" } ], "scope": { @@ -3869,6 +3918,55 @@ ] }, "unit": "{evictions}" + }, + { + "description": "The number of documents for an index.", + "name": "elasticsearch.index.documents", + "sum": { + "aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE", + "isMonotonic": false, + "dataPoints": [ + { + "asInt": "40", + "attributes": [ + { + "key": "state", + "value": { + "stringValue": "active" + } + }, + { + "key": "aggregation", + "value": { + "stringValue": "total" + } + } + ], + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + }, + { + "asInt": "40", + "attributes": [ + { + "key": "state", + "value": { + "stringValue": "active" + } + }, + { + "key": "aggregation", + "value": { + "stringValue": "primary_shards" + } + } + ], + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + } + ] + }, + "unit": "{documents}" } ], "scope": {