Skip to content

Commit

Permalink
[receiver/kubeletstats] Migrate kubeletstatsreceiver to the new Metri…
Browse files Browse the repository at this point in the history
…cs Builder (#9744)

* Updated to use v2 generated metrics

* Fix lint errors

* Update changelog

* Moved Node and Pod resources back to accumulator

* replaced resource labels with ResourceOptions

* Fix lint

* Fix import spacing

* Update receiver/kubeletstatsreceiver/internal/metadata/metrics.go

Co-authored-by: Dmitrii Anoshin <[email protected]>

* Refactored scraper to emit using metrics builder

* Add default metrics

* Added multiple metrics builders

* Fix lint

* Regenerated v2 metrics

* rename metric builders

* Updated to set new start time.

* Update CHANGELOG.md

* Update CHANGELOG.md

Co-authored-by: Dmitrii Anoshin <[email protected]>
  • Loading branch information
TylerHelmuth and dmitryax authored May 13, 2022
1 parent 948318e commit 85000d9
Show file tree
Hide file tree
Showing 26 changed files with 3,240 additions and 973 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

### 💡 Enhancements 💡

- `kubeletstatsreceiver`: Update receiver to use new Metrics Builder. All emitted metrics remain the same. (#9744)

### 🧰 Bug fixes 🧰

## v0.51.0
Expand Down
4 changes: 4 additions & 0 deletions receiver/kubeletstatsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
kube "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

var _ config.Receiver = (*Config)(nil)
Expand All @@ -48,6 +49,9 @@ type Config struct {

// Configuration of the Kubernetes API client.
K8sAPIConfig *k8sconfig.APIConfig `mapstructure:"k8s_api_config"`

// Metrics allows customizing scraped metrics representation.
Metrics metadata.MetricsSettings `mapstructure:"metrics"`
}

func (cfg *Config) Validate() error {
Expand Down
7 changes: 7 additions & 0 deletions receiver/kubeletstatsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
kube "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -59,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, defaultCfg)

tlsCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "tls")].(*Config)
Expand Down Expand Up @@ -86,6 +88,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, tlsCfg)

saCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "sa")].(*Config)
Expand All @@ -105,6 +108,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, saCfg)

metadataCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "metadata")].(*Config)
Expand All @@ -127,6 +131,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, metadataCfg)

metricGroupsCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "metric_groups")].(*Config)
Expand All @@ -145,6 +150,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.NodeMetricGroup,
kubelet.VolumeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, metricGroupsCfg)

metadataWithK8sAPICfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "metadata_with_k8s_api")].(*Config)
Expand All @@ -167,6 +173,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.NodeMetricGroup,
},
K8sAPIConfig: &k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeKubeConfig},
Metrics: metadata.DefaultMetricsSettings(),
}, metadataWithK8sAPICfg)
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/kubeletstatsreceiver/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
//go:build !windows
// +build !windows

//go:generate mdatagen metadata.yaml
//go:generate mdatagen --experimental-gen metadata.yaml

package kubeletstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver"
9 changes: 8 additions & 1 deletion receiver/kubeletstatsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ These are the metrics available for this scraper.
| **k8s.volume.inodes.free** | The free inodes in the filesystem. | 1 | Gauge(Int) | <ul> </ul> |
| **k8s.volume.inodes.used** | The inodes used by the filesystem. This may not equal inodes - free because filesystem may share inodes with other filesystems. | 1 | Gauge(Int) | <ul> </ul> |

**Highlighted metrics** are emitted by default.
**Highlighted metrics** are emitted by default. Other metrics are optional and not emitted by default.
Any metric can be enabled or disabled with the following scraper configuration:

```yaml
metrics:
<metric_name>:
enabled: <true|false>
```
## Resource attributes
Expand Down
4 changes: 3 additions & 1 deletion receiver/kubeletstatsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
kube "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

const (
Expand Down Expand Up @@ -59,6 +60,7 @@ func createDefaultConfig() config.Receiver {
AuthType: k8sconfig.AuthTypeTLS,
},
},
Metrics: metadata.DefaultMetricsSettings(),
}
}

Expand All @@ -78,7 +80,7 @@ func createMetricsReceiver(
return nil, err
}

scrp, err := newKubletScraper(rest, set, rOptions)
scrp, err := newKubletScraper(rest, set, rOptions, cfg.Metrics)
if err != nil {
return nil, err
}
Expand Down
84 changes: 32 additions & 52 deletions receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,66 +49,53 @@ type metricDataAccumulator struct {
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
time time.Time
mbs *metadata.MetricsBuilders
}

const (
scopeName = "otelcol/kubeletstatsreceiver"
)

func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
if !a.metricGroupsToCollect[NodeMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
fillNodeResource(rm.Resource(), s)

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

startTime := pcommon.NewTimestampFromTime(s.StartTime.Time)
currentTime := pcommon.NewTimestampFromTime(a.time)
addCPUMetrics(ilm.Metrics(), metadata.NodeCPUMetrics, s.CPU, startTime, currentTime)
addMemoryMetrics(ilm.Metrics(), metadata.NodeMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(ilm.Metrics(), metadata.NodeFilesystemMetrics, s.Fs, currentTime)
addNetworkMetrics(ilm.Metrics(), metadata.NodeNetworkMetrics, s.Network, startTime, currentTime)
addCPUMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeFilesystemMetrics, s.Fs, currentTime)
addNetworkMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetrics, s.Network, currentTime)
// todo s.Runtime.ImageFs

a.m = append(a.m, md)
a.m = append(a.m, a.mbs.NodeMetricsBuilder.Emit(
metadata.WithStartTimeOverride(pcommon.NewTimestampFromTime(s.StartTime.Time)),
metadata.WithK8sNodeName(s.NodeName),
))
}

func (a *metricDataAccumulator) podStats(s stats.PodStats) {
if !a.metricGroupsToCollect[PodMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
fillPodResource(rm.Resource(), s)

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

startTime := pcommon.NewTimestampFromTime(s.StartTime.Time)
currentTime := pcommon.NewTimestampFromTime(a.time)
addCPUMetrics(ilm.Metrics(), metadata.PodCPUMetrics, s.CPU, startTime, currentTime)
addMemoryMetrics(ilm.Metrics(), metadata.PodMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(ilm.Metrics(), metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(ilm.Metrics(), metadata.PodNetworkMetrics, s.Network, startTime, currentTime)

a.m = append(a.m, md)
addCPUMetrics(a.mbs.PodMetricsBuilder, metadata.PodCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.PodMetricsBuilder, metadata.PodMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.PodMetricsBuilder, metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)

a.m = append(a.m, a.mbs.PodMetricsBuilder.Emit(
metadata.WithStartTimeOverride(pcommon.NewTimestampFromTime(s.StartTime.Time)),
metadata.WithK8sPodUID(s.PodRef.UID),
metadata.WithK8sPodName(s.PodRef.Name),
metadata.WithK8sNamespaceName(s.PodRef.Namespace),
))
}

func (a *metricDataAccumulator) containerStats(sPod stats.PodStats, s stats.ContainerStats) {
if !a.metricGroupsToCollect[ContainerMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()

if err := fillContainerResource(rm.Resource(), sPod, s, a.metadata); err != nil {
ro, err := getContainerResourceOptions(sPod, s, a.metadata)
if err != nil {
a.logger.Warn(
"failed to fetch container metrics",
zap.String("pod", sPod.PodRef.Name),
Expand All @@ -117,26 +104,21 @@ func (a *metricDataAccumulator) containerStats(sPod stats.PodStats, s stats.Cont
return
}

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

startTime := pcommon.NewTimestampFromTime(s.StartTime.Time)
currentTime := pcommon.NewTimestampFromTime(a.time)
addCPUMetrics(ilm.Metrics(), metadata.ContainerCPUMetrics, s.CPU, startTime, currentTime)
addMemoryMetrics(ilm.Metrics(), metadata.ContainerMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(ilm.Metrics(), metadata.ContainerFilesystemMetrics, s.Rootfs, currentTime)
a.m = append(a.m, md)
addCPUMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerFilesystemMetrics, s.Rootfs, currentTime)

a.m = append(a.m, a.mbs.ContainerMetricsBuilder.Emit(ro...))
}

func (a *metricDataAccumulator) volumeStats(sPod stats.PodStats, s stats.VolumeStats) {
if !a.metricGroupsToCollect[VolumeMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()

if err := fillVolumeResource(rm.Resource(), sPod, s, a.metadata); err != nil {
ro, err := getVolumeResourceOptions(sPod, s, a.metadata)
if err != nil {
a.logger.Warn(
"Failed to gather additional volume metadata. Skipping metric collection.",
zap.String("pod", sPod.PodRef.Name),
Expand All @@ -145,10 +127,8 @@ func (a *metricDataAccumulator) volumeStats(sPod stats.PodStats, s stats.VolumeS
return
}

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

currentTime := pcommon.NewTimestampFromTime(a.time)
addVolumeMetrics(ilm.Metrics(), metadata.K8sVolumeMetrics, s, currentTime)
a.m = append(a.m, md)
addVolumeMetrics(a.mbs.OtherMetricsBuilder, metadata.K8sVolumeMetrics, s, currentTime)

a.m = append(a.m, a.mbs.OtherMetricsBuilder.Emit(ro...))
}
21 changes: 17 additions & 4 deletions receiver/kubeletstatsreceiver/internal/kubelet/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

// TestMetadataErrorCases walks through the error cases of collecting
Expand All @@ -39,7 +41,7 @@ func TestMetadataErrorCases(t *testing.T) {
numMDs int
numLogs int
logMessages []string
detailedPVCLabelsSetterOverride func(volCacheID, volumeClaim, namespace string, labels map[string]string) error
detailedPVCLabelsSetterOverride func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceMetricsOption, error)
}{
{
name: "Fails to get container metadata",
Expand Down Expand Up @@ -176,9 +178,9 @@ func TestMetadataErrorCases(t *testing.T) {
},
},
}, nil),
detailedPVCLabelsSetterOverride: func(volCacheID, volumeClaim, namespace string, labels map[string]string) error {
detailedPVCLabelsSetterOverride: func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceMetricsOption, error) {
// Mock failure cases.
return errors.New("")
return nil, errors.New("")
},
testScenario: func(acc metricDataAccumulator) {
podStats := stats.PodStats{
Expand All @@ -205,11 +207,16 @@ func TestMetadataErrorCases(t *testing.T) {
observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)

tt.metadata.DetailedPVCLabelsSetter = tt.detailedPVCLabelsSetterOverride
tt.metadata.DetailedPVCResourceGetter = tt.detailedPVCLabelsSetterOverride
acc := metricDataAccumulator{
metadata: tt.metadata,
logger: logger,
metricGroupsToCollect: tt.metricGroupsToCollect,
mbs: &metadata.MetricsBuilders{
NodeMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
PodMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
OtherMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
},
}

tt.testScenario(acc)
Expand All @@ -231,6 +238,12 @@ func TestNilHandling(t *testing.T) {
ContainerMetricGroup: true,
VolumeMetricGroup: true,
},
mbs: &metadata.MetricsBuilders{
NodeMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
PodMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
ContainerMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
OtherMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
},
}
assert.NotPanics(t, func() {
acc.nodeStats(stats.NodeStats{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"

const (
labelPersistentVolumeClaimName = "k8s.persistentvolumeclaim.name"
labelVolumeName = "k8s.volume.name"
labelVolumeType = "k8s.volume.type"
labelVolumeType = "k8s.volume.type"

// Volume types.
labelValuePersistentVolumeClaim = "persistentVolumeClaim"
Expand Down
15 changes: 7 additions & 8 deletions receiver/kubeletstatsreceiver/internal/kubelet/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,31 @@ package kubelet // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

func addCPUMetrics(dest pmetric.MetricSlice, cpuMetrics metadata.CPUMetrics, s *stats.CPUStats, startTime pcommon.Timestamp, currentTime pcommon.Timestamp) {
func addCPUMetrics(mb *metadata.MetricsBuilder, cpuMetrics metadata.CPUMetrics, s *stats.CPUStats, currentTime pcommon.Timestamp) {
if s == nil {
return
}
addCPUUsageMetric(dest, cpuMetrics.Utilization, s, currentTime)
addCPUTimeMetric(dest, cpuMetrics.Time, s, startTime, currentTime)
addCPUUsageMetric(mb, cpuMetrics.Utilization, s, currentTime)
addCPUTimeMetric(mb, cpuMetrics.Time, s, currentTime)
}

func addCPUUsageMetric(dest pmetric.MetricSlice, metricInt metadata.MetricIntf, s *stats.CPUStats, currentTime pcommon.Timestamp) {
func addCPUUsageMetric(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordDoubleDataPointFunc, s *stats.CPUStats, currentTime pcommon.Timestamp) {
if s.UsageNanoCores == nil {
return
}
value := float64(*s.UsageNanoCores) / 1_000_000_000
fillDoubleGauge(dest.AppendEmpty(), metricInt, value, currentTime)
recordDataPoint(mb, currentTime, value)
}

func addCPUTimeMetric(dest pmetric.MetricSlice, metricInt metadata.MetricIntf, s *stats.CPUStats, startTime pcommon.Timestamp, currentTime pcommon.Timestamp) {
func addCPUTimeMetric(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordDoubleDataPointFunc, s *stats.CPUStats, currentTime pcommon.Timestamp) {
if s.UsageCoreNanoSeconds == nil {
return
}
value := float64(*s.UsageCoreNanoSeconds) / 1_000_000_000
fillDoubleSum(dest.AppendEmpty(), metricInt, value, startTime, currentTime)
recordDataPoint(mb, currentTime, value)
}
Loading

0 comments on commit 85000d9

Please sign in to comment.