Skip to content

Commit

Permalink
[receiver/k8sclusterreceiver] Fix k8s node and container cpu metrics …
Browse files Browse the repository at this point in the history
…not being reported properly (#8245)

* Added a feature gate that enables k8s node and container cpu metrics to be reported as double values

* Refactored testutils/metrics.go methods to support asserting int and double metrics

* Added tests for node cpu metrics being reported as double cpu units with the feature gate enabled

* patch2 code review changes

* patch3 code review changes

Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
jvoravong and Alex Boten authored Mar 11, 2022
1 parent c119771 commit 8fcbd64
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 81 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

- `zipkinexporter`: Set "error" tag value when status is set to error (#8187)
- `prometheusremotewriteexporter`: Correctly handle metric labels which collide after sanitization (#8378)
- `k8sclusterreceiver`: Add support to enable k8s node and container cpu metrics to be reported as double values (#8245)
- Use "--feature-gates=receiver.k8sclusterreceiver.reportCpuMetricsAsDouble" to enable reporting node and container
cpu metrics as a double values.

### 🚀 New components 🚀

Expand Down
37 changes: 37 additions & 0 deletions receiver/k8sclusterreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,43 @@ Example:
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
### Feature Gate Configurations
- `receiver.k8sclusterreceiver.reportCpuMetricsAsDouble`
- Description
- The k8s container and node cpu metrics being reported by the k8sclusterreceiver are transitioning from being
reported as integer millicpu units to being reported as double cpu units to adhere to opentelemetry cpu metric
specifications. Please update any monitoring this might affect, the change will cause cpu metrics to be double
instead of integer values as well as metric values will be scaled down by 1000x. You can control whether the
k8sclusterreceiver reports container and node cpu metrics in double cpu units instead of integer millicpu units
with the feature gate listed below.
- Affected Metrics
- k8s.container.cpu_request
- k8s.container.cpu_limit
- k8s.node.allocatable_cpu
- Stages and Timeline
- Alpha
- In this stage the feature is disabled by default and must be enabled by the user.
- Target version
- v0.47.0
- Beta
- In this stage the feature enabled by default but can be disabled by the user.
- Target version
- v0.50.0
- Generally Available
- In this stage the feature is permanently enabled and the feature gate is no longer available.
- Target version
- v0.53.0
- Usage
- Feature gate identifiers prefixed with - will disable the gate and prefixing with + or with no prefix will enable the gate.
- Start the otelcol with the feature gate enabled:
- otelcol {other_arguments} --feature-gates=receiver.k8sclusterreceiver.reportCpuMetricsAsDouble
- Start the otelcol with the feature gate disabled:
- otelcol {other_arguments} --feature-gates=-receiver.k8sclusterreceiver.reportCpuMetricsAsDouble
- More Information
- [collector.go where the the feature gate is registered](./internal/collection/collector.go)
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/8115
- https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/system-metrics.md#systemcpu---processor-metrics

### node_conditions_to_report

For example, with the config below the receiver will emit two metrics
Expand Down
29 changes: 29 additions & 0 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
quotav1 "github.com/openshift/api/quota/v1"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/service/featuregate"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -64,6 +65,9 @@ const (
k8sKindReplicationController = "ReplicationController"
k8sKindReplicaSet = "ReplicaSet"
k8sStatefulSet = "StatefulSet"

// ID for a temporary feature gate
reportCPUMetricsAsDoubleFeatureGateID = "receiver.k8sclusterreceiver.reportCpuMetricsAsDouble"
)

// DataCollector wraps around a metricsStore and a metadaStore exposing
Expand All @@ -78,8 +82,33 @@ type DataCollector struct {
allocatableTypesToReport []string
}

var reportCPUMetricsAsDoubleFeatureGate = featuregate.Gate{
ID: reportCPUMetricsAsDoubleFeatureGateID,
Enabled: false,
Description: "The k8s container and node cpu metrics being reported by the k8sclusterreceiver are transitioning " +
"from being reported as integer millicpu units to being reported as double cpu units to adhere to " +
"opentelemetry cpu metric specifications. You can control whether the k8sclusterreceiver reports container " +
"and node cpu metrics in double cpu units instead of integer millicpu units with the " +
"receiver.k8sclusterreceiver.reportCpuMetricsAsDouble feature gate. " +
"For more details see: " +
"https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/k8sclusterreceiver/README.md#feature-gate-configurations ",
}

func init() {
featuregate.Register(reportCPUMetricsAsDoubleFeatureGate)
}

// NewDataCollector returns a DataCollector.
func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
if featuregate.IsEnabled(reportCPUMetricsAsDoubleFeatureGateID) {
logger.Info("The receiver.k8sclusterreceiver.reportCpuMetricsAsDouble feature gate is enabled. This " +
"otel collector will report double cpu units, which is good for future support!")
} else {
logger.Info("WARNING - Breaking Change: " + reportCPUMetricsAsDoubleFeatureGate.Description)
logger.Info("The receiver.k8sclusterreceiver.reportCpuMetricsAsDouble feature gate is disabled. This " +
"otel collector will report integer cpu units, be aware this will not be supported in the future.")
}

return &DataCollector{
logger: logger,
metricsStore: &metricsStore{
Expand Down
17 changes: 12 additions & 5 deletions receiver/k8sclusterreceiver/internal/collection/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/model/semconv/v1.6.1"
"go.opentelemetry.io/collector/service/featuregate"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -110,20 +111,26 @@ func getSpecMetricsForContainer(c corev1.Container) []*metricspb.Metric {
},
} {
for k, v := range t.rl {
val := v.Value()
val := utils.GetInt64TimeSeries(v.Value())
valType := metricspb.MetricDescriptor_GAUGE_INT64
if k == corev1.ResourceCPU {
val = v.MilliValue()
if featuregate.IsEnabled(reportCPUMetricsAsDoubleFeatureGateID) {
// cpu metrics must be of the double type to adhere to opentelemetry system.cpu metric specifications
valType = metricspb.MetricDescriptor_GAUGE_DOUBLE
val = utils.GetDoubleTimeSeries(float64(v.MilliValue()) / 1000.0)
} else {
val = utils.GetInt64TimeSeries(v.MilliValue())
}
}

metrics = append(metrics,
&metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: fmt.Sprintf("k8s.container.%s_%s", k, t.typ),
Description: t.description,
Type: metricspb.MetricDescriptor_GAUGE_INT64,
Type: valType,
},
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(val),
val,
},
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestCronJobMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.cronjob.active_jobs",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.cronjob.active_jobs",
metricspb.MetricDescriptor_GAUGE_INT64, 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func TestDaemonsetMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, rm.metrics[0], "k8s.daemonset.current_scheduled_nodes",
testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.daemonset.current_scheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 3)

testutils.AssertMetrics(t, rm.metrics[1], "k8s.daemonset.desired_scheduled_nodes",
testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.daemonset.desired_scheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 5)

testutils.AssertMetrics(t, rm.metrics[2], "k8s.daemonset.misscheduled_nodes",
testutils.AssertMetricsInt(t, rm.metrics[2], "k8s.daemonset.misscheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 1)

testutils.AssertMetrics(t, rm.metrics[3], "k8s.daemonset.ready_nodes",
testutils.AssertMetricsInt(t, rm.metrics[3], "k8s.daemonset.ready_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func TestDeploymentMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, rm.metrics[0], "k8s.deployment.desired",
testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.deployment.desired",
metricspb.MetricDescriptor_GAUGE_INT64, 10)

testutils.AssertMetrics(t, rm.metrics[1], "k8s.deployment.available",
testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.deployment.available",
metricspb.MetricDescriptor_GAUGE_INT64, 3)
}

Expand Down
8 changes: 4 additions & 4 deletions receiver/k8sclusterreceiver/internal/collection/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func TestHPAMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, rm.metrics[0], "k8s.hpa.max_replicas",
testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.hpa.max_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 10)

testutils.AssertMetrics(t, rm.metrics[1], "k8s.hpa.min_replicas",
testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.hpa.min_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

testutils.AssertMetrics(t, rm.metrics[2], "k8s.hpa.current_replicas",
testutils.AssertMetricsInt(t, rm.metrics[2], "k8s.hpa.current_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 5)

testutils.AssertMetrics(t, rm.metrics[3], "k8s.hpa.desired_replicas",
testutils.AssertMetricsInt(t, rm.metrics[3], "k8s.hpa.desired_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 7)
}

Expand Down
16 changes: 8 additions & 8 deletions receiver/k8sclusterreceiver/internal/collection/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ func TestJobMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 0)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 3)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[3], "k8s.job.desired_successful_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[3], "k8s.job.desired_successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 10)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[4], "k8s.job.max_parallel_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[4], "k8s.job.max_parallel_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

// Test with nil values.
Expand All @@ -65,13 +65,13 @@ func TestJobMetrics(t *testing.T) {
require.Equal(t, 1, len(actualResourceMetrics))
require.Equal(t, 3, len(actualResourceMetrics[0].metrics))

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 0)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 3)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestNamespaceMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.namespace.phase",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.namespace.phase",
metricspb.MetricDescriptor_GAUGE_INT64, 0)
}

Expand Down
63 changes: 35 additions & 28 deletions receiver/k8sclusterreceiver/internal/collection/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/iancoleman/strcase"
conventions "go.opentelemetry.io/collector/model/semconv/v1.6.1"
"go.opentelemetry.io/collector/service/featuregate"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -61,27 +62,46 @@ func getMetricsForNode(node *corev1.Node, nodeConditionTypesToReport, allocatabl
},
})
}

// Adding 'node allocatable type' metrics
for _, nodeAllocatableTypeValue := range allocatableTypesToReport {
nodeAllocatableMetric := getNodeAllocatableMetric(nodeAllocatableTypeValue)
v1NodeAllocatableTypeValue := corev1.ResourceName(nodeAllocatableTypeValue)
metricValue, err := nodeAllocatableValue(node, v1NodeAllocatableTypeValue)

// metrics will be skipped if metric not present in node or value is not convertable to int64
if err != nil {
logger.Debug(err.Error())
valType := metricspb.MetricDescriptor_GAUGE_INT64
quantity, ok := node.Status.Allocatable[v1NodeAllocatableTypeValue]
if !ok {
logger.Debug(fmt.Errorf("allocatable type %v not found in node %v", nodeAllocatableTypeValue,
node.GetName()).Error())
continue
}
val := utils.GetInt64TimeSeries(quantity.Value())

if featuregate.IsEnabled(reportCPUMetricsAsDoubleFeatureGateID) {
// cpu metrics must be of the double type to adhere to opentelemetry system.cpu metric specifications
if v1NodeAllocatableTypeValue == corev1.ResourceCPU {
val = utils.GetDoubleTimeSeries(float64(quantity.MilliValue()) / 1000.0)
valType = metricspb.MetricDescriptor_GAUGE_DOUBLE
}
} else {
metrics = append(metrics, &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: nodeAllocatableMetric,
Description: allocatableDesciption[v1NodeAllocatableTypeValue.String()],
Type: metricspb.MetricDescriptor_GAUGE_INT64,
},
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(metricValue),
},
})
// metrics will be skipped if metric not present in node or value is not convertable to int64
valInt64, ok := quantity.AsInt64()
if !ok {
logger.Debug(fmt.Errorf("metric %s has value %v which is not convertable to int64",
v1NodeAllocatableTypeValue, node.GetName()).Error())
continue
}
val = utils.GetInt64TimeSeries(valInt64)
}
metrics = append(metrics, &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: nodeAllocatableMetric,
Description: allocatableDesciption[v1NodeAllocatableTypeValue.String()],
Type: valType,
},
Timeseries: []*metricspb.TimeSeries{
val,
},
})
}

return []*resourceMetrics{
Expand Down Expand Up @@ -117,19 +137,6 @@ var nodeConditionValues = map[corev1.ConditionStatus]int64{
corev1.ConditionUnknown: -1,
}

func nodeAllocatableValue(node *corev1.Node, allocatableType corev1.ResourceName) (int64, error) {
value, ok := node.Status.Allocatable[allocatableType]
if !ok {
return 0, fmt.Errorf("allocatable type %v not found in node %v", allocatableType, node.GetName())
}

val, ok := value.AsInt64()
if !ok {
return 0, fmt.Errorf("metric %s has value %v which is not convertable to int64", allocatableType, value)
}
return val, nil
}

func nodeConditionValue(node *corev1.Node, condType corev1.NodeConditionType) int64 {
status := corev1.ConditionUnknown
for _, c := range node.Status.Conditions {
Expand Down
Loading

0 comments on commit 8fcbd64

Please sign in to comment.