From 1f2c183469b8363e613dd6880ff8cc4075197782 Mon Sep 17 00:00:00 2001 From: sujal shah Date: Thu, 13 Feb 2025 16:01:17 +0530 Subject: [PATCH] receiver/prometheusremotewrite: Add two fields timestamp and value. Convert sample timestamps from milliseconds to nanoseconds. Set datapoint values using SetDoubleValue based on incoming sample values. Update tests in translateV2 to verify that samples are correctly ingested and their timestamps properly converted. Fixes part of #37277. Signed-off-by: sujal shah --- .../prometheusremotewritereceiver/receiver.go | 28 +++++---- .../receiver_test.go | 59 ++++++++++++------- 2 files changed, 57 insertions(+), 30 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 741d43929b396..57df6cec8f3c3 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -1,6 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 - +// receiver/prometheusremotewritereceiver/receiver.go package prometheusremotewritereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver" import ( @@ -269,15 +269,23 @@ func addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev // addDatapoints adds the labels to the datapoints attributes. // TODO: We're still not handling several fields that make a datapoint complete, e.g. StartTimestamp, // Timestamp, Value, etc. -func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, _ writev2.TimeSeries) { - attributes := datapoints.AppendEmpty().Attributes() - - for _, l := range ls { - if l.Name == "instance" || l.Name == "job" || // Become resource attributes "service.name", "service.instance.id" and "service.namespace" - l.Name == labels.MetricName || // Becomes metric name - l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version - continue +func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, ts writev2.TimeSeries) { + // Add samples from the timeseries + for _, sample := range ts.Samples { + dp := datapoints.AppendEmpty() + + // Set timestamp in nanoseconds (Prometheus uses milliseconds) + dp.SetTimestamp(pcommon.Timestamp(sample.Timestamp * 1e6)) + dp.SetDoubleValue(sample.Value) + + attributes := dp.Attributes() + for _, l := range ls { + if l.Name == "instance" || l.Name == "job" || // Become resource attributes + l.Name == labels.MetricName || // Becomes metric name + l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version + continue + } + attributes.PutStr(l.Name, l.Value) } - attributes.PutStr(l.Name, l.Value) } } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index bc9b234c2c775..d4572e0205123 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" @@ -149,17 +150,17 @@ func TestTranslateV2(t *testing.T) { Timeseries: []writev2.TimeSeries{ { Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16}, // Same scope: scope_name: scope1. scope_version v1 + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, }, { Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16}, // Same scope: scope_name: scope1. scope_version v1 + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 16}, Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, }, { Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 17, 18}, // Different scope: scope_name: scope2. scope_version v2 + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 17, 18}, Samples: []writev2.Sample{{Value: 3, Timestamp: 3}}, }, }, @@ -171,19 +172,30 @@ func TestTranslateV2(t *testing.T) { rmAttributes1.PutStr("service.namespace", "service-x") rmAttributes1.PutStr("service.name", "test") rmAttributes1.PutStr("service.instance.id", "107cn001") + sm1 := rm1.ScopeMetrics().AppendEmpty() sm1.Scope().SetName("scope1") sm1.Scope().SetVersion("v1") - sm1Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() - sm1Attributes.PutStr("d", "e") - sm2Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() - sm2Attributes.PutStr("d", "e") + + dp1 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp1.SetTimestamp(pcommon.Timestamp(1 * 1e6)) + dp1.SetDoubleValue(1.0) + dp1.Attributes().PutStr("d", "e") + + dp2 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp2.SetTimestamp(pcommon.Timestamp(2 * 1e6)) + dp2.SetDoubleValue(2.0) + dp2.Attributes().PutStr("d", "e") sm2 := rm1.ScopeMetrics().AppendEmpty() sm2.Scope().SetName("scope2") sm2.Scope().SetVersion("v2") - sm3Attributes := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() - sm3Attributes.PutStr("foo", "bar") + + dp3 := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp3.SetTimestamp(pcommon.Timestamp(3 * 1e6)) + dp3.SetDoubleValue(3.0) + dp3.Attributes().PutStr("foo", "bar") + return expected }(), expectedStats: remote.WriteResponseStats{}, @@ -224,23 +236,30 @@ func TestTranslateV2(t *testing.T) { rmAttributes1.PutStr("service.namespace", "service-x") rmAttributes1.PutStr("service.name", "test") rmAttributes1.PutStr("service.instance.id", "107cn001") + sm1 := rm1.ScopeMetrics().AppendEmpty() - sm1Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() - sm1Attributes.PutStr("d", "e") - sm1Attributes.PutStr("foo", "bar") - // Since we don't check "scope_name" and "scope_version", we end up with duplicated scope metrics for repeated series. - // TODO: Properly handle scope metrics. - sm2Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() - sm2Attributes.PutStr("d", "e") - sm2Attributes.PutStr("foo", "bar") + dp1 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp1.SetTimestamp(pcommon.Timestamp(1 * 1e6)) + dp1.SetDoubleValue(1.0) + dp1.Attributes().PutStr("d", "e") + dp1.Attributes().PutStr("foo", "bar") + + dp2 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp2.SetTimestamp(pcommon.Timestamp(2 * 1e6)) + dp2.SetDoubleValue(2.0) + dp2.Attributes().PutStr("d", "e") + dp2.Attributes().PutStr("foo", "bar") rm2 := expected.ResourceMetrics().AppendEmpty() rmAttributes2 := rm2.Resource().Attributes() rmAttributes2.PutStr("service.name", "foo") rmAttributes2.PutStr("service.instance.id", "bar") - mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() - mAttributes2.PutStr("d", "e") - mAttributes2.PutStr("foo", "bar") + + dp3 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp3.SetTimestamp(pcommon.Timestamp(2 * 1e6)) + dp3.SetDoubleValue(2.0) + dp3.Attributes().PutStr("d", "e") + dp3.Attributes().PutStr("foo", "bar") return expected }(),