Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick to 5.2: Fix docker hanging when container killed #3634

Merged
merged 1 commit into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ https://github.com/elastic/beats/compare/v5.2.0...master[Check the HEAD diff]

*Metricbeat*

- Fix go routine leak in docker module. {pull}3492[3492]
- Fix bug docker module hanging when docker container killed. {issue}3610[3610]
- Set timeout to period instead of 1s by default as documented.

*Packetbeat*

*Winlogbeat*
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ metricbeat.modules:
cpu_ticks: true
----

It is strongly recommended to not run docker metricsets with a period smaller then 3 seconds. The request to the docker
API already takes up to 2s seconds. Otherwise all the requests would timeout and no data is reported.

[float]
=== Dashboard

Expand Down
5 changes: 5 additions & 0 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func newBaseModuleFromConfig(rawConfig *common.Config) (BaseModule, error) {
return baseModule, err
}

// If timeout is not set, timeout is set to the same value as period
if baseModule.config.Timeout == 0 {
baseModule.config.Timeout = baseModule.config.Period
}

baseModule.name = strings.ToLower(baseModule.config.Module)

err = mustNotContainDuplicates(baseModule.config.Hosts)
Expand Down
1 change: 0 additions & 1 deletion metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ func (c ModuleConfig) GoString() string { return c.String() }
var defaultModuleConfig = ModuleConfig{
Enabled: true,
Period: time.Second * 10,
Timeout: time.Second,
}

// DefaultModuleConfig returns a ModuleConfig with the default values populated.
Expand Down
6 changes: 3 additions & 3 deletions metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestModuleConfig(t *testing.T) {
MetricSets: []string{"test"},
Enabled: true,
Period: time.Second * 10,
Timeout: time.Second,
Timeout: 0,
},
},
{
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestModuleConfigDefaults(t *testing.T) {

assert.Equal(t, true, mc.Enabled)
assert.Equal(t, time.Second*10, mc.Period)
assert.Equal(t, time.Second, mc.Timeout)
assert.Equal(t, time.Second*0, mc.Timeout)
assert.Empty(t, mc.Hosts)
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestNewBaseModuleFromModuleConfigStruct(t *testing.T) {
assert.Equal(t, moduleName, baseModule.Config().Module)
assert.Equal(t, true, baseModule.Config().Enabled)
assert.Equal(t, time.Second*10, baseModule.Config().Period)
assert.Equal(t, time.Second, baseModule.Config().Timeout)
assert.Equal(t, time.Second*10, baseModule.Config().Timeout)
assert.Empty(t, baseModule.Config().Hosts)
}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch returns a list of docker CPU stats.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/diskio/diskio.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch creates list of events with diskio stats for all containers.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
21 changes: 13 additions & 8 deletions metricbeat/module/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"

"time"

"github.com/fsouza/go-dockerclient"
)

Expand Down Expand Up @@ -58,7 +60,7 @@ func NewDockerClient(endpoint string, config Config) (*docker.Client, error) {
}

// FetchStats returns a list of running containers with all related stats inside
func FetchStats(client *docker.Client) ([]Stat, error) {
func FetchStats(client *docker.Client, timeout time.Duration) ([]Stat, error) {
containers, err := client.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
Expand All @@ -67,24 +69,27 @@ func FetchStats(client *docker.Client) ([]Stat, error) {
var wg sync.WaitGroup

containersList := make([]Stat, 0, len(containers))
queue := make(chan Stat, 1)
statsQueue := make(chan Stat, 1)
wg.Add(len(containers))

for _, container := range containers {
go func(container docker.APIContainers) {
defer wg.Done()
queue <- exportContainerStats(client, &container)
statsQueue <- exportContainerStats(client, &container, timeout)
}(container)
}

go func() {
wg.Wait()
close(queue)
close(statsQueue)
}()

// This will break after the queue has been drained and queue is closed.
for container := range queue {
containersList = append(containersList, container)
for stat := range statsQueue {
// If names is empty, there is not data inside
if len(stat.Container.Names) != 0 {
containersList = append(containersList, stat)
}
}

return containersList, err
Expand All @@ -95,7 +100,7 @@ func FetchStats(client *docker.Client) ([]Stat, error) {
// This is currently very inefficient as docker calculates the average for each request,
// means each request will take at least 2s: https://github.com/docker/docker/blob/master/cli/command/container/stats_helpers.go#L148
// Getting all stats at once is implemented here: https://github.com/docker/docker/pull/25361
func exportContainerStats(client *docker.Client, container *docker.APIContainers) Stat {
func exportContainerStats(client *docker.Client, container *docker.APIContainers, timeout time.Duration) Stat {
var wg sync.WaitGroup
var event Stat

Expand All @@ -105,7 +110,7 @@ func exportContainerStats(client *docker.Client, container *docker.APIContainers
ID: container.ID,
Stats: statsC,
Stream: false,
Timeout: -1,
Timeout: timeout,
}

wg.Add(2)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch creates a list of memory events for each container.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/docker/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch methods creates a list of network events for each container.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
stats, err := docker.FetchStats(m.dockerClient)
stats, err := docker.FetchStats(m.dockerClient, m.Module().Config().Timeout)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/system/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ metricbeat.modules:
cpu_ticks: true
----

It is strongly recommended to not run docker metricsets with a period smaller then 3 seconds. The request to the docker
API already takes up to 2s seconds. Otherwise all the requests would timeout and no data is reported.

[float]
=== Dashboard

Expand Down
13 changes: 7 additions & 6 deletions metricbeat/tests/system/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_container_fields(self):
"name": "docker",
"metricsets": ["container"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s",
"period": "10s",
}])

proc = self.start_beat()
Expand All @@ -40,7 +40,7 @@ def test_cpu_fields(self):
"name": "docker",
"metricsets": ["cpu"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_diskio_fields(self):
"name": "docker",
"metricsets": ["diskio"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -97,7 +97,7 @@ def test_info_fields(self):
"name": "docker",
"metricsets": ["info"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -122,7 +122,7 @@ def test_memory_fields(self):
"name": "docker",
"metricsets": ["memory"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -148,7 +148,7 @@ def test_network_fields(self):
"name": "docker",
"metricsets": ["network"],
"hosts": ["unix:///var/run/docker.sock"],
"period": "1s"
"period": "10s"
}])

proc = self.start_beat()
Expand All @@ -157,6 +157,7 @@ def test_network_fields(self):

# Ensure no errors or warnings exist in the log.
log = self.get_log()

self.assertNotRegexpMatches(log.replace("WARN EXPERIMENTAL", ""), "ERR|WARN")

output = self.read_output_json()
Expand Down