Skip to content

Commit

Permalink
[receiver/dockerstatsreceiver] refactor docker client (#4702)
Browse files Browse the repository at this point in the history
Moving docker client into new shared internal/docker

**Description:** The client in the docker stats receiver has useful functionality for use in other components, such as the docker observer.

**Link to tracking Issue:** #4446
  • Loading branch information
Mark Stumpf authored Sep 21, 2021
1 parent ce2ec25 commit 95f5018
Show file tree
Hide file tree
Showing 19 changed files with 1,725 additions and 101 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ extension/httpforwarder/ @open-telemetry/collector-c
extension/observer/ @open-telemetry/collector-contrib-approvers @asuresh4 @jrcamp

internal/aws/ @open-telemetry/collector-contrib-approvers @anuraaga @mxiamxia
internal/docker/ @open-telemetry/collector-contrib-approvers @mstumpfx @rmfitzpatrick

internal/k8sconfig/ @open-telemetry/collector-contrib-approvers @pmcollins @asuresh4
internal/splunk/ @open-telemetry/collector-contrib-approvers @pmcollins @asuresh4
internal/stanza/ @open-telemetry/collector-contrib-approvers @djaglowski
Expand Down
3 changes: 3 additions & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/xray v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/interval v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.36.0 // indirect
Expand Down Expand Up @@ -462,6 +463,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanz

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/interval => ../../internal/interval

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker => ../../internal/docker

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter => ../../exporter/alibabacloudlogserviceexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsprometheusremotewriteexporter => ../../exporter/awsprometheusremotewriteexporter
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/proxy v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/xray v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/interval v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.36.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.36.0 // indirect
Expand Down Expand Up @@ -462,6 +463,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanz

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/interval => ./internal/interval

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker => ./internal/docker

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter => ./exporter/alibabacloudlogserviceexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsprometheusremotewriteexporter => ./exporter/awsprometheusremotewriteexporter
Expand Down
1 change: 1 addition & 0 deletions internal/docker/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
73 changes: 73 additions & 0 deletions internal/docker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package docker

import (
"errors"
"fmt"
"time"
)

type Config struct {
// The URL of the docker server. Default is "unix:///var/run/docker.sock"
Endpoint string `mapstructure:"endpoint"`

// The maximum amount of time to wait for docker API responses. Default is 5s
Timeout time.Duration `mapstructure:"timeout"`

// A list of filters whose matching images are to be excluded. Supports literals, globs, and regex.
ExcludedImages []string `mapstructure:"excluded_images"`

// Docker client API version.
DockerAPIVersion float64 `mapstructure:"api_version"`
}

// NewConfig creates a new config to be used when creating
// a docker client
func NewConfig(endpoint string, timeout time.Duration, excludedImages []string, apiVersion float64) (*Config, error) {
cfg := &Config{
Endpoint: endpoint,
Timeout: timeout,
ExcludedImages: excludedImages,
DockerAPIVersion: apiVersion,
}

err := cfg.validate()
return cfg, err
}

// NewDefaultConfig creates a new config with default values
// to be used when creating a docker client
func NewDefaultConfig() *Config {
cfg := &Config{
Endpoint: "unix:///var/run/docker.sock",
Timeout: 5 * time.Second,
DockerAPIVersion: minimalRequiredDockerAPIVersion,
}

return cfg
}

// validate asserts that an endpoint field is set
// on the config struct
func (config Config) validate() error {
if config.Endpoint == "" {
return errors.New("config.Endpoint must be specified")
}
if config.DockerAPIVersion < minimalRequiredDockerAPIVersion {
return fmt.Errorf("Docker API version must be at least %v", minimalRequiredDockerAPIVersion)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package dockerstatsreceiver
package docker

import (
"context"
Expand All @@ -26,30 +26,35 @@ import (
dtypes "github.com/docker/docker/api/types"
dfilters "github.com/docker/docker/api/types/filters"
docker "github.com/docker/docker/client"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
)

const (
defaultDockerAPIVersion = 1.22
minimalRequiredDockerAPIVersion = 1.22
userAgent = "OpenTelemetry-Collector Docker Stats Receiver/v0.0.1"
)

// dockerClient provides the core metric gathering functionality from the Docker Daemon.
// Container is client.ContainerInspect() response container
// stats and translated environment string map for potential labels.
type Container struct {
*dtypes.ContainerJSON
EnvMap map[string]string
}

// Client provides the core metric gathering functionality from the Docker Daemon.
// It retrieves container information in two forms to produce metric data: dtypes.ContainerJSON
// from client.ContainerInspect() for container information (id, name, hostname, labels, and env)
// and dtypes.StatsJSON from client.ContainerStats() for metric values.
type dockerClient struct {
type Client struct {
client *docker.Client
config *Config
containers map[string]DockerContainer
containers map[string]Container
containersLock sync.Mutex
excludedImageMatcher *StringMatcher
logger *zap.Logger
}

func newDockerClient(config *Config, logger *zap.Logger) (*dockerClient, error) {
func NewDockerClient(config *Config, logger *zap.Logger) (*Client, error) {
client, err := docker.NewClientWithOpts(
docker.WithHost(config.Endpoint),
docker.WithVersion(fmt.Sprintf("v%v", config.DockerAPIVersion)),
Expand All @@ -64,23 +69,23 @@ func newDockerClient(config *Config, logger *zap.Logger) (*dockerClient, error)
return nil, fmt.Errorf("could not determine docker client excluded images: %w", err)
}

dc := &dockerClient{
dc := &Client{
client: client,
config: config,
logger: logger,
containers: make(map[string]DockerContainer),
containers: make(map[string]Container),
containersLock: sync.Mutex{},
excludedImageMatcher: excludedImageMatcher,
}

return dc, nil
}

// Provides a slice of DockerContainers to use for individual FetchContainerStats calls.
func (dc *dockerClient) Containers() []DockerContainer {
// Containers provides a slice of Container to use for individual FetchContainerStats calls.
func (dc *Client) Containers() []Container {
dc.containersLock.Lock()
defer dc.containersLock.Unlock()
containers := make([]DockerContainer, 0, len(dc.containers))
containers := make([]Container, 0, len(dc.containers))
for _, container := range dc.containers {
containers = append(containers, container)
}
Expand All @@ -90,7 +95,7 @@ func (dc *dockerClient) Containers() []DockerContainer {
// LoadContainerList will load the initial running container maps for
// inspection and establishing which containers warrant stat gathering calls
// by the receiver.
func (dc *dockerClient) LoadContainerList(ctx context.Context) error {
func (dc *Client) LoadContainerList(ctx context.Context) error {
// Build initial container maps before starting loop
filters := dfilters.NewArgs()
filters.Add("status", "running")
Expand Down Expand Up @@ -127,12 +132,31 @@ func (dc *dockerClient) LoadContainerList(ctx context.Context) error {
return nil
}

// FetchContainerStatsAndConvertToMetrics will query the desired container stats and send
// converted metrics to the results channel, since this is intended to be run in a goroutine.
func (dc *dockerClient) FetchContainerStatsAndConvertToMetrics(
// FetchContainerStatsAsJSON will query the desired container stats
// and return them as StatsJSON
func (dc *Client) FetchContainerStatsAsJSON(
ctx context.Context,
container Container,
) (*dtypes.StatsJSON, error) {
containerStats, err := dc.FetchContainerStats(ctx, container)
if err != nil {
return nil, err
}

statsJSON, err := dc.toStatsJSON(containerStats, &container)
if err != nil {
return nil, err
}

return statsJSON, nil
}

// FetchContainerStats will query the desired container stats
// and return them as ContainerStats
func (dc *Client) FetchContainerStats(
ctx context.Context,
container DockerContainer,
) (pdata.Metrics, error) {
container Container,
) (dtypes.ContainerStats, error) {
dc.logger.Debug("Fetching container stats.", zap.String("id", container.ID))
statsCtx, cancel := context.WithTimeout(ctx, dc.config.Timeout)
containerStats, err := dc.client.ContainerStats(statsCtx, container.ID, false)
Expand All @@ -151,30 +175,14 @@ func (dc *dockerClient) FetchContainerStatsAndConvertToMetrics(
zap.Error(err),
)
}

return pdata.NewMetrics(), err
}

statsJSON, err := dc.toStatsJSON(containerStats, &container)
if err != nil {
return pdata.NewMetrics(), err
}

md, err := ContainerStatsToMetrics(pdata.NewTimestampFromTime(time.Now()), statsJSON, &container, dc.config)
if err != nil {
dc.logger.Error(
"Could not convert docker containerStats for container id",
zap.String("id", container.ID),
zap.Error(err),
)
return pdata.NewMetrics(), err
}
return md, nil
return containerStats, err
}

func (dc *dockerClient) toStatsJSON(
func (dc *Client) toStatsJSON(
containerStats dtypes.ContainerStats,
container *DockerContainer,
container *Container,
) (*dtypes.StatsJSON, error) {
var statsJSON dtypes.StatsJSON
err := json.NewDecoder(containerStats.Body).Decode(&statsJSON)
Expand All @@ -195,7 +203,7 @@ func (dc *dockerClient) toStatsJSON(
return &statsJSON, nil
}

func (dc *dockerClient) ContainerEventLoop(ctx context.Context) {
func (dc *Client) ContainerEventLoop(ctx context.Context) {
filters := dfilters.NewArgs([]dfilters.KeyValuePair{
{Key: "type", Value: "container"},
{Key: "event", Value: "destroy"},
Expand Down Expand Up @@ -263,7 +271,7 @@ EVENT_LOOP:

// Queries inspect api and returns *ContainerJSON and true when container should be queried for stats,
// nil and false otherwise.
func (dc *dockerClient) inspectedContainerIsOfInterest(ctx context.Context, cid string) (*dtypes.ContainerJSON, bool) {
func (dc *Client) inspectedContainerIsOfInterest(ctx context.Context, cid string) (*dtypes.ContainerJSON, bool) {
inspectCtx, cancel := context.WithTimeout(ctx, dc.config.Timeout)
container, err := dc.client.ContainerInspect(inspectCtx, cid)
defer cancel()
Expand All @@ -279,7 +287,7 @@ func (dc *dockerClient) inspectedContainerIsOfInterest(ctx context.Context, cid
return nil, false
}

func (dc *dockerClient) persistContainer(containerJSON *dtypes.ContainerJSON) {
func (dc *Client) persistContainer(containerJSON *dtypes.ContainerJSON) {
if containerJSON == nil {
return
}
Expand All @@ -294,24 +302,24 @@ func (dc *dockerClient) persistContainer(containerJSON *dtypes.ContainerJSON) {
dc.logger.Debug("Monitoring Docker container", zap.String("id", cid))
dc.containersLock.Lock()
defer dc.containersLock.Unlock()
dc.containers[cid] = DockerContainer{
dc.containers[cid] = Container{
ContainerJSON: containerJSON,
EnvMap: containerEnvToMap(containerJSON.Config.Env),
EnvMap: ContainerEnvToMap(containerJSON.Config.Env),
}
}

func (dc *dockerClient) removeContainer(cid string) {
func (dc *Client) removeContainer(cid string) {
dc.containersLock.Lock()
defer dc.containersLock.Unlock()
delete(dc.containers, cid)
dc.logger.Debug("Removed container from stores.", zap.String("id", cid))
}

func (dc *dockerClient) shouldBeExcluded(image string) bool {
func (dc *Client) shouldBeExcluded(image string) bool {
return dc.excludedImageMatcher != nil && dc.excludedImageMatcher.Matches(image)
}

func containerEnvToMap(env []string) map[string]string {
func ContainerEnvToMap(env []string) map[string]string {
out := make(map[string]string, len(env))
for _, v := range env {
parts := strings.Split(v, "=")
Expand Down
Loading

0 comments on commit 95f5018

Please sign in to comment.