Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated Google Cloud Spanner receiver to scraper approach. #5868

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions receiver/googlecloudspannerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,20 @@ func createDefaultConfig() config.Receiver {

func createMetricsReceiver(
_ context.Context,
params component.ReceiverCreateSettings,
settings component.ReceiverCreateSettings,
baseCfg config.Receiver,
consumer consumer.Metrics,
) (component.MetricsReceiver, error) {

rCfg := baseCfg.(*Config)
logger := params.Logger
return newGoogleCloudSpannerReceiver(logger, rCfg, consumer)
r := newGoogleCloudSpannerReceiver(settings.Logger, rCfg)

scraper, err := scraperhelper.NewScraper(typeStr, r.Scrape, scraperhelper.WithStart(r.Start),
scraperhelper.WithShutdown(r.Shutdown))
if err != nil {
return nil, err
}

return scraperhelper.NewScraperControllerReceiver(&rCfg.ScraperControllerSettings, settings, consumer,
scraperhelper.AddScraper(scraper))
}
2 changes: 1 addition & 1 deletion receiver/googlecloudspannerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ func TestCreateMetricsReceiver(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, receiver, "failed to create metrics receiver")

_, err = factory.CreateTracesReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), receiverConfig, nil)
_, err = factory.CreateMetricsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), receiverConfig, nil)
require.Error(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package metadata
import (
"sort"
"strings"

"go.opentelemetry.io/collector/model/pdata"
)

type LabelValueMetadata interface {
Expand All @@ -26,6 +28,7 @@ type LabelValueMetadata interface {
type LabelValue interface {
LabelValueMetadata
Value() interface{}
SetValueTo(attributes pdata.AttributeMap)
}

type queryLabelValueMetadata struct {
Expand Down Expand Up @@ -133,6 +136,10 @@ func (value stringLabelValue) Value() interface{} {
return value.value
}

func (value stringLabelValue) SetValueTo(attributes pdata.AttributeMap) {
attributes.InsertString(value.name, value.value)
}

func newStringLabelValue(metadata StringLabelValueMetadata, valueHolder interface{}) stringLabelValue {
return stringLabelValue{
StringLabelValueMetadata: metadata,
Expand All @@ -150,6 +157,10 @@ func (value int64LabelValue) Value() interface{} {
return value.value
}

func (value int64LabelValue) SetValueTo(attributes pdata.AttributeMap) {
attributes.InsertInt(value.name, value.value)
}

func newInt64LabelValue(metadata Int64LabelValueMetadata, valueHolder interface{}) int64LabelValue {
return int64LabelValue{
Int64LabelValueMetadata: metadata,
Expand All @@ -167,6 +178,10 @@ func (value boolLabelValue) Value() interface{} {
return value.value
}

func (value boolLabelValue) SetValueTo(attributes pdata.AttributeMap) {
attributes.InsertBool(value.name, value.value)
}

func newBoolLabelValue(metadata BoolLabelValueMetadata, valueHolder interface{}) boolLabelValue {
return boolLabelValue{
BoolLabelValueMetadata: metadata,
Expand All @@ -184,6 +199,10 @@ func (value stringSliceLabelValue) Value() interface{} {
return value.value
}

func (value stringSliceLabelValue) SetValueTo(attributes pdata.AttributeMap) {
attributes.InsertString(value.name, value.value)
}

func newStringSliceLabelValue(metadata StringSliceLabelValueMetadata, valueHolder interface{}) stringSliceLabelValue {
value := *valueHolder.(*[]string)

Expand All @@ -207,6 +226,10 @@ func (value byteSliceLabelValue) Value() interface{} {
return value.value
}

func (value byteSliceLabelValue) SetValueTo(attributes pdata.AttributeMap) {
attributes.InsertString(value.name, value.value)
}

func newByteSliceLabelValue(metadata ByteSliceLabelValueMetadata, valueHolder interface{}) byteSliceLabelValue {
return byteSliceLabelValue{
ByteSliceLabelValueMetadata: metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
)

func TestStringLabelValueMetadata(t *testing.T) {
Expand Down Expand Up @@ -108,14 +109,22 @@ func TestStringLabelValue(t *testing.T) {
},
}

labelValue :=
stringLabelValue{
StringLabelValueMetadata: metadata,
value: stringValue,
}
labelValue := stringLabelValue{
StringLabelValueMetadata: metadata,
value: stringValue,
}

assert.Equal(t, metadata, labelValue.StringLabelValueMetadata)
assert.Equal(t, stringValue, labelValue.Value())

attributes := pdata.NewAttributeMap()

labelValue.SetValueTo(attributes)

attributeValue, exists := attributes.Get(labelName)

assert.True(t, exists)
assert.Equal(t, stringValue, attributeValue.StringVal())
}

func TestInt64LabelValue(t *testing.T) {
Expand All @@ -126,14 +135,22 @@ func TestInt64LabelValue(t *testing.T) {
},
}

labelValue :=
int64LabelValue{
Int64LabelValueMetadata: metadata,
value: int64Value,
}
labelValue := int64LabelValue{
Int64LabelValueMetadata: metadata,
value: int64Value,
}

assert.Equal(t, metadata, labelValue.Int64LabelValueMetadata)
assert.Equal(t, int64Value, labelValue.Value())

attributes := pdata.NewAttributeMap()

labelValue.SetValueTo(attributes)

attributeValue, exists := attributes.Get(labelName)

assert.True(t, exists)
assert.Equal(t, int64Value, attributeValue.IntVal())
}

func TestBoolLabelValue(t *testing.T) {
Expand All @@ -144,14 +161,22 @@ func TestBoolLabelValue(t *testing.T) {
},
}

labelValue :=
boolLabelValue{
BoolLabelValueMetadata: metadata,
value: boolValue,
}
labelValue := boolLabelValue{
BoolLabelValueMetadata: metadata,
value: boolValue,
}

assert.Equal(t, metadata, labelValue.BoolLabelValueMetadata)
assert.Equal(t, boolValue, labelValue.Value())

attributes := pdata.NewAttributeMap()

labelValue.SetValueTo(attributes)

attributeValue, exists := attributes.Get(labelName)

assert.True(t, exists)
assert.Equal(t, boolValue, attributeValue.BoolVal())
}

func TestStringSliceLabelValue(t *testing.T) {
Expand All @@ -162,14 +187,22 @@ func TestStringSliceLabelValue(t *testing.T) {
},
}

labelValue :=
stringSliceLabelValue{
StringSliceLabelValueMetadata: metadata,
value: stringValue,
}
labelValue := stringSliceLabelValue{
StringSliceLabelValueMetadata: metadata,
value: stringValue,
}

assert.Equal(t, metadata, labelValue.StringSliceLabelValueMetadata)
assert.Equal(t, stringValue, labelValue.Value())

attributes := pdata.NewAttributeMap()

labelValue.SetValueTo(attributes)

attributeValue, exists := attributes.Get(labelName)

assert.True(t, exists)
assert.Equal(t, stringValue, attributeValue.StringVal())
}

func TestByteSliceLabelValue(t *testing.T) {
Expand All @@ -180,14 +213,22 @@ func TestByteSliceLabelValue(t *testing.T) {
},
}

labelValue :=
byteSliceLabelValue{
ByteSliceLabelValueMetadata: metadata,
value: stringValue,
}
labelValue := byteSliceLabelValue{
ByteSliceLabelValueMetadata: metadata,
value: stringValue,
}

assert.Equal(t, metadata, labelValue.ByteSliceLabelValueMetadata)
assert.Equal(t, stringValue, labelValue.Value())

attributes := pdata.NewAttributeMap()

labelValue.SetValueTo(attributes)

attributeValue, exists := attributes.Get(labelName)

assert.True(t, exists)
assert.Equal(t, stringValue, attributeValue.StringVal())
}

func TestNewQueryLabelValueMetadata(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metadata

import (
"sort"

"go.opentelemetry.io/collector/model/pdata"
)

const instrumentationLibraryName = "otelcol/googlecloudspannermetrics"

type MetricsBuilder struct {
// TODO It is empty now but there will be fields soon
}

func (b *MetricsBuilder) Build(dataPoints []*MetricsDataPoint) pdata.Metrics {
var metrics pdata.Metrics

groupedDataPoints := group(dataPoints)

if len(groupedDataPoints) > 0 {
metrics = pdata.NewMetrics()
rms := metrics.ResourceMetrics()
rm := rms.AppendEmpty()

ilms := rm.InstrumentationLibraryMetrics()
ilm := ilms.AppendEmpty()
ilm.InstrumentationLibrary().SetName(instrumentationLibraryName)

for key, points := range groupedDataPoints {
metric := ilm.Metrics().AppendEmpty()
metric.SetName(key.MetricName)
metric.SetUnit(key.MetricUnit)
metric.SetDataType(key.MetricDataType.MetricDataType())

var dataPointSlice pdata.NumberDataPointSlice

switch key.MetricDataType.MetricDataType() {
case pdata.MetricDataTypeGauge:
dataPointSlice = metric.Gauge().DataPoints()
case pdata.MetricDataTypeSum:
metric.Sum().SetAggregationTemporality(key.MetricDataType.AggregationTemporality())
metric.Sum().SetIsMonotonic(key.MetricDataType.IsMonotonic())
dataPointSlice = metric.Sum().DataPoints()
}

for _, point := range points {
point.CopyTo(dataPointSlice.AppendEmpty())
}
}
}

return metrics
}

func group(dataPoints []*MetricsDataPoint) map[MetricsDataPointKey][]*MetricsDataPoint {
if len(dataPoints) == 0 {
return nil
}

groupedDataPoints := make(map[MetricsDataPointKey][]*MetricsDataPoint)

for _, dataPoint := range dataPoints {
groupingKey := dataPoint.GroupingKey()
groupedDataPoints[groupingKey] = append(groupedDataPoints[groupingKey], dataPoint)
}

// TODO Apply cardinality filtering somewhere here for each group of dataPoints and rename method to groupAndFilter()

// Sorting points by metric name and timestamp
for _, points := range groupedDataPoints {
sort.Slice(points, func(i, j int) bool {
return points[i].timestamp.Before(points[j].timestamp)
})

}

return groupedDataPoints
}
Loading