From bf67b139176ebf394d9a70ad05261b629402f045 Mon Sep 17 00:00:00 2001 From: Mirco Hacker Date: Mon, 23 Jan 2023 17:21:06 +0100 Subject: [PATCH] [exporter/awsemfexporter] Add config option to retain the initial value of delta metrics (#16218) --- ...fexporter_retein_intitial_delta_value.yaml | 16 + exporter/awsemfexporter/README.md | 37 +- exporter/awsemfexporter/config.go | 5 + exporter/awsemfexporter/datapoint.go | 36 +- exporter/awsemfexporter/datapoint_test.go | 357 +++++++++--------- exporter/awsemfexporter/factory.go | 15 +- exporter/awsemfexporter/metric_translator.go | 11 +- 7 files changed, 267 insertions(+), 210 deletions(-) create mode 100644 .chloggen/awsemfexporter_retein_intitial_delta_value.yaml diff --git a/.chloggen/awsemfexporter_retein_intitial_delta_value.yaml b/.chloggen/awsemfexporter_retein_intitial_delta_value.yaml new file mode 100644 index 000000000000..62468b40cd45 --- /dev/null +++ b/.chloggen/awsemfexporter_retein_intitial_delta_value.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsemfexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The AWS EMF exporter now supports the additional configuration flag `retain_initial_value_of_delta_metric`. With this flag active the first value of a metric is not discarded but instead sent to AWS. + +# One or more tracking issues related to the change +issues: [16218] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index 9e94c0e46040..a920facbc116 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -18,25 +18,26 @@ Convert OpenTelemetry ```Int64DataPoints```, ```DoubleDataPoints```, ```SummaryD The following exporter configuration parameters are supported. -| Name | Description | Default | -|:---------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------- | -| `log_group_name` | Customized log group name which supports `{ClusterName}` and `{TaskId}` placeholders. One valid example is `/aws/metrics/{ClusterName}`. It will search for `ClusterName` (or `aws.ecs.cluster.name`) resource attribute in the metrics data and replace with the actual cluster name. If none of them are found in the resource attribute map, `{ClusterName}` will be replaced by `undefined`. Similar way, for the `{TaskId}`, it searches for `TaskId` (or `aws.ecs.task.id`) key in the resource attribute map. For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`) |"/metrics/default"| -| `log_stream_name` | Customized log stream name which supports `{TaskId}`, `{ClusterName}`, `{NodeName}`, `{ContainerInstanceId}`, and `{TaskDefinitionFamily}` placeholders. One valid example is `{TaskId}`. It will search for `TaskId` (or `aws.ecs.task.id`) resource attribute in the metrics data and replace with the actual task id. If none of them are found in the resource attribute map, `{TaskId}` will be replaced by `undefined`. Similarly, for the `{TaskDefinitionFamily}`, it searches for `TaskDefinitionFamily` (or `aws.ecs.task.family`). For the `{ClusterName}`, it searches for `ClusterName` (or `aws.ecs.cluster.name`). For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`). For `{ContainerInstanceId}`, it searches for `ContainerInstanceId` (or `aws.ecs.container.instance.id`). (Note: ContainerInstanceId (or `aws.ecs.container.instance.id`) only works for AWS ECS EC2 launch type. |"otel-stream"| -| `log_retention` | LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653. |"Never Expire"| -| `namespace` | Customized CloudWatch metrics namespace | "default" | -| `endpoint` | Optionally override the default CloudWatch service endpoint. | | -| `no_verify_ssl` | Enable or disable TLS certificate verification. | false | -| `proxy_address` | Upload Structured Logs to AWS CloudWatch through a proxy. | | -| `region` | Send Structured Logs to AWS CloudWatch in a specific region. If this field is not present in config, environment variable "AWS_REGION" can then be used to set region. | determined by metadata | -| `role_arn` | IAM role to upload segments to a different account. | | -| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | -| `dimension_rollup_option` | DimensionRollupOption is the option for metrics dimension rollup. Three options are available: `NoDimensionRollup`, `SingleDimensionRollupOnly` and `ZeroAndSingleDimensionRollup` |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)| -| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | -| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | +| Name | Description | Default | +|:---------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------| +| `log_group_name` | Customized log group name which supports `{ClusterName}` and `{TaskId}` placeholders. One valid example is `/aws/metrics/{ClusterName}`. It will search for `ClusterName` (or `aws.ecs.cluster.name`) resource attribute in the metrics data and replace with the actual cluster name. If none of them are found in the resource attribute map, `{ClusterName}` will be replaced by `undefined`. Similar way, for the `{TaskId}`, it searches for `TaskId` (or `aws.ecs.task.id`) key in the resource attribute map. For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`) | "/metrics/default" | +| `log_stream_name` | Customized log stream name which supports `{TaskId}`, `{ClusterName}`, `{NodeName}`, `{ContainerInstanceId}`, and `{TaskDefinitionFamily}` placeholders. One valid example is `{TaskId}`. It will search for `TaskId` (or `aws.ecs.task.id`) resource attribute in the metrics data and replace with the actual task id. If none of them are found in the resource attribute map, `{TaskId}` will be replaced by `undefined`. Similarly, for the `{TaskDefinitionFamily}`, it searches for `TaskDefinitionFamily` (or `aws.ecs.task.family`). For the `{ClusterName}`, it searches for `ClusterName` (or `aws.ecs.cluster.name`). For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`). For `{ContainerInstanceId}`, it searches for `ContainerInstanceId` (or `aws.ecs.container.instance.id`). (Note: ContainerInstanceId (or `aws.ecs.container.instance.id`) only works for AWS ECS EC2 launch type. | "otel-stream" | +| `log_retention` | LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653. | "Never Expire" | +| `namespace` | Customized CloudWatch metrics namespace | "default" | +| `endpoint` | Optionally override the default CloudWatch service endpoint. | | +| `no_verify_ssl` | Enable or disable TLS certificate verification. | false | +| `proxy_address` | Upload Structured Logs to AWS CloudWatch through a proxy. | | +| `region` | Send Structured Logs to AWS CloudWatch in a specific region. If this field is not present in config, environment variable "AWS_REGION" can then be used to set region. | determined by metadata | +| `role_arn` | IAM role to upload segments to a different account. | | +| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | +| `dimension_rollup_option` | DimensionRollupOption is the option for metrics dimension rollup. Three options are available: `NoDimensionRollup`, `SingleDimensionRollupOnly` and `ZeroAndSingleDimensionRollup` | "ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup) | +| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | +| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | | `detailed_metrics` | Retain detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` | -| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}``` | [ ] | -| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | -| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors. | [ ]| +| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}``` | [ ] | +| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | +| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors. | [ ] | +| `retain_initial_value_of_delta_metric` | This option specifies how the first value of a metric is handled. AWS EMF expects metric values to only contain deltas to the previous value. In the default case the first received value is therefor not sent to AWS but only used as a baseline for follow up changes to this metric. This is fine for high throughput metrics with stable labels (e.g. `requests{code=200}`). In this case it does not matter if the first value of this metric is discarded. However when your metric describes infrequent events or events with high label cardinality, then the exporter in default configuration would still drop the first occurrence of this metric. With this configuration value set to `true` the first value of all metrics will instead be send to AWS. | false | ### metric_declaration diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index 57b41671fc57..24ff3b8ebc1f 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -43,6 +43,11 @@ type Config struct { // Namespace is a container for CloudWatch metrics. // Metrics in different namespaces are isolated from each other. Namespace string `mapstructure:"namespace"` + // RetainInitialValueOfDeltaMetric is the flag to signal that the initial value of a metric is a valid datapoint. + // The default behavior is that the first value occurrence of a metric is set as the baseline for the calculation of + // the delta to the next occurrence. With this flag set to true the exporter will instead use this first value as the + // initial delta value. This is especially useful when handling low frequency metrics. + RetainInitialValueOfDeltaMetric bool `mapstructure:"retain_initial_value_of_delta_metric"` // DimensionRollupOption is the option for metrics dimension rollup. Three options are available, default option is "ZeroAndSingleDimensionRollup". // "ZeroAndSingleDimensionRollup" - Enable both zero dimension rollup and single dimension rollup // "SingleDimensionRollupOnly" - Enable single dimension rollup diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index 94e07c334fc0..7276eb3e2b90 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -75,11 +75,12 @@ type dataPoints interface { // deltaMetricMetadata contains the metadata required to perform rate/delta calculation type deltaMetricMetadata struct { - adjustToDelta bool - metricName string - namespace string - logGroup string - logStream string + adjustToDelta bool + retainInitialValueForDelta bool + metricName string + namespace string + logGroup string + logStream string } // numberDataPointSlice is a wrapper for pmetric.NumberDataPointSlice @@ -127,6 +128,13 @@ func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationS var deltaVal interface{} mKey := aws.NewKey(dps.deltaMetricMetadata, labels) deltaVal, retained = deltaMetricCalculator.Calculate(mKey, metricVal, metric.Timestamp().AsTime()) + + // If a delta to the previous data point could not be computed use the current metric value instead + if !retained && dps.retainInitialValueForDelta { + retained = true + deltaVal = metricVal + } + if !retained { return nil, retained } @@ -175,6 +183,13 @@ func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentation var delta interface{} mKey := aws.NewKey(dps.deltaMetricMetadata, labels) delta, retained = summaryMetricCalculator.Calculate(mKey, summaryMetricEntry{sum, count}, metric.Timestamp().AsTime()) + + // If a delta to the previous data point could not be computed use the current metric value instead + if !retained && dps.retainInitialValueForDelta { + retained = true + delta = summaryMetricEntry{sum, count} + } + if !retained { return datapoints, retained } @@ -229,11 +244,12 @@ func createLabels(attributes pcommon.Map, instrLibName string) map[string]string // getDataPoints retrieves data points from OT Metric. func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Logger) dataPoints { metricMetadata := deltaMetricMetadata{ - adjustToDelta: false, - metricName: pmd.Name(), - namespace: metadata.namespace, - logGroup: metadata.logGroup, - logStream: metadata.logStream, + adjustToDelta: false, + retainInitialValueForDelta: metadata.retainInitialValueForDelta, + metricName: pmd.Name(), + namespace: metadata.namespace, + logGroup: metadata.logGroup, + logStream: metadata.logStream, } var dps dataPoints diff --git a/exporter/awsemfexporter/datapoint_test.go b/exporter/awsemfexporter/datapoint_test.go index 822760de7b25..a3042e05af7c 100644 --- a/exporter/awsemfexporter/datapoint_test.go +++ b/exporter/awsemfexporter/datapoint_test.go @@ -141,13 +141,14 @@ func generateOtelTestMetrics(generatedOtelMetrics ...pmetric.Metrics) pmetric.Me } return finalOtelMetrics } -func generateDeltaMetricMetadata(adjustToDelta bool, metricName string) deltaMetricMetadata { +func generateDeltaMetricMetadata(adjustToDelta bool, metricName string, retainInitialValueForDelta bool) deltaMetricMetadata { return deltaMetricMetadata{ - adjustToDelta: adjustToDelta, - metricName: metricName, - logGroup: "log-group", - logStream: "log-stream", - namespace: "namespace", + adjustToDelta: adjustToDelta, + metricName: metricName, + logGroup: "log-group", + logStream: "log-stream", + namespace: "namespace", + retainInitialValueForDelta: retainInitialValueForDelta, } } @@ -157,118 +158,128 @@ func setupDataPointCache() { } func TestCalculateDeltaDatapoints_NumberDataPointSlice(t *testing.T) { - setupDataPointCache() - - testCases := []struct { - name string - adjustToDelta bool - metricName string - metricValue interface{} - expectedDatapoint dataPoint - }{ - { - name: "Float data type with 1st delta calculation", - adjustToDelta: true, - metricValue: 0.4, - metricName: "double", - expectedDatapoint: dataPoint{ - name: "double", - value: 0.4, - labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + for _, retainInitialValueOfDeltaMetric := range []bool{true, false} { + setupDataPointCache() + + testCases := []struct { + name string + adjustToDelta bool + metricName string + metricValue interface{} + expectedDatapoint dataPoint + expectedRetained bool + }{ + { + name: fmt.Sprintf("Float data type with 1st delta calculation retainInitialValueOfDeltaMetric=%t", retainInitialValueOfDeltaMetric), + adjustToDelta: true, + metricValue: 0.4, + metricName: "double", + expectedDatapoint: dataPoint{ + name: "double", + value: 0.4, + labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + }, + expectedRetained: retainInitialValueOfDeltaMetric, }, - }, - { - name: "Float data type with 2nd delta calculation", - adjustToDelta: true, - metricName: "double", - metricValue: 0.8, - expectedDatapoint: dataPoint{ - name: "double", - value: 0.4, - labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + { + name: "Float data type with 2nd delta calculation", + adjustToDelta: true, + metricName: "double", + metricValue: 0.8, + expectedDatapoint: dataPoint{ + name: "double", + value: 0.4, + labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + }, + expectedRetained: true, }, - }, - { - name: "Double data type without delta calculation", - adjustToDelta: false, - metricName: "double", - metricValue: 0.5, - expectedDatapoint: dataPoint{ - name: "double", - value: 0.5, - labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + { + name: "Double data type without delta calculation", + adjustToDelta: false, + metricName: "double", + metricValue: 0.5, + expectedDatapoint: dataPoint{ + name: "double", + value: 0.5, + labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + }, + expectedRetained: true, }, - }, - { - name: "Int data type with 1st delta calculation", - adjustToDelta: true, - metricName: "int", - metricValue: int64(-17), - expectedDatapoint: dataPoint{ - name: "int", - value: float64(0), - labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + { + name: "Int data type with 1st delta calculation", + adjustToDelta: true, + metricName: "int", + metricValue: int64(-17), + expectedDatapoint: dataPoint{ + name: "int", + value: float64(-17), + labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + }, + expectedRetained: retainInitialValueOfDeltaMetric, }, - }, - { - name: "Int data type with 2nd delta calculation", - adjustToDelta: true, - metricName: "int", - metricValue: int64(1), - expectedDatapoint: dataPoint{ - name: "int", - value: float64(18), - labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + { + name: "Int data type with 2nd delta calculation", + adjustToDelta: true, + metricName: "int", + metricValue: int64(1), + expectedDatapoint: dataPoint{ + name: "int", + value: float64(18), + labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + }, + expectedRetained: true, }, - }, - { - name: "Int data type without delta calculation", - adjustToDelta: false, - metricName: "int", - metricValue: int64(10), - expectedDatapoint: dataPoint{ - name: "int", - value: float64(10), - labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + { + name: "Int data type without delta calculation", + adjustToDelta: false, + metricName: "int", + metricValue: int64(10), + expectedDatapoint: dataPoint{ + name: "int", + value: float64(10), + labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + }, + expectedRetained: true, }, - }, - } + } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Given the number datapoint (including Sum and Gauge OTEL metric type) with data type as int or double - numberDPS := pmetric.NewNumberDataPointSlice() - numberDP := numberDPS.AppendEmpty() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Given the number datapoint (including Sum and Gauge OTEL metric type) with data type as int or double + numberDPS := pmetric.NewNumberDataPointSlice() + numberDP := numberDPS.AppendEmpty() - numberDP.Attributes().PutStr("label1", "value1") + numberDP.Attributes().PutStr("label1", "value1") - switch v := tc.metricValue.(type) { - case int64: - numberDP.SetIntValue(v) - case float64: - numberDP.SetDoubleValue(v) - } + switch v := tc.metricValue.(type) { + case int64: + numberDP.SetIntValue(v) + case float64: + numberDP.SetDoubleValue(v) + } - deltaMetricMetadata := generateDeltaMetricMetadata(tc.adjustToDelta, tc.metricName) - numberDatapointSlice := numberDataPointSlice{deltaMetricMetadata, numberDPS} + deltaMetricMetadata := generateDeltaMetricMetadata(tc.adjustToDelta, tc.metricName, retainInitialValueOfDeltaMetric) + numberDatapointSlice := numberDataPointSlice{deltaMetricMetadata, numberDPS} - // When calculate the delta datapoints for number datapoint - dps, retained := numberDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false) + // When calculate the delta datapoints for number datapoint + dps, retained := numberDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false) - // Then asserting the delta is within 0.002 with the following datapoint - assert.Equal(t, 1, numberDatapointSlice.Len()) - if retained { - assert.Equal(t, tc.expectedDatapoint.name, dps[0].name) - assert.Equal(t, tc.expectedDatapoint.labels, dps[0].labels) - assert.InDelta(t, tc.expectedDatapoint.value, dps[0].value, 0.002) - } - }) + assert.Equal(t, 1, numberDatapointSlice.Len()) + assert.Equal(t, tc.expectedRetained, retained) + if retained { + assert.Equal(t, tc.expectedDatapoint.name, dps[0].name) + assert.Equal(t, tc.expectedDatapoint.labels, dps[0].labels) + // Asserting the delta is within 0.002 with the following datapoint + assert.InDelta(t, tc.expectedDatapoint.value, dps[0].value, 0.002) + } + }) + } } } func TestCalculateDeltaDatapoints_HistogramDataPointSlice(t *testing.T) { - deltaMetricMetadata := generateDeltaMetricMetadata(false, "foo") + deltaMetricMetadata := generateDeltaMetricMetadata(false, "foo", false) testCases := []struct { name string @@ -348,79 +359,85 @@ func TestCalculateDeltaDatapoints_HistogramDataPointSlice(t *testing.T) { } func TestCalculateDeltaDatapoints_SummaryDataPointSlice(t *testing.T) { - deltaMetricMetadata := generateDeltaMetricMetadata(true, "foo") - - testCases := []struct { - name string - summaryMetricValue map[string]interface{} - expectedDatapoint []dataPoint - }{ - { - name: "Detailed summary with 1st delta sum count calculation", - summaryMetricValue: map[string]interface{}{"sum": float64(17.3), "count": uint64(17), "firstQuantile": float64(1), "secondQuantile": float64(5)}, - expectedDatapoint: []dataPoint{ - {name: fmt.Sprint("foo", summarySumSuffix), value: float64(17.3), labels: map[string]string{"label1": "value1"}}, - {name: fmt.Sprint("foo", summaryCountSuffix), value: uint64(17), labels: map[string]string{"label1": "value1"}}, - {name: "foo", value: float64(1), labels: map[string]string{"label1": "value1", "quantile": "0"}}, - {name: "foo", value: float64(5), labels: map[string]string{"label1": "value1", "quantile": "100"}}, + for _, retainInitialValueOfDeltaMetric := range []bool{true, false} { + deltaMetricMetadata := generateDeltaMetricMetadata(true, "foo", retainInitialValueOfDeltaMetric) + + testCases := []struct { + name string + summaryMetricValue map[string]interface{} + expectedDatapoint []dataPoint + expectedRetained bool + }{ + { + name: fmt.Sprintf("Detailed summary with 1st delta sum count calculation retainInitialValueOfDeltaMetric=%t", retainInitialValueOfDeltaMetric), + summaryMetricValue: map[string]interface{}{"sum": float64(17.3), "count": uint64(17), "firstQuantile": float64(1), "secondQuantile": float64(5)}, + expectedDatapoint: []dataPoint{ + {name: fmt.Sprint("foo", summarySumSuffix), value: float64(17.3), labels: map[string]string{"label1": "value1"}}, + {name: fmt.Sprint("foo", summaryCountSuffix), value: uint64(17), labels: map[string]string{"label1": "value1"}}, + {name: "foo", value: float64(1), labels: map[string]string{"label1": "value1", "quantile": "0"}}, + {name: "foo", value: float64(5), labels: map[string]string{"label1": "value1", "quantile": "100"}}, + }, + expectedRetained: retainInitialValueOfDeltaMetric, }, - }, - { - name: "Detailed summary with 2nd delta sum count calculation", - summaryMetricValue: map[string]interface{}{"sum": float64(100), "count": uint64(25), "firstQuantile": float64(1), "secondQuantile": float64(5)}, - expectedDatapoint: []dataPoint{ - {name: fmt.Sprint("foo", summarySumSuffix), value: float64(82.7), labels: map[string]string{"label1": "value1"}}, - {name: fmt.Sprint("foo", summaryCountSuffix), value: uint64(8), labels: map[string]string{"label1": "value1"}}, - {name: "foo", value: float64(1), labels: map[string]string{"label1": "value1", "quantile": "0"}}, - {name: "foo", value: float64(5), labels: map[string]string{"label1": "value1", "quantile": "100"}}, + { + name: "Detailed summary with 2nd delta sum count calculation", + summaryMetricValue: map[string]interface{}{"sum": float64(100), "count": uint64(25), "firstQuantile": float64(1), "secondQuantile": float64(5)}, + expectedDatapoint: []dataPoint{ + {name: fmt.Sprint("foo", summarySumSuffix), value: float64(82.7), labels: map[string]string{"label1": "value1"}}, + {name: fmt.Sprint("foo", summaryCountSuffix), value: uint64(8), labels: map[string]string{"label1": "value1"}}, + {name: "foo", value: float64(1), labels: map[string]string{"label1": "value1", "quantile": "0"}}, + {name: "foo", value: float64(5), labels: map[string]string{"label1": "value1", "quantile": "100"}}, + }, + expectedRetained: true, }, - }, - { - name: "Detailed summary with 3rd delta sum count calculation", - summaryMetricValue: map[string]interface{}{"sum": float64(120), "count": uint64(26), "firstQuantile": float64(1), "secondQuantile": float64(5)}, - expectedDatapoint: []dataPoint{ - {name: fmt.Sprint("foo", summarySumSuffix), value: float64(20), labels: map[string]string{"label1": "value1"}}, - {name: fmt.Sprint("foo", summaryCountSuffix), value: uint64(1), labels: map[string]string{"label1": "value1"}}, - {name: "foo", value: float64(1), labels: map[string]string{"label1": "value1", "quantile": "0"}}, - {name: "foo", value: float64(5), labels: map[string]string{"label1": "value1", "quantile": "100"}}, + { + name: "Detailed summary with 3rd delta sum count calculation", + summaryMetricValue: map[string]interface{}{"sum": float64(120), "count": uint64(26), "firstQuantile": float64(1), "secondQuantile": float64(5)}, + expectedDatapoint: []dataPoint{ + {name: fmt.Sprint("foo", summarySumSuffix), value: float64(20), labels: map[string]string{"label1": "value1"}}, + {name: fmt.Sprint("foo", summaryCountSuffix), value: uint64(1), labels: map[string]string{"label1": "value1"}}, + {name: "foo", value: float64(1), labels: map[string]string{"label1": "value1", "quantile": "0"}}, + {name: "foo", value: float64(5), labels: map[string]string{"label1": "value1", "quantile": "100"}}, + }, + expectedRetained: true, }, - }, - } + } - for i, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Given the summary datapoints with quantile 0, quantile 100, sum and count - summaryDPS := pmetric.NewSummaryDataPointSlice() - summaryDP := summaryDPS.AppendEmpty() - summaryDP.SetSum(tc.summaryMetricValue["sum"].(float64)) - summaryDP.SetCount(tc.summaryMetricValue["count"].(uint64)) - summaryDP.Attributes().PutStr("label1", "value1") - - summaryDP.QuantileValues().EnsureCapacity(2) - firstQuantileValue := summaryDP.QuantileValues().AppendEmpty() - firstQuantileValue.SetQuantile(0) - firstQuantileValue.SetValue(tc.summaryMetricValue["firstQuantile"].(float64)) - secondQuantileValue := summaryDP.QuantileValues().AppendEmpty() - secondQuantileValue.SetQuantile(100) - secondQuantileValue.SetValue(tc.summaryMetricValue["secondQuantile"].(float64)) - - summaryDatapointSlice := summaryDataPointSlice{deltaMetricMetadata, summaryDPS} - - // When calculate the delta datapoints for sum and count in summary - dps, retained := summaryDatapointSlice.CalculateDeltaDatapoints(0, "", true) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Given the summary datapoints with quantile 0, quantile 100, sum and count + summaryDPS := pmetric.NewSummaryDataPointSlice() + summaryDP := summaryDPS.AppendEmpty() + summaryDP.SetSum(tc.summaryMetricValue["sum"].(float64)) + summaryDP.SetCount(tc.summaryMetricValue["count"].(uint64)) + summaryDP.Attributes().PutStr("label1", "value1") + + summaryDP.QuantileValues().EnsureCapacity(2) + firstQuantileValue := summaryDP.QuantileValues().AppendEmpty() + firstQuantileValue.SetQuantile(0) + firstQuantileValue.SetValue(tc.summaryMetricValue["firstQuantile"].(float64)) + secondQuantileValue := summaryDP.QuantileValues().AppendEmpty() + secondQuantileValue.SetQuantile(100) + secondQuantileValue.SetValue(tc.summaryMetricValue["secondQuantile"].(float64)) + + summaryDatapointSlice := summaryDataPointSlice{deltaMetricMetadata, summaryDPS} + + // When calculate the delta datapoints for sum and count in summary + dps, retained := summaryDatapointSlice.CalculateDeltaDatapoints(0, "", true) + + // Then receiving the following datapoint with an expected length + assert.Equal(t, tc.expectedRetained, retained) + assert.Equal(t, 1, summaryDatapointSlice.Len()) + if retained { + assert.Len(t, dps, 4) + for i, dp := range dps { + assert.Equal(t, tc.expectedDatapoint[i].labels, dp.labels) + assert.InDelta(t, tc.expectedDatapoint[i].value, dp.value, 0.002) + } - // Then receiving the following datapoint with an expected length - assert.Equal(t, i > 0, retained) - assert.Equal(t, 1, summaryDatapointSlice.Len()) - if retained { - assert.Len(t, dps, 4) - for i, dp := range dps { - assert.Equal(t, tc.expectedDatapoint[i].labels, dp.labels) - assert.InDelta(t, tc.expectedDatapoint[i].value, dp.value, 0.002) } - - } - }) + }) + } } } @@ -449,8 +466,8 @@ func TestCreateLabels(t *testing.T) { func TestGetDataPoints(t *testing.T) { logger := zap.NewNop() - normalDeltraMetricMetadata := generateDeltaMetricMetadata(false, "foo") - cumulativeDeltaMetricMetadata := generateDeltaMetricMetadata(true, "foo") + normalDeltraMetricMetadata := generateDeltaMetricMetadata(false, "foo", false) + cumulativeDeltaMetricMetadata := generateDeltaMetricMetadata(true, "foo", false) testCases := []struct { name string diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index 552f6e707d2a..e73eab57e592 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -42,13 +42,14 @@ func NewFactory() exporter.Factory { // CreateDefaultConfig creates the default configuration for exporter. func createDefaultConfig() component.Config { return &Config{ - AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), - LogGroupName: "", - LogStreamName: "", - Namespace: "", - DimensionRollupOption: "ZeroAndSingleDimensionRollup", - OutputDestination: "cloudwatch", - logger: zap.NewNop(), + AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), + LogGroupName: "", + LogStreamName: "", + Namespace: "", + DimensionRollupOption: "ZeroAndSingleDimensionRollup", + RetainInitialValueOfDeltaMetric: false, + OutputDestination: "cloudwatch", + logger: zap.NewNop(), } } diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index f0b5b6622da1..af705d607add 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -68,11 +68,12 @@ type cWMetricStats struct { } type groupedMetricMetadata struct { - namespace string - timestampMs int64 - logGroup string - logStream string - metricDataType pmetric.MetricType + namespace string + timestampMs int64 + logGroup string + logStream string + metricDataType pmetric.MetricType + retainInitialValueForDelta bool } // cWMetricMetadata represents the metadata associated with a given CloudWatch metric