Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure buildCWMetric logic #1

Merged
merged 9 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions exporter/awsemfexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ github.com/go-openapi/validate v0.19.8/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85n
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.3.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-playground/validator/v10 v10.4.0/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
Expand Down Expand Up @@ -541,6 +542,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.14.5/go.mod h1:UJ0EZAp832vCd54Wev9N1BMKEyvcZ5+IM0AwDrnlkEc=
github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4GkDEdKCRJduHpTxT3/jcw=
github.com/grpc-ecosystem/grpc-gateway v1.14.8 h1:hXClj+iFpmLM8i3lkO6i4Psli4P2qObQuQReiII26U8=
github.com/grpc-ecosystem/grpc-gateway v1.14.8/go.mod h1:NZE8t6vs6TnwLL/ITkaK8W3ecMLGAbh2jXTclvpiwYo=
github.com/grpc-ecosystem/grpc-gateway v1.15.0 h1:ntPNC9TD/6l2XDenJZe6T5lSMg95thpV9sGAqHX4WU8=
github.com/grpc-ecosystem/grpc-gateway v1.15.0/go.mod h1:vO11I9oWA+KsxmfFQPhLnnIb1VDE24M+pdxZFiuZcA8=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
Expand Down Expand Up @@ -1070,6 +1073,7 @@ github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqri
github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
Expand Down Expand Up @@ -1121,6 +1125,7 @@ go.opentelemetry.io/collector v0.11.1-0.20201006165100-07236c11fb27/go.mod h1:mK
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down Expand Up @@ -1480,6 +1485,7 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc/examples v0.0.0-20200728065043-dfc0c05b2da9 h1:f+/+gfZ/tfaHBXXiv1gWRmCej6wlX3mLY4bnLpI99wk=
Expand Down
260 changes: 108 additions & 152 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,50 @@ type CWMetricStats struct {
Sum float64
}

// Wrapper interface for:
// - pdata.IntDataPointSlice
// - pdata.DoubleDataPointSlice
// - pdata.IntHistogramDataPointSlice
// - pdata.DoubleHistogramDataPointSlice
type DataPoints interface {
Len() int
At(int) DataPoint
}

// Wrapper interface for:
// - pdata.IntDataPoint
// - pdata.DoubleDataPoint
// - pdata.IntHistogramDataPoint
// - pdata.DoubleHistogramDataPoint
type DataPoint interface {
IsNil() bool
LabelsMap() pdata.StringMap
}

// Define wrapper interfaces such that At(i) returns a `DataPoint`
type IntDataPointSlice struct {
pdata.IntDataPointSlice
}
type DoubleDataPointSlice struct {
pdata.DoubleDataPointSlice
}
type DoubleHistogramDataPointSlice struct {
pdata.DoubleHistogramDataPointSlice
}
kohrapha marked this conversation as resolved.
Show resolved Hide resolved

func (dps IntDataPointSlice) At(i int) DataPoint {
return dps.IntDataPointSlice.At(i)
}
func (dps DoubleDataPointSlice) At(i int) DataPoint {
return dps.DoubleDataPointSlice.At(i)
}
func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
return dps.DoubleHistogramDataPointSlice.At(i)
}

// TranslateOtToCWMetric converts OT metrics to CloudWatch Metric format
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption string, namespace string) ([]*CWMetrics, int) {
var cwMetricLists []*CWMetrics
var cwMetricList []*CWMetrics
totalDroppedMetrics := 0

if len(namespace) == 0 && !rm.Resource().IsNil() {
Expand Down Expand Up @@ -115,11 +156,11 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption stri
totalDroppedMetrics++
continue
}
cwMetricList := getMeasurements(&metric, namespace, OTLib, dimensionRollupOption)
cwMetricLists = append(cwMetricLists, cwMetricList...)
cwMetrics := getCWMetrics(&metric, namespace, OTLib, dimensionRollupOption)
cwMetricList = append(cwMetricList, cwMetrics...)
}
}
return cwMetricLists, totalDroppedMetrics
return cwMetricList, totalDroppedMetrics
}

func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
Expand Down Expand Up @@ -148,203 +189,118 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
return ples
}

func getMeasurements(metric *pdata.Metric, namespace string, OTLib string, dimensionRollupOption string) []*CWMetrics {
// Translates OTLP Metric to list of CW Metrics
func getCWMetrics(metric *pdata.Metric, namespace string, OTLib string, dimensionRollupOption string) []*CWMetrics {
var result []*CWMetrics
var dps DataPoints

// metric measure data from OT
metricMeasure := make(map[string]string)
// metric measure slice could include multiple metric measures
metricSlice := []map[string]string{}
metricMeasure["Name"] = metric.Name()
metricMeasure["Unit"] = metric.Unit()
metricSlice = append(metricSlice, metricMeasure)
// metric measure slice could include multiple metric measures
metricSlice := []map[string]string{metricMeasure}

// Retrieve data points
switch metric.DataType() {
case pdata.MetricDataTypeIntGauge:
dps := metric.IntGauge().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = IntDataPointSlice{metric.IntGauge().DataPoints()}
case pdata.MetricDataTypeDoubleGauge:
dps := metric.DoubleGauge().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = DoubleDataPointSlice{metric.DoubleGauge().DataPoints()}
case pdata.MetricDataTypeIntSum:
dps := metric.IntSum().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = IntDataPointSlice{metric.IntSum().DataPoints()}
case pdata.MetricDataTypeDoubleSum:
dps := metric.DoubleSum().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = DoubleDataPointSlice{metric.DoubleSum().DataPoints()}
case pdata.MetricDataTypeDoubleHistogram:
dps := metric.DoubleHistogram().DataPoints()
if dps.Len() == 0 {
return result
dps = DoubleHistogramDataPointSlice{metric.DoubleHistogram().DataPoints()}
}

if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromHistogram(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
cwMetric := buildCWMetric(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
return result
}

func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, OTLib string, dimensionRollupOption string) *CWMetrics {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
// Dimensions Slice
var dimensionSlice []string
var dimensionKV pdata.StringMap
switch metric := dp.(type) {
case pdata.IntDataPoint:
dimensionKV = metric.LabelsMap()
case pdata.DoubleDataPoint:
dimensionKV = metric.LabelsMap()
// Build CWMetric from DataPoint
func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, OTLib string, dimensionRollupOption string) *CWMetrics {
dimensions, fields := createDimensions(dp, OTLib, dimensionRollupOption)
cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensions,
Metrics: metricSlice,
}

dimensionKV.ForEach(func(k string, v pdata.StringValue) {
fieldsPairs[k] = v.Value()
dimensionSlice = append(dimensionSlice, k)
})
// add OTLib as an additional dimension
fieldsPairs[OtlibDimensionKey] = OTLib
dimensionArray = append(dimensionArray, append(dimensionSlice, OtlibDimensionKey))

metricList := []CwMeasurement{*cwMeasurement}
timestamp := time.Now().UnixNano() / int64(time.Millisecond)

// Extract metric
var metricVal interface{}
switch metric := dp.(type) {
case pdata.IntDataPoint:
// Put a fake but identical metric value here in order to add metric name into fieldsPairs
// Put a fake but identical metric value here in order to add metric name into fields
// since calculateRate() needs metric name as one of metric identifiers
fieldsPairs[pmd.Name()] = int64(FakeMetricValue)
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
fields[pmd.Name()] = int64(FakeMetricValue)
metricVal = calculateRate(fields, metric.Value(), timestamp)
case pdata.DoubleDataPoint:
fieldsPairs[pmd.Name()] = float64(FakeMetricValue)
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
fields[pmd.Name()] = float64(FakeMetricValue)
metricVal = calculateRate(fields, metric.Value(), timestamp)
case pdata.DoubleHistogramDataPoint:
bucketBounds := metric.ExplicitBounds()
metricVal = &CWMetricStats{
Min: bucketBounds[0],
Max: bucketBounds[len(bucketBounds)-1],
Count: metric.Count(),
Sum: metric.Sum(),
}
}
if metricVal == nil {
return nil
}
fieldsPairs[pmd.Name()] = metricVal
fields[pmd.Name()] = metricVal

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensionArray,
Metrics: metricSlice,
}
metricList := make([]CwMeasurement, 1)
metricList[0] = *cwMeasurement
cwMetric := &CWMetrics{
Measurements: metricList,
Timestamp: timestamp,
Fields: fieldsPairs,
Fields: fields,
}
return cwMetric
}

func buildCWMetricFromHistogram(metric pdata.DoubleHistogramDataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, OTLib string, dimensionRollupOption string) *CWMetrics {
// Create dimensions from DataPoint labels, where dimensions is a 2D array of dimension names,
// and initialize fields with dimension key/value pairs
func createDimensions(dp DataPoint, OTLib string, dimensionRollupOption string) (dimensions [][]string, fields map[string]interface{}) {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
// Dimensions Slice
var dimensionSlice []string
dimensionKV := metric.LabelsMap()
fields = make(map[string]interface{})
dimensionKV := dp.LabelsMap()

dimensionSlice := make([]string, dimensionKV.Len(), dimensionKV.Len()+1)
idx := 0
dimensionKV.ForEach(func(k string, v pdata.StringValue) {
fieldsPairs[k] = v.Value()
dimensionSlice = append(dimensionSlice, k)
fields[k] = v.Value()
dimensionSlice[idx] = k
idx++
})
// add OTLib as an additional dimension
fieldsPairs[OtlibDimensionKey] = OTLib
dimensionArray = append(dimensionArray, append(dimensionSlice, OtlibDimensionKey))

timestamp := time.Now().UnixNano() / int64(time.Millisecond)

bucketBounds := metric.ExplicitBounds()
metricStats := &CWMetricStats{
Min: bucketBounds[0],
Max: bucketBounds[len(bucketBounds)-1],
Count: metric.Count(),
Sum: metric.Sum(),
}
fieldsPairs[pmd.Name()] = metricStats
// Add OTLib as an additional dimension
fields[OtlibDimensionKey] = OTLib
dimensions = append(dimensions, append(dimensionSlice, OtlibDimensionKey))

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
dimensions = append(dimensions, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensionArray,
Metrics: metricSlice,
}
metricList := make([]CwMeasurement, 1)
metricList[0] = *cwMeasurement
cwMetric := &CWMetrics{
Measurements: metricList,
Timestamp: timestamp,
Fields: fieldsPairs,
}
return cwMetric
return
}

// rate is calculated by valDelta / timeDelta
Expand Down
Loading