From 40105734dc1bd618db5781691cfb0c3e7a2cd6a2 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 8 Jul 2024 14:08:03 -0700 Subject: [PATCH 01/11] Add logic to batch metrics by attribute types --- receiver/datadogreceiver/batcher.go | 119 ++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 receiver/datadogreceiver/batcher.go diff --git a/receiver/datadogreceiver/batcher.go b/receiver/datadogreceiver/batcher.go new file mode 100644 index 000000000000..ca1a956066f2 --- /dev/null +++ b/receiver/datadogreceiver/batcher.go @@ -0,0 +1,119 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type Batcher struct { + pmetric.Metrics + + resourceMetrics map[identity.Resource]pmetric.ResourceMetrics + scopeMetrics map[identity.Scope]pmetric.ScopeMetrics + metrics map[identity.Metric]pmetric.Metric +} + +func newBatcher() Batcher { + return Batcher{ + resourceMetrics: make(map[identity.Resource]pmetric.ResourceMetrics), + scopeMetrics: make(map[identity.Scope]pmetric.ScopeMetrics), + metrics: make(map[identity.Metric]pmetric.Metric), + } +} + +// Dimensions stores the properties of the series that are needed in order +// to unique identify the series. This is needed in order to batch metrics by +// resource, scope, and datapoint attributes +type Dimensions struct { + name string + metricType pmetric.MetricType + resourceAttrs pcommon.Map + scopeAttrs pcommon.Map + dpAttrs pcommon.Map + buildInfo string +} + +var metricTypeMap = map[string]pmetric.MetricType{ + "count": pmetric.MetricTypeSum, + "gauge": pmetric.MetricTypeGauge, + "rate": pmetric.MetricTypeSum, + "service_check": pmetric.MetricTypeGauge, + "sketch": pmetric.MetricTypeExponentialHistogram, +} + +func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) Dimensions { + resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(tags, host, stringPool) + return Dimensions{ + name: name, + metricType: metricTypeMap[metricType], + buildInfo: version, + resourceAttrs: resourceAttrs, + scopeAttrs: scopeAttrs, + dpAttrs: dpAttrs, + } +} + +func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) { + resource := getResource(dim.resourceAttrs) + resourceID := identity.OfResource(resource) + resourceMetrics, ok := b.resourceMetrics[resourceID] + if !ok { + resourceMetrics = b.Metrics.ResourceMetrics().AppendEmpty() + resource.MoveTo(resourceMetrics.Resource()) + b.resourceMetrics[resourceID] = resourceMetrics + } + + scope := getScope(dim.scopeAttrs, dim.buildInfo) + scopeID := identity.OfScope(resourceID, scope) + scopeMetrics, ok := b.scopeMetrics[scopeID] + if !ok { + scopeMetrics = resourceMetrics.ScopeMetrics().AppendEmpty() + scope.MoveTo(scopeMetrics.Scope()) + b.scopeMetrics[scopeID] = scopeMetrics + } + + m := getMetric(dim) + metricID := identity.OfMetric(scopeID, m) + metric, ok := b.metrics[metricID] + if !ok { + metric = scopeMetrics.Metrics().AppendEmpty() + m.MoveTo(metric) + b.metrics[metricID] = metric + } + + return metric, metricID +} + +func getResource(attrs pcommon.Map) pcommon.Resource { + resource := pcommon.NewResource() + attrs.CopyTo(resource.Attributes()) // TODO(jesus.vazquez) review this copy + return resource +} + +func getScope(attrs pcommon.Map, version string) pcommon.InstrumentationScope { + scope := pcommon.NewInstrumentationScope() + scope.SetName("otelcol/datadogreceiver") + scope.SetVersion(version) + attrs.CopyTo(scope.Attributes()) + return scope +} + +func getMetric(dim Dimensions) pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName(dim.name) + metric.Type() + switch dim.metricType { + case pmetric.MetricTypeSum: + metric.SetEmptySum() + case pmetric.MetricTypeGauge: + metric.SetEmptyGauge() + case pmetric.MetricTypeExponentialHistogram: + metric.SetEmptyExponentialHistogram() + } + return metric +} From 83dd84e9580265b6d523d7bc81263d67df6b78a2 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 8 Jul 2024 14:11:05 -0700 Subject: [PATCH 02/11] Add support for V1 series endpoint Co-authored by: Jesus Vazquez : --- .../datadogreceiver/metrics_translator.go | 100 +++++++++++++++++- receiver/datadogreceiver/receiver.go | 33 +++++- 2 files changed, 127 insertions(+), 6 deletions(-) diff --git a/receiver/datadogreceiver/metrics_translator.go b/receiver/datadogreceiver/metrics_translator.go index 25bcda2a0896..d616912e5143 100644 --- a/receiver/datadogreceiver/metrics_translator.go +++ b/receiver/datadogreceiver/metrics_translator.go @@ -3,6 +3,8 @@ package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver" import ( + datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "go.opentelemetry.io/collector/pdata/pmetric" "sync" "go.opentelemetry.io/collector/component" @@ -13,12 +15,104 @@ import ( type MetricsTranslator struct { sync.RWMutex - buildInfo component.BuildInfo - lastTs map[identity.Stream]pcommon.Timestamp + buildInfo component.BuildInfo + lastTs map[identity.Stream]pcommon.Timestamp + stringPool *StringPool } func newMetricsTranslator() *MetricsTranslator { return &MetricsTranslator{ - lastTs: make(map[identity.Stream]pcommon.Timestamp), + lastTs: make(map[identity.Stream]pcommon.Timestamp), + stringPool: newStringPool(), } } + +func (mt *MetricsTranslator) streamHasTimestamp(stream identity.Stream) (pcommon.Timestamp, bool) { + mt.RLock() + defer mt.RUnlock() + ts, ok := mt.lastTs[stream] + return ts, ok +} + +func (mt *MetricsTranslator) updateLastTsForStream(stream identity.Stream, ts pcommon.Timestamp) { + mt.Lock() + defer mt.Unlock() + mt.lastTs[stream] = ts +} + +const ( + TypeGauge string = "gauge" + TypeRate string = "rate" + TypeCount string = "count" +) + +type SeriesList struct { + Series []datadogV1.Series `json:"series"` +} + +func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metrics { + bt := newBatcher() + bt.Metrics = pmetric.NewMetrics() + + for _, serie := range series.Series { + var dps pmetric.NumberDataPointSlice + + dimensions := parseSeriesProperties(serie.Metric, serie.GetType(), serie.GetTags(), serie.GetHost(), mt.buildInfo.Version, mt.stringPool) + metric, metricID := bt.Lookup(dimensions) + + switch serie.GetType() { + case TypeCount: + metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + metric.Sum().SetIsMonotonic(false) // See https://docs.datadoghq.com/metrics/types/?tab=count#definition + dps = metric.Sum().DataPoints() + case TypeGauge: + dps = metric.Gauge().DataPoints() + case TypeRate: + metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + dps = metric.Sum().DataPoints() + default: + // Type is unset/unspecified + continue + } + + dps.EnsureCapacity(len(serie.Points)) + + var dp pmetric.NumberDataPoint + var ts uint64 + var value float64 + // The Datadog API returns a slice of slices of points [][]*float64 which is a bit awkward to work with. + // It looks like this: + // points := [][]*float64{ + // {×tamp1, &value1}, + // {×tamp2, &value2}, + // } + // We need to flatten this to a slice of *float64 to work with it. And we know that in that slice, the first + // element is the timestamp and the second is the value. + for _, points := range serie.Points { + if len(points) != 2 { + continue // The datapoint is missing a timestamp and/or value, so this point should be skipped + } + ts = uint64(*points[0]) + value = *points[1] + + dp = dps.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(ts * 1_000_000_000)) // OTel uses nanoseconds, while Datadog uses seconds + + if *serie.Type == TypeRate { + if serie.Interval.IsSet() { + value *= float64(serie.GetInterval()) + } + } + dp.SetDoubleValue(value) + dimensions.dpAttrs.CopyTo(dp.Attributes()) + + stream := identity.OfStream(metricID, dp) + ts, ok := mt.streamHasTimestamp(stream) + if ok { + dp.SetStartTimestamp(ts) + } + mt.updateLastTsForStream(stream, dp.Timestamp()) + } + } + return bt.Metrics +} diff --git a/receiver/datadogreceiver/receiver.go b/receiver/datadogreceiver/receiver.go index 780dc1a68fbf..7fa8e9587652 100644 --- a/receiver/datadogreceiver/receiver.go +++ b/receiver/datadogreceiver/receiver.go @@ -5,8 +5,10 @@ package datadogreceiver // import "github.com/open-telemetry/opentelemetry-colle import ( "context" + "encoding/json" "errors" "fmt" + "io" "net/http" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" @@ -143,9 +145,34 @@ func (ddr *datadogReceiver) handleV1Series(w http.ResponseWriter, req *http.Requ ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err) }(&metricsCount) - err = fmt.Errorf("series v1 endpoint not implemented") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err)) + buf := getBuffer() + defer putBuffer(buf) + if _, err = io.Copy(buf, req.Body); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error(err.Error()) + return + } + + seriesList := SeriesList{} + err = json.Unmarshal(buf.Bytes(), &seriesList) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + ddr.params.Logger.Error(err.Error()) + return + } + + metrics := translateMetricsV1(seriesList, ddr.metricsTranslator) + metricsCount = metrics.DataPointCount() + + err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err)) + return + } + + w.WriteHeader(http.StatusAccepted) + _, err = w.Write([]byte("OK")) } // handleV2Series handles the v2 series endpoint https://docs.datadoghq.com/api/latest/metrics/#submit-metrics From d5f7724ccac33ab7a62832e40f390ad2c086df77 Mon Sep 17 00:00:00 2001 From: Federico Torres Date: Mon, 8 Jul 2024 14:30:24 -0700 Subject: [PATCH 03/11] Add test helper functions --- receiver/datadogreceiver/testutil.go | 64 ++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 receiver/datadogreceiver/testutil.go diff --git a/receiver/datadogreceiver/testutil.go b/receiver/datadogreceiver/testutil.go new file mode 100644 index 000000000000..8b31d5effdf0 --- /dev/null +++ b/receiver/datadogreceiver/testutil.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver" + +import ( + "testing" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func createMetricsTranslator() *MetricsTranslator { + mt := newMetricsTranslator() + mt.buildInfo = component.BuildInfo{ + Command: "otelcol", + Description: "OpenTelemetry Collector", + Version: "latest", + } + return mt +} + +func requireResourceMetrics(t *testing.T, result pmetric.Metrics, expectedAttrs pcommon.Map, expectedLen int) { + require.Equal(t, expectedLen, result.ResourceMetrics().Len()) + require.Equal(t, expectedAttrs, result.ResourceMetrics().At(0).Resource().Attributes()) +} + +func requireScopeMetrics(t *testing.T, result pmetric.Metrics, expectedScopeMetricsLen, expectedMetricsLen int) { + require.Equal(t, expectedScopeMetricsLen, result.ResourceMetrics().At(0).ScopeMetrics().Len()) + require.Equal(t, expectedMetricsLen, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) +} + +func requireScope(t *testing.T, result pmetric.Metrics, expectedAttrs pcommon.Map, expectedName, expectedVersion string) { + require.Equal(t, expectedName, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, expectedVersion, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Version()) + require.Equal(t, expectedAttrs, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes()) +} + +func requireMetricAndDataPointCounts(t *testing.T, result pmetric.Metrics, expectedMetricCount, expectedDpCount int) { + require.Equal(t, expectedMetricCount, result.MetricCount()) + require.Equal(t, expectedDpCount, result.DataPointCount()) +} + +func requireSum(t *testing.T, metric pmetric.Metric, expectedName string, expectedAggregationTemporality pmetric.AggregationTemporality, expectedDpsLen int) { + require.Equal(t, expectedName, metric.Name()) + require.Equal(t, pmetric.MetricTypeSum, metric.Type()) + require.Equal(t, expectedAggregationTemporality, metric.Sum().AggregationTemporality()) + require.Equal(t, expectedDpsLen, metric.Sum().DataPoints().Len()) +} + +func requireGauge(t *testing.T, metric pmetric.Metric, expectedName string, expectedDpsLen int) { + require.Equal(t, expectedName, metric.Name()) + require.Equal(t, pmetric.MetricTypeGauge, metric.Type()) + require.Equal(t, expectedDpsLen, metric.Gauge().DataPoints().Len()) +} + +func requireDp(t *testing.T, dp pmetric.NumberDataPoint, expectedAttrs pcommon.Map, expectedTime int64, expectedValue float64) { + require.Equal(t, expectedTime, dp.Timestamp().AsTime().Unix()) + require.Equal(t, expectedValue, dp.DoubleValue()) + require.Equal(t, expectedAttrs, dp.Attributes()) +} From 0fedbe2be82e8c12f44e67404f294af2204cc035 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 8 Jul 2024 14:49:22 -0700 Subject: [PATCH 04/11] Add tests for V1 series endpoint Co-authored by: Jesus Vazquez : --- .../metrics_translator_test.go | 160 ++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 receiver/datadogreceiver/metrics_translator_test.go diff --git a/receiver/datadogreceiver/metrics_translator_test.go b/receiver/datadogreceiver/metrics_translator_test.go new file mode 100644 index 000000000000..f9c37797aa7b --- /dev/null +++ b/receiver/datadogreceiver/metrics_translator_test.go @@ -0,0 +1,160 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package datadogreceiver + +import ( + "testing" + + "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func strPtr(s string) *string { return &s } +func float64Ptr(f float64) *float64 { return &f } + +type testPoint struct { + Ts int64 + Value float64 +} + +func testPointsToDatadogPoints(points []testPoint) [][]*float64 { + datadogPoints := make([][]*float64, len(points)) + for i, point := range points { + datadogPoints[i] = []*float64{float64Ptr(float64(point.Ts)), float64Ptr(point.Value)} + } + return datadogPoints + +} + +func TestTranslateMetricsV1(t *testing.T) { + tests := []struct { + name string + + series SeriesList + expect func(t *testing.T, result pmetric.Metrics) + }{ + { + name: "Count metric", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestCount", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:tag1", "version:tag2"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + { + 1636629081, 1.0, + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + requireResourceMetrics(t, result, expectedResourceAttrs, 1) + requireScopeMetrics(t, result, 1, 1) + requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireSum(t, metric, "TestCount", pmetric.AggregationTemporalityDelta, 2) + + dp := metric.Sum().DataPoints().At(0) + requireDp(t, dp, expectedDpAttrs, 1636629071, 0.5) + + dp = metric.Sum().DataPoints().At(1) + requireDp(t, dp, expectedDpAttrs, 1636629081, 1.0) + }, + }, + { + name: "Gauge metric", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestGauge", + Host: strPtr("Host1"), + Type: strPtr(TypeGauge), + Tags: []string{"env:tag1", "version:tag2"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + Ts: 1636629071, + Value: 2, + }, + { + Ts: 1636629081, + Value: 3, + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + requireResourceMetrics(t, result, expectedResourceAttrs, 1) + requireScopeMetrics(t, result, 1, 1) + requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireGauge(t, metric, "TestGauge", 2) + + dp := metric.Gauge().DataPoints().At(0) + requireDp(t, dp, expectedDpAttrs, 1636629071, 2) + + dp = metric.Gauge().DataPoints().At(1) + requireDp(t, dp, expectedDpAttrs, 1636629081, 3) + }, + }, + { + name: "Rate metric", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestRate", + Host: strPtr("Host1"), + Type: strPtr(TypeRate), + Tags: []string{"env:tag1", "version:tag2"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + Ts: 1636629071, + Value: 2, + }, + { + Ts: 1636629081, + Value: 3, + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "Host1", newStringPool()) + requireResourceMetrics(t, result, expectedResourceAttrs, 1) + requireScopeMetrics(t, result, 1, 1) + requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireSum(t, metric, "TestRate", pmetric.AggregationTemporalityDelta, 2) + + dp := metric.Sum().DataPoints().At(0) + requireDp(t, dp, expectedDpAttrs, 1636629071, 2) + + dp = metric.Sum().DataPoints().At(1) + requireDp(t, dp, expectedDpAttrs, 1636629081, 3) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + result := translateMetricsV1(tt.series, mt) + + tt.expect(t, result) + }) + } +} From e42f62f6f7d754a367fdd93c7c58a2159352f56c Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 8 Jul 2024 14:50:44 -0700 Subject: [PATCH 05/11] Add tests for batching by attributes --- receiver/datadogreceiver/batcher_test.go | 289 +++++++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 receiver/datadogreceiver/batcher_test.go diff --git a/receiver/datadogreceiver/batcher_test.go b/receiver/datadogreceiver/batcher_test.go new file mode 100644 index 000000000000..e06b3572edf2 --- /dev/null +++ b/receiver/datadogreceiver/batcher_test.go @@ -0,0 +1,289 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package datadogreceiver + +import ( + "testing" + + "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestMetricBatcher(t *testing.T) { + tests := []struct { + name string + series SeriesList + expect func(t *testing.T, result pmetric.Metrics) + }{ + { + name: "Same metric, same tags, different hosts", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestCount1", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:tag1", "service:test1", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + { + Metric: "TestCount1", + Host: strPtr("Host2"), + Type: strPtr(TypeCount), + Tags: []string{"env:tag1", "service:test1", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + // Different hosts should result in different ResourceMetrics + require.Equal(t, 2, result.ResourceMetrics().Len()) + resource1 := result.ResourceMetrics().At(0) + resource2 := result.ResourceMetrics().At(1) + v, exists := resource1.Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host1", v.AsString()) + v, exists = resource2.Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host2", v.AsString()) + + require.Equal(t, 1, resource1.ScopeMetrics().Len()) + require.Equal(t, 1, resource2.ScopeMetrics().Len()) + + require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len()) + + require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name()) + require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name()) + }, + }, + { + name: "Same host, different metric names", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestCount1", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:tag1", "service:test1", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + { + Metric: "TestCount2", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:tag1", "service:test1", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + // The different metrics will fall under the same ResourceMetric and ScopeMetric + // and there will be separate metrics under the ScopeMetric.Metrics() + require.Equal(t, 1, result.ResourceMetrics().Len()) + resource := result.ResourceMetrics().At(0) + + v, exists := resource.Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host1", v.AsString()) + + require.Equal(t, 1, resource.ScopeMetrics().Len()) + + require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, "TestCount1", resource.ScopeMetrics().At(0).Metrics().At(0).Name()) + require.Equal(t, "TestCount2", resource.ScopeMetrics().At(0).Metrics().At(1).Name()) + }, + }, + { + name: "Same host, same metric name, single tag diff", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestCount1", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:dev", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + { + Metric: "TestCount1", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:prod", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + // Differences in attribute values should result in different resourceMetrics + require.Equal(t, 2, result.ResourceMetrics().Len()) + resource1 := result.ResourceMetrics().At(0) + resource2 := result.ResourceMetrics().At(1) + v, exists := resource1.Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host1", v.AsString()) + v, exists = resource2.Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host1", v.AsString()) + v, exists = resource1.Resource().Attributes().Get("deployment.environment") + require.True(t, exists) + require.Equal(t, "dev", v.AsString()) + v, exists = resource2.Resource().Attributes().Get("deployment.environment") + require.True(t, exists) + require.Equal(t, "prod", v.AsString()) + + require.Equal(t, 1, resource1.ScopeMetrics().Len()) + require.Equal(t, 1, resource1.ScopeMetrics().Len()) + + require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len()) + + require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len()) + + require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name()) + require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name()) + }, + }, + { + name: "Same host, same metric name, same tags, different type", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestMetric", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:dev", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + { + Metric: "TestMetric", + Host: strPtr("Host1"), + Type: strPtr(TypeGauge), + Tags: []string{"env:dev", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + // The different metrics will fall under the same ResourceMetric and ScopeMetric + // and there will be separate metrics under the ScopeMetric.Metrics() due to the different + // data types + require.Equal(t, 1, result.ResourceMetrics().Len()) + resource := result.ResourceMetrics().At(0) + + v, exists := resource.Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host1", v.AsString()) + + require.Equal(t, 1, resource.ScopeMetrics().Len()) + + require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len()) + + require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name()) + require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(1).Name()) + + require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type()) + require.Equal(t, pmetric.MetricTypeGauge, resource.ScopeMetrics().At(0).Metrics().At(1).Type()) + }, + }, + { + name: "Same host, same metric name, same tags, diff datapoints", + series: SeriesList{ + Series: []datadogV1.Series{ + { + Metric: "TestMetric", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:dev", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629071, 0.5, + }, + }), + }, + { + Metric: "TestMetric", + Host: strPtr("Host1"), + Type: strPtr(TypeCount), + Tags: []string{"env:dev", "version:tag1"}, + Points: testPointsToDatadogPoints([]testPoint{ + { + 1636629081, 1.0, // Different timestamp and value + }, + }), + }, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + // Same host, tags, and metric name but two different datapoints + // should result in a single resourceMetric, scopeMetric, and metric + // but two different datapoints under that metric + require.Equal(t, 1, result.ResourceMetrics().Len()) + resource := result.ResourceMetrics().At(0) + + v, exists := resource.Resource().Attributes().Get("host.name") + require.True(t, exists) + require.Equal(t, "Host1", v.AsString()) + + require.Equal(t, 1, resource.ScopeMetrics().Len()) + + require.Equal(t, 1, resource.ScopeMetrics().At(0).Metrics().Len()) + + require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name()) + + require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type()) + require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len()) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := newMetricsTranslator() + mt.buildInfo = component.BuildInfo{ + Command: "otelcol", + Description: "OpenTelemetry Collector", + Version: "latest", + } + result := translateMetricsV1(tt.series, mt) + + tt.expect(t, result) + }) + } +} From ee4e3e43d53405b1a343c88cba3326c283279aa9 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 8 Jul 2024 14:50:58 -0700 Subject: [PATCH 06/11] Update vendoring --- receiver/datadogreceiver/go.mod | 5 +++++ receiver/datadogreceiver/go.sum | 8 ++++++++ testbed/go.mod | 2 ++ testbed/go.sum | 4 ++++ 4 files changed, 19 insertions(+) diff --git a/receiver/datadogreceiver/go.mod b/receiver/datadogreceiver/go.mod index 60cc8cbfbf04..60725aa1daa3 100644 --- a/receiver/datadogreceiver/go.mod +++ b/receiver/datadogreceiver/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.4 + github.com/DataDog/datadog-api-client-go/v2 v2.27.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.105.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.105.0 github.com/stretchr/testify v1.9.0 @@ -18,11 +19,13 @@ require ( go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 google.golang.org/protobuf v1.34.2 ) require ( + github.com/DataDog/zstd v1.5.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -31,6 +34,7 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/goccy/go-json v0.10.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect @@ -74,6 +78,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.27.0 // indirect + golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/receiver/datadogreceiver/go.sum b/receiver/datadogreceiver/go.sum index 7c8906604713..88bc7ce9bcdc 100644 --- a/receiver/datadogreceiver/go.sum +++ b/receiver/datadogreceiver/go.sum @@ -1,5 +1,9 @@ github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.4 h1:nOSyRWX5tkxkGm0n9F7ZZP3Mw3VozNcBZJgeBj7leTY= github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.4/go.mod h1:gHkSUTn6H6UEZQHY3XWBIGNjfI3Tdi0IxlrxIFBWDwU= +github.com/DataDog/datadog-api-client-go/v2 v2.27.0 h1:AGZj41frjnjMufQHQbJH2fzmifOs20wpmVDtIBCv33E= +github.com/DataDog/datadog-api-client-go/v2 v2.27.0/go.mod h1:QKOu6vscsh87fMY1lHfLEmNSunyXImj8BUaUWJXOehc= +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -18,6 +22,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c= github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -175,6 +181,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/testbed/go.mod b/testbed/go.mod index 3ce8f642ebd3..497a62246175 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -81,6 +81,8 @@ require ( github.com/Code-Hex/go-generics-cache v1.5.1 // indirect github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.4 // indirect github.com/DataDog/datadog-agent/pkg/trace/exportable v0.0.0-20201016145401-4646cf596b02 // indirect + github.com/DataDog/datadog-api-client-go/v2 v2.27.0 // indirect + github.com/DataDog/zstd v1.5.2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/alecthomas/participle/v2 v2.1.1 // indirect github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect diff --git a/testbed/go.sum b/testbed/go.sum index 7fa5e53829a4..0c10e17473a0 100644 --- a/testbed/go.sum +++ b/testbed/go.sum @@ -66,8 +66,12 @@ github.com/DataDog/datadog-agent/pkg/trace/exportable v0.0.0-20201016145401-4646 github.com/DataDog/datadog-agent/pkg/util/log v0.0.0-20201009091607-ce4e57cdf8f4/go.mod h1:cRy7lwapA3jcjnX74kU6NFkXaRGQyB0l/QZA0IwYGEQ= github.com/DataDog/datadog-agent/pkg/util/log v0.0.0-20201009092105-58e18918b2db/go.mod h1:cRy7lwapA3jcjnX74kU6NFkXaRGQyB0l/QZA0IwYGEQ= github.com/DataDog/datadog-agent/pkg/util/winutil v0.0.0-20201009092105-58e18918b2db/go.mod h1:EtS4X73GXAyrpVddkLQ4SewSQX+zv284e8iIkVBXgtk= +github.com/DataDog/datadog-api-client-go/v2 v2.27.0 h1:AGZj41frjnjMufQHQbJH2fzmifOs20wpmVDtIBCv33E= +github.com/DataDog/datadog-api-client-go/v2 v2.27.0/go.mod h1:QKOu6vscsh87fMY1lHfLEmNSunyXImj8BUaUWJXOehc= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go v3.5.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= From c8f8665cf25bd9f8b3b070be3e860950f4685389 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 8 Jul 2024 16:56:29 -0700 Subject: [PATCH 07/11] Add end-to-end test for v1 series Co-authored by: Federico Torres : --- receiver/datadogreceiver/receiver_test.go | 66 +++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index 445ecad2ef66..d8785b5e932c 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -4,9 +4,13 @@ package datadogreceiver import ( + "bytes" "context" "errors" "fmt" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" "io" "net/http" "strings" @@ -102,3 +106,65 @@ func TestDatadogServer(t *testing.T) { }) } } + +func TestDatadogMetricsV1_EndToEnd(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" // Using a randomly assigned address + sink := new(consumertest.MetricsSink) + + dd, err := newDataDogReceiver( + cfg, + receivertest.NewNopCreateSettings(), + ) + require.NoError(t, err, "Must not error when creating receiver") + dd.(*datadogReceiver).nextMetricsConsumer = sink + + require.NoError(t, dd.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, dd.Shutdown(context.Background())) + }() + + metricsPayloadV1 := []byte(`{ + "series": [ + { + "metric": "system.load.1", + "host": "testHost", + "type": "count", + "points": [[1636629071,0.7]], + "source_type_name": "kubernetes", + "tags": ["environment:test"] + } + ] + }`) + + req, err := http.NewRequest( + http.MethodPost, + fmt.Sprintf("http://%s/api/v1/series", dd.(*datadogReceiver).address), + io.NopCloser(bytes.NewReader(metricsPayloadV1)), + ) + require.NoError(t, err, "Must not error when creating request") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err, "Must not error performing request") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, multierr.Combine(err, resp.Body.Close()), "Must not error when reading body") + require.Equal(t, string(body), "OK", "Expected response to be 'OK', got %s", string(body)) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + + mds := sink.AllMetrics() + require.Len(t, mds, 1) + got := mds[0] + require.Equal(t, 1, got.ResourceMetrics().Len()) + metrics := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + assert.Equal(t, 1, metrics.Len()) + metric := metrics.At(0) + assert.Equal(t, pmetric.MetricTypeSum, metric.Type()) + assert.Equal(t, "system.load.1", metric.Name()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, metric.Sum().AggregationTemporality()) + assert.Equal(t, false, metric.Sum().IsMonotonic()) + assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(0).Timestamp()) + assert.Equal(t, 0.7, metric.Sum().DataPoints().At(0).DoubleValue()) + expectedEnvironment, _ := metric.Sum().DataPoints().At(0).Attributes().Get("environment") + assert.Equal(t, "test", expectedEnvironment.AsString()) +} From 9f9d651ef360ea72eb6d7021a6e067ee48709b66 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Tue, 9 Jul 2024 12:51:43 -0700 Subject: [PATCH 08/11] Refactoring of batching and v1 series translation code --- receiver/datadogreceiver/batcher.go | 24 +++++++++---------- receiver/datadogreceiver/batcher_test.go | 2 +- .../datadogreceiver/metrics_translator.go | 18 +++++++------- .../metrics_translator_test.go | 2 +- receiver/datadogreceiver/receiver.go | 2 +- receiver/datadogreceiver/receiver_test.go | 9 +++---- receiver/datadogreceiver/testutil.go | 5 ---- 7 files changed, 29 insertions(+), 33 deletions(-) diff --git a/receiver/datadogreceiver/batcher.go b/receiver/datadogreceiver/batcher.go index ca1a956066f2..ee79a1a8d944 100644 --- a/receiver/datadogreceiver/batcher.go +++ b/receiver/datadogreceiver/batcher.go @@ -20,6 +20,7 @@ type Batcher struct { func newBatcher() Batcher { return Batcher{ + Metrics: pmetric.NewMetrics(), resourceMetrics: make(map[identity.Resource]pmetric.ResourceMetrics), scopeMetrics: make(map[identity.Scope]pmetric.ScopeMetrics), metrics: make(map[identity.Metric]pmetric.Metric), @@ -59,7 +60,7 @@ func parseSeriesProperties(name string, metricType string, tags []string, host s } func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) { - resource := getResource(dim.resourceAttrs) + resource := dim.Resource() resourceID := identity.OfResource(resource) resourceMetrics, ok := b.resourceMetrics[resourceID] if !ok { @@ -68,7 +69,7 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) { b.resourceMetrics[resourceID] = resourceMetrics } - scope := getScope(dim.scopeAttrs, dim.buildInfo) + scope := dim.Scope() scopeID := identity.OfScope(resourceID, scope) scopeMetrics, ok := b.scopeMetrics[scopeID] if !ok { @@ -77,7 +78,7 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) { b.scopeMetrics[scopeID] = scopeMetrics } - m := getMetric(dim) + m := dim.Metric() metricID := identity.OfMetric(scopeID, m) metric, ok := b.metrics[metricID] if !ok { @@ -89,25 +90,24 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) { return metric, metricID } -func getResource(attrs pcommon.Map) pcommon.Resource { +func (d Dimensions) Resource() pcommon.Resource { resource := pcommon.NewResource() - attrs.CopyTo(resource.Attributes()) // TODO(jesus.vazquez) review this copy + d.resourceAttrs.CopyTo(resource.Attributes()) // TODO(jesus.vazquez) review this copy return resource } -func getScope(attrs pcommon.Map, version string) pcommon.InstrumentationScope { +func (d Dimensions) Scope() pcommon.InstrumentationScope { scope := pcommon.NewInstrumentationScope() scope.SetName("otelcol/datadogreceiver") - scope.SetVersion(version) - attrs.CopyTo(scope.Attributes()) + scope.SetVersion(d.buildInfo) + d.scopeAttrs.CopyTo(scope.Attributes()) return scope } -func getMetric(dim Dimensions) pmetric.Metric { +func (d Dimensions) Metric() pmetric.Metric { metric := pmetric.NewMetric() - metric.SetName(dim.name) - metric.Type() - switch dim.metricType { + metric.SetName(d.name) + switch d.metricType { case pmetric.MetricTypeSum: metric.SetEmptySum() case pmetric.MetricTypeGauge: diff --git a/receiver/datadogreceiver/batcher_test.go b/receiver/datadogreceiver/batcher_test.go index e06b3572edf2..f9f21fc48391 100644 --- a/receiver/datadogreceiver/batcher_test.go +++ b/receiver/datadogreceiver/batcher_test.go @@ -281,7 +281,7 @@ func TestMetricBatcher(t *testing.T) { Description: "OpenTelemetry Collector", Version: "latest", } - result := translateMetricsV1(tt.series, mt) + result := mt.translateMetricsV1(tt.series) tt.expect(t, result) }) diff --git a/receiver/datadogreceiver/metrics_translator.go b/receiver/datadogreceiver/metrics_translator.go index d616912e5143..beb56bd744a8 100644 --- a/receiver/datadogreceiver/metrics_translator.go +++ b/receiver/datadogreceiver/metrics_translator.go @@ -2,13 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver" + import ( - datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" - "go.opentelemetry.io/collector/pdata/pmetric" "sync" + "time" + datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) @@ -50,9 +52,8 @@ type SeriesList struct { Series []datadogV1.Series `json:"series"` } -func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metrics { +func (mt *MetricsTranslator) translateMetricsV1(series SeriesList) pmetric.Metrics { bt := newBatcher() - bt.Metrics = pmetric.NewMetrics() for _, serie := range series.Series { var dps pmetric.NumberDataPointSlice @@ -78,7 +79,7 @@ func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metric dps.EnsureCapacity(len(serie.Points)) var dp pmetric.NumberDataPoint - var ts uint64 + var ts int64 var value float64 // The Datadog API returns a slice of slices of points [][]*float64 which is a bit awkward to work with. // It looks like this: @@ -92,11 +93,11 @@ func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metric if len(points) != 2 { continue // The datapoint is missing a timestamp and/or value, so this point should be skipped } - ts = uint64(*points[0]) + ts = int64(*points[0]) value = *points[1] dp = dps.AppendEmpty() - dp.SetTimestamp(pcommon.Timestamp(ts * 1_000_000_000)) // OTel uses nanoseconds, while Datadog uses seconds + dp.SetTimestamp(pcommon.Timestamp(ts * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds if *serie.Type == TypeRate { if serie.Interval.IsSet() { @@ -107,8 +108,7 @@ func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metric dimensions.dpAttrs.CopyTo(dp.Attributes()) stream := identity.OfStream(metricID, dp) - ts, ok := mt.streamHasTimestamp(stream) - if ok { + if ts, ok := mt.streamHasTimestamp(stream); ok { dp.SetStartTimestamp(ts) } mt.updateLastTsForStream(stream, dp.Timestamp()) diff --git a/receiver/datadogreceiver/metrics_translator_test.go b/receiver/datadogreceiver/metrics_translator_test.go index f9c37797aa7b..6f55cff0d1d9 100644 --- a/receiver/datadogreceiver/metrics_translator_test.go +++ b/receiver/datadogreceiver/metrics_translator_test.go @@ -152,7 +152,7 @@ func TestTranslateMetricsV1(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mt := createMetricsTranslator() - result := translateMetricsV1(tt.series, mt) + result := mt.translateMetricsV1(tt.series) tt.expect(t, result) }) diff --git a/receiver/datadogreceiver/receiver.go b/receiver/datadogreceiver/receiver.go index 7fa8e9587652..035f5334d018 100644 --- a/receiver/datadogreceiver/receiver.go +++ b/receiver/datadogreceiver/receiver.go @@ -161,7 +161,7 @@ func (ddr *datadogReceiver) handleV1Series(w http.ResponseWriter, req *http.Requ return } - metrics := translateMetricsV1(seriesList, ddr.metricsTranslator) + metrics := ddr.metricsTranslator.translateMetricsV1(seriesList) metricsCount = metrics.DataPointCount() err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics) diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index d8785b5e932c..2203dc544b77 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -8,14 +8,15 @@ import ( "context" "errors" "fmt" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/multierr" "io" "net/http" "strings" "testing" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -114,7 +115,7 @@ func TestDatadogMetricsV1_EndToEnd(t *testing.T) { dd, err := newDataDogReceiver( cfg, - receivertest.NewNopCreateSettings(), + receivertest.NewNopSettings(), ) require.NoError(t, err, "Must not error when creating receiver") dd.(*datadogReceiver).nextMetricsConsumer = sink diff --git a/receiver/datadogreceiver/testutil.go b/receiver/datadogreceiver/testutil.go index 8b31d5effdf0..5f947b500e2a 100644 --- a/receiver/datadogreceiver/testutil.go +++ b/receiver/datadogreceiver/testutil.go @@ -39,11 +39,6 @@ func requireScope(t *testing.T, result pmetric.Metrics, expectedAttrs pcommon.Ma require.Equal(t, expectedAttrs, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes()) } -func requireMetricAndDataPointCounts(t *testing.T, result pmetric.Metrics, expectedMetricCount, expectedDpCount int) { - require.Equal(t, expectedMetricCount, result.MetricCount()) - require.Equal(t, expectedDpCount, result.DataPointCount()) -} - func requireSum(t *testing.T, metric pmetric.Metric, expectedName string, expectedAggregationTemporality pmetric.AggregationTemporality, expectedDpsLen int) { require.Equal(t, expectedName, metric.Name()) require.Equal(t, pmetric.MetricTypeSum, metric.Type()) From 73442657272d389407a2b6e7105e9a19f1867859 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Wed, 10 Jul 2024 07:53:50 -0700 Subject: [PATCH 09/11] Fix linting --- receiver/datadogreceiver/receiver_test.go | 7 +++---- receiver/datadogreceiver/testutil.go | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index 2203dc544b77..f39377d1f07d 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -13,15 +13,14 @@ import ( "strings" "testing" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/multierr" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/multierr" ) func TestDatadogTracesReceiver_Lifecycle(t *testing.T) { diff --git a/receiver/datadogreceiver/testutil.go b/receiver/datadogreceiver/testutil.go index 5f947b500e2a..b5415d7042c1 100644 --- a/receiver/datadogreceiver/testutil.go +++ b/receiver/datadogreceiver/testutil.go @@ -6,10 +6,9 @@ package datadogreceiver // import "github.com/open-telemetry/opentelemetry-colle import ( "testing" - "go.opentelemetry.io/collector/pdata/pcommon" - "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) From 57c35ad4d5e273b72224d4b161bd871e5f7fa340 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 15 Jul 2024 07:49:30 -0700 Subject: [PATCH 10/11] Ignore err on writing header for V1 series --- receiver/datadogreceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/datadogreceiver/receiver.go b/receiver/datadogreceiver/receiver.go index 035f5334d018..f69b42d900ce 100644 --- a/receiver/datadogreceiver/receiver.go +++ b/receiver/datadogreceiver/receiver.go @@ -172,7 +172,7 @@ func (ddr *datadogReceiver) handleV1Series(w http.ResponseWriter, req *http.Requ } w.WriteHeader(http.StatusAccepted) - _, err = w.Write([]byte("OK")) + _, _ = w.Write([]byte("OK")) } // handleV2Series handles the v2 series endpoint https://docs.datadoghq.com/api/latest/metrics/#submit-metrics From bb6240c2dded9977902eb408677ca05711f09b0f Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 15 Jul 2024 09:47:06 -0700 Subject: [PATCH 11/11] Updating go.mod --- receiver/datadogreceiver/go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/datadogreceiver/go.mod b/receiver/datadogreceiver/go.mod index 60725aa1daa3..8cd19d02d321 100644 --- a/receiver/datadogreceiver/go.mod +++ b/receiver/datadogreceiver/go.mod @@ -76,7 +76,6 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.22.0 // indirect