From aed5529a5936b4ab0c87bc4c1bd31c4e38b893fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Tue, 29 May 2018 16:51:09 +0200 Subject: [PATCH] Unify Runner dynamic launching and reloading under RunnerList (#7172) This PR unifies all uses of dynamic runner handling under a new `RunnerList` structure. It can receive a list of configs as the desired state and do all the needed changes (start/stopping) to transition to it. I plan to use this as part of #7028 --- filebeat/tests/system/test_autodiscover.py | 4 +- filebeat/tests/system/test_deprecated.py | 2 +- filebeat/tests/system/test_reload_inputs.py | 10 +- filebeat/tests/system/test_reload_modules.py | 2 +- libbeat/autodiscover/autodiscover.go | 120 ++++++++------ libbeat/autodiscover/autodiscover_test.go | 5 + libbeat/cfgfile/list.go | 147 ++++++++++++++++ libbeat/cfgfile/list_test.go | 166 +++++++++++++++++++ libbeat/cfgfile/registry.go | 60 ------- libbeat/cfgfile/registry_test.go | 48 ------ libbeat/cfgfile/reload.go | 125 +++----------- metricbeat/tests/system/test_autodiscover.py | 8 +- metricbeat/tests/system/test_reload.py | 4 +- 13 files changed, 423 insertions(+), 278 deletions(-) create mode 100644 libbeat/cfgfile/list.go create mode 100644 libbeat/cfgfile/list_test.go delete mode 100644 libbeat/cfgfile/registry.go delete mode 100644 libbeat/cfgfile/registry_test.go diff --git a/filebeat/tests/system/test_autodiscover.py b/filebeat/tests/system/test_autodiscover.py index b2fcd4810a5d..a85a0bf60482 100644 --- a/filebeat/tests/system/test_autodiscover.py +++ b/filebeat/tests/system/test_autodiscover.py @@ -42,8 +42,8 @@ def test_docker(self): docker_client.images.pull('busybox') docker_client.containers.run('busybox', 'sleep 1') - self.wait_until(lambda: self.log_contains('Autodiscover starting runner: input')) - self.wait_until(lambda: self.log_contains('Autodiscover stopping runner: input')) + self.wait_until(lambda: self.log_contains('Starting runner: input')) + self.wait_until(lambda: self.log_contains('Stopping runner: input')) output = self.read_output_json() proc.check_kill_and_wait() diff --git a/filebeat/tests/system/test_deprecated.py b/filebeat/tests/system/test_deprecated.py index be764d8c84aa..16e75d736e70 100644 --- a/filebeat/tests/system/test_deprecated.py +++ b/filebeat/tests/system/test_deprecated.py @@ -102,7 +102,7 @@ def test_reload_config_prospector_deprecated(self): f.write(inputConfigTemplate.format(self.working_dir + "/logs/test2.log")) self.wait_until( - lambda: self.log_contains_count("New runner started") == 2, + lambda: self.log_contains_count("Starting runner") == 2, max_timeout=15) # Add new log line and see if it is picked up = new input is running diff --git a/filebeat/tests/system/test_reload_inputs.py b/filebeat/tests/system/test_reload_inputs.py index cee9c8742a16..e0757cfa2914 100644 --- a/filebeat/tests/system/test_reload_inputs.py +++ b/filebeat/tests/system/test_reload_inputs.py @@ -68,7 +68,7 @@ def test_start_stop(self): # Wait until input is stopped self.wait_until( - lambda: self.log_contains("Runner stopped:"), + lambda: self.log_contains("Stopping runner:"), max_timeout=15) with open(logfile, 'a') as f: @@ -114,7 +114,7 @@ def test_start_stop_replace(self): # Wait until input is stopped self.wait_until( - lambda: self.log_contains("Runner stopped:"), + lambda: self.log_contains("Stopping runner:"), max_timeout=15) with open(self.working_dir + "/configs/input.yml", 'w') as f: @@ -177,7 +177,7 @@ def test_reload_same_input(self): # Wait until input is stopped self.wait_until( - lambda: self.log_contains("Runner stopped:"), + lambda: self.log_contains("Stopping runner:"), max_timeout=15) # Update both log files, only 1 change should be picke dup @@ -285,7 +285,7 @@ def test_reload_same_config(self): # Wait until old runner is stopped self.wait_until( - lambda: self.log_contains("Runner stopped:"), + lambda: self.log_contains("Stopping runner:"), max_timeout=15) # Add new log line and see if it is picked up = new input is running @@ -325,7 +325,7 @@ def test_reload_add(self): f.write(inputConfigTemplate.format(self.working_dir + "/logs/test2.log")) self.wait_until( - lambda: self.log_contains_count("New runner started") == 2, + lambda: self.log_contains_count("Starting runner:") == 2, max_timeout=15) # Add new log line and see if it is picked up = new input is running diff --git a/filebeat/tests/system/test_reload_modules.py b/filebeat/tests/system/test_reload_modules.py index 2ecbc7a7f30a..3c97ed0b2730 100644 --- a/filebeat/tests/system/test_reload_modules.py +++ b/filebeat/tests/system/test_reload_modules.py @@ -151,7 +151,7 @@ def test_start_stop(self): # Wait until input is stopped self.wait_until( - lambda: self.log_contains("Runner stopped:"), + lambda: self.log_contains("Stopping runner:"), max_timeout=15) with open(logfile, 'a') as f: diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 8cdf27fcaf7e..4856eb15d35f 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -1,17 +1,22 @@ package autodiscover import ( + "time" + "github.com/elastic/beats/libbeat/autodiscover/meta" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/logp" - - "github.com/mitchellh/hashstructure" ) -const debugK = "autodiscover" +const ( + debugK = "autodiscover" + + // If a config reload fails after a new event, a new reload will be run after this period + retryPeriod = 10 * time.Second +) // TODO autodiscover providers config reload @@ -38,7 +43,8 @@ type Autodiscover struct { defaultPipeline beat.Pipeline adapter Adapter providers []Provider - runners *cfgfile.Registry + configs map[uint64]*cfgfile.ConfigWithMeta + runners *cfgfile.RunnerList meta *meta.Map listener bus.Listener @@ -64,7 +70,8 @@ func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, confi bus: bus, defaultPipeline: pipeline, adapter: adapter, - runners: cfgfile.NewRegistry(), + configs: map[uint64]*cfgfile.ConfigWithMeta{}, + runners: cfgfile.NewRunnerList("autodiscover", adapter, pipeline), providers: providers, meta: meta.NewMap(), }, nil @@ -87,38 +94,59 @@ func (a *Autodiscover) Start() { } func (a *Autodiscover) worker() { - for event := range a.listener.Events() { - // This will happen on Stop: - if event == nil { - return + var updated, retry bool + + for { + select { + case event := <-a.listener.Events(): + // This will happen on Stop: + if event == nil { + return + } + + if _, ok := event["start"]; ok { + updated = a.handleStart(event) + } + if _, ok := event["stop"]; ok { + updated = a.handleStop(event) + } + + case <-time.After(retryPeriod): } - if _, ok := event["start"]; ok { - a.handleStart(event) - } - if _, ok := event["stop"]; ok { - a.handleStop(event) + if updated || retry { + if retry { + logp.Debug(debugK, "Reloading existing autodiscover configs after error") + } + + configs := make([]*cfgfile.ConfigWithMeta, 0, len(a.configs)) + for _, c := range a.configs { + configs = append(configs, c) + } + + err := a.runners.Reload(configs) + + // On error, make sure the next run also updates because some runners were not properly loaded + retry = err != nil + // reset updated status + updated = false } } } -func (a *Autodiscover) handleStart(event bus.Event) { +func (a *Autodiscover) handleStart(event bus.Event) bool { + var updated bool + configs, err := a.adapter.CreateConfig(event) if err != nil { logp.Debug(debugK, "Could not generate config from event %v: %v", event, err) - return + return false } logp.Debug(debugK, "Got a start event: %v, generated configs: %+v", event, configs) meta := getMeta(event) for _, config := range configs { - rawCfg := map[string]interface{}{} - if err := config.Unpack(rawCfg); err != nil { - logp.Debug(debugK, "Error unpacking config: %v", err) - continue - } - - hash, err := hashstructure.Hash(rawCfg, nil) + hash, err := cfgfile.HashConfig(config) if err != nil { logp.Debug(debugK, "Could not hash config %v: %v", config, err) continue @@ -138,34 +166,28 @@ func (a *Autodiscover) handleStart(event bus.Event) { continue } - runner, err := a.adapter.Create(a.defaultPipeline, config, &dynFields) - if err != nil { - logp.Debug(debugK, "Failed to create runner with config %v: %v", config, err) - continue + a.configs[hash] = &cfgfile.ConfigWithMeta{ + Config: config, + Meta: &dynFields, } - - logp.Info("Autodiscover starting runner: %s", runner) - a.runners.Add(hash, runner) - runner.Start() + updated = true } + + return updated } -func (a *Autodiscover) handleStop(event bus.Event) { +func (a *Autodiscover) handleStop(event bus.Event) bool { + var updated bool + configs, err := a.adapter.CreateConfig(event) if err != nil { logp.Debug(debugK, "Could not generate config from event %v: %v", event, err) - return + return false } logp.Debug(debugK, "Got a stop event: %v, generated configs: %+v", event, configs) for _, config := range configs { - rawCfg := map[string]interface{}{} - if err := config.Unpack(rawCfg); err != nil { - logp.Debug(debugK, "Error unpacking config: %v", err) - continue - } - - hash, err := hashstructure.Hash(rawCfg, nil) + hash, err := cfgfile.HashConfig(config) if err != nil { logp.Debug(debugK, "Could not hash config %v: %v", config, err) continue @@ -176,18 +198,15 @@ func (a *Autodiscover) handleStop(event bus.Event) { continue } - if runner := a.runners.Get(hash); runner != nil { - // Stop can block, we run it asyncrhonously to avoid blocking - // the whole events loop. The runner hash is removed in any case - // so an equivalent configuration can be added again while this - // one stops, and a duplicated event don't try to stop it twice. - logp.Info("Autodiscover stopping runner: %s", runner) - go runner.Stop() - a.runners.Remove(hash) + if a.runners.Has(hash) { + delete(a.configs, hash) + updated = true } else { logp.Debug(debugK, "Runner not found for stopping: %s", hash) } } + + return updated } func getMeta(event bus.Event) common.MapStr { @@ -220,9 +239,6 @@ func (a *Autodiscover) Stop() { } // Stop runners - for hash, runner := range a.runners.CopyList() { - runner.Stop() - a.meta.Remove(hash) - } + a.runners.Stop() logp.Info("Stopped autodiscover manager") } diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index a771d98d47ff..4c8236d06664 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -40,6 +40,11 @@ func (m *mockRunner) Clone() *mockRunner { stopped: m.stopped, } } +func (m *mockRunner) String() string { + m.mutex.Lock() + defer m.mutex.Unlock() + return "runner" +} type mockAdapter struct { mutex sync.Mutex diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go new file mode 100644 index 000000000000..d1eef775e250 --- /dev/null +++ b/libbeat/cfgfile/list.go @@ -0,0 +1,147 @@ +package cfgfile + +import ( + "sync" + + "github.com/joeshaw/multierror" + "github.com/mitchellh/hashstructure" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +// RunnerList implements a reloadable.List of Runners +type RunnerList struct { + runners map[uint64]Runner + mutex sync.RWMutex + factory RunnerFactory + pipeline beat.Pipeline + logger *logp.Logger +} + +// ConfigWithMeta holds a pair of common.Config and optional metadata for it +type ConfigWithMeta struct { + // Config to store + Config *common.Config + + // Meta data related to this config + Meta *common.MapStrPointer +} + +// NewRunnerList builds and returns a RunnerList +func NewRunnerList(name string, factory RunnerFactory, pipeline beat.Pipeline) *RunnerList { + return &RunnerList{ + runners: map[uint64]Runner{}, + factory: factory, + pipeline: pipeline, + logger: logp.NewLogger(name), + } +} + +// Reload the list of runners to match the given state +func (r *RunnerList) Reload(configs []*ConfigWithMeta) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + var errs multierror.Errors + + startList := map[uint64]*ConfigWithMeta{} + stopList := r.copyRunnerList() + + r.logger.Debugf("Starting reload procedure, current runners: %d", len(stopList)) + + // diff current & desired state, create action lists + for _, config := range configs { + hash, err := HashConfig(config.Config) + if err != nil { + r.logger.Errorf("Unable to hash given config: %s", err) + errs = append(errs, errors.Wrap(err, "Unable to hash given config")) + continue + } + + if _, ok := stopList[hash]; ok { + delete(stopList, hash) + } else { + startList[hash] = config + } + } + + r.logger.Debugf("Start list: %d, Stop list: %d", len(startList), len(stopList)) + + // Stop removed runners + for hash, runner := range stopList { + r.logger.Debugf("Stopping runner: %s", runner) + delete(r.runners, hash) + go runner.Stop() + } + + // Start new runners + for hash, config := range startList { + runner, err := r.factory.Create(r.pipeline, config.Config, config.Meta) + if err != nil { + r.logger.Errorf("Error creating runner from config: %s", err) + errs = append(errs, errors.Wrap(err, "Error creating runner from config")) + continue + } + + r.logger.Debugf("Starting runner: %s", runner) + r.runners[hash] = runner + runner.Start() + } + + return errs.Err() +} + +// Stop all runners +func (r *RunnerList) Stop() { + r.mutex.Lock() + defer r.mutex.Unlock() + + if len(r.runners) == 0 { + return + } + + r.logger.Infof("Stopping %v runners ...", len(r.runners)) + + wg := sync.WaitGroup{} + for hash, runner := range r.copyRunnerList() { + wg.Add(1) + + delete(r.runners, hash) + + // Stop modules in parallel + go func(h uint64, run Runner) { + defer wg.Done() + r.logger.Debugf("Stopping runner: %s", run) + run.Stop() + r.logger.Debugf("Stopped runner: %s", run) + }(hash, runner) + } + + wg.Wait() +} + +// Has returns true if a runner with the given hash is running +func (r *RunnerList) Has(hash uint64) bool { + r.mutex.RLock() + defer r.mutex.RUnlock() + _, ok := r.runners[hash] + return ok +} + +// HashConfig hashes a given common.Config +func HashConfig(c *common.Config) (uint64, error) { + var config map[string]interface{} + c.Unpack(&config) + return hashstructure.Hash(config, nil) +} + +func (r *RunnerList) copyRunnerList() map[uint64]Runner { + list := make(map[uint64]Runner, len(r.runners)) + for k, v := range r.runners { + list[k] = v + } + return list +} diff --git a/libbeat/cfgfile/list_test.go b/libbeat/cfgfile/list_test.go new file mode 100644 index 000000000000..390340aaaf99 --- /dev/null +++ b/libbeat/cfgfile/list_test.go @@ -0,0 +1,166 @@ +package cfgfile + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +type runner struct { + id int64 + started bool + stopped bool +} + +func (r *runner) Start() { r.started = true } +func (r *runner) Stop() { r.stopped = true } + +type runnerFactory struct{ runners []*runner } + +func (r *runnerFactory) Create(x beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (Runner, error) { + config := struct { + ID int64 `config:"id"` + }{} + + err := c.Unpack(&config) + if err != nil { + return nil, err + } + + // id < 0 is an invalid config + if config.ID < 0 { + return nil, errors.New("Invalid config") + } + + runner := &runner{id: config.ID} + r.runners = append(r.runners, runner) + return runner, err +} + +func TestNewConfigs(t *testing.T) { + factory := &runnerFactory{} + list := NewRunnerList("", factory, nil) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(2), + createConfig(3), + }) + + assert.Equal(t, len(list.copyRunnerList()), 3) +} + +func TestReloadSameConfigs(t *testing.T) { + factory := &runnerFactory{} + list := NewRunnerList("", factory, nil) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(2), + createConfig(3), + }) + + state := list.copyRunnerList() + assert.Equal(t, len(state), 3) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(2), + createConfig(3), + }) + + // nothing changed + assert.Equal(t, state, list.copyRunnerList()) +} + +func TestReloadStopConfigs(t *testing.T) { + factory := &runnerFactory{} + list := NewRunnerList("", factory, nil) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(2), + createConfig(3), + }) + + assert.Equal(t, len(list.copyRunnerList()), 3) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(3), + }) + + assert.Equal(t, len(list.copyRunnerList()), 2) +} + +func TestReloadStartStopConfigs(t *testing.T) { + factory := &runnerFactory{} + list := NewRunnerList("", factory, nil) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(2), + createConfig(3), + }) + + state := list.copyRunnerList() + assert.Equal(t, len(state), 3) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(3), + createConfig(4), + }) + + assert.Equal(t, len(list.copyRunnerList()), 3) + assert.NotEqual(t, state, list.copyRunnerList()) +} + +func TestStopAll(t *testing.T) { + factory := &runnerFactory{} + list := NewRunnerList("", factory, nil) + + list.Reload([]*ConfigWithMeta{ + createConfig(1), + createConfig(2), + createConfig(3), + }) + + assert.Equal(t, len(list.copyRunnerList()), 3) + list.Stop() + assert.Equal(t, len(list.copyRunnerList()), 0) + + for _, r := range list.runners { + assert.False(t, r.(*runner).stopped) + } +} + +func TestHas(t *testing.T) { + factory := &runnerFactory{} + list := NewRunnerList("", factory, nil) + config := createConfig(1) + + hash, err := HashConfig(config.Config) + if err != nil { + t.Fatal(err) + } + + list.Reload([]*ConfigWithMeta{ + config, + }) + + assert.True(t, list.Has(hash)) + assert.False(t, list.Has(0)) +} + +func createConfig(id int64) *ConfigWithMeta { + c := common.NewConfig() + c.SetInt("id", -1, id) + return &ConfigWithMeta{ + Config: c, + } +} diff --git a/libbeat/cfgfile/registry.go b/libbeat/cfgfile/registry.go deleted file mode 100644 index 237136dc6302..000000000000 --- a/libbeat/cfgfile/registry.go +++ /dev/null @@ -1,60 +0,0 @@ -package cfgfile - -import "sync" - -// Registry holds a list of Runners mapped by their unique hashes -type Registry struct { - sync.RWMutex - List map[uint64]Runner -} - -// NewRegistry returns a new empty Registry -func NewRegistry() *Registry { - return &Registry{ - List: map[uint64]Runner{}, - } -} - -// Add the given Runner to the list, indexed by a hash -func (r *Registry) Add(hash uint64, m Runner) { - r.Lock() - defer r.Unlock() - r.List[hash] = m -} - -// Remove the Runner with the given hash from the list -func (r *Registry) Remove(hash uint64) { - r.Lock() - defer r.Unlock() - delete(r.List, hash) -} - -// Has returns true if there is a Runner with the given hash -func (r *Registry) Has(hash uint64) bool { - r.RLock() - defer r.RUnlock() - - _, ok := r.List[hash] - return ok -} - -// Get returns the Runner with the given hash, or nil if it doesn't exist -func (r *Registry) Get(hash uint64) Runner { - r.RLock() - defer r.RUnlock() - - return r.List[hash] -} - -// CopyList returns a static copy of the Runners map -func (r *Registry) CopyList() map[uint64]Runner { - r.RLock() - defer r.RUnlock() - - // Create a copy of the list - list := map[uint64]Runner{} - for k, v := range r.List { - list[k] = v - } - return list -} diff --git a/libbeat/cfgfile/registry_test.go b/libbeat/cfgfile/registry_test.go deleted file mode 100644 index 1f04ebd96d7e..000000000000 --- a/libbeat/cfgfile/registry_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package cfgfile - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -type runner struct{ id int } - -func (runner) Start() {} -func (runner) Stop() {} - -func TestRegistryHas(t *testing.T) { - registry := NewRegistry() - - registry.Add(1, runner{1}) - assert.True(t, registry.Has(1)) - assert.False(t, registry.Has(0)) -} - -func TestRegistryRemove(t *testing.T) { - registry := NewRegistry() - - registry.Add(1, runner{1}) - assert.True(t, registry.Has(1)) - - registry.Remove(1) - assert.False(t, registry.Has(1)) -} - -func TestRegistryGet(t *testing.T) { - registry := NewRegistry() - - registry.Add(1, runner{1}) - assert.Equal(t, registry.Get(1), runner{1}) -} - -func TestRegistryCopyList(t *testing.T) { - registry := NewRegistry() - - registry.Add(1, runner{1}) - registry.Add(2, runner{2}) - - list := registry.CopyList() - registry.Remove(1) - assert.Equal(t, len(list), 2) -} diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 9b549e402bdf..228b4b42cdcb 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -6,7 +6,6 @@ import ( "time" "github.com/joeshaw/multierror" - "github.com/mitchellh/hashstructure" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/beat" @@ -56,12 +55,12 @@ type Runner interface { // Reloader is used to register and reload modules type Reloader struct { - pipeline beat.Pipeline - registry *Registry - config DynamicConfig - path string - done chan struct{} - wg sync.WaitGroup + pipeline beat.Pipeline + runnerFactory RunnerFactory + config DynamicConfig + path string + done chan struct{} + wg sync.WaitGroup } // NewReloader creates new Reloader instance for the given config @@ -76,7 +75,6 @@ func NewReloader(pipeline beat.Pipeline, cfg *common.Config) *Reloader { return &Reloader{ pipeline: pipeline, - registry: NewRegistry(), config: config, path: path, done: make(chan struct{}), @@ -109,10 +107,10 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { // Initialize modules for _, c := range configs { // Only add configs to startList which are enabled - if !c.Enabled() { + if !c.Config.Enabled() { continue } - _, err := runnerFactory.Create(rl.pipeline, c, nil) + _, err := runnerFactory.Create(rl.pipeline, c.Config, c.Meta) if err != nil { return err } @@ -124,11 +122,13 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { func (rl *Reloader) Run(runnerFactory RunnerFactory) { logp.Info("Config reloader started") + list := NewRunnerList("reload", runnerFactory, rl.pipeline) + rl.wg.Add(1) defer rl.wg.Done() // Stop all running modules when method finishes - defer rl.stopRunners(rl.registry.CopyList()) + defer list.Stop() gw := NewGlobWatcher(rl.path) @@ -144,8 +144,8 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { case <-rl.done: logp.Info("Dynamic config reloader stopped") return - case <-time.After(rl.config.Reload.Period): + case <-time.After(rl.config.Reload.Period): debugf("Scan for new config files") configReloads.Add(1) @@ -167,49 +167,10 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { debugf("Number of module configs found: %v", len(configs)) - startList := map[uint64]Runner{} - stopList := rl.registry.CopyList() - - for _, c := range configs { - - // Only add configs to startList which are enabled - if !c.Enabled() { - continue - } - - rawCfg := map[string]interface{}{} - err := c.Unpack(rawCfg) - - if err != nil { - logp.Err("Unable to unpack config file due to error: %v", err) - continue - } - - hash, err := hashstructure.Hash(rawCfg, nil) - if err != nil { - // Make sure the next run also updates because some runners were not properly loaded - overwriteUpdate = true - debugf("Unable to generate hash for config file %v due to error: %v", c, err) - continue - } - - debugf("Remove module from stoplist: %v", hash) - delete(stopList, hash) - - // As module already exist, it must be removed from the stop list and not started - if !rl.registry.Has(hash) { - debugf("Add module to startlist: %v", hash) - runner, err := runnerFactory.Create(rl.pipeline, c, nil) - if err != nil { - logp.Err("Unable to create runner due to error: %v", err) - continue - } - startList[hash] = runner - } + if err := list.Reload(configs); err != nil { + // Make sure the next run also updates because some runners were not properly loaded + overwriteUpdate = true } - - rl.stopRunners(stopList) - rl.startRunners(startList) } // Path loading is enabled but not reloading. Loads files only once and then stops. @@ -224,22 +185,24 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { } } -func (rl *Reloader) loadConfigs(files []string) ([]*common.Config, error) { +func (rl *Reloader) loadConfigs(files []string) ([]*ConfigWithMeta, error) { // Load all config objects - configs := []*common.Config{} + result := []*ConfigWithMeta{} var errs multierror.Errors for _, file := range files { - c, err := LoadList(file) + configs, err := LoadList(file) if err != nil { errs = append(errs, err) logp.Err("Error loading config: %s", err) continue } - configs = append(configs, c...) + for _, c := range configs { + result = append(result, &ConfigWithMeta{Config: c}) + } } - return configs, errs.Err() + return result, errs.Err() } // Stop stops the reloader and waits for all modules to properly stop @@ -247,47 +210,3 @@ func (rl *Reloader) Stop() { close(rl.done) rl.wg.Wait() } - -func (rl *Reloader) startRunners(list map[uint64]Runner) { - if len(list) == 0 { - return - } - - logp.Info("Starting %v runners ...", len(list)) - for id, runner := range list { - runner.Start() - rl.registry.Add(id, runner) - - moduleStarts.Add(1) - moduleRunning.Add(1) - debugf("New runner started: %v", id) - } -} - -func (rl *Reloader) stopRunners(list map[uint64]Runner) { - if len(list) == 0 { - return - } - - logp.Info("Stopping %v runners ...", len(list)) - - wg := sync.WaitGroup{} - for hash, runner := range list { - wg.Add(1) - - // Stop modules in parallel - go func(h uint64, run Runner) { - defer func() { - moduleStops.Add(1) - moduleRunning.Add(-1) - debugf("Runner stopped: %v", h) - wg.Done() - }() - - run.Stop() - rl.registry.Remove(h) - }(hash, runner) - } - - wg.Wait() -} diff --git a/metricbeat/tests/system/test_autodiscover.py b/metricbeat/tests/system/test_autodiscover.py index 3773d7ae267a..9bc2226377cd 100644 --- a/metricbeat/tests/system/test_autodiscover.py +++ b/metricbeat/tests/system/test_autodiscover.py @@ -40,12 +40,12 @@ def test_docker(self): docker_client.images.pull('memcached:1.5.3') container = docker_client.containers.run('memcached:1.5.3', detach=True) - self.wait_until(lambda: self.log_contains('Autodiscover starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Autodiscover stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) output = self.read_output_json() proc.check_kill_and_wait() @@ -82,12 +82,12 @@ def test_docker_labels(self): } container = docker_client.containers.run('memcached:1.5.3', labels=labels, detach=True) - self.wait_until(lambda: self.log_contains('Autodiscover starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Autodiscover stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) output = self.read_output_json() proc.check_kill_and_wait() diff --git a/metricbeat/tests/system/test_reload.py b/metricbeat/tests/system/test_reload.py index d5734b5a4359..c448eb1230a6 100644 --- a/metricbeat/tests/system/test_reload.py +++ b/metricbeat/tests/system/test_reload.py @@ -68,7 +68,7 @@ def test_start_stop(self): # Wait until offset for new line is updated self.wait_until( - lambda: self.log_contains("Starting 1 runner"), + lambda: self.log_contains("Starting runner: system [metricsets=1]"), max_timeout=10) self.wait_until(lambda: self.output_lines() > 0) @@ -78,7 +78,7 @@ def test_start_stop(self): # Wait until offset for new line is updated self.wait_until( - lambda: self.log_contains("Runner stopped:"), + lambda: self.log_contains("Stopping runner: system [metricsets=1]"), max_timeout=10) lines = self.output_lines()