From 3c53078a397a902c1e8a87f5f7ce06012ba19bf6 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 22 Feb 2024 11:22:53 +0100 Subject: [PATCH 1/9] Update OTLP translation code Signed-off-by: Arve Knudsen --- go.mod | 2 +- go.sum | 4 +- .../prometheusremotewrite/helper.go | 80 +++++++++---------- .../prometheusremotewrite/histograms.go | 17 +++- .../prometheusremotewrite/metrics_to_prw.go | 16 ++-- .../number_data_points.go | 34 ++++---- .../otlp_to_openmetrics_metadata.go | 66 +++++++++++++++ vendor/modules.txt | 4 +- 8 files changed, 149 insertions(+), 74 deletions(-) create mode 100644 vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go diff --git a/go.mod b/go.mod index 943f1eb4fe7..eacfe5a4096 100644 --- a/go.mod +++ b/go.mod @@ -256,7 +256,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240213153929-d32b69f03433 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240222102121-edcd29643919 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 5c49d04d768..9d596d88ca6 100644 --- a/go.sum +++ b/go.sum @@ -505,8 +505,8 @@ github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPft github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240213153929-d32b69f03433 h1:mW1Cet0WSipL2fBpfWnUyIlS7cZqvg2/5j6IP6lbogA= -github.com/grafana/mimir-prometheus v0.0.0-20240213153929-d32b69f03433/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ= +github.com/grafana/mimir-prometheus v0.0.0-20240222102121-edcd29643919 h1:UpQVzSLCyhGoohqdL7xyHbVPHYhGC91hzliz7OZGY6Y= +github.com/grafana/mimir-prometheus v0.0.0-20240222102121-edcd29643919/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo= diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go index 09b39d29d9d..817cbaba7de 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -28,7 +28,6 @@ import ( ) const ( - nameStr = "__name__" sumStr = "_sum" countStr = "_count" bucketStr = "_bucket" @@ -72,15 +71,14 @@ func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label, datatype string) string { - if sample == nil || labels == nil || tsMap == nil { + // This shouldn't happen return "" } - sig := timeSeriesSignature(datatype, &labels) - ts, ok := tsMap[sig] - - if ok { + sig := timeSeriesSignature(datatype, labels) + ts := tsMap[sig] + if ts != nil { ts.Samples = append(ts.Samples, *sample) } else { newTs := &prompb.TimeSeries{ @@ -97,7 +95,7 @@ func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, label // we only add exemplars if samples are presents // tsMap is unmodified if either of its parameters is nil and samples are nil. func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) { - if tsMap == nil || bucketBoundsData == nil || exemplars == nil { + if len(tsMap) == 0 || len(bucketBoundsData) == 0 || len(exemplars) == 0 { return } @@ -113,14 +111,10 @@ func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBound sig := bucketBound.sig bound := bucketBound.bound - _, ok := tsMap[sig] - if ok { - if tsMap[sig].Samples != nil { - if exemplar.Value <= bound { - tsMap[sig].Exemplars = append(tsMap[sig].Exemplars, exemplar) - return - } - } + ts := tsMap[sig] + if ts != nil && len(ts.Samples) > 0 && exemplar.Value <= bound { + ts.Exemplars = append(ts.Exemplars, exemplar) + return } } } @@ -131,10 +125,10 @@ func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBound // // the label slice should not contain duplicate label names; this method sorts the slice by label name before creating // the signature. -func timeSeriesSignature(datatype string, labels *[]prompb.Label) string { +func timeSeriesSignature(datatype string, labels []prompb.Label) string { length := len(datatype) - for _, lb := range *labels { + for _, lb := range labels { length += 2 + len(lb.GetName()) + len(lb.GetValue()) } @@ -142,9 +136,9 @@ func timeSeriesSignature(datatype string, labels *[]prompb.Label) string { b.Grow(length) b.WriteString(datatype) - sort.Sort(ByLabelName(*labels)) + sort.Sort(ByLabelName(labels)) - for _, lb := range *labels { + for _, lb := range labels { b.WriteString("-") b.WriteString(lb.GetName()) b.WriteString("-") @@ -154,9 +148,9 @@ func timeSeriesSignature(datatype string, labels *[]prompb.Label) string { return b.String() } -// createAttributes creates a slice of Cortex Label with OTLP attributes and pairs of string values. -// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is -// logged. Resultant label names are sanitized. +// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen, and overwrites are +// logged. Resulting label names are sanitized. func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []prompb.Label { serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) @@ -186,8 +180,8 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa for _, label := range labels { var finalKey = prometheustranslator.NormalizeLabel(label.Name) - if existingLabel, alreadyExists := l[finalKey]; alreadyExists { - l[finalKey] = existingLabel + ";" + label.Value + if existingValue, alreadyExists := l[finalKey]; alreadyExists { + l[finalKey] = existingValue + ";" + label.Value } else { l[finalKey] = label.Value } @@ -257,10 +251,8 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { // addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It // ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) -func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) { +func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries, baseName string) { timestamp := convertTimeStamp(pt.Timestamp()) - // sum, count, and buckets of the histogram should append suffix to baseName - baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) createLabels := func(nameSuffix string, extras ...string) []prompb.Label { @@ -272,7 +264,8 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon labels = append(labels, prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) } - labels = append(labels, prompb.Label{Name: nameStr, Value: baseName + nameSuffix}) + // sum, count, and buckets of the histogram should append suffix to baseName + labels = append(labels, prompb.Label{Name: model.MetricNameLabel, Value: baseName + nameSuffix}) return labels } @@ -349,7 +342,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { labels := createLabels(createdSuffix) - addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, metric.Type().String()) + addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, pt.Timestamp(), metric.Type().String()) } } @@ -359,13 +352,12 @@ type exemplarType interface { } func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar { - var promExemplars []prompb.Exemplar - + promExemplars := make([]prompb.Exemplar, 0, pt.Exemplars().Len()) for i := 0; i < pt.Exemplars().Len(); i++ { exemplar := pt.Exemplars().At(i) exemplarRunes := 0 - promExemplar := &prompb.Exemplar{ + promExemplar := prompb.Exemplar{ Value: exemplar.DoubleValue(), Timestamp: timestamp.FromTime(exemplar.Timestamp().AsTime()), } @@ -387,9 +379,10 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar { } promExemplar.Labels = append(promExemplar.Labels, promLabel) } - var labelsFromAttributes []prompb.Label - exemplar.FilteredAttributes().Range(func(key string, value pcommon.Value) bool { + attrs := exemplar.FilteredAttributes() + labelsFromAttributes := make([]prompb.Label, 0, attrs.Len()) + attrs.Range(func(key string, value pcommon.Value) bool { val := value.AsString() exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val) promLabel := prompb.Label{ @@ -407,7 +400,7 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar { promExemplar.Labels = append(promExemplar.Labels, labelsFromAttributes...) } - promExemplars = append(promExemplars, *promExemplar) + promExemplars = append(promExemplars, promExemplar) } return promExemplars @@ -457,10 +450,8 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { // addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, - tsMap map[string]*prompb.TimeSeries) { + tsMap map[string]*prompb.TimeSeries, baseName string) { timestamp := convertTimeStamp(pt.Timestamp()) - // sum and count of the summary should append suffix to baseName - baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) createLabels := func(name string, extras ...string) []prompb.Label { @@ -472,7 +463,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res labels = append(labels, prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) } - labels = append(labels, prompb.Label{Name: nameStr, Value: name}) + labels = append(labels, prompb.Label{Name: model.MetricNameLabel, Value: name}) return labels } @@ -485,6 +476,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res if pt.Flags().NoRecordedValue() { sum.Value = math.Float64frombits(value.StaleNaN) } + // sum and count of the summary should append suffix to baseName sumlabels := createLabels(baseName + sumStr) addSample(tsMap, sum, sumlabels, metric.Type().String()) @@ -518,7 +510,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { createdLabels := createLabels(baseName + createdSuffix) - addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) } } @@ -528,15 +520,17 @@ func addCreatedTimeSeriesIfNeeded( series map[string]*prompb.TimeSeries, labels []prompb.Label, startTimestamp pcommon.Timestamp, + timestamp pcommon.Timestamp, metricType string, ) { - sig := timeSeriesSignature(metricType, &labels) + sig := timeSeriesSignature(metricType, labels) if _, ok := series[sig]; !ok { series[sig] = &prompb.TimeSeries{ Labels: labels, Samples: []prompb.Sample{ { // convert ns to ms - Value: float64(convertTimeStamp(startTimestamp)), + Value: float64(convertTimeStamp(startTimestamp)), + Timestamp: convertTimeStamp(timestamp), }, }, } @@ -570,7 +564,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta if len(settings.Namespace) > 0 { name = settings.Namespace + "_" + name } - labels := createAttributes(resource, attributes, settings.ExternalLabels, nameStr, name) + labels := createAttributes(resource, attributes, settings.ExternalLabels, model.MetricNameLabel, name) sample := &prompb.Sample{ Value: float64(1), // convert ns to ms diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index 0a528f60741..14cea32c374 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -29,12 +29,13 @@ func addSingleExponentialHistogramDataPoint( resource, pt.Attributes(), settings.ExternalLabels, - model.MetricNameLabel, metric, + model.MetricNameLabel, + metric, ) sig := timeSeriesSignature( pmetric.MetricTypeExponentialHistogram.String(), - &labels, + labels, ) ts, ok := series[sig] if !ok { @@ -76,7 +77,17 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) h := prompb.Histogram{ - Schema: scale, + // The counter reset detection must be compatible with Prometheus to + // safely set ResetHint to NO. This is not ensured currently. + // Sending a sample that triggers counter reset but with ResetHint==NO + // would lead to Prometheus panic as it does not double check the hint. + // Thus we're explicitly saying UNKNOWN here, which is always safe. + // TODO: using created time stamp should be accurate, but we + // need to know here if it was used for the detection. + // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 + // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 + ResetHint: prompb.Histogram_UNKNOWN, + Schema: scale, ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()}, // TODO use zero_threshold, if set, see diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 6e8f42fae06..fb141034ad0 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -23,9 +23,10 @@ type Settings struct { DisableTargetInfo bool ExportCreatedMetric bool AddMetricSuffixes bool + SendMetadata bool } -// FromMetrics converts pmetric.Metrics to prometheus remote write format. +// FromMetrics converts pmetric.Metrics to Prometheus remote write format. func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*prompb.TimeSeries, errs error) { tsMap = make(map[string]*prompb.TimeSeries) @@ -51,6 +52,8 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp continue } + promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + // handle individual metric based on type //exhaustive:enforce switch metric.Type() { @@ -60,7 +63,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() @@ -68,7 +71,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() @@ -76,19 +79,18 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } - name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) for x := 0; x < dataPoints.Len(); x++ { errs = multierr.Append( errs, addSingleExponentialHistogramDataPoint( - name, + promName, dataPoints.At(x), resource, settings, @@ -102,7 +104,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } default: errs = multierr.Append(errs, errors.New("unsupported metric type")) diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go index 85684ad6de4..b5bd8765fe8 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -13,11 +13,9 @@ import ( "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" - - prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" ) -// addSingleSumNumberDataPoint converts the Gauge metric data point to a +// addSingleGaugeNumberDataPoint converts the Gauge metric data point to a // Prometheus time series with samples and labels. The result is stored in the // series map. func addSingleGaugeNumberDataPoint( @@ -26,13 +24,14 @@ func addSingleGaugeNumberDataPoint( metric pmetric.Metric, settings Settings, series map[string]*prompb.TimeSeries, + name string, ) { - name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) labels := createAttributes( resource, pt.Attributes(), settings.ExternalLabels, - model.MetricNameLabel, name, + model.MetricNameLabel, + name, ) sample := &prompb.Sample{ // convert ns to ms @@ -59,8 +58,8 @@ func addSingleSumNumberDataPoint( metric pmetric.Metric, settings Settings, series map[string]*prompb.TimeSeries, + name string, ) { - name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) labels := createAttributes( resource, pt.Attributes(), @@ -82,7 +81,7 @@ func addSingleSumNumberDataPoint( } sig := addSample(series, sample, labels, metric.Type().String()) - if ts, ok := series[sig]; sig != "" && ok { + if ts := series[sig]; sig != "" && ts != nil { exemplars := getPromExemplars[pmetric.NumberDataPoint](pt) ts.Exemplars = append(ts.Exemplars, exemplars...) } @@ -90,15 +89,18 @@ func addSingleSumNumberDataPoint( // add _created time series if needed if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { startTimestamp := pt.StartTimestamp() - if startTimestamp != 0 { - createdLabels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - nameStr, - name+createdSuffix, - ) - addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, metric.Type().String()) + if startTimestamp == 0 { + return + } + + createdLabels := make([]prompb.Label, len(labels)) + copy(createdLabels, labels) + for i, l := range createdLabels { + if l.Name == model.MetricNameLabel { + createdLabels[i].Value = name + createdSuffix + break + } } + addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) } } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go new file mode 100644 index 00000000000..e43797212e3 --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go @@ -0,0 +1,66 @@ +// DO NOT EDIT. COPIED AS-IS. SEE ../README.md + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + +import ( + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/pdata/pmetric" + + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" +) + +func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) prompb.MetricMetadata_MetricType { + switch otelMetric.Type() { + case pmetric.MetricTypeGauge: + return prompb.MetricMetadata_GAUGE + case pmetric.MetricTypeSum: + metricType := prompb.MetricMetadata_GAUGE + if otelMetric.Sum().IsMonotonic() { + metricType = prompb.MetricMetadata_COUNTER + } + return metricType + case pmetric.MetricTypeHistogram: + return prompb.MetricMetadata_HISTOGRAM + case pmetric.MetricTypeSummary: + return prompb.MetricMetadata_SUMMARY + case pmetric.MetricTypeExponentialHistogram: + return prompb.MetricMetadata_HISTOGRAM + } + return prompb.MetricMetadata_UNKNOWN +} + +func OtelMetricsToMetadata(md pmetric.Metrics, addMetricSuffixes bool) []*prompb.MetricMetadata { + resourceMetricsSlice := md.ResourceMetrics() + + metadataLength := 0 + for i := 0; i < resourceMetricsSlice.Len(); i++ { + scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics() + for j := 0; j < scopeMetricsSlice.Len(); j++ { + metadataLength += scopeMetricsSlice.At(j).Metrics().Len() + } + } + + var metadata = make([]*prompb.MetricMetadata, 0, metadataLength) + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + + for j := 0; j < scopeMetricsSlice.Len(); j++ { + scopeMetrics := scopeMetricsSlice.At(j) + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + metric := scopeMetrics.Metrics().At(k) + entry := prompb.MetricMetadata{ + Type: otelMetricTypeToPromMetricType(metric), + MetricFamilyName: prometheustranslator.BuildCompliantName(metric, "", addMetricSuffixes), + Help: metric.Description(), + } + metadata = append(metadata, &entry) + } + } + } + + return metadata +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b68a8eec07a..9b9a913aeb6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -906,7 +906,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240213153929-d32b69f03433 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240222102121-edcd29643919 ## explicit; go 1.20 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1522,7 +1522,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240213153929-d32b69f03433 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240222102121-edcd29643919 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 From a837fe2955f91b9d99e112a7fa3eb94a983e0f55 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 20 Nov 2023 14:59:19 +0100 Subject: [PATCH 2/9] distributor.OTLPHandler: Convert directly to Mimir format Signed-off-by: Arve Knudsen --- pkg/distributor/otel.go | 98 +--- pkg/distributor/otlp/helper.go | 578 +++++++++++++++++++++ pkg/distributor/otlp/histograms.go | 196 +++++++ pkg/distributor/otlp/metrics_to_prw.go | 120 +++++ pkg/distributor/otlp/number_data_points.go | 102 ++++ pkg/distributor/push_test.go | 37 ++ 6 files changed, 1037 insertions(+), 94 deletions(-) create mode 100644 pkg/distributor/otlp/helper.go create mode 100644 pkg/distributor/otlp/histograms.go create mode 100644 pkg/distributor/otlp/metrics_to_prw.go create mode 100644 pkg/distributor/otlp/number_data_points.go diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index 32cf2f9bfcf..605421461c0 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -21,12 +21,12 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" - "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.uber.org/multierr" + "github.com/grafana/mimir/pkg/distributor/otlp" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/spanlogger" @@ -262,7 +262,7 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr } func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { - tsMap, errs := prometheusremotewrite.FromMetrics(md, prometheusremotewrite.Settings{ + ts, errs := otlp.FromMetrics(md, otlp.Settings{ AddMetricSuffixes: addSuffixes, }) if errs != nil { @@ -274,104 +274,14 @@ func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOt parseErrs = parseErrs[:maxErrMsgLen] } - if len(tsMap) == 0 { + if len(ts) == 0 { return nil, errors.New(parseErrs) } level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs) } - mimirTs := mimirpb.PreallocTimeseriesSliceFromPool() - for _, promTs := range tsMap { - mimirTs = append(mimirTs, promToMimirTimeseries(promTs)) - } - - return mimirTs, nil -} - -func promToMimirTimeseries(promTs *prompb.TimeSeries) mimirpb.PreallocTimeseries { - labels := make([]mimirpb.LabelAdapter, 0, len(promTs.Labels)) - for _, label := range promTs.Labels { - labels = append(labels, mimirpb.LabelAdapter{ - Name: label.Name, - Value: label.Value, - }) - } - - samples := make([]mimirpb.Sample, 0, len(promTs.Samples)) - for _, sample := range promTs.Samples { - samples = append(samples, mimirpb.Sample{ - TimestampMs: sample.Timestamp, - Value: sample.Value, - }) - } - - histograms := make([]mimirpb.Histogram, 0, len(promTs.Histograms)) - for idx := range promTs.Histograms { - histograms = append(histograms, promToMimirHistogram(&promTs.Histograms[idx])) - } - - exemplars := make([]mimirpb.Exemplar, 0, len(promTs.Exemplars)) - for _, exemplar := range promTs.Exemplars { - labels := make([]mimirpb.LabelAdapter, 0, len(exemplar.Labels)) - for _, label := range exemplar.Labels { - labels = append(labels, mimirpb.LabelAdapter{ - Name: label.Name, - Value: label.Value, - }) - } - - exemplars = append(exemplars, mimirpb.Exemplar{ - Labels: labels, - Value: exemplar.Value, - TimestampMs: exemplar.Timestamp, - }) - } - - ts := mimirpb.TimeseriesFromPool() - ts.Labels = labels - ts.Samples = samples - ts.Histograms = histograms - ts.Exemplars = exemplars - - return mimirpb.PreallocTimeseries{TimeSeries: ts} -} - -func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram { - pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans)) - for _, span := range h.PositiveSpans { - pSpans = append( - pSpans, mimirpb.BucketSpan{ - Offset: span.Offset, - Length: span.Length, - }, - ) - } - nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans)) - for _, span := range h.NegativeSpans { - nSpans = append( - nSpans, mimirpb.BucketSpan{ - Offset: span.Offset, - Length: span.Length, - }, - ) - } - - return mimirpb.Histogram{ - Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()}, - Sum: h.Sum, - Schema: h.Schema, - ZeroThreshold: h.ZeroThreshold, - ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}, - NegativeSpans: nSpans, - NegativeDeltas: h.NegativeDeltas, - NegativeCounts: h.NegativeCounts, - PositiveSpans: pSpans, - PositiveDeltas: h.PositiveDeltas, - PositiveCounts: h.PositiveCounts, - Timestamp: h.Timestamp, - ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint), - } + return ts, nil } // TimeseriesToOTLPRequest is used in tests. diff --git a/pkg/distributor/otlp/helper.go b/pkg/distributor/otlp/helper.go new file mode 100644 index 00000000000..a09af75536e --- /dev/null +++ b/pkg/distributor/otlp/helper.go @@ -0,0 +1,578 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package otlp + +import ( + "encoding/hex" + "fmt" + "log" + "math" + "sort" + "strconv" + "strings" + "time" + "unicode/utf8" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/value" + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +const ( + nameStr = "__name__" + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" + leStr = "le" + quantileStr = "quantile" + pInfStr = "+Inf" + createdSuffix = "_created" + // maxExemplarRunes is the maximum number of UTF-8 exemplar characters + // according to the prometheus specification + // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars + maxExemplarRunes = 128 + // Trace and Span id keys are defined as part of the spec: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2 + traceIDKey = "trace_id" + spanIDKey = "span_id" + infoType = "info" + targetMetricName = "target_info" +) + +type bucketBoundsData struct { + sig string + bound float64 +} + +// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds +type byBucketBoundsData []bucketBoundsData + +func (m byBucketBoundsData) Len() int { return len(m) } +func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } +func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } + +// ByLabelName enables the usage of sort.Sort() with a slice of labels +type ByLabelName []mimirpb.LabelAdapter + +func (a ByLabelName) Len() int { return len(a) } +func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it +// creates a new TimeSeries in the map if not found and returns the time series signature. +// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. +func addSample(tsMap map[string]*mimirpb.TimeSeries, sample *mimirpb.Sample, labels []mimirpb.LabelAdapter, + datatype string) string { + + if sample == nil || labels == nil || tsMap == nil { + return "" + } + + sig := timeSeriesSignature(datatype, labels) + ts, ok := tsMap[sig] + + if ok { + ts.Samples = append(ts.Samples, *sample) + } else { + newTs := mimirpb.TimeseriesFromPool() + newTs.Labels = labels + newTs.Samples = append(newTs.Samples, *sample) + tsMap[sig] = newTs + } + + return sig +} + +// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig; +// we only add exemplars if samples are presents +// tsMap is unmodified if either of its parameters is nil and samples are nil. +func addExemplars(tsMap map[string]*mimirpb.TimeSeries, exemplars []mimirpb.Exemplar, bucketBoundsData []bucketBoundsData) { + if len(tsMap) == 0 || len(bucketBoundsData) == 0 || len(exemplars) == 0 { + return + } + + sort.Sort(byBucketBoundsData(bucketBoundsData)) + + for _, exemplar := range exemplars { + addExemplar(tsMap, bucketBoundsData, exemplar) + } +} + +func addExemplar(tsMap map[string]*mimirpb.TimeSeries, bucketBounds []bucketBoundsData, exemplar mimirpb.Exemplar) { + for _, bucketBound := range bucketBounds { + sig := bucketBound.sig + bound := bucketBound.bound + + ts := tsMap[sig] + if ts != nil && len(ts.Samples) > 0 && exemplar.Value <= bound { + ts.Exemplars = append(ts.Exemplars, exemplar) + return + } + } +} + +// timeSeries return a string signature in the form of: +// +// TYPE-label1-value1- ... -labelN-valueN +// +// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating +// the signature. +func timeSeriesSignature(datatype string, labels []mimirpb.LabelAdapter) string { + length := len(datatype) + + for _, lb := range labels { + length += 2 + len(lb.Name) + len(lb.Value) + } + + b := strings.Builder{} + b.Grow(length) + b.WriteString(datatype) + + sort.Sort(ByLabelName(labels)) + + for _, lb := range labels { + b.WriteString("-") + b.WriteString(lb.Name) + b.WriteString("-") + b.WriteString(lb.Value) + } + + return b.String() +} + +// createAttributes creates a slice of Cortex Label with OTLP attributes and pairs of string values. +// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is +// logged. Resultant label names are sanitized. +func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []mimirpb.LabelAdapter { + serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) + instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) + + // Calculate the maximum possible number of labels we could return so we can preallocate l + maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 + + if haveServiceName { + maxLabelCount++ + } + + if haveInstanceID { + maxLabelCount++ + } + + // map ensures no duplicate label name + l := make(map[string]string, maxLabelCount) + + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + labels := make([]mimirpb.LabelAdapter, 0, attributes.Len()) + attributes.Range(func(key string, value pcommon.Value) bool { + labels = append(labels, mimirpb.LabelAdapter{Name: key, Value: value.AsString()}) + return true + }) + sort.Stable(ByLabelName(labels)) + + for _, label := range labels { + var finalKey = prometheustranslator.NormalizeLabel(label.Name) + if existingLabel, alreadyExists := l[finalKey]; alreadyExists { + l[finalKey] = existingLabel + ";" + label.Value + } else { + l[finalKey] = label.Value + } + } + + // Map service.name + service.namespace to job + if haveServiceName { + val := serviceName.AsString() + if serviceNamespace, ok := resource.Attributes().Get(conventions.AttributeServiceNamespace); ok { + val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) + } + l[model.JobLabel] = val + } + // Map service.instance.id to instance + if haveInstanceID { + l[model.InstanceLabel] = instance.AsString() + } + for key, value := range externalLabels { + // External labels have already been sanitized + if _, alreadyExists := l[key]; alreadyExists { + // Skip external labels if they are overridden by metric attributes + continue + } + l[key] = value + } + + for i := 0; i < len(extras); i += 2 { + if i+1 >= len(extras) { + break + } + _, found := l[extras[i]] + if found { + log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") + } + // internal labels should be maintained + name := extras[i] + if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") { + name = prometheustranslator.NormalizeLabel(name) + } + l[name] = extras[i+1] + } + + s := make([]mimirpb.LabelAdapter, 0, len(l)) + for k, v := range l { + s = append(s, mimirpb.LabelAdapter{Name: k, Value: v}) + } + + return s +} + +// isValidAggregationTemporality checks whether an OTel metric has a valid +// aggregation temporality for conversion to a Mimir metric. +func isValidAggregationTemporality(metric pmetric.Metric) bool { + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + return true + case pmetric.MetricTypeSum: + return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + case pmetric.MetricTypeHistogram: + return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + case pmetric.MetricTypeExponentialHistogram: + return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + } + return false +} + +// addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It +// ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) +func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*mimirpb.TimeSeries) { + timestamp := convertTimeStamp(pt.Timestamp()) + // sum, count, and buckets of the histogram should append suffix to baseName + baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) + + createLabels := func(nameSuffix string, extras ...string) []mimirpb.LabelAdapter { + extraLabelCount := len(extras) / 2 + labels := make([]mimirpb.LabelAdapter, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name + copy(labels, baseLabels) + + for extrasIdx := 0; extrasIdx < extraLabelCount; extrasIdx++ { + labels = append(labels, mimirpb.LabelAdapter{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) + } + + labels = append(labels, mimirpb.LabelAdapter{Name: nameStr, Value: baseName + nameSuffix}) + + return labels + } + + // If the sum is unset, it indicates the _sum metric point should be + // omitted + if pt.HasSum() { + // treat sum as a sample in an individual TimeSeries + sum := &mimirpb.Sample{ + Value: pt.Sum(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + + sumlabels := createLabels(sumStr) + addSample(tsMap, sum, sumlabels, metric.Type().String()) + + } + + // treat count as a sample in an individual TimeSeries + count := &mimirpb.Sample{ + Value: float64(pt.Count()), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + + countlabels := createLabels(countStr) + addSample(tsMap, count, countlabels, metric.Type().String()) + + // cumulative count for conversion to cumulative histogram + var cumulativeCount uint64 + + exemplars := getMimirExemplars[pmetric.HistogramDataPoint](pt) + + var bucketBounds []bucketBoundsData + + // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 + for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ { + bound := pt.ExplicitBounds().At(i) + cumulativeCount += pt.BucketCounts().At(i) + bucket := &mimirpb.Sample{ + Value: float64(cumulativeCount), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + bucket.Value = math.Float64frombits(value.StaleNaN) + } + boundStr := strconv.FormatFloat(bound, 'f', -1, 64) + labels := createLabels(bucketStr, leStr, boundStr) + sig := addSample(tsMap, bucket, labels, metric.Type().String()) + + bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound}) + } + // add le=+Inf bucket + infBucket := &mimirpb.Sample{ + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + infBucket.Value = math.Float64frombits(value.StaleNaN) + } else { + infBucket.Value = float64(pt.Count()) + } + infLabels := createLabels(bucketStr, leStr, pInfStr) + sig := addSample(tsMap, infBucket, infLabels, metric.Type().String()) + + bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)}) + addExemplars(tsMap, exemplars, bucketBounds) + + // add _created time series if needed + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + labels := createLabels(createdSuffix) + addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, metric.Type().String()) + } +} + +type exemplarType interface { + pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint + Exemplars() pmetric.ExemplarSlice +} + +func getMimirExemplars[T exemplarType](pt T) []mimirpb.Exemplar { + var mimirExemplars []mimirpb.Exemplar + + for i := 0; i < pt.Exemplars().Len(); i++ { + exemplar := pt.Exemplars().At(i) + exemplarRunes := 0 + + mimirExemplar := mimirpb.Exemplar{ + Value: exemplar.DoubleValue(), + TimestampMs: timestamp.FromTime(exemplar.Timestamp().AsTime()), + } + if traceID := exemplar.TraceID(); !traceID.IsEmpty() { + val := hex.EncodeToString(traceID[:]) + exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val) + mimirLabel := mimirpb.LabelAdapter{ + Name: traceIDKey, + Value: val, + } + mimirExemplar.Labels = append(mimirExemplar.Labels, mimirLabel) + } + if spanID := exemplar.SpanID(); !spanID.IsEmpty() { + val := hex.EncodeToString(spanID[:]) + exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val) + mimirLabel := mimirpb.LabelAdapter{ + Name: spanIDKey, + Value: val, + } + mimirExemplar.Labels = append(mimirExemplar.Labels, mimirLabel) + } + + attrs := exemplar.FilteredAttributes() + labelsFromAttributes := make([]mimirpb.LabelAdapter, 0, attrs.Len()) + attrs.Range(func(key string, value pcommon.Value) bool { + val := value.AsString() + exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val) + mimirLabel := mimirpb.LabelAdapter{ + Name: key, + Value: val, + } + + labelsFromAttributes = append(labelsFromAttributes, mimirLabel) + + return true + }) + if exemplarRunes <= maxExemplarRunes { + // only append filtered attributes if it does not cause exemplar + // labels to exceed the max number of runes + mimirExemplar.Labels = append(mimirExemplar.Labels, labelsFromAttributes...) + } + + mimirExemplars = append(mimirExemplars, mimirExemplar) + } + + return mimirExemplars +} + +// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics +func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp { + var ts pcommon.Timestamp + // handle individual metric based on type + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = maxTimestamp(ts, dataPoints.At(x).Timestamp()) + } + } + return ts +} + +func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { + if a > b { + return a + } + return b +} + +// addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. +func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, + tsMap map[string]*mimirpb.TimeSeries) { + timestamp := convertTimeStamp(pt.Timestamp()) + // sum and count of the summary should append suffix to baseName + baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) + + createLabels := func(name string, extras ...string) []mimirpb.LabelAdapter { + extraLabelCount := len(extras) / 2 + labels := make([]mimirpb.LabelAdapter, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name + copy(labels, baseLabels) + + for extrasIdx := 0; extrasIdx < extraLabelCount; extrasIdx++ { + labels = append(labels, mimirpb.LabelAdapter{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) + } + + labels = append(labels, mimirpb.LabelAdapter{Name: nameStr, Value: name}) + + return labels + } + + // treat sum as a sample in an individual TimeSeries + sum := &mimirpb.Sample{ + Value: pt.Sum(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + sumlabels := createLabels(baseName + sumStr) + addSample(tsMap, sum, sumlabels, metric.Type().String()) + + // treat count as a sample in an individual TimeSeries + count := &mimirpb.Sample{ + Value: float64(pt.Count()), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + countlabels := createLabels(baseName + countStr) + addSample(tsMap, count, countlabels, metric.Type().String()) + + // process each percentile/quantile + for i := 0; i < pt.QuantileValues().Len(); i++ { + qt := pt.QuantileValues().At(i) + quantile := &mimirpb.Sample{ + Value: qt.Value(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + quantile.Value = math.Float64frombits(value.StaleNaN) + } + percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) + qtlabels := createLabels(baseName, quantileStr, percentileStr) + addSample(tsMap, quantile, qtlabels, metric.Type().String()) + } + + // add _created time series if needed + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + createdLabels := createLabels(baseName + createdSuffix) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + } +} + +// addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single +// sample. If the series exists, then new samples won't be added. +func addCreatedTimeSeriesIfNeeded( + series map[string]*mimirpb.TimeSeries, + labels []mimirpb.LabelAdapter, + startTimestamp pcommon.Timestamp, + metricType string, +) { + sig := timeSeriesSignature(metricType, labels) + if _, ok := series[sig]; !ok { + series[sig] = &mimirpb.TimeSeries{ + Labels: labels, + Samples: []mimirpb.Sample{ + { // convert ns to ms + Value: float64(convertTimeStamp(startTimestamp)), + }, + }, + } + } +} + +// addResourceTargetInfo converts the resource to the target info metric +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[string]*mimirpb.TimeSeries) { + if settings.DisableTargetInfo { + return + } + // Use resource attributes (other than those used for job+instance) as the + // metric labels for the target info metric + attributes := pcommon.NewMap() + resource.Attributes().CopyTo(attributes) + attributes.RemoveIf(func(k string, _ pcommon.Value) bool { + switch k { + case conventions.AttributeServiceName, conventions.AttributeServiceNamespace, conventions.AttributeServiceInstanceID: + // Remove resource attributes used for job + instance + return true + default: + return false + } + }) + if attributes.Len() == 0 { + // If we only have job + instance, then target_info isn't useful, so don't add it. + return + } + // create parameters for addSample + name := targetMetricName + if len(settings.Namespace) > 0 { + name = settings.Namespace + "_" + name + } + labels := createAttributes(resource, attributes, settings.ExternalLabels, nameStr, name) + sample := &mimirpb.Sample{ + Value: float64(1), + // convert ns to ms + TimestampMs: convertTimeStamp(timestamp), + } + addSample(tsMap, sample, labels, infoType) +} + +// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms +func convertTimeStamp(timestamp pcommon.Timestamp) int64 { + return timestamp.AsTime().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +} diff --git a/pkg/distributor/otlp/histograms.go b/pkg/distributor/otlp/histograms.go new file mode 100644 index 00000000000..80394b01d1a --- /dev/null +++ b/pkg/distributor/otlp/histograms.go @@ -0,0 +1,196 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package otlp + +import ( + "fmt" + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/value" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +const defaultZeroThreshold = 1e-128 + +func addSingleExponentialHistogramDataPoint( + metric string, + pt pmetric.ExponentialHistogramDataPoint, + resource pcommon.Resource, + settings Settings, + series map[string]*mimirpb.TimeSeries, +) error { + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, metric, + ) + + sig := timeSeriesSignature( + pmetric.MetricTypeExponentialHistogram.String(), + labels, + ) + ts, ok := series[sig] + if !ok { + ts = &mimirpb.TimeSeries{ + Labels: labels, + } + series[sig] = ts + } + + histogram, err := exponentialToNativeHistogram(pt) + if err != nil { + return err + } + ts.Histograms = append(ts.Histograms, histogram) + + exemplars := getMimirExemplars[pmetric.ExponentialHistogramDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + + return nil +} + +// exponentialToNativeHistogram translates OTel Exponential Histogram data point +// to Prometheus Native Histogram. +func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (mimirpb.Histogram, error) { + scale := p.Scale() + if scale < -4 { + return mimirpb.Histogram{}, + fmt.Errorf("cannot convert exponential to native histogram."+ + " Scale must be >= -4, was %d", scale) + } + + var scaleDown int32 + if scale > 8 { + scaleDown = scale - 8 + scale = 8 + } + + pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown) + nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) + + h := mimirpb.Histogram{ + Schema: scale, + + ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()}, + // TODO use zero_threshold, if set, see + // https://github.com/open-telemetry/opentelemetry-proto/pull/441 + ZeroThreshold: defaultZeroThreshold, + + PositiveSpans: pSpans, + PositiveDeltas: pDeltas, + NegativeSpans: nSpans, + NegativeDeltas: nDeltas, + + Timestamp: convertTimeStamp(p.Timestamp()), + } + + if p.Flags().NoRecordedValue() { + h.Sum = math.Float64frombits(value.StaleNaN) + h.Count = &mimirpb.Histogram_CountInt{CountInt: value.StaleNaN} + } else { + if p.HasSum() { + h.Sum = p.Sum() + } + h.Count = &mimirpb.Histogram_CountInt{CountInt: p.Count()} + } + return h, nil +} + +// convertBucketsLayout translates OTel Exponential Histogram dense buckets +// representation to Prometheus Native Histogram sparse bucket representation. +// +// The translation logic is taken from the client_golang `histogram.go#makeBuckets` +// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go +// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket +// index 0 corresponds to the range (1, base] while Prometheus bucket index 0 +// to the range (base 1]. +// +// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one. +func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]mimirpb.BucketSpan, []int64) { + bucketCounts := buckets.BucketCounts() + if bucketCounts.Len() == 0 { + return nil, nil + } + + var ( + spans []mimirpb.BucketSpan + deltas []int64 + count int64 + prevCount int64 + ) + + appendDelta := func(count int64) { + spans[len(spans)-1].Length++ + deltas = append(deltas, count-prevCount) + prevCount = count + } + + // Let the compiler figure out that this is const during this function by + // moving it into a local variable. + numBuckets := bucketCounts.Len() + + // The offset is scaled and adjusted by 1 as described above. + bucketIdx := buckets.Offset()>>scaleDown + 1 + spans = append(spans, mimirpb.BucketSpan{ + Offset: bucketIdx, + Length: 0, + }) + + for i := 0; i < numBuckets; i++ { + // The offset is scaled and adjusted by 1 as described above. + nextBucketIdx := (int32(i)+buckets.Offset())>>scaleDown + 1 + if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet. + count += int64(bucketCounts.At(i)) + continue + } + if count == 0 { + count = int64(bucketCounts.At(i)) + continue + } + + gap := nextBucketIdx - bucketIdx - 1 + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, mimirpb.BucketSpan{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < gap; j++ { + appendDelta(0) + } + } + appendDelta(count) + count = int64(bucketCounts.At(i)) + bucketIdx = nextBucketIdx + } + // Need to use the last item's index. The offset is scaled and adjusted by 1 as described above. + gap := (int32(numBuckets)+buckets.Offset()-1)>>scaleDown + 1 - bucketIdx + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, mimirpb.BucketSpan{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < gap; j++ { + appendDelta(0) + } + } + appendDelta(count) + + return spans, deltas +} diff --git a/pkg/distributor/otlp/metrics_to_prw.go b/pkg/distributor/otlp/metrics_to_prw.go new file mode 100644 index 00000000000..c3c4d4f1fd6 --- /dev/null +++ b/pkg/distributor/otlp/metrics_to_prw.go @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package otlp + +import ( + "errors" + "fmt" + + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +type Settings struct { + Namespace string + ExternalLabels map[string]string + DisableTargetInfo bool + ExportCreatedMetric bool + AddMetricSuffixes bool +} + +// FromMetrics converts pmetric.Metrics to Mimir time series. +func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimeseries, error) { + tsMap := map[string]*mimirpb.TimeSeries{} + var errs error + resourceMetricsSlice := md.ResourceMetrics() + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + resource := resourceMetrics.Resource() + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + // keep track of the most recent timestamp in the ResourceMetrics for + // use with the "target" info metric + var mostRecentTimestamp pcommon.Timestamp + for j := 0; j < scopeMetricsSlice.Len(); j++ { + metricSlice := scopeMetricsSlice.At(j).Metrics() + + // TODO: decide if instrumentation library information should be exported as labels + for k := 0; k < metricSlice.Len(); k++ { + metric := metricSlice.At(k) + mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) + + if !isValidAggregationTemporality(metric) { + errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name())) + continue + } + + // handle individual metric based on type + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + for x := 0; x < dataPoints.Len(); x++ { + errs = multierr.Append( + errs, + addSingleExponentialHistogramDataPoint( + name, + dataPoints.At(x), + resource, + settings, + tsMap, + ), + ) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + } + for x := 0; x < dataPoints.Len(); x++ { + addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + } + default: + errs = multierr.Append(errs, errors.New("unsupported metric type")) + } + } + } + addResourceTargetInfo(resource, settings, mostRecentTimestamp, tsMap) + } + + mimirTs := mimirpb.PreallocTimeseriesSliceFromPool() + if cap(mimirTs) < len(tsMap) { + mimirpb.ReuseSlice(mimirTs) + mimirTs = make([]mimirpb.PreallocTimeseries, 0, len(tsMap)) + } + for _, ts := range tsMap { + mimirTs = append(mimirTs, mimirpb.PreallocTimeseries{TimeSeries: ts}) + } + return mimirTs, errs +} diff --git a/pkg/distributor/otlp/number_data_points.go b/pkg/distributor/otlp/number_data_points.go new file mode 100644 index 00000000000..a13d27d09e7 --- /dev/null +++ b/pkg/distributor/otlp/number_data_points.go @@ -0,0 +1,102 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package otlp + +import ( + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/value" + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +// addSingleSumNumberDataPoint converts the Gauge metric data point to a +// Prometheus time series with samples and labels. The result is stored in the +// series map. +func addSingleGaugeNumberDataPoint( + pt pmetric.NumberDataPoint, + resource pcommon.Resource, + metric pmetric.Metric, + settings Settings, + series map[string]*mimirpb.TimeSeries, +) { + name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, + name, + ) + sample := &mimirpb.Sample{ + // convert ns to ms + TimestampMs: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + addSample(series, sample, labels, metric.Type().String()) +} + +// addSingleSumNumberDataPoint converts the Sum metric data point to a Prometheus +// time series with samples, labels and exemplars. The result is stored in the +// series map. +func addSingleSumNumberDataPoint( + pt pmetric.NumberDataPoint, + resource pcommon.Resource, + metric pmetric.Metric, + settings Settings, + series map[string]*mimirpb.TimeSeries, +) { + name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, name, + ) + sample := &mimirpb.Sample{ + // convert ns to ms + TimestampMs: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + sig := addSample(series, sample, labels, metric.Type().String()) + + if ts, ok := series[sig]; sig != "" && ok { + exemplars := getMimirExemplars[pmetric.NumberDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + // add _created time series if needed + if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { + startTimestamp := pt.StartTimestamp() + if startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + name+createdSuffix, + ) + addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, metric.Type().String()) + } + } +} diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 43840faeaa8..6d1e263fbf8 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -680,6 +680,43 @@ func createMimirWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool) return inoutBytes } +func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram { + pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans)) + for _, span := range h.PositiveSpans { + pSpans = append( + pSpans, mimirpb.BucketSpan{ + Offset: span.Offset, + Length: span.Length, + }, + ) + } + nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans)) + for _, span := range h.NegativeSpans { + nSpans = append( + nSpans, mimirpb.BucketSpan{ + Offset: span.Offset, + Length: span.Length, + }, + ) + } + + return mimirpb.Histogram{ + Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()}, + Sum: h.Sum, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}, + NegativeSpans: nSpans, + NegativeDeltas: h.NegativeDeltas, + NegativeCounts: h.NegativeCounts, + PositiveSpans: pSpans, + PositiveDeltas: h.PositiveDeltas, + PositiveCounts: h.PositiveCounts, + Timestamp: h.Timestamp, + ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint), + } +} + func createMimirWriteRequestProtobufWithNonSupportedLabelNames(t *testing.T, skipLabelNameValidation bool) []byte { t.Helper() ts := mimirpb.PreallocTimeseries{ From c761b8361d1f51d6c7fec6a5cdf09a1ecf6b307b Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 1 Dec 2023 16:03:59 +0100 Subject: [PATCH 3/9] Optimize OTel->Prometheus conversion Signed-off-by: Arve Knudsen --- pkg/distributor/otlp/helper.go | 7 +++---- pkg/distributor/otlp/metrics_to_prw.go | 8 ++++++-- pkg/distributor/otlp/number_data_points.go | 7 +++---- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/distributor/otlp/helper.go b/pkg/distributor/otlp/helper.go index a09af75536e..a3810cef386 100644 --- a/pkg/distributor/otlp/helper.go +++ b/pkg/distributor/otlp/helper.go @@ -69,7 +69,6 @@ func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. func addSample(tsMap map[string]*mimirpb.TimeSeries, sample *mimirpb.Sample, labels []mimirpb.LabelAdapter, datatype string) string { - if sample == nil || labels == nil || tsMap == nil { return "" } @@ -146,9 +145,9 @@ func timeSeriesSignature(datatype string, labels []mimirpb.LabelAdapter) string return b.String() } -// createAttributes creates a slice of Cortex Label with OTLP attributes and pairs of string values. -// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is -// logged. Resultant label names are sanitized. +// createAttributes creates a slice of Mimir labels with OTLP attributes and pairs of string values. +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collision happens, and the overwrite is +// logged. Resulting label names are sanitized. func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []mimirpb.LabelAdapter { serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) diff --git a/pkg/distributor/otlp/metrics_to_prw.go b/pkg/distributor/otlp/metrics_to_prw.go index c3c4d4f1fd6..5c8141f085e 100644 --- a/pkg/distributor/otlp/metrics_to_prw.go +++ b/pkg/distributor/otlp/metrics_to_prw.go @@ -47,6 +47,8 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes continue } + promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + // handle individual metric based on type //exhaustive:enforce switch metric.Type() { @@ -56,7 +58,9 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + // For each data point, createAttributes is called for its attributes + // + addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() @@ -64,7 +68,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() diff --git a/pkg/distributor/otlp/number_data_points.go b/pkg/distributor/otlp/number_data_points.go index a13d27d09e7..56d301f45a6 100644 --- a/pkg/distributor/otlp/number_data_points.go +++ b/pkg/distributor/otlp/number_data_points.go @@ -7,14 +7,13 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/value" - prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/grafana/mimir/pkg/mimirpb" ) -// addSingleSumNumberDataPoint converts the Gauge metric data point to a +// addSingleGaugeNumberDataPoint converts the Gauge metric data point to a // Prometheus time series with samples and labels. The result is stored in the // series map. func addSingleGaugeNumberDataPoint( @@ -23,8 +22,8 @@ func addSingleGaugeNumberDataPoint( metric pmetric.Metric, settings Settings, series map[string]*mimirpb.TimeSeries, + name string, ) { - name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) labels := createAttributes( resource, pt.Attributes(), @@ -57,8 +56,8 @@ func addSingleSumNumberDataPoint( metric pmetric.Metric, settings Settings, series map[string]*mimirpb.TimeSeries, + name string, ) { - name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) labels := createAttributes( resource, pt.Attributes(), From 38967e06822034e161a9caaa26397f80354dd19e Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 1 Dec 2023 16:54:47 +0100 Subject: [PATCH 4/9] Optimize OTel->Prometheus conversion Signed-off-by: Arve Knudsen --- pkg/distributor/otlp/helper.go | 25 ++++++++++++---------- pkg/distributor/otlp/metrics_to_prw.go | 12 +++++++++-- pkg/distributor/otlp/number_data_points.go | 10 +-------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/pkg/distributor/otlp/helper.go b/pkg/distributor/otlp/helper.go index a3810cef386..be0c0ba26a2 100644 --- a/pkg/distributor/otlp/helper.go +++ b/pkg/distributor/otlp/helper.go @@ -152,6 +152,18 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) + // TODO: Make a hash of all the attribute pairs + serviceName + instance, + // and try to return corresponding entry. Otherwise make new, and enter in map. + + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + labels := make([]mimirpb.LabelAdapter, 0, attributes.Len()) + attributes.Range(func(key string, value pcommon.Value) bool { + labels = append(labels, mimirpb.LabelAdapter{Name: key, Value: value.AsString()}) + return true + }) + sort.Stable(ByLabelName(labels)) + // Calculate the maximum possible number of labels we could return so we can preallocate l maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 @@ -166,19 +178,10 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa // map ensures no duplicate label name l := make(map[string]string, maxLabelCount) - // Ensure attributes are sorted by key for consistent merging of keys which - // collide when sanitized. - labels := make([]mimirpb.LabelAdapter, 0, attributes.Len()) - attributes.Range(func(key string, value pcommon.Value) bool { - labels = append(labels, mimirpb.LabelAdapter{Name: key, Value: value.AsString()}) - return true - }) - sort.Stable(ByLabelName(labels)) - for _, label := range labels { var finalKey = prometheustranslator.NormalizeLabel(label.Name) - if existingLabel, alreadyExists := l[finalKey]; alreadyExists { - l[finalKey] = existingLabel + ";" + label.Value + if existingValue, alreadyExists := l[finalKey]; alreadyExists { + l[finalKey] = existingValue + ";" + label.Value } else { l[finalKey] = label.Value } diff --git a/pkg/distributor/otlp/metrics_to_prw.go b/pkg/distributor/otlp/metrics_to_prw.go index 5c8141f085e..d615b3d132b 100644 --- a/pkg/distributor/otlp/metrics_to_prw.go +++ b/pkg/distributor/otlp/metrics_to_prw.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" + "github.com/prometheus/common/model" prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -59,8 +60,15 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes } for x := 0; x < dataPoints.Len(); x++ { // For each data point, createAttributes is called for its attributes - // - addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) + pt := dataPoints.At(x) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, + promName, + ) + addSingleGaugeNumberDataPoint(pt, metric, tsMap, labels, promName) } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() diff --git a/pkg/distributor/otlp/number_data_points.go b/pkg/distributor/otlp/number_data_points.go index 56d301f45a6..9c33f988c1b 100644 --- a/pkg/distributor/otlp/number_data_points.go +++ b/pkg/distributor/otlp/number_data_points.go @@ -18,19 +18,11 @@ import ( // series map. func addSingleGaugeNumberDataPoint( pt pmetric.NumberDataPoint, - resource pcommon.Resource, metric pmetric.Metric, - settings Settings, series map[string]*mimirpb.TimeSeries, + labels []mimirpb.LabelAdapter, name string, ) { - labels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - model.MetricNameLabel, - name, - ) sample := &mimirpb.Sample{ // convert ns to ms TimestampMs: convertTimeStamp(pt.Timestamp()), From 0475a17d6d37ff85b789be05ce8a48ea3d894903 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 4 Dec 2023 17:31:24 +0100 Subject: [PATCH 5/9] Use xxhash to compute time series signature Signed-off-by: Arve Knudsen --- pkg/distributor/otlp/helper.go | 54 +++++++++------------- pkg/distributor/otlp/histograms.go | 5 +- pkg/distributor/otlp/metrics_to_prw.go | 15 ++---- pkg/distributor/otlp/number_data_points.go | 16 +++++-- 4 files changed, 40 insertions(+), 50 deletions(-) diff --git a/pkg/distributor/otlp/helper.go b/pkg/distributor/otlp/helper.go index be0c0ba26a2..6ff7ca8d74a 100644 --- a/pkg/distributor/otlp/helper.go +++ b/pkg/distributor/otlp/helper.go @@ -9,10 +9,10 @@ import ( "math" "sort" "strconv" - "strings" "time" "unicode/utf8" + "github.com/cespare/xxhash/v2" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" @@ -46,7 +46,7 @@ const ( ) type bucketBoundsData struct { - sig string + sig uint64 bound float64 } @@ -67,15 +67,14 @@ func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it // creates a new TimeSeries in the map if not found and returns the time series signature. // tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. -func addSample(tsMap map[string]*mimirpb.TimeSeries, sample *mimirpb.Sample, labels []mimirpb.LabelAdapter, - datatype string) string { +func addSample(tsMap map[uint64]*mimirpb.TimeSeries, sample *mimirpb.Sample, labels []mimirpb.LabelAdapter, + datatype string) uint64 { if sample == nil || labels == nil || tsMap == nil { - return "" + return 0 } sig := timeSeriesSignature(datatype, labels) ts, ok := tsMap[sig] - if ok { ts.Samples = append(ts.Samples, *sample) } else { @@ -91,7 +90,7 @@ func addSample(tsMap map[string]*mimirpb.TimeSeries, sample *mimirpb.Sample, lab // addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig; // we only add exemplars if samples are presents // tsMap is unmodified if either of its parameters is nil and samples are nil. -func addExemplars(tsMap map[string]*mimirpb.TimeSeries, exemplars []mimirpb.Exemplar, bucketBoundsData []bucketBoundsData) { +func addExemplars(tsMap map[uint64]*mimirpb.TimeSeries, exemplars []mimirpb.Exemplar, bucketBoundsData []bucketBoundsData) { if len(tsMap) == 0 || len(bucketBoundsData) == 0 || len(exemplars) == 0 { return } @@ -103,7 +102,7 @@ func addExemplars(tsMap map[string]*mimirpb.TimeSeries, exemplars []mimirpb.Exem } } -func addExemplar(tsMap map[string]*mimirpb.TimeSeries, bucketBounds []bucketBoundsData, exemplar mimirpb.Exemplar) { +func addExemplar(tsMap map[uint64]*mimirpb.TimeSeries, bucketBounds []bucketBoundsData, exemplar mimirpb.Exemplar) { for _, bucketBound := range bucketBounds { sig := bucketBound.sig bound := bucketBound.bound @@ -122,29 +121,24 @@ func addExemplar(tsMap map[string]*mimirpb.TimeSeries, bucketBounds []bucketBoun // // the label slice should not contain duplicate label names; this method sorts the slice by label name before creating // the signature. -func timeSeriesSignature(datatype string, labels []mimirpb.LabelAdapter) string { - length := len(datatype) - - for _, lb := range labels { - length += 2 + len(lb.Name) + len(lb.Value) - } - - b := strings.Builder{} - b.Grow(length) - b.WriteString(datatype) - +func timeSeriesSignature(datatype string, labels []mimirpb.LabelAdapter) uint64 { sort.Sort(ByLabelName(labels)) + h := xxhash.New() + h.WriteString(datatype) + h.Write(seps) for _, lb := range labels { - b.WriteString("-") - b.WriteString(lb.Name) - b.WriteString("-") - b.WriteString(lb.Value) + h.WriteString(lb.Name) + h.Write(seps) + h.WriteString(lb.Value) + h.Write(seps) } - return b.String() + return h.Sum64() } +var seps = []byte{'\xff'} + // createAttributes creates a slice of Mimir labels with OTLP attributes and pairs of string values. // Unpaired string values are ignored. String pairs overwrite OTLP labels if collision happens, and the overwrite is // logged. Resulting label names are sanitized. @@ -152,9 +146,6 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) - // TODO: Make a hash of all the attribute pairs + serviceName + instance, - // and try to return corresponding entry. Otherwise make new, and enter in map. - // Ensure attributes are sorted by key for consistent merging of keys which // collide when sanitized. labels := make([]mimirpb.LabelAdapter, 0, attributes.Len()) @@ -162,7 +153,6 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa labels = append(labels, mimirpb.LabelAdapter{Name: key, Value: value.AsString()}) return true }) - sort.Stable(ByLabelName(labels)) // Calculate the maximum possible number of labels we could return so we can preallocate l maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 @@ -251,7 +241,7 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { // addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It // ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) -func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*mimirpb.TimeSeries) { +func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[uint64]*mimirpb.TimeSeries) { timestamp := convertTimeStamp(pt.Timestamp()) // sum, count, and buckets of the histogram should append suffix to baseName baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) @@ -452,7 +442,7 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { // addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, - tsMap map[string]*mimirpb.TimeSeries) { + tsMap map[uint64]*mimirpb.TimeSeries) { timestamp := convertTimeStamp(pt.Timestamp()) // sum and count of the summary should append suffix to baseName baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) @@ -520,7 +510,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res // addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single // sample. If the series exists, then new samples won't be added. func addCreatedTimeSeriesIfNeeded( - series map[string]*mimirpb.TimeSeries, + series map[uint64]*mimirpb.TimeSeries, labels []mimirpb.LabelAdapter, startTimestamp pcommon.Timestamp, metricType string, @@ -539,7 +529,7 @@ func addCreatedTimeSeriesIfNeeded( } // addResourceTargetInfo converts the resource to the target info metric -func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[string]*mimirpb.TimeSeries) { +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[uint64]*mimirpb.TimeSeries) { if settings.DisableTargetInfo { return } diff --git a/pkg/distributor/otlp/histograms.go b/pkg/distributor/otlp/histograms.go index 80394b01d1a..93eed546dd7 100644 --- a/pkg/distributor/otlp/histograms.go +++ b/pkg/distributor/otlp/histograms.go @@ -21,13 +21,14 @@ func addSingleExponentialHistogramDataPoint( pt pmetric.ExponentialHistogramDataPoint, resource pcommon.Resource, settings Settings, - series map[string]*mimirpb.TimeSeries, + series map[uint64]*mimirpb.TimeSeries, ) error { labels := createAttributes( resource, pt.Attributes(), settings.ExternalLabels, - model.MetricNameLabel, metric, + model.MetricNameLabel, + metric, ) sig := timeSeriesSignature( diff --git a/pkg/distributor/otlp/metrics_to_prw.go b/pkg/distributor/otlp/metrics_to_prw.go index d615b3d132b..87fec96b31f 100644 --- a/pkg/distributor/otlp/metrics_to_prw.go +++ b/pkg/distributor/otlp/metrics_to_prw.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" - "github.com/prometheus/common/model" prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -25,7 +24,7 @@ type Settings struct { // FromMetrics converts pmetric.Metrics to Mimir time series. func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimeseries, error) { - tsMap := map[string]*mimirpb.TimeSeries{} + tsMap := map[uint64]*mimirpb.TimeSeries{} var errs error resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { @@ -50,7 +49,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) - // handle individual metric based on type + // handle individual metrics based on type //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeGauge: @@ -60,15 +59,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes } for x := 0; x < dataPoints.Len(); x++ { // For each data point, createAttributes is called for its attributes - pt := dataPoints.At(x) - labels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - model.MetricNameLabel, - promName, - ) - addSingleGaugeNumberDataPoint(pt, metric, tsMap, labels, promName) + addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() diff --git a/pkg/distributor/otlp/number_data_points.go b/pkg/distributor/otlp/number_data_points.go index 9c33f988c1b..a6b9f31ace2 100644 --- a/pkg/distributor/otlp/number_data_points.go +++ b/pkg/distributor/otlp/number_data_points.go @@ -18,11 +18,19 @@ import ( // series map. func addSingleGaugeNumberDataPoint( pt pmetric.NumberDataPoint, + resource pcommon.Resource, metric pmetric.Metric, - series map[string]*mimirpb.TimeSeries, - labels []mimirpb.LabelAdapter, + settings Settings, + series map[uint64]*mimirpb.TimeSeries, name string, ) { + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + model.MetricNameLabel, + name, + ) sample := &mimirpb.Sample{ // convert ns to ms TimestampMs: convertTimeStamp(pt.Timestamp()), @@ -47,7 +55,7 @@ func addSingleSumNumberDataPoint( resource pcommon.Resource, metric pmetric.Metric, settings Settings, - series map[string]*mimirpb.TimeSeries, + series map[uint64]*mimirpb.TimeSeries, name string, ) { labels := createAttributes( @@ -71,7 +79,7 @@ func addSingleSumNumberDataPoint( } sig := addSample(series, sample, labels, metric.Type().String()) - if ts, ok := series[sig]; sig != "" && ok { + if ts, ok := series[sig]; sig != 0 && ok { exemplars := getMimirExemplars[pmetric.NumberDataPoint](pt) ts.Exemplars = append(ts.Exemplars, exemplars...) } From bae4bfbf9a8ba1cbf5e21aa649e84a92a37d04f6 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 5 Dec 2023 08:14:19 +0100 Subject: [PATCH 6/9] Don't recompute metric name Signed-off-by: Arve Knudsen --- pkg/distributor/otlp/helper.go | 10 ++++------ pkg/distributor/otlp/metrics_to_prw.go | 7 +++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/distributor/otlp/helper.go b/pkg/distributor/otlp/helper.go index 6ff7ca8d74a..c5507df8845 100644 --- a/pkg/distributor/otlp/helper.go +++ b/pkg/distributor/otlp/helper.go @@ -241,10 +241,8 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { // addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It // ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) -func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[uint64]*mimirpb.TimeSeries) { +func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[uint64]*mimirpb.TimeSeries, baseName string) { timestamp := convertTimeStamp(pt.Timestamp()) - // sum, count, and buckets of the histogram should append suffix to baseName - baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) createLabels := func(nameSuffix string, extras ...string) []mimirpb.LabelAdapter { @@ -256,6 +254,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon labels = append(labels, mimirpb.LabelAdapter{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) } + // sum, count, and buckets of the histogram should append suffix to baseName labels = append(labels, mimirpb.LabelAdapter{Name: nameStr, Value: baseName + nameSuffix}) return labels @@ -442,10 +441,8 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { // addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, - tsMap map[uint64]*mimirpb.TimeSeries) { + tsMap map[uint64]*mimirpb.TimeSeries, baseName string) { timestamp := convertTimeStamp(pt.Timestamp()) - // sum and count of the summary should append suffix to baseName - baseName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) createLabels := func(name string, extras ...string) []mimirpb.LabelAdapter { @@ -470,6 +467,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res if pt.Flags().NoRecordedValue() { sum.Value = math.Float64frombits(value.StaleNaN) } + // sum and count of the summary should append suffix to baseName sumlabels := createLabels(baseName + sumStr) addSample(tsMap, sum, sumlabels, metric.Type().String()) diff --git a/pkg/distributor/otlp/metrics_to_prw.go b/pkg/distributor/otlp/metrics_to_prw.go index 87fec96b31f..6117bc5fa3a 100644 --- a/pkg/distributor/otlp/metrics_to_prw.go +++ b/pkg/distributor/otlp/metrics_to_prw.go @@ -75,19 +75,18 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } - name := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) for x := 0; x < dataPoints.Len(); x++ { errs = multierr.Append( errs, addSingleExponentialHistogramDataPoint( - name, + promName, dataPoints.At(x), resource, settings, @@ -101,7 +100,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap) + addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } default: errs = multierr.Append(errs, errors.New("unsupported metric type")) From af962da99f9519a43bbc21243438b1f256019d93 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 5 Dec 2023 08:38:48 +0100 Subject: [PATCH 7/9] Distributor: Optimize by reading/gzip decompressing using pooled buffers Signed-off-by: Arve Knudsen --- tools/trafficdump/model.go | 3 ++- tools/trafficdump/processor.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tools/trafficdump/model.go b/tools/trafficdump/model.go index fb2028c9a82..e834459aaa0 100644 --- a/tools/trafficdump/model.go +++ b/tools/trafficdump/model.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "time" "github.com/prometheus/prometheus/model/labels" @@ -45,7 +46,7 @@ type request struct { PushRequest any `json:"push,omitempty"` - cleanup func() + cleanup func(*sync.Pool) } type requestURL struct { diff --git a/tools/trafficdump/processor.go b/tools/trafficdump/processor.go index a12953f213d..2600bfc4559 100644 --- a/tools/trafficdump/processor.go +++ b/tools/trafficdump/processor.go @@ -59,7 +59,7 @@ func (p *processor) run() { func (p *processor) print(req *request, resp *response) { if req != nil && req.cleanup != nil { - defer req.cleanup() + defer req.cleanup(&bufferPool) } if req != nil && req.ignored { From d23f769617b1b751bab2322099c3d3c3a64a331e Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 7 Dec 2023 11:03:15 +0100 Subject: [PATCH 8/9] Copy improvements from opentelemetry-collector-contrib Signed-off-by: Arve Knudsen --- go.mod | 4 +-- pkg/distributor/otlp/helper.go | 42 +++++++++++----------- pkg/distributor/otlp/histograms.go | 12 ++++++- pkg/distributor/otlp/metrics_to_prw.go | 2 +- pkg/distributor/otlp/number_data_points.go | 26 ++++++++------ 5 files changed, 51 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index eacfe5a4096..36d75aa2eaa 100644 --- a/go.mod +++ b/go.mod @@ -136,7 +136,7 @@ require ( github.com/bits-and-blooms/bitset v1.8.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -235,7 +235,7 @@ require ( go.mongodb.org/mongo-driver v1.13.1 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.1 // indirect - go.opentelemetry.io/collector/semconv v0.93.0 // indirect + go.opentelemetry.io/collector/semconv v0.93.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect go.opentelemetry.io/otel/metric v1.22.0 // indirect go.uber.org/zap v1.21.0 // indirect diff --git a/pkg/distributor/otlp/helper.go b/pkg/distributor/otlp/helper.go index c5507df8845..e2ee8337544 100644 --- a/pkg/distributor/otlp/helper.go +++ b/pkg/distributor/otlp/helper.go @@ -25,7 +25,6 @@ import ( ) const ( - nameStr = "__name__" sumStr = "_sum" countStr = "_count" bucketStr = "_bucket" @@ -70,12 +69,13 @@ func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func addSample(tsMap map[uint64]*mimirpb.TimeSeries, sample *mimirpb.Sample, labels []mimirpb.LabelAdapter, datatype string) uint64 { if sample == nil || labels == nil || tsMap == nil { + // This shouldn't happen return 0 } sig := timeSeriesSignature(datatype, labels) - ts, ok := tsMap[sig] - if ok { + ts := tsMap[sig] + if ts != nil { ts.Samples = append(ts.Samples, *sample) } else { newTs := mimirpb.TimeseriesFromPool() @@ -140,20 +140,12 @@ func timeSeriesSignature(datatype string, labels []mimirpb.LabelAdapter) uint64 var seps = []byte{'\xff'} // createAttributes creates a slice of Mimir labels with OTLP attributes and pairs of string values. -// Unpaired string values are ignored. String pairs overwrite OTLP labels if collision happens, and the overwrite is +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen, and overwrites are // logged. Resulting label names are sanitized. func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []mimirpb.LabelAdapter { serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) - // Ensure attributes are sorted by key for consistent merging of keys which - // collide when sanitized. - labels := make([]mimirpb.LabelAdapter, 0, attributes.Len()) - attributes.Range(func(key string, value pcommon.Value) bool { - labels = append(labels, mimirpb.LabelAdapter{Name: key, Value: value.AsString()}) - return true - }) - // Calculate the maximum possible number of labels we could return so we can preallocate l maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 @@ -168,6 +160,15 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa // map ensures no duplicate label name l := make(map[string]string, maxLabelCount) + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + labels := make([]mimirpb.LabelAdapter, 0, attributes.Len()) + attributes.Range(func(key string, value pcommon.Value) bool { + labels = append(labels, mimirpb.LabelAdapter{Name: key, Value: value.AsString()}) + return true + }) + sort.Stable(ByLabelName(labels)) + for _, label := range labels { var finalKey = prometheustranslator.NormalizeLabel(label.Name) if existingValue, alreadyExists := l[finalKey]; alreadyExists { @@ -255,7 +256,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon } // sum, count, and buckets of the histogram should append suffix to baseName - labels = append(labels, mimirpb.LabelAdapter{Name: nameStr, Value: baseName + nameSuffix}) + labels = append(labels, mimirpb.LabelAdapter{Name: model.MetricNameLabel, Value: baseName + nameSuffix}) return labels } @@ -332,7 +333,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { labels := createLabels(createdSuffix) - addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, metric.Type().String()) + addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, pt.Timestamp(), metric.Type().String()) } } @@ -342,8 +343,7 @@ type exemplarType interface { } func getMimirExemplars[T exemplarType](pt T) []mimirpb.Exemplar { - var mimirExemplars []mimirpb.Exemplar - + mimirExemplars := make([]mimirpb.Exemplar, 0, pt.Exemplars().Len()) for i := 0; i < pt.Exemplars().Len(); i++ { exemplar := pt.Exemplars().At(i) exemplarRunes := 0 @@ -454,7 +454,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res labels = append(labels, mimirpb.LabelAdapter{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) } - labels = append(labels, mimirpb.LabelAdapter{Name: nameStr, Value: name}) + labels = append(labels, mimirpb.LabelAdapter{Name: model.MetricNameLabel, Value: name}) return labels } @@ -501,7 +501,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { createdLabels := createLabels(baseName + createdSuffix) - addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) } } @@ -511,6 +511,7 @@ func addCreatedTimeSeriesIfNeeded( series map[uint64]*mimirpb.TimeSeries, labels []mimirpb.LabelAdapter, startTimestamp pcommon.Timestamp, + timestamp pcommon.Timestamp, metricType string, ) { sig := timeSeriesSignature(metricType, labels) @@ -519,7 +520,8 @@ func addCreatedTimeSeriesIfNeeded( Labels: labels, Samples: []mimirpb.Sample{ { // convert ns to ms - Value: float64(convertTimeStamp(startTimestamp)), + Value: float64(convertTimeStamp(startTimestamp)), + TimestampMs: convertTimeStamp(timestamp), }, }, } @@ -553,7 +555,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta if len(settings.Namespace) > 0 { name = settings.Namespace + "_" + name } - labels := createAttributes(resource, attributes, settings.ExternalLabels, nameStr, name) + labels := createAttributes(resource, attributes, settings.ExternalLabels, model.MetricNameLabel, name) sample := &mimirpb.Sample{ Value: float64(1), // convert ns to ms diff --git a/pkg/distributor/otlp/histograms.go b/pkg/distributor/otlp/histograms.go index 93eed546dd7..9b241f48c73 100644 --- a/pkg/distributor/otlp/histograms.go +++ b/pkg/distributor/otlp/histograms.go @@ -75,7 +75,17 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (mimi nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) h := mimirpb.Histogram{ - Schema: scale, + // The counter reset detection must be compatible with Prometheus to + // safely set ResetHint to NO. This is not ensured currently. + // Sending a sample that triggers counter reset but with ResetHint==NO + // would lead to Prometheus panic as it does not double check the hint. + // Thus we're explicitly saying UNKNOWN here, which is always safe. + // TODO: using created time stamp should be accurate, but we + // need to know here if it was used for the detection. + // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 + // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 + ResetHint: mimirpb.Histogram_UNKNOWN, + Schema: scale, ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()}, // TODO use zero_threshold, if set, see diff --git a/pkg/distributor/otlp/metrics_to_prw.go b/pkg/distributor/otlp/metrics_to_prw.go index 6117bc5fa3a..d33e9e7b048 100644 --- a/pkg/distributor/otlp/metrics_to_prw.go +++ b/pkg/distributor/otlp/metrics_to_prw.go @@ -20,6 +20,7 @@ type Settings struct { DisableTargetInfo bool ExportCreatedMetric bool AddMetricSuffixes bool + SendMetadata bool } // FromMetrics converts pmetric.Metrics to Mimir time series. @@ -58,7 +59,6 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - // For each data point, createAttributes is called for its attributes addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) } case pmetric.MetricTypeSum: diff --git a/pkg/distributor/otlp/number_data_points.go b/pkg/distributor/otlp/number_data_points.go index a6b9f31ace2..b66b6677bd4 100644 --- a/pkg/distributor/otlp/number_data_points.go +++ b/pkg/distributor/otlp/number_data_points.go @@ -62,7 +62,8 @@ func addSingleSumNumberDataPoint( resource, pt.Attributes(), settings.ExternalLabels, - model.MetricNameLabel, name, + model.MetricNameLabel, + name, ) sample := &mimirpb.Sample{ // convert ns to ms @@ -79,7 +80,7 @@ func addSingleSumNumberDataPoint( } sig := addSample(series, sample, labels, metric.Type().String()) - if ts, ok := series[sig]; sig != 0 && ok { + if ts := series[sig]; sig != 0 && ts != nil { exemplars := getMimirExemplars[pmetric.NumberDataPoint](pt) ts.Exemplars = append(ts.Exemplars, exemplars...) } @@ -87,15 +88,18 @@ func addSingleSumNumberDataPoint( // add _created time series if needed if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { startTimestamp := pt.StartTimestamp() - if startTimestamp != 0 { - createdLabels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - nameStr, - name+createdSuffix, - ) - addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, metric.Type().String()) + if startTimestamp == 0 { + return + } + + createdLabels := make([]mimirpb.LabelAdapter, len(labels)) + copy(createdLabels, labels) + for i, l := range createdLabels { + if l.Name == model.MetricNameLabel { + createdLabels[i].Value = name + createdSuffix + break + } } + addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) } } From f39b0212196d37d8a4a3e88d1b8a6e14ec53b97a Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 22 Feb 2024 11:05:40 +0100 Subject: [PATCH 9/9] Handle errors Signed-off-by: Arve Knudsen --- pkg/distributor/otlp/helper.go | 105 +++++++++++++++------ pkg/distributor/otlp/histograms.go | 5 +- pkg/distributor/otlp/metrics_to_prw.go | 20 +++- pkg/distributor/otlp/number_data_points.go | 20 ++-- pkg/distributor/push.go | 4 +- pkg/util/requestbuffers.go | 3 +- pkg/util/requestbuffers_test.go | 6 +- 7 files changed, 116 insertions(+), 47 deletions(-) diff --git a/pkg/distributor/otlp/helper.go b/pkg/distributor/otlp/helper.go index e2ee8337544..f3391fa60eb 100644 --- a/pkg/distributor/otlp/helper.go +++ b/pkg/distributor/otlp/helper.go @@ -67,13 +67,16 @@ func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // creates a new TimeSeries in the map if not found and returns the time series signature. // tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. func addSample(tsMap map[uint64]*mimirpb.TimeSeries, sample *mimirpb.Sample, labels []mimirpb.LabelAdapter, - datatype string) uint64 { - if sample == nil || labels == nil || tsMap == nil { + datatype string) (uint64, error) { + if sample == nil || len(labels) == 0 || len(tsMap) == 0 { // This shouldn't happen - return 0 + return 0, fmt.Errorf("invalid parameter") } - sig := timeSeriesSignature(datatype, labels) + sig, err := timeSeriesSignature(datatype, labels) + if err != nil { + return 0, err + } ts := tsMap[sig] if ts != nil { ts.Samples = append(ts.Samples, *sample) @@ -84,7 +87,7 @@ func addSample(tsMap map[uint64]*mimirpb.TimeSeries, sample *mimirpb.Sample, lab tsMap[sig] = newTs } - return sig + return sig, nil } // addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig; @@ -121,20 +124,32 @@ func addExemplar(tsMap map[uint64]*mimirpb.TimeSeries, bucketBounds []bucketBoun // // the label slice should not contain duplicate label names; this method sorts the slice by label name before creating // the signature. -func timeSeriesSignature(datatype string, labels []mimirpb.LabelAdapter) uint64 { +func timeSeriesSignature(datatype string, labels []mimirpb.LabelAdapter) (uint64, error) { sort.Sort(ByLabelName(labels)) h := xxhash.New() - h.WriteString(datatype) - h.Write(seps) + if _, err := h.WriteString(datatype); err != nil { + return 0, err + } + if _, err := h.Write(seps); err != nil { + return 0, err + } for _, lb := range labels { - h.WriteString(lb.Name) - h.Write(seps) - h.WriteString(lb.Value) - h.Write(seps) + if _, err := h.WriteString(lb.Name); err != nil { + return 0, err + } + if _, err := h.Write(seps); err != nil { + return 0, err + } + if _, err := h.WriteString(lb.Value); err != nil { + return 0, err + } + if _, err := h.Write(seps); err != nil { + return 0, err + } } - return h.Sum64() + return h.Sum64(), nil } var seps = []byte{'\xff'} @@ -242,7 +257,7 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { // addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It // ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) -func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[uint64]*mimirpb.TimeSeries, baseName string) { +func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[uint64]*mimirpb.TimeSeries, baseName string) error { timestamp := convertTimeStamp(pt.Timestamp()) baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) @@ -274,7 +289,9 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon } sumlabels := createLabels(sumStr) - addSample(tsMap, sum, sumlabels, metric.Type().String()) + if _, err := addSample(tsMap, sum, sumlabels, metric.Type().String()); err != nil { + return err + } } @@ -288,7 +305,9 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon } countlabels := createLabels(countStr) - addSample(tsMap, count, countlabels, metric.Type().String()) + if _, err := addSample(tsMap, count, countlabels, metric.Type().String()); err != nil { + return err + } // cumulative count for conversion to cumulative histogram var cumulativeCount uint64 @@ -310,7 +329,10 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) labels := createLabels(bucketStr, leStr, boundStr) - sig := addSample(tsMap, bucket, labels, metric.Type().String()) + sig, err := addSample(tsMap, bucket, labels, metric.Type().String()) + if err != nil { + return err + } bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound}) } @@ -324,7 +346,10 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon infBucket.Value = float64(pt.Count()) } infLabels := createLabels(bucketStr, leStr, pInfStr) - sig := addSample(tsMap, infBucket, infLabels, metric.Type().String()) + sig, err := addSample(tsMap, infBucket, infLabels, metric.Type().String()) + if err != nil { + return err + } bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)}) addExemplars(tsMap, exemplars, bucketBounds) @@ -333,8 +358,12 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { labels := createLabels(createdSuffix) - addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, pt.Timestamp(), metric.Type().String()) + if err := addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, pt.Timestamp(), metric.Type().String()); err != nil { + return err + } } + + return nil } type exemplarType interface { @@ -441,7 +470,7 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { // addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, - tsMap map[uint64]*mimirpb.TimeSeries, baseName string) { + tsMap map[uint64]*mimirpb.TimeSeries, baseName string) error { timestamp := convertTimeStamp(pt.Timestamp()) baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) @@ -469,7 +498,9 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res } // sum and count of the summary should append suffix to baseName sumlabels := createLabels(baseName + sumStr) - addSample(tsMap, sum, sumlabels, metric.Type().String()) + if _, err := addSample(tsMap, sum, sumlabels, metric.Type().String()); err != nil { + return err + } // treat count as a sample in an individual TimeSeries count := &mimirpb.Sample{ @@ -480,7 +511,9 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res count.Value = math.Float64frombits(value.StaleNaN) } countlabels := createLabels(baseName + countStr) - addSample(tsMap, count, countlabels, metric.Type().String()) + if _, err := addSample(tsMap, count, countlabels, metric.Type().String()); err != nil { + return err + } // process each percentile/quantile for i := 0; i < pt.QuantileValues().Len(); i++ { @@ -494,15 +527,21 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res } percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) qtlabels := createLabels(baseName, quantileStr, percentileStr) - addSample(tsMap, quantile, qtlabels, metric.Type().String()) + if _, err := addSample(tsMap, quantile, qtlabels, metric.Type().String()); err != nil { + return err + } } // add _created time series if needed startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { createdLabels := createLabels(baseName + createdSuffix) - addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) + if err := addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()); err != nil { + return err + } } + + return nil } // addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single @@ -513,8 +552,11 @@ func addCreatedTimeSeriesIfNeeded( startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp, metricType string, -) { - sig := timeSeriesSignature(metricType, labels) +) error { + sig, err := timeSeriesSignature(metricType, labels) + if err != nil { + return err + } if _, ok := series[sig]; !ok { series[sig] = &mimirpb.TimeSeries{ Labels: labels, @@ -526,12 +568,14 @@ func addCreatedTimeSeriesIfNeeded( }, } } + + return nil } // addResourceTargetInfo converts the resource to the target info metric -func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[uint64]*mimirpb.TimeSeries) { +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[uint64]*mimirpb.TimeSeries) error { if settings.DisableTargetInfo { - return + return nil } // Use resource attributes (other than those used for job+instance) as the // metric labels for the target info metric @@ -548,7 +592,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta }) if attributes.Len() == 0 { // If we only have job + instance, then target_info isn't useful, so don't add it. - return + return nil } // create parameters for addSample name := targetMetricName @@ -561,7 +605,8 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta // convert ns to ms TimestampMs: convertTimeStamp(timestamp), } - addSample(tsMap, sample, labels, infoType) + _, err := addSample(tsMap, sample, labels, infoType) + return err } // convertTimeStamp converts OTLP timestamp in ns to timestamp in ms diff --git a/pkg/distributor/otlp/histograms.go b/pkg/distributor/otlp/histograms.go index 9b241f48c73..d9dbeded9b4 100644 --- a/pkg/distributor/otlp/histograms.go +++ b/pkg/distributor/otlp/histograms.go @@ -31,10 +31,13 @@ func addSingleExponentialHistogramDataPoint( metric, ) - sig := timeSeriesSignature( + sig, err := timeSeriesSignature( pmetric.MetricTypeExponentialHistogram.String(), labels, ) + if err != nil { + return err + } ts, ok := series[sig] if !ok { ts = &mimirpb.TimeSeries{ diff --git a/pkg/distributor/otlp/metrics_to_prw.go b/pkg/distributor/otlp/metrics_to_prw.go index d33e9e7b048..1813bf7a1f8 100644 --- a/pkg/distributor/otlp/metrics_to_prw.go +++ b/pkg/distributor/otlp/metrics_to_prw.go @@ -59,7 +59,9 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) + if err := addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName); err != nil { + errs = multierr.Append(errs, err) + } } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() @@ -67,7 +69,9 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) + if err := addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName); err != nil { + errs = multierr.Append(errs, err) + } } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() @@ -75,7 +79,9 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) + if err := addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName); err != nil { + errs = multierr.Append(errs, err) + } } case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() @@ -100,14 +106,18 @@ func FromMetrics(md pmetric.Metrics, settings Settings) ([]mimirpb.PreallocTimes errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) } for x := 0; x < dataPoints.Len(); x++ { - addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) + if err := addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName); err != nil { + errs = multierr.Append(errs, err) + } } default: errs = multierr.Append(errs, errors.New("unsupported metric type")) } } } - addResourceTargetInfo(resource, settings, mostRecentTimestamp, tsMap) + if err := addResourceTargetInfo(resource, settings, mostRecentTimestamp, tsMap); err != nil { + errs = multierr.Append(errs, err) + } } mimirTs := mimirpb.PreallocTimeseriesSliceFromPool() diff --git a/pkg/distributor/otlp/number_data_points.go b/pkg/distributor/otlp/number_data_points.go index b66b6677bd4..73cca595896 100644 --- a/pkg/distributor/otlp/number_data_points.go +++ b/pkg/distributor/otlp/number_data_points.go @@ -23,7 +23,7 @@ func addSingleGaugeNumberDataPoint( settings Settings, series map[uint64]*mimirpb.TimeSeries, name string, -) { +) error { labels := createAttributes( resource, pt.Attributes(), @@ -44,7 +44,8 @@ func addSingleGaugeNumberDataPoint( if pt.Flags().NoRecordedValue() { sample.Value = math.Float64frombits(value.StaleNaN) } - addSample(series, sample, labels, metric.Type().String()) + _, err := addSample(series, sample, labels, metric.Type().String()) + return err } // addSingleSumNumberDataPoint converts the Sum metric data point to a Prometheus @@ -57,7 +58,7 @@ func addSingleSumNumberDataPoint( settings Settings, series map[uint64]*mimirpb.TimeSeries, name string, -) { +) error { labels := createAttributes( resource, pt.Attributes(), @@ -78,7 +79,10 @@ func addSingleSumNumberDataPoint( if pt.Flags().NoRecordedValue() { sample.Value = math.Float64frombits(value.StaleNaN) } - sig := addSample(series, sample, labels, metric.Type().String()) + sig, err := addSample(series, sample, labels, metric.Type().String()) + if err != nil { + return err + } if ts := series[sig]; sig != 0 && ts != nil { exemplars := getMimirExemplars[pmetric.NumberDataPoint](pt) @@ -89,7 +93,7 @@ func addSingleSumNumberDataPoint( if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { startTimestamp := pt.StartTimestamp() if startTimestamp == 0 { - return + return nil } createdLabels := make([]mimirpb.LabelAdapter, len(labels)) @@ -100,6 +104,10 @@ func addSingleSumNumberDataPoint( break } } - addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) + if err := addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()); err != nil { + return err + } } + + return nil } diff --git a/pkg/distributor/push.go b/pkg/distributor/push.go index 440ad1cb536..6d99d8abedd 100644 --- a/pkg/distributor/push.go +++ b/pkg/distributor/push.go @@ -135,7 +135,7 @@ func handler( err = httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - rb.CleanUp() + rb.CleanUp(nil) return nil, nil, err } @@ -147,7 +147,7 @@ func handler( cleanup := func() { mimirpb.ReuseSlice(req.Timeseries) - rb.CleanUp() + rb.CleanUp(nil) } return &req.WriteRequest, cleanup, nil } diff --git a/pkg/util/requestbuffers.go b/pkg/util/requestbuffers.go index 0fc613c5208..20c90907600 100644 --- a/pkg/util/requestbuffers.go +++ b/pkg/util/requestbuffers.go @@ -3,6 +3,7 @@ package util import ( "bytes" + "sync" ) // Pool is an abstraction of sync.Pool, for testability. @@ -49,7 +50,7 @@ func (rb *RequestBuffers) Get(size int) *bytes.Buffer { } // CleanUp releases buffers back to the pool. -func (rb *RequestBuffers) CleanUp() { +func (rb *RequestBuffers) CleanUp(*sync.Pool) { for i, b := range rb.buffers { // Make sure the backing array doesn't retain a reference rb.buffers[i] = nil diff --git a/pkg/util/requestbuffers_test.go b/pkg/util/requestbuffers_test.go index b5f8ea6d643..62a9414c714 100644 --- a/pkg/util/requestbuffers_test.go +++ b/pkg/util/requestbuffers_test.go @@ -11,7 +11,9 @@ import ( func TestRequestBuffers(t *testing.T) { rb := NewRequestBuffers(&fakePool{}) - t.Cleanup(rb.CleanUp) + t.Cleanup(func() { + rb.CleanUp(nil) + }) b := rb.Get(1024) require.NotNil(t, b) @@ -21,7 +23,7 @@ func TestRequestBuffers(t *testing.T) { _, err := b.Write([]byte("test")) require.NoError(t, err) - rb.CleanUp() + rb.CleanUp(nil) assert.Nil(t, rb.buffersBacking[0]) b1 := rb.Get(2048)