Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/k8sclusterreceiver] Fix k8s node and container cpu metrics not being reported properly #8245

Merged
merged 6 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

- `zipkinexporter`: Set "error" tag value when status is set to error (#8187)
- `prometheusremotewriteexporter`: Correctly handle metric labels which collide after sanitization (#8378)
- `k8sclusterreceiver`: Add support to enable k8s node and container cpu metrics to be reported as double values (#8245)
- Use "--feature-gates=receiver.k8sclusterreceiver.reportCpuMetricsAsDouble" to enable reporting node and container
cpu metrics as a double values.

### 🚀 New components 🚀

Expand Down
37 changes: 37 additions & 0 deletions receiver/k8sclusterreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,43 @@ Example:
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).

### Feature Gate Configurations
- `receiver.k8sclusterreceiver.reportCpuMetricsAsDouble`
- Description
- The k8s container and node cpu metrics being reported by the k8sclusterreceiver are transitioning from being
reported as integer millicpu units to being reported as double cpu units to adhere to opentelemetry cpu metric
specifications. Please update any monitoring this might affect, the change will cause cpu metrics to be double
instead of integer values as well as metric values will be scaled down by 1000x. You can control whether the
k8sclusterreceiver reports container and node cpu metrics in double cpu units instead of integer millicpu units
with the feature gate listed below.
- Affected Metrics
- k8s.container.cpu_request
- k8s.container.cpu_limit
- k8s.node.allocatable_cpu
- Stages and Timeline
- Alpha
- In this stage the feature is disabled by default and must be enabled by the user.
- Target version
- v0.47.0
- Beta
- In this stage the feature enabled by default but can be disabled by the user.
- Target version
- v0.50.0
- Generally Available
- In this stage the feature is permanently enabled and the feature gate is no longer available.
- Target version
- v0.53.0
- Usage
- Feature gate identifiers prefixed with - will disable the gate and prefixing with + or with no prefix will enable the gate.
- Start the otelcol with the feature gate enabled:
- otelcol {other_arguments} --feature-gates=receiver.k8sclusterreceiver.reportCpuMetricsAsDouble
- Start the otelcol with the feature gate disabled:
- otelcol {other_arguments} --feature-gates=-receiver.k8sclusterreceiver.reportCpuMetricsAsDouble
- More Information
- [collector.go where the the feature gate is registered](./internal/collection/collector.go)
- https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/8115
- https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/system-metrics.md#systemcpu---processor-metrics

jvoravong marked this conversation as resolved.
Show resolved Hide resolved
### node_conditions_to_report

For example, with the config below the receiver will emit two metrics
Expand Down
29 changes: 29 additions & 0 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
quotav1 "github.com/openshift/api/quota/v1"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/service/featuregate"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -64,6 +65,9 @@ const (
k8sKindReplicationController = "ReplicationController"
k8sKindReplicaSet = "ReplicaSet"
k8sStatefulSet = "StatefulSet"

// ID for a temporary feature gate
reportCPUMetricsAsDoubleFeatureGateID = "receiver.k8sclusterreceiver.reportCpuMetricsAsDouble"
)

// DataCollector wraps around a metricsStore and a metadaStore exposing
Expand All @@ -78,8 +82,33 @@ type DataCollector struct {
allocatableTypesToReport []string
}

var reportCPUMetricsAsDoubleFeatureGate = featuregate.Gate{
ID: reportCPUMetricsAsDoubleFeatureGateID,
Enabled: false,
Description: "The k8s container and node cpu metrics being reported by the k8sclusterreceiver are transitioning " +
"from being reported as integer millicpu units to being reported as double cpu units to adhere to " +
"opentelemetry cpu metric specifications. You can control whether the k8sclusterreceiver reports container " +
"and node cpu metrics in double cpu units instead of integer millicpu units with the " +
"receiver.k8sclusterreceiver.reportCpuMetricsAsDouble feature gate. " +
"For more details see: " +
"https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/k8sclusterreceiver/README.md#feature-gate-configurations ",
}

func init() {
featuregate.Register(reportCPUMetricsAsDoubleFeatureGate)
}

// NewDataCollector returns a DataCollector.
func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
if featuregate.IsEnabled(reportCPUMetricsAsDoubleFeatureGateID) {
logger.Info("The receiver.k8sclusterreceiver.reportCpuMetricsAsDouble feature gate is enabled. This " +
"otel collector will report double cpu units, which is good for future support!")
} else {
logger.Info("WARNING - Breaking Change: " + reportCPUMetricsAsDoubleFeatureGate.Description)
logger.Info("The receiver.k8sclusterreceiver.reportCpuMetricsAsDouble feature gate is disabled. This " +
"otel collector will report integer cpu units, be aware this will not be supported in the future.")
}

return &DataCollector{
logger: logger,
metricsStore: &metricsStore{
Expand Down
17 changes: 12 additions & 5 deletions receiver/k8sclusterreceiver/internal/collection/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/model/semconv/v1.6.1"
"go.opentelemetry.io/collector/service/featuregate"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -110,20 +111,26 @@ func getSpecMetricsForContainer(c corev1.Container) []*metricspb.Metric {
},
} {
for k, v := range t.rl {
val := v.Value()
val := utils.GetInt64TimeSeries(v.Value())
valType := metricspb.MetricDescriptor_GAUGE_INT64
if k == corev1.ResourceCPU {
val = v.MilliValue()
if featuregate.IsEnabled(reportCPUMetricsAsDoubleFeatureGateID) {
// cpu metrics must be of the double type to adhere to opentelemetry system.cpu metric specifications
valType = metricspb.MetricDescriptor_GAUGE_DOUBLE
val = utils.GetDoubleTimeSeries(float64(v.MilliValue()) / 1000.0)
} else {
val = utils.GetInt64TimeSeries(v.MilliValue())
}
}

metrics = append(metrics,
&metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: fmt.Sprintf("k8s.container.%s_%s", k, t.typ),
Description: t.description,
Type: metricspb.MetricDescriptor_GAUGE_INT64,
Type: valType,
},
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(val),
val,
},
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestCronJobMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.cronjob.active_jobs",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.cronjob.active_jobs",
metricspb.MetricDescriptor_GAUGE_INT64, 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func TestDaemonsetMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, rm.metrics[0], "k8s.daemonset.current_scheduled_nodes",
testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.daemonset.current_scheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 3)

testutils.AssertMetrics(t, rm.metrics[1], "k8s.daemonset.desired_scheduled_nodes",
testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.daemonset.desired_scheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 5)

testutils.AssertMetrics(t, rm.metrics[2], "k8s.daemonset.misscheduled_nodes",
testutils.AssertMetricsInt(t, rm.metrics[2], "k8s.daemonset.misscheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 1)

testutils.AssertMetrics(t, rm.metrics[3], "k8s.daemonset.ready_nodes",
testutils.AssertMetricsInt(t, rm.metrics[3], "k8s.daemonset.ready_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func TestDeploymentMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, rm.metrics[0], "k8s.deployment.desired",
testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.deployment.desired",
metricspb.MetricDescriptor_GAUGE_INT64, 10)

testutils.AssertMetrics(t, rm.metrics[1], "k8s.deployment.available",
testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.deployment.available",
metricspb.MetricDescriptor_GAUGE_INT64, 3)
}

Expand Down
8 changes: 4 additions & 4 deletions receiver/k8sclusterreceiver/internal/collection/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func TestHPAMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, rm.metrics[0], "k8s.hpa.max_replicas",
testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.hpa.max_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 10)

testutils.AssertMetrics(t, rm.metrics[1], "k8s.hpa.min_replicas",
testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.hpa.min_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

testutils.AssertMetrics(t, rm.metrics[2], "k8s.hpa.current_replicas",
testutils.AssertMetricsInt(t, rm.metrics[2], "k8s.hpa.current_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 5)

testutils.AssertMetrics(t, rm.metrics[3], "k8s.hpa.desired_replicas",
testutils.AssertMetricsInt(t, rm.metrics[3], "k8s.hpa.desired_replicas",
metricspb.MetricDescriptor_GAUGE_INT64, 7)
}

Expand Down
16 changes: 8 additions & 8 deletions receiver/k8sclusterreceiver/internal/collection/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ func TestJobMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 0)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 3)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[3], "k8s.job.desired_successful_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[3], "k8s.job.desired_successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 10)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[4], "k8s.job.max_parallel_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[4], "k8s.job.max_parallel_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

// Test with nil values.
Expand All @@ -65,13 +65,13 @@ func TestJobMetrics(t *testing.T) {
require.Equal(t, 1, len(actualResourceMetrics))
require.Equal(t, 3, len(actualResourceMetrics[0].metrics))

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.job.active_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 2)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[1], "k8s.job.failed_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 0)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[2], "k8s.job.successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 3)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestNamespaceMetrics(t *testing.T) {
},
)

testutils.AssertMetrics(t, actualResourceMetrics[0].metrics[0], "k8s.namespace.phase",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.namespace.phase",
metricspb.MetricDescriptor_GAUGE_INT64, 0)
}

Expand Down
63 changes: 35 additions & 28 deletions receiver/k8sclusterreceiver/internal/collection/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/iancoleman/strcase"
conventions "go.opentelemetry.io/collector/model/semconv/v1.6.1"
"go.opentelemetry.io/collector/service/featuregate"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -61,27 +62,46 @@ func getMetricsForNode(node *corev1.Node, nodeConditionTypesToReport, allocatabl
},
})
}

// Adding 'node allocatable type' metrics
for _, nodeAllocatableTypeValue := range allocatableTypesToReport {
nodeAllocatableMetric := getNodeAllocatableMetric(nodeAllocatableTypeValue)
v1NodeAllocatableTypeValue := corev1.ResourceName(nodeAllocatableTypeValue)
metricValue, err := nodeAllocatableValue(node, v1NodeAllocatableTypeValue)

// metrics will be skipped if metric not present in node or value is not convertable to int64
if err != nil {
logger.Debug(err.Error())
valType := metricspb.MetricDescriptor_GAUGE_INT64
quantity, ok := node.Status.Allocatable[v1NodeAllocatableTypeValue]
if !ok {
logger.Debug(fmt.Errorf("allocatable type %v not found in node %v", nodeAllocatableTypeValue,
node.GetName()).Error())
continue
}
val := utils.GetInt64TimeSeries(quantity.Value())

if featuregate.IsEnabled(reportCPUMetricsAsDoubleFeatureGateID) {
// cpu metrics must be of the double type to adhere to opentelemetry system.cpu metric specifications
if v1NodeAllocatableTypeValue == corev1.ResourceCPU {
val = utils.GetDoubleTimeSeries(float64(quantity.MilliValue()) / 1000.0)
valType = metricspb.MetricDescriptor_GAUGE_DOUBLE
}
} else {
metrics = append(metrics, &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: nodeAllocatableMetric,
Description: allocatableDesciption[v1NodeAllocatableTypeValue.String()],
Type: metricspb.MetricDescriptor_GAUGE_INT64,
},
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(metricValue),
},
})
// metrics will be skipped if metric not present in node or value is not convertable to int64
valInt64, ok := quantity.AsInt64()
if !ok {
logger.Debug(fmt.Errorf("metric %s has value %v which is not convertable to int64",
v1NodeAllocatableTypeValue, node.GetName()).Error())
continue
}
val = utils.GetInt64TimeSeries(valInt64)
jvoravong marked this conversation as resolved.
Show resolved Hide resolved
}
metrics = append(metrics, &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: nodeAllocatableMetric,
Description: allocatableDesciption[v1NodeAllocatableTypeValue.String()],
Type: valType,
},
Timeseries: []*metricspb.TimeSeries{
val,
},
})
}

return []*resourceMetrics{
Expand Down Expand Up @@ -117,19 +137,6 @@ var nodeConditionValues = map[corev1.ConditionStatus]int64{
corev1.ConditionUnknown: -1,
}

func nodeAllocatableValue(node *corev1.Node, allocatableType corev1.ResourceName) (int64, error) {
value, ok := node.Status.Allocatable[allocatableType]
if !ok {
return 0, fmt.Errorf("allocatable type %v not found in node %v", allocatableType, node.GetName())
}

val, ok := value.AsInt64()
if !ok {
return 0, fmt.Errorf("metric %s has value %v which is not convertable to int64", allocatableType, value)
}
return val, nil
}

func nodeConditionValue(node *corev1.Node, condType corev1.NodeConditionType) int64 {
status := corev1.ConditionUnknown
for _, c := range node.Status.Conditions {
Expand Down
Loading