Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of driver/docker: Fix container CPU stats collection into release/1.9.x #24793

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24768.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
driver/docker: Fix container CPU stats collection where previous CPU stats were missing and causing incorrect calculations
```
70 changes: 47 additions & 23 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package docker
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -89,13 +90,18 @@ func (h *taskHandle) Stats(ctx context.Context, interval time.Duration, compute
return recvCh, nil
}

// collectStats starts collecting resource usage stats of a docker container
// collectStats starts collecting resource usage stats of a Docker container
// and does this until the context or the tasks handler done channel is closed.
func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration, compute cpustats.Compute) {
defer destCh.close()

// retry tracks the number of retries the collection has been through since
// the last successful Docker API call. This is used to calculate the
// backoff time for the collection ticker.
var retry uint64

ticker, cancel := helper.NewSafeTicker(interval)
defer cancel()
var stats *containerapi.Stats

for {
select {
Expand All @@ -104,30 +110,48 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
case <-h.doneCh:
return
case <-ticker.C:
// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot. This streaming stats can be reused over time,
// but require synchronization, which restricts the interval for the metrics.
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
stats, err := h.collectDockerStats(ctx)
switch err {
case nil:
resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
ticker.Reset(interval)
retry = 0
default:
h.logger.Error("error collecting stats from container", "error", err)
ticker.Reset(helper.Backoff(statsCollectorBackoffBaseline, statsCollectorBackoffLimit, retry))
retry++
}
}
}
}

err = json.NewDecoder(statsReader.Body).Decode(&stats)
statsReader.Body.Close()
if err != nil && err != io.EOF {
h.logger.Error("error decoding stats data from container", "error", err)
return
}
// collectDockerStats performs the stats collection from the Docker API. It is
// split into its own function for the purpose of aiding testing.
func (h *taskHandle) collectDockerStats(ctx context.Context) (*containerapi.Stats, error) {

if stats == nil {
h.logger.Error("error decoding stats data: stats were nil")
return
}
var stats *containerapi.Stats

resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, false)
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to collect stats: %w", err)
}

// Ensure the body is not nil to avoid potential panics. The statsReader
// itself cannot be nil, so there is no need to check this.
if statsReader.Body == nil {
return nil, errors.New("error decoding stats data: no reader body")
}

err = json.NewDecoder(statsReader.Body).Decode(&stats)
_ = statsReader.Body.Close()
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to decode Docker response: %w", err)
}

if stats == nil {
return nil, errors.New("error decoding stats data: stats were nil")
}

return stats, nil
}
59 changes: 59 additions & 0 deletions drivers/docker/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package docker

import (
"context"
"runtime"
"sync"
"testing"
"time"

containerapi "github.com/docker/docker/api/types/container"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/cpustats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/drivers/docker/util"
"github.com/shoenig/test/must"
)
Expand Down Expand Up @@ -112,3 +115,59 @@ func TestDriver_DockerUsageSender(t *testing.T) {
destCh.close()
destCh.send(res)
}

func Test_taskHandle_collectDockerStats(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)

// Start a Docker container and wait for it to be running, so we can
// guarantee stats generation.
driverCfg, dockerTaskConfig, _ := dockerTask(t)

must.NoError(t, driverCfg.EncodeConcreteDriverConfig(dockerTaskConfig))

_, driverHarness, handle, cleanup := dockerSetup(t, driverCfg, nil)
defer cleanup()
must.NoError(t, driverHarness.WaitUntilStarted(driverCfg.ID, 5*time.Second))

// Generate a context, so the test doesn't hang on Docker problems and
// execute a single collection of the stats.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

dockerStats, err := handle.collectDockerStats(ctx)
must.NoError(t, err)
must.NotNil(t, dockerStats)

// Ensure all the stats we use for calculating CPU percentages within
// DockerStatsToTaskResourceUsage are present and non-zero.
must.NonZero(t, dockerStats.CPUStats.CPUUsage.TotalUsage)
must.NonZero(t, dockerStats.CPUStats.CPUUsage.TotalUsage)

must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.TotalUsage)
must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.TotalUsage)

// System usage is only populated on Linux machines. GitHub Actions Windows
// runners do not have UsageInKernelmode or UsageInUsermode populated and
// these datapoints are not used by the Windows stats usage function. Also
// wrap the Linux specific memory stats.
if runtime.GOOS == "linux" {
must.NonZero(t, dockerStats.CPUStats.SystemUsage)
must.NonZero(t, dockerStats.CPUStats.CPUUsage.UsageInKernelmode)
must.NonZero(t, dockerStats.CPUStats.CPUUsage.UsageInUsermode)

must.NonZero(t, dockerStats.PreCPUStats.SystemUsage)
must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.UsageInKernelmode)
must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.UsageInUsermode)

must.NonZero(t, dockerStats.MemoryStats.Usage)
must.MapContainsKey(t, dockerStats.MemoryStats.Stats, "file_mapped")
}

// Test Windows specific memory stats are collected as and when expected.
if runtime.GOOS == "windows" {
must.NonZero(t, dockerStats.MemoryStats.PrivateWorkingSet)
must.NonZero(t, dockerStats.MemoryStats.Commit)
must.NonZero(t, dockerStats.MemoryStats.CommitPeak)
}
}
Loading