Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure buildCWMetric logic #1

Merged
merged 9 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 109 additions & 158 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,50 @@ type CWMetricStats struct {
Sum float64
}

// Wrapper interface for:
// - pdata.IntDataPointSlice
// - pdata.DoubleDataPointSlice
// - pdata.IntHistogramDataPointSlice
// - pdata.DoubleHistogramDataPointSlice
type DataPoints interface {
Len() int
At(int) DataPoint
}

// Wrapper interface for:
// - pdata.IntDataPoint
// - pdata.DoubleDataPoint
// - pdata.IntHistogramDataPoint
// - pdata.DoubleHistogramDataPoint
type DataPoint interface {
IsNil() bool
LabelsMap() pdata.StringMap
}

// Define wrapper interfaces such that At(i) returns a `DataPoint`
type IntDataPointSlice struct {
pdata.IntDataPointSlice
}
type DoubleDataPointSlice struct {
pdata.DoubleDataPointSlice
}
type DoubleHistogramDataPointSlice struct {
pdata.DoubleHistogramDataPointSlice
}
kohrapha marked this conversation as resolved.
Show resolved Hide resolved

func (dps IntDataPointSlice) At(i int) DataPoint {
return dps.IntDataPointSlice.At(i)
}
func (dps DoubleDataPointSlice) At(i int) DataPoint {
return dps.DoubleDataPointSlice.At(i)
}
func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
return dps.DoubleHistogramDataPointSlice.At(i)
}

// TranslateOtToCWMetric converts OT metrics to CloudWatch Metric format
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption string, namespace string) ([]*CWMetrics, int) {
var cwMetricLists []*CWMetrics
var cwMetricList []*CWMetrics
totalDroppedMetrics := 0
var instrumentationLibName string

Expand Down Expand Up @@ -117,11 +158,11 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption stri
totalDroppedMetrics++
continue
}
cwMetricList := getMeasurements(&metric, namespace, instrumentationLibName, dimensionRollupOption)
cwMetricLists = append(cwMetricLists, cwMetricList...)
cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, dimensionRollupOption)
cwMetricList = append(cwMetricList, cwMetrics...)
}
}
return cwMetricLists, totalDroppedMetrics
return cwMetricList, totalDroppedMetrics
}

func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
Expand Down Expand Up @@ -150,217 +191,127 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
return ples
}

func getMeasurements(metric *pdata.Metric, namespace string, instrumentationLibName string, dimensionRollupOption string) []*CWMetrics {
// Translates OTLP Metric to list of CW Metrics
func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName string, dimensionRollupOption string) []*CWMetrics {
var result []*CWMetrics
var dps DataPoints

// metric measure data from OT
metricMeasure := make(map[string]string)
// metric measure slice could include multiple metric measures
metricSlice := []map[string]string{}
metricMeasure["Name"] = metric.Name()
metricMeasure["Unit"] = metric.Unit()
metricSlice = append(metricSlice, metricMeasure)
// metric measure slice could include multiple metric measures
metricSlice := []map[string]string{metricMeasure}

// Retrieve data points
switch metric.DataType() {
case pdata.MetricDataTypeIntGauge:
dps := metric.IntGauge().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = IntDataPointSlice{metric.IntGauge().DataPoints()}
case pdata.MetricDataTypeDoubleGauge:
dps := metric.DoubleGauge().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = DoubleDataPointSlice{metric.DoubleGauge().DataPoints()}
case pdata.MetricDataTypeIntSum:
dps := metric.IntSum().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = IntDataPointSlice{metric.IntSum().DataPoints()}
case pdata.MetricDataTypeDoubleSum:
dps := metric.DoubleSum().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = DoubleDataPointSlice{metric.DoubleSum().DataPoints()}
case pdata.MetricDataTypeDoubleHistogram:
dps := metric.DoubleHistogram().DataPoints()
if dps.Len() == 0 {
return result
dps = DoubleHistogramDataPointSlice{metric.DoubleHistogram().DataPoints()}
}

if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromHistogram(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
cwMetric := buildCWMetric(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
return result
}

func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
// Dimensions Slice
var dimensionSlice []string
var dimensionKV pdata.StringMap
switch metric := dp.(type) {
case pdata.IntDataPoint:
dimensionKV = metric.LabelsMap()
case pdata.DoubleDataPoint:
dimensionKV = metric.LabelsMap()
}

dimensionKV.ForEach(func(k string, v pdata.StringValue) {
fieldsPairs[k] = v.Value()
dimensionSlice = append(dimensionSlice, k)
})
// add OTel instrumentation lib name as an additional dimension if it is defined
if instrumentationLibName != noInstrumentationLibraryName {
fieldsPairs[OTellibDimensionKey] = instrumentationLibName
dimensionArray = append(dimensionArray, append(dimensionSlice, OTellibDimensionKey))
} else {
dimensionArray = append(dimensionArray, dimensionSlice)
// Build CWMetric from DataPoint
func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics {
dimensions, fields := createDimensions(dp, instrumentationLibName, dimensionRollupOption)
cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensions,
Metrics: metricSlice,
}

metricList := []CwMeasurement{*cwMeasurement}
timestamp := time.Now().UnixNano() / int64(time.Millisecond)

// Extract metric
var metricVal interface{}
switch metric := dp.(type) {
case pdata.IntDataPoint:
// Put a fake but identical metric value here in order to add metric name into fieldsPairs
// Put a fake but identical metric value here in order to add metric name into fields
// since calculateRate() needs metric name as one of metric identifiers
fieldsPairs[pmd.Name()] = int64(FakeMetricValue)
fields[pmd.Name()] = int64(FakeMetricValue)
metricVal = metric.Value()
if needsCalculateRate(pmd) {
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
metricVal = calculateRate(fields, metric.Value(), timestamp)
}
case pdata.DoubleDataPoint:
fieldsPairs[pmd.Name()] = float64(FakeMetricValue)
fields[pmd.Name()] = float64(FakeMetricValue)
metricVal = metric.Value()
if needsCalculateRate(pmd) {
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
metricVal = calculateRate(fields, metric.Value(), timestamp)
}
case pdata.DoubleHistogramDataPoint:
bucketBounds := metric.ExplicitBounds()
metricVal = &CWMetricStats{
Min: bucketBounds[0],
Max: bucketBounds[len(bucketBounds)-1],
Count: metric.Count(),
Sum: metric.Sum(),
}
}
if metricVal == nil {
return nil
}
fieldsPairs[pmd.Name()] = metricVal
fields[pmd.Name()] = metricVal

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice, instrumentationLibName)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensionArray,
Metrics: metricSlice,
}
metricList := make([]CwMeasurement, 1)
metricList[0] = *cwMeasurement
cwMetric := &CWMetrics{
Measurements: metricList,
Timestamp: timestamp,
Fields: fieldsPairs,
Fields: fields,
}
return cwMetric
}

func buildCWMetricFromHistogram(metric pdata.DoubleHistogramDataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics {
// Create dimensions from DataPoint labels, where dimensions is a 2D array of dimension names,
// and initialize fields with dimension key/value pairs
func createDimensions(dp DataPoint, instrumentationLibName string, dimensionRollupOption string) (dimensions [][]string, fields map[string]interface{}) {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
// Dimensions Slice
var dimensionSlice []string
dimensionKV := metric.LabelsMap()
fields = make(map[string]interface{})
dimensionKV := dp.LabelsMap()

dimensionSlice := make([]string, dimensionKV.Len(), dimensionKV.Len()+1)
idx := 0
dimensionKV.ForEach(func(k string, v pdata.StringValue) {
fieldsPairs[k] = v.Value()
dimensionSlice = append(dimensionSlice, k)
fields[k] = v.Value()
dimensionSlice[idx] = k
idx++
})
// add OTel instrumentation lib name as an additional dimension if it is defined
// Add OTel instrumentation lib name as an additional dimension if it is defined
if instrumentationLibName != noInstrumentationLibraryName {
fieldsPairs[OTellibDimensionKey] = instrumentationLibName
dimensionArray = append(dimensionArray, append(dimensionSlice, OTellibDimensionKey))
fields[OTellibDimensionKey] = instrumentationLibName
dimensions = append(dimensions, append(dimensionSlice, OTellibDimensionKey))
} else {
dimensionArray = append(dimensionArray, dimensionSlice)
}

timestamp := time.Now().UnixNano() / int64(time.Millisecond)

bucketBounds := metric.ExplicitBounds()
metricStats := &CWMetricStats{
Min: bucketBounds[0],
Max: bucketBounds[len(bucketBounds)-1],
Count: metric.Count(),
Sum: metric.Sum(),
dimensions = append(dimensions, dimensionSlice)
}
fieldsPairs[pmd.Name()] = metricStats

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice, instrumentationLibName)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
dimensions = append(dimensions, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensionArray,
Metrics: metricSlice,
}
metricList := make([]CwMeasurement, 1)
metricList[0] = *cwMeasurement
cwMetric := &CWMetrics{
Measurements: metricList,
Timestamp: timestamp,
Fields: fieldsPairs,
}
return cwMetric
return
}

// rate is calculated by valDelta / timeDelta
Expand Down
Loading