Skip to content

Commit

Permalink
Unify Runner dynamic launching and reloading under RunnerList (#7172)
Browse files Browse the repository at this point in the history
This PR unifies all uses of dynamic runner handling under a new `RunnerList` structure. It can receive a list of configs as the desired state and do all the needed changes (start/stopping) to transition to it.

I plan to use this as part of #7028
  • Loading branch information
exekias authored and ruflin committed May 29, 2018
1 parent 81c8671 commit aed5529
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 278 deletions.
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_autodiscover.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def test_docker(self):
docker_client.images.pull('busybox')
docker_client.containers.run('busybox', 'sleep 1')

self.wait_until(lambda: self.log_contains('Autodiscover starting runner: input'))
self.wait_until(lambda: self.log_contains('Autodiscover stopping runner: input'))
self.wait_until(lambda: self.log_contains('Starting runner: input'))
self.wait_until(lambda: self.log_contains('Stopping runner: input'))

output = self.read_output_json()
proc.check_kill_and_wait()
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_reload_config_prospector_deprecated(self):
f.write(inputConfigTemplate.format(self.working_dir + "/logs/test2.log"))

self.wait_until(
lambda: self.log_contains_count("New runner started") == 2,
lambda: self.log_contains_count("Starting runner") == 2,
max_timeout=15)

# Add new log line and see if it is picked up = new input is running
Expand Down
10 changes: 5 additions & 5 deletions filebeat/tests/system/test_reload_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_start_stop(self):

# Wait until input is stopped
self.wait_until(
lambda: self.log_contains("Runner stopped:"),
lambda: self.log_contains("Stopping runner:"),
max_timeout=15)

with open(logfile, 'a') as f:
Expand Down Expand Up @@ -114,7 +114,7 @@ def test_start_stop_replace(self):

# Wait until input is stopped
self.wait_until(
lambda: self.log_contains("Runner stopped:"),
lambda: self.log_contains("Stopping runner:"),
max_timeout=15)

with open(self.working_dir + "/configs/input.yml", 'w') as f:
Expand Down Expand Up @@ -177,7 +177,7 @@ def test_reload_same_input(self):

# Wait until input is stopped
self.wait_until(
lambda: self.log_contains("Runner stopped:"),
lambda: self.log_contains("Stopping runner:"),
max_timeout=15)

# Update both log files, only 1 change should be picke dup
Expand Down Expand Up @@ -285,7 +285,7 @@ def test_reload_same_config(self):

# Wait until old runner is stopped
self.wait_until(
lambda: self.log_contains("Runner stopped:"),
lambda: self.log_contains("Stopping runner:"),
max_timeout=15)

# Add new log line and see if it is picked up = new input is running
Expand Down Expand Up @@ -325,7 +325,7 @@ def test_reload_add(self):
f.write(inputConfigTemplate.format(self.working_dir + "/logs/test2.log"))

self.wait_until(
lambda: self.log_contains_count("New runner started") == 2,
lambda: self.log_contains_count("Starting runner:") == 2,
max_timeout=15)

# Add new log line and see if it is picked up = new input is running
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_reload_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def test_start_stop(self):

# Wait until input is stopped
self.wait_until(
lambda: self.log_contains("Runner stopped:"),
lambda: self.log_contains("Stopping runner:"),
max_timeout=15)

with open(logfile, 'a') as f:
Expand Down
120 changes: 68 additions & 52 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package autodiscover

import (
"time"

"github.com/elastic/beats/libbeat/autodiscover/meta"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/logp"

"github.com/mitchellh/hashstructure"
)

const debugK = "autodiscover"
const (
debugK = "autodiscover"

// If a config reload fails after a new event, a new reload will be run after this period
retryPeriod = 10 * time.Second
)

// TODO autodiscover providers config reload

Expand All @@ -38,7 +43,8 @@ type Autodiscover struct {
defaultPipeline beat.Pipeline
adapter Adapter
providers []Provider
runners *cfgfile.Registry
configs map[uint64]*cfgfile.ConfigWithMeta
runners *cfgfile.RunnerList
meta *meta.Map

listener bus.Listener
Expand All @@ -64,7 +70,8 @@ func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, confi
bus: bus,
defaultPipeline: pipeline,
adapter: adapter,
runners: cfgfile.NewRegistry(),
configs: map[uint64]*cfgfile.ConfigWithMeta{},
runners: cfgfile.NewRunnerList("autodiscover", adapter, pipeline),
providers: providers,
meta: meta.NewMap(),
}, nil
Expand All @@ -87,38 +94,59 @@ func (a *Autodiscover) Start() {
}

func (a *Autodiscover) worker() {
for event := range a.listener.Events() {
// This will happen on Stop:
if event == nil {
return
var updated, retry bool

for {
select {
case event := <-a.listener.Events():
// This will happen on Stop:
if event == nil {
return
}

if _, ok := event["start"]; ok {
updated = a.handleStart(event)
}
if _, ok := event["stop"]; ok {
updated = a.handleStop(event)
}

case <-time.After(retryPeriod):
}

if _, ok := event["start"]; ok {
a.handleStart(event)
}
if _, ok := event["stop"]; ok {
a.handleStop(event)
if updated || retry {
if retry {
logp.Debug(debugK, "Reloading existing autodiscover configs after error")
}

configs := make([]*cfgfile.ConfigWithMeta, 0, len(a.configs))
for _, c := range a.configs {
configs = append(configs, c)
}

err := a.runners.Reload(configs)

// On error, make sure the next run also updates because some runners were not properly loaded
retry = err != nil
// reset updated status
updated = false
}
}
}

func (a *Autodiscover) handleStart(event bus.Event) {
func (a *Autodiscover) handleStart(event bus.Event) bool {
var updated bool

configs, err := a.adapter.CreateConfig(event)
if err != nil {
logp.Debug(debugK, "Could not generate config from event %v: %v", event, err)
return
return false
}
logp.Debug(debugK, "Got a start event: %v, generated configs: %+v", event, configs)

meta := getMeta(event)
for _, config := range configs {
rawCfg := map[string]interface{}{}
if err := config.Unpack(rawCfg); err != nil {
logp.Debug(debugK, "Error unpacking config: %v", err)
continue
}

hash, err := hashstructure.Hash(rawCfg, nil)
hash, err := cfgfile.HashConfig(config)
if err != nil {
logp.Debug(debugK, "Could not hash config %v: %v", config, err)
continue
Expand All @@ -138,34 +166,28 @@ func (a *Autodiscover) handleStart(event bus.Event) {
continue
}

runner, err := a.adapter.Create(a.defaultPipeline, config, &dynFields)
if err != nil {
logp.Debug(debugK, "Failed to create runner with config %v: %v", config, err)
continue
a.configs[hash] = &cfgfile.ConfigWithMeta{
Config: config,
Meta: &dynFields,
}

logp.Info("Autodiscover starting runner: %s", runner)
a.runners.Add(hash, runner)
runner.Start()
updated = true
}

return updated
}

func (a *Autodiscover) handleStop(event bus.Event) {
func (a *Autodiscover) handleStop(event bus.Event) bool {
var updated bool

configs, err := a.adapter.CreateConfig(event)
if err != nil {
logp.Debug(debugK, "Could not generate config from event %v: %v", event, err)
return
return false
}
logp.Debug(debugK, "Got a stop event: %v, generated configs: %+v", event, configs)

for _, config := range configs {
rawCfg := map[string]interface{}{}
if err := config.Unpack(rawCfg); err != nil {
logp.Debug(debugK, "Error unpacking config: %v", err)
continue
}

hash, err := hashstructure.Hash(rawCfg, nil)
hash, err := cfgfile.HashConfig(config)
if err != nil {
logp.Debug(debugK, "Could not hash config %v: %v", config, err)
continue
Expand All @@ -176,18 +198,15 @@ func (a *Autodiscover) handleStop(event bus.Event) {
continue
}

if runner := a.runners.Get(hash); runner != nil {
// Stop can block, we run it asyncrhonously to avoid blocking
// the whole events loop. The runner hash is removed in any case
// so an equivalent configuration can be added again while this
// one stops, and a duplicated event don't try to stop it twice.
logp.Info("Autodiscover stopping runner: %s", runner)
go runner.Stop()
a.runners.Remove(hash)
if a.runners.Has(hash) {
delete(a.configs, hash)
updated = true
} else {
logp.Debug(debugK, "Runner not found for stopping: %s", hash)
}
}

return updated
}

func getMeta(event bus.Event) common.MapStr {
Expand Down Expand Up @@ -220,9 +239,6 @@ func (a *Autodiscover) Stop() {
}

// Stop runners
for hash, runner := range a.runners.CopyList() {
runner.Stop()
a.meta.Remove(hash)
}
a.runners.Stop()
logp.Info("Stopped autodiscover manager")
}
5 changes: 5 additions & 0 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func (m *mockRunner) Clone() *mockRunner {
stopped: m.stopped,
}
}
func (m *mockRunner) String() string {
m.mutex.Lock()
defer m.mutex.Unlock()
return "runner"
}

type mockAdapter struct {
mutex sync.Mutex
Expand Down
Loading

0 comments on commit aed5529

Please sign in to comment.