From e535809e488d962f481b2e81007527617bea2461 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Fri, 3 Nov 2017 15:56:47 +0100 Subject: [PATCH] Fix ML jobs setup for dynamic modules Modules from `modules.d` were ignored by both `setup` command and `--setup` flag. Fixes #5504 --- filebeat/beater/filebeat.go | 42 +++++++-- .../system/config/filebeat_modules.yml.j2 | 3 + filebeat/tests/system/test_modules.py | 86 ++++++++++++++++++- 3 files changed, 125 insertions(+), 6 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index bba19a3300d8..da3829f79369 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -4,9 +4,11 @@ import ( "flag" "fmt" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" @@ -99,10 +101,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } // register `setup` callback for ML jobs - if !moduleRegistry.Empty() { - b.SetupMLCallback = func(b *beat.Beat) error { - return fb.loadModulesML(b) - } + b.SetupMLCallback = func(b *beat.Beat) error { + return fb.loadModulesML(b) } return fb, nil } @@ -127,6 +127,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { func (fb *Filebeat) loadModulesML(b *beat.Beat) error { logp.Debug("machine-learning", "Setting up ML jobs for modules") + var errs multierror.Errors if b.Config.Output.Name() != "elasticsearch" { logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" + @@ -139,8 +140,39 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat) error { if err != nil { return errors.Errorf("Error creating Elasticsearch client: %v", err) } + if err := fb.moduleRegistry.LoadML(esClient); err != nil { + errs = append(errs, err) + } + + // Add dynamic modules.d + if fb.config.ConfigModules.Enabled() { + config := cfgfile.DefaultDynamicConfig + fb.config.ConfigModules.Unpack(&config) + + modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") + if err != nil { + return errors.Wrap(err, "initialization error") + } + + for _, file := range modulesManager.ListEnabled() { + confs, err := cfgfile.LoadList(file.Path) + if err != nil { + errs = append(errs, errors.Wrap(err, "error loading config file")) + continue + } + set, err := fileset.NewModuleRegistry(confs, "", false) + if err != nil { + errs = append(errs, err) + continue + } + + if err := set.LoadML(esClient); err != nil { + errs = append(errs, err) + } + } + } - return fb.moduleRegistry.LoadML(esClient) + return errs.Err() } // Run allows the beater to be run as a beat. diff --git a/filebeat/tests/system/config/filebeat_modules.yml.j2 b/filebeat/tests/system/config/filebeat_modules.yml.j2 index b62c9c293c98..05ed82f4d3a8 100644 --- a/filebeat/tests/system/config/filebeat_modules.yml.j2 +++ b/filebeat/tests/system/config/filebeat_modules.yml.j2 @@ -1,5 +1,8 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} +filebeat.config.modules: + path: {{ beat.working_dir + '/modules.d/*.yml' }} + output.elasticsearch.hosts: ["{{ elasticsearch_url }}"] output.elasticsearch.index: {{ index_name }} diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 582a80b444e8..ba2923b0ff06 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -208,7 +208,7 @@ def search_objects(): "integration test not available on 2.x") def test_setup_machine_learning_nginx(self): """ - Tests that setup works and loads nginx dashboards. + Tests that setup works and loads machine learning jobs using --modules flag. """ self.init() # generate a minimal configuration @@ -238,3 +238,87 @@ def test_setup_machine_learning_nginx(self): datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/") assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"]) + + @unittest.skipIf(not INTEGRATION_TESTS or + os.getenv("TESTING_ENVIRONMENT") == "2x", + "integration test not available on 2.x") + def test_setup_machine_learning_nginx_enable(self): + """ + Tests that setup works and loads machine learning jobs for enabled modules. + """ + self.init() + # generate a minimal configuration + cfgfile = os.path.join(self.working_dir, "filebeat.yml") + self.render_config_template( + template_name="filebeat_modules", + output=cfgfile, + index_name=self.index_name, + elasticsearch_url=self.elasticsearch_url) + + # Enable nginx + os.mkdir(os.path.join(self.working_dir, "modules.d")) + with open(os.path.join(self.working_dir, "modules.d/nginx.yml"), "wb") as nginx: + nginx.write("- module: nginx") + + cmd = [ + self.filebeat, "-systemTest", + "-e", "-d", "*", + "-c", cfgfile, + "setup", "--machine-learning"] + + output = open(os.path.join(self.working_dir, "output.log"), "ab") + output.write(" ".join(cmd) + "\n") + subprocess.Popen(cmd, + stdin=None, + stdout=output, + stderr=output, + bufsize=0).wait() + + jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/") + assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"]) + + datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/") + assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"]) + + @unittest.skipIf(not INTEGRATION_TESTS or + os.getenv("TESTING_ENVIRONMENT") == "2x", + "integration test not available on 2.x") + def test_setup_flag_machine_learning_nginx_enable(self): + """ + Tests that setup works and loads machine learning jobs for enabled modules using --setup flag. + """ + self.init() + # generate a minimal configuration + cfgfile = os.path.join(self.working_dir, "filebeat.yml") + self.render_config_template( + template_name="filebeat_modules", + output=cfgfile, + index_name=self.index_name, + elasticsearch_url=self.elasticsearch_url) + + # Enable nginx + os.mkdir(os.path.join(self.working_dir, "modules.d")) + with open(os.path.join(self.working_dir, "modules.d/nginx.yml"), "wb") as nginx: + nginx.write("- module: nginx") + + cmd = [ + self.filebeat, "-systemTest", + "-e", "-d", "*", + "-c", cfgfile, + "--setup"] + + output = open(os.path.join(self.working_dir, "output.log"), "ab") + output.write(" ".join(cmd) + "\n") + beat = subprocess.Popen(cmd, + stdin=None, + stdout=output, + stderr=output, + bufsize=0) + + jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/") + assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"]) + + datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/") + assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"]) + + beat.kill()