diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 0df7b4fc21a9..b441c3e80082 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -35,3 +35,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only. by `make` and `mage`. Example: `export PYTHON_EXE=python2.7`. {pull}11212[11212] - Prometheus helper for metricbeat contains now `Namespace` field for `prometheus.MetricsMappings` {pull}11424[11424] - Update Jinja2 version to 2.10.1. {pull}11817[11817] +- Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777] \ No newline at end of file diff --git a/libbeat/cmd/export/config.go b/libbeat/cmd/export/config.go index 86c59ccb8916..df9816a00e3c 100644 --- a/libbeat/cmd/export/config.go +++ b/libbeat/cmd/export/config.go @@ -18,7 +18,6 @@ package export import ( - "fmt" "os" "github.com/spf13/cobra" @@ -40,26 +39,20 @@ func GenExportConfigCmd(settings instance.Settings) *cobra.Command { } func exportConfig(settings instance.Settings) error { - b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version) - if err != nil { - return fmt.Errorf("error initializing beat: %s", err) - } - settings.DisableConfigResolver = true - - err = b.InitWithSettings(settings) + b, err := instance.NewInitializedBeat(settings) if err != nil { - return fmt.Errorf("error initializing beat: %s", err) + fatalfInitCmd(err) } var config map[string]interface{} err = b.RawConfig.Unpack(&config) if err != nil { - return fmt.Errorf("error unpacking config, error: %s", err) + fatalf("Error unpacking config: %+v.", err) } res, err := yaml.Marshal(config) if err != nil { - return fmt.Errorf("Error converting config to YAML format, error: %s", err) + fatalf("Error converting config to YAML format: %+v.", err) } os.Stdout.Write(res) diff --git a/libbeat/cmd/export/dashboard.go b/libbeat/cmd/export/dashboard.go index 129a6ac6f713..552fbd6c6e68 100644 --- a/libbeat/cmd/export/dashboard.go +++ b/libbeat/cmd/export/dashboard.go @@ -19,7 +19,6 @@ package export import ( "fmt" - "os" "path/filepath" "github.com/spf13/cobra" @@ -40,15 +39,9 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command { yml, _ := cmd.Flags().GetString("yml") decode, _ := cmd.Flags().GetBool("decode") - b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version) + b, err := instance.NewInitializedBeat(settings) if err != nil { - fmt.Fprintf(os.Stderr, "Error creating beat: %s\n", err) - os.Exit(1) - } - err = b.InitWithSettings(settings) - if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) + fatalfInitCmd(err) } // Use empty config to use default configs if not set @@ -58,16 +51,14 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command { client, err := kibana.NewKibanaClient(b.Config.Kibana) if err != nil { - fmt.Fprintf(os.Stderr, "Error creating Kibana client: %+v\n", err) - os.Exit(1) + fatalf("Error creating Kibana client: %+v.\n", err) } // Export dashboards from yml file if yml != "" { results, info, err := dashboards.ExportAllFromYml(client, yml) if err != nil { - fmt.Fprintf(os.Stderr, "Error getting dashboards from yml: %+v\n", err) - os.Exit(1) + fatalf("Error exporting dashboards from yml: %+v.\n", err) } for i, r := range results { if decode { @@ -76,9 +67,8 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command { err = dashboards.SaveToFile(r, info.Dashboards[i].File, filepath.Dir(yml), client.GetVersion()) if err != nil { - fmt.Fprintf(os.Stderr, "Error saving dashboard '%s' to file '%s' : %+v\n", + fatalf("Error saving dashboard '%s' to file '%s' : %+v.\n", info.Dashboards[i].ID, info.Dashboards[i].File, err) - os.Exit(1) } } return @@ -88,8 +78,7 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command { if dashboard != "" { result, err := dashboards.Export(client, dashboard) if err != nil { - fmt.Fprintf(os.Stderr, "Error getting dashboard: %+v\n", err) - os.Exit(1) + fatalf("Error exporting dashboard: %+v.\n", err) } if decode { diff --git a/libbeat/cmd/export/export.go b/libbeat/cmd/export/export.go new file mode 100644 index 000000000000..d7acacf97539 --- /dev/null +++ b/libbeat/cmd/export/export.go @@ -0,0 +1,112 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package export + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt" + "github.com/elastic/beats/libbeat/version" +) + +type stdoutClient struct { + ver common.Version + f *os.File +} + +type fileClient struct { + ver common.Version + dir string +} + +func newIdxmgmtClient(dir string, ver string) idxmgmt.FileClient { + if dir == "" { + c, err := newStdoutClient(ver) + if err != nil { + fatalf("Error creating stdout writer: %+v.", err) + } + return c + } + c, err := newFileClient(dir, ver) + if err != nil { + fatalf("Error creating directory: %+v.", err) + } + return c +} + +func newStdoutClient(ver string) (*stdoutClient, error) { + if ver == "" { + ver = version.GetDefaultVersion() + } + v, err := common.NewVersion(ver) + if err != nil { + return nil, err + } + return &stdoutClient{ver: *v, f: os.Stdout}, nil +} + +func newFileClient(dir string, ver string) (*fileClient, error) { + if ver == "" { + ver = version.GetDefaultVersion() + } + path, err := filepath.Abs(dir) + if err != nil { + return nil, err + } + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + return nil, err + } + return &fileClient{ver: *common.MustNewVersion(ver), dir: path}, nil +} + +func (c *stdoutClient) GetVersion() common.Version { + return c.ver +} + +func (c *stdoutClient) Write(_ string, body string) error { + _, err := c.f.WriteString(body) + return err +} + +func (c *fileClient) GetVersion() common.Version { + return c.ver +} + +func (c *fileClient) Write(name string, body string) error { + f, err := os.Create(filepath.Join(c.dir, fmt.Sprintf("%s.json", name))) + defer f.Close() + if err != nil { + return err + } + _, err = f.WriteString(body) + return err +} + +func fatalf(msg string, vs ...interface{}) { + fmt.Fprintf(os.Stderr, msg, vs...) + fmt.Fprintln(os.Stderr) + os.Exit(1) +} + +func fatalfInitCmd(err error) { + fatalf("Failed to initialize 'export' command: %+v.", err) +} diff --git a/libbeat/cmd/export/ilm_policy.go b/libbeat/cmd/export/ilm_policy.go index 21363f0a7c6d..83981dd3dc71 100644 --- a/libbeat/cmd/export/ilm_policy.go +++ b/libbeat/cmd/export/ilm_policy.go @@ -18,14 +18,10 @@ package export import ( - "fmt" - "os" - "github.com/spf13/cobra" "github.com/elastic/beats/libbeat/cmd/instance" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/libbeat/idxmgmt" ) // GenGetILMPolicyCmd is the command used to export the ilm policy. @@ -34,30 +30,24 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command { Use: "ilm-policy", Short: "Export ILM policy", Run: func(cmd *cobra.Command, args []string) { - b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version) - if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) - } - err = b.InitWithSettings(settings) - if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) - } + version, _ := cmd.Flags().GetString("es.version") + dir, _ := cmd.Flags().GetString("dir") - ilmFactory := settings.ILM - if ilmFactory == nil { - ilmFactory = ilm.DefaultSupport - } - - ilm, err := ilmFactory(nil, b.Info, b.RawConfig) + b, err := instance.NewInitializedBeat(settings) if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing ILM support: %s\n", err) + fatalfInitCmd(err) } - fmt.Println(common.MapStr(ilm.Policy().Body).StringToPrint()) + clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version)) + idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields)) + if err := idxManager.Setup(idxmgmt.LoadModeDisabled, idxmgmt.LoadModeEnabled); err != nil { + fatalf("Error exporting ilm-policy: %+v.", err) + } }, } + genTemplateConfigCmd.Flags().String("es.version", settings.Version, "Elasticsearch version") + genTemplateConfigCmd.Flags().String("dir", "", "Specify directory for printing policy files. By default policies are printed to stdout.") + return genTemplateConfigCmd } diff --git a/libbeat/cmd/export/index_pattern.go b/libbeat/cmd/export/index_pattern.go index 3e0f688ffcd6..5fc4e7f45601 100644 --- a/libbeat/cmd/export/index_pattern.go +++ b/libbeat/cmd/export/index_pattern.go @@ -18,7 +18,6 @@ package export import ( - "log" "os" "github.com/spf13/cobra" @@ -36,13 +35,9 @@ func GenIndexPatternConfigCmd(settings instance.Settings) *cobra.Command { Run: func(cmd *cobra.Command, args []string) { version, _ := cmd.Flags().GetString("es.version") - b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version) + b, err := instance.NewInitializedBeat(settings) if err != nil { - fatalf("Error initializing beat: %+v", err) - } - err = b.InitWithSettings(settings) - if err != nil { - fatalf("Error initializing beat: %+v", err) + fatalfInitCmd(err) } if version == "" { @@ -52,21 +47,21 @@ func GenIndexPatternConfigCmd(settings instance.Settings) *cobra.Command { // Index pattern generation v, err := common.NewVersion(version) if err != nil { - fatalf("Error creating version: %+v", err) + fatalf("Error creating version: %+v.", err) } indexPattern, err := kibana.NewGenerator(b.Info.IndexPrefix, b.Info.Beat, b.Fields, settings.Version, *v, b.Config.Migration.Enabled()) if err != nil { - log.Fatal(err) + fatalf("Error creating Kibana Generator: %+v.", err) } pattern, err := indexPattern.Generate() if err != nil { - log.Fatalf("ERROR: %s", err) + fatalf("Error generating Index Pattern: %+v.", err) } _, err = os.Stdout.WriteString(pattern.StringToPrint() + "\n") if err != nil { - fatalf("Error writing index pattern: %+v", err) + fatalf("Error writing Index Pattern: %+v.", err) } }, } diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index 0aafa20506f7..5b74ff57ac5d 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -18,94 +18,50 @@ package export import ( - "fmt" - "os" - "github.com/spf13/cobra" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cmd/instance" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/idxmgmt" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/paths" - "github.com/elastic/beats/libbeat/template" ) +// GenTemplateConfigCmd is the command used to export the elasticsearch template. func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command { genTemplateConfigCmd := &cobra.Command{ Use: "template", Short: "Export index template to stdout", Run: func(cmd *cobra.Command, args []string) { version, _ := cmd.Flags().GetString("es.version") - index, _ := cmd.Flags().GetString("index") + dir, _ := cmd.Flags().GetString("dir") noILM, _ := cmd.Flags().GetBool("noilm") - b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version) - if err != nil { - fatalf("Error initializing beat: %+v", err) - } - err = b.InitWithSettings(settings) - if err != nil { - fatalf("Error initializing beat: %+v", err) + if noILM { + settings.ILM = ilmNoopSupport } - if version == "" { - version = b.Info.Version - } - esVersion, err := common.NewVersion(version) + b, err := instance.NewInitializedBeat(settings) if err != nil { - fatalf("Invalid Elasticsearch version: %+v", err) + fatalfInitCmd(err) } - imFactory := settings.IndexManagement - if imFactory == nil { - imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) - } - indexManager, err := imFactory(logp.NewLogger("index-management"), b.Info, b.RawConfig) - if err != nil { - fatalf("Error initializing the index manager: %+v", err) - } - - tmplCfg, err := indexManager.TemplateConfig(!noILM) - if err != nil { - fatalf("Template error detected: %+v", err) - } - if tmplCfg.Enabled == false { - tmplCfg = template.DefaultConfig() - } - - tmpl, err := template.New(b.Info.Version, index, *esVersion, tmplCfg, b.Config.Migration.Enabled()) - if err != nil { - fatalf("Error generating template: %+v", err) - } - - var templateString common.MapStr - if tmplCfg.Fields != "" { - fieldsPath := paths.Resolve(paths.Config, tmplCfg.Fields) - templateString, err = tmpl.LoadFile(fieldsPath) - } else { - templateString, err = tmpl.LoadBytes(b.Fields) - } - if err != nil { - fatalf("Error generating template: %+v", err) - } - - _, err = os.Stdout.WriteString(templateString.StringToPrint() + "\n") - if err != nil { - fatalf("Error writing template: %+v", err) + clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version)) + idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields)) + if err := idxManager.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeDisabled); err != nil { + fatalf("Error exporting template: %+v.", err) } }, } genTemplateConfigCmd.Flags().String("es.version", settings.Version, "Elasticsearch version") - genTemplateConfigCmd.Flags().String("index", settings.IndexPrefix, "Base index name") genTemplateConfigCmd.Flags().Bool("noilm", false, "Generate template with ILM disabled") + genTemplateConfigCmd.Flags().String("dir", "", "Specify directory for printing template files. By default templates are printed to stdout.") return genTemplateConfigCmd } -func fatalf(msg string, vs ...interface{}) { - fmt.Fprintf(os.Stderr, msg, vs...) - fmt.Fprintln(os.Stderr) - os.Exit(1) +func ilmNoopSupport(_ *logp.Logger, info beat.Info, config *common.Config) (ilm.Supporter, error) { + return ilm.NoopSupport(info, config) } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index d5f6da1739bb..e8ac6270d640 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -76,12 +76,11 @@ import ( type Beat struct { beat.Beat - Config beatConfig - RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. - - keystore keystore.Keystore - index idxmgmt.Supporter + Config beatConfig + RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. + IdxSupporter idxmgmt.Supporter + keystore keystore.Keystore processing processing.Supporter } @@ -186,6 +185,18 @@ func Run(settings Settings, bt beat.Creator) error { }()) } +// NewInitializedBeat creates a new beat where all information and initialization is derived from settings +func NewInitializedBeat(settings Settings) (*Beat, error) { + b, err := NewBeat(settings.Name, settings.IndexPrefix, settings.Version) + if err != nil { + return nil, err + } + if err := b.InitWithSettings(settings); err != nil { + return nil, err + } + return b, nil +} + // NewBeat creates a new beat instance func NewBeat(name, indexPrefix, v string) (*Beat, error) { if v == "" { @@ -468,7 +479,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er } esConfig := outCfg.Config() - if b.index.Enabled() { + if b.IdxSupporter.Enabled() { esClient, err := elasticsearch.NewConnectedClient(esConfig) if err != nil { return err @@ -476,8 +487,16 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er // prepare index by loading templates, lifecycle policies and write aliases - m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields)) - err = m.Setup(setup.Template, setup.ILMPolicy) + m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields)) + var tmplLoadMode, ilmLoadMode = idxmgmt.LoadModeUnset, idxmgmt.LoadModeUnset + if setup.Template { + tmplLoadMode = idxmgmt.LoadModeForce + } + if setup.ILMPolicy { + ilmLoadMode = idxmgmt.LoadModeForce + } + + err = m.Setup(tmplLoadMode, ilmLoadMode) if err != nil { return err } @@ -615,7 +634,7 @@ func (b *Beat) configure(settings Settings) error { if imFactory == nil { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) } - b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig) + b.IdxSupporter, err = imFactory(nil, b.Beat.Info, b.RawConfig) if err != nil { return err } @@ -762,7 +781,7 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error { // policy as a callback with the elasticsearch output. It is important the // registration happens before the publisher is created. func (b *Beat) registerESIndexManagement() error { - if b.Config.Output.Name() != "elasticsearch" || !b.index.Enabled() { + if b.Config.Output.Name() != "elasticsearch" || !b.IdxSupporter.Enabled() { return nil } @@ -775,8 +794,8 @@ func (b *Beat) registerESIndexManagement() error { func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { return func(esClient *elasticsearch.Client) error { - m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields)) - return m.Setup(false, false) + m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields)) + return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled) } } @@ -800,7 +819,7 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg common.ConfigNamespace) return outputs.Group{}, nil } - return outputs.Load(b.index, b.Info, stats, cfg.Name(), cfg.Config()) + return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config()) } func (b *Beat) registerClusterUUIDFetching() error { diff --git a/libbeat/cmd/keystore.go b/libbeat/cmd/keystore.go index 706efd5b6ca1..b2cffe2b7ebf 100644 --- a/libbeat/cmd/keystore.go +++ b/libbeat/cmd/keystore.go @@ -36,16 +36,11 @@ import ( ) func getKeystore(settings instance.Settings) (keystore.Keystore, error) { - b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version) - + b, err := instance.NewInitializedBeat(settings) if err != nil { return nil, fmt.Errorf("error initializing beat: %s", err) } - if err = b.InitWithSettings(settings); err != nil { - return nil, fmt.Errorf("error initializing beat: %s", err) - } - return b.Keystore(), nil } diff --git a/libbeat/cmd/modules.go b/libbeat/cmd/modules.go index 609e40a5e564..4bd6bff137ea 100644 --- a/libbeat/cmd/modules.go +++ b/libbeat/cmd/modules.go @@ -49,27 +49,23 @@ func GenModulesCmd(name, version string, modulesFactory modulesManagerFactory) * Use: "modules", Short: "Manage configured modules", } + settings := instance.Settings{Name: name, Version: version} - modulesCmd.AddCommand(genListModulesCmd(name, version, modulesFactory)) - modulesCmd.AddCommand(genEnableModulesCmd(name, version, modulesFactory)) - modulesCmd.AddCommand(genDisableModulesCmd(name, version, modulesFactory)) + modulesCmd.AddCommand(genListModulesCmd(settings, modulesFactory)) + modulesCmd.AddCommand(genEnableModulesCmd(settings, modulesFactory)) + modulesCmd.AddCommand(genDisableModulesCmd(settings, modulesFactory)) return &modulesCmd } // Instantiate a modules manager or die trying -func getModules(name, version string, modulesFactory modulesManagerFactory) ModulesManager { - b, err := instance.NewBeat(name, "", version) +func getModules(settings instance.Settings, modulesFactory modulesManagerFactory) ModulesManager { + b, err := instance.NewInitializedBeat(settings) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) os.Exit(1) } - if err = b.Init(); err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) - } - manager, err := modulesFactory(&b.Beat) if err != nil { fmt.Fprintf(os.Stderr, "Error in modules manager: %s\n", err) @@ -79,12 +75,12 @@ func getModules(name, version string, modulesFactory modulesManagerFactory) Modu return manager } -func genListModulesCmd(name, version string, modulesFactory modulesManagerFactory) *cobra.Command { +func genListModulesCmd(settings instance.Settings, modulesFactory modulesManagerFactory) *cobra.Command { return &cobra.Command{ Use: "list", Short: "List modules", Run: func(cmd *cobra.Command, args []string) { - modules := getModules(name, version, modulesFactory) + modules := getModules(settings, modulesFactory) fmt.Println("Enabled:") for _, module := range modules.ListEnabled() { @@ -99,12 +95,12 @@ func genListModulesCmd(name, version string, modulesFactory modulesManagerFactor } } -func genEnableModulesCmd(name, version string, modulesFactory modulesManagerFactory) *cobra.Command { +func genEnableModulesCmd(settings instance.Settings, modulesFactory modulesManagerFactory) *cobra.Command { return &cobra.Command{ Use: "enable MODULE...", Short: "Enable one or more given modules", Run: func(cmd *cobra.Command, args []string) { - modules := getModules(name, version, modulesFactory) + modules := getModules(settings, modulesFactory) for _, module := range args { if !modules.Exists(module) { @@ -128,12 +124,12 @@ func genEnableModulesCmd(name, version string, modulesFactory modulesManagerFact } } -func genDisableModulesCmd(name, version string, modulesFactory modulesManagerFactory) *cobra.Command { +func genDisableModulesCmd(settings instance.Settings, modulesFactory modulesManagerFactory) *cobra.Command { return &cobra.Command{ Use: "disable MODULE...", Short: "Disable one or more given modules", Run: func(cmd *cobra.Command, args []string) { - modules := getModules(name, version, modulesFactory) + modules := getModules(settings, modulesFactory) for _, module := range args { if !modules.Exists(module) { diff --git a/libbeat/cmd/test/output.go b/libbeat/cmd/test/output.go index 55046b49e352..6d4761d65a82 100644 --- a/libbeat/cmd/test/output.go +++ b/libbeat/cmd/test/output.go @@ -34,13 +34,7 @@ func GenTestOutputCmd(settings instance.Settings) *cobra.Command { Use: "output", Short: "Test " + settings.Name + " can connect to the output by using the current settings", Run: func(cmd *cobra.Command, args []string) { - b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version) - if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) - } - - err = b.InitWithSettings(settings) + b, err := instance.NewInitializedBeat(settings) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) os.Exit(1) diff --git a/libbeat/idxmgmt/client_handler.go b/libbeat/idxmgmt/client_handler.go new file mode 100644 index 000000000000..c4665ce0ecbb --- /dev/null +++ b/libbeat/idxmgmt/client_handler.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idxmgmt + +import ( + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/libbeat/template" +) + +// ClientHandler defines the interface between a remote service and the Manager for ILM and templates. +type ClientHandler interface { + ilm.ClientHandler + template.Loader +} + +type clientHandler struct { + ilm.ClientHandler + template.Loader +} + +// ESClient defines the minimal interface required for the index manager to +// prepare an index. +type ESClient interface { + Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) + GetVersion() common.Version +} + +// FileClient defines the minimal interface required for the Loader to +// prepare a policy and write alias. +type FileClient interface { + GetVersion() common.Version + Write(name string, body string) error +} + +// NewClientHandler initializes and returns a new instance of ClientHandler +func NewClientHandler(ilm ilm.ClientHandler, template template.Loader) ClientHandler { + return &clientHandler{ilm, template} +} + +// NewESClientHandler returns a new ESLoader instance, +// initialized with an ilm and template client handler based on the passed in client. +func NewESClientHandler(c ESClient) ClientHandler { + return NewClientHandler(ilm.NewESClientHandler(c), template.NewESLoader(c)) +} + +// NewFileClientHandler returns a new ESLoader instance, +// initialized with an ilm and template client handler based on the passed in client. +func NewFileClientHandler(c FileClient) ClientHandler { + return NewClientHandler(ilm.NewFileClientHandler(c), template.NewFileLoader(c)) +} diff --git a/libbeat/idxmgmt/idxmgmt.go b/libbeat/idxmgmt/idxmgmt.go index adda46d62c34..9181ae8c8187 100644 --- a/libbeat/idxmgmt/idxmgmt.go +++ b/libbeat/idxmgmt/idxmgmt.go @@ -43,12 +43,6 @@ type Supporter interface { // ILM, or aliases. Enabled() bool - // ILM provides access to the configured ILM support. - ILM() ilm.Supporter - - // TemplateConfig returns the template configuration used by the index supporter. - TemplateConfig(withILM bool) (template.TemplateConfig, error) - // BuildSelector create an index selector. // The defaultIndex string is interpreted as format string. It is used // as default index if the configuration provided does not define an index or @@ -57,7 +51,7 @@ type Supporter interface { // Manager creates a new manager that can be used to execute the required steps // for initializing an index, ILM policies, and write aliases. - Manager(client ESClient, assets Asseter) Manager + Manager(client ClientHandler, assets Asseter) Manager } // Asseter provides access to beats assets required to load the template. @@ -65,17 +59,32 @@ type Asseter interface { Fields(name string) []byte } -// ESClient defines the minimal interface required for the index manager to -// prepare an index. -type ESClient interface { - Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) - GetVersion() common.Version -} - // Manager is used to initialize indices, ILM policies, and aliases within the // Elastic Stack. type Manager interface { - Setup(forceTemplate, forcePolicy bool) error + Setup(template, ilm LoadMode) error +} + +// LoadMode defines the mode to be used for loading idxmgmt related information. +// It will be used in combination with idxmgmt configuration settings. +type LoadMode uint8 + +//go:generate stringer -linecomment -type LoadMode +const ( + // LoadModeUnset indicates that no specific mode is set. + // Instead the decision about loading data will be derived from the config or their respective default values. + LoadModeUnset LoadMode = iota //unset + // LoadModeEnabled indicates loading if not already available + LoadModeEnabled //enabled + // LoadModeForce indicates loading in any case. + LoadModeForce //force + // LoadModeDisabled indicates no loading + LoadModeDisabled //disabled +) + +// Enabled returns whether or not the LoadMode should be considered enabled +func (m *LoadMode) Enabled() bool { + return m == nil || *m != LoadModeDisabled } // DefaultSupport initializes the default index management support used by most Beats. diff --git a/libbeat/idxmgmt/idxmgmt_test.go b/libbeat/idxmgmt/idxmgmt_test.go index 339e8510be92..c5a72ca05bb1 100644 --- a/libbeat/idxmgmt/idxmgmt_test.go +++ b/libbeat/idxmgmt/idxmgmt_test.go @@ -86,81 +86,6 @@ func TestDefaultSupport_Enabled(t *testing.T) { } } -func TestDefaultSupport_TemplateConfig(t *testing.T) { - ilmTemplateSettings := func(alias, policy string) []onCall { - return []onCall{ - onMode().Return(ilm.ModeEnabled), - onAlias().Return(ilm.Alias{Name: alias}), - onPolicy().Return(ilm.Policy{Name: policy}), - } - } - - cloneCfg := func(c template.TemplateConfig) template.TemplateConfig { - if c.AppendFields != nil { - tmp := make(mapping.Fields, len(c.AppendFields)) - copy(tmp, c.AppendFields) - c.AppendFields = tmp - } - - if c.Settings.Index != nil { - c.Settings.Index = (map[string]interface{})(common.MapStr(c.Settings.Index).Clone()) - } - if c.Settings.Index != nil { - c.Settings.Source = (map[string]interface{})(common.MapStr(c.Settings.Source).Clone()) - } - return c - } - - cfgWith := func(s template.TemplateConfig, mods ...map[string]interface{}) template.TemplateConfig { - for _, mod := range mods { - cfg := common.MustNewConfigFrom(mod) - s = cloneCfg(s) - err := cfg.Unpack(&s) - if err != nil { - panic(err) - } - } - return s - } - - cases := map[string]struct { - ilmCalls []onCall - cfg map[string]interface{} - want template.TemplateConfig - fail bool - }{ - "default template config": { - want: template.DefaultConfig(), - }, - "default template with ilm": { - ilmCalls: ilmTemplateSettings("alias", "test-9.9.9"), - want: cfgWith(template.DefaultConfig(), map[string]interface{}{ - "name": "alias", - "pattern": "alias-*", - "settings.index.lifecycle.name": "test-9.9.9", - "settings.index.lifecycle.rollover_alias": "alias", - }), - }, - } - for name, test := range cases { - t.Run(name, func(t *testing.T) { - info := beat.Info{Beat: "test", Version: "9.9.9"} - factory := MakeDefaultSupport(makeMockILMSupport(test.ilmCalls...)) - im, err := factory(nil, info, common.MustNewConfigFrom(test.cfg)) - require.NoError(t, err) - withILM := len(test.ilmCalls) > 0 - - tmpl, err := im.TemplateConfig(withILM) - if test.fail { - assert.Error(t, err) - } else { - require.NoError(t, err) - assert.Equal(t, test.want, tmpl) - } - }) - } -} - func TestDefaultSupport_BuildSelector(t *testing.T) { type nameFunc func(time.Time) string @@ -271,3 +196,184 @@ func TestDefaultSupport_BuildSelector(t *testing.T) { }) } } + +func TestDefaultSupport_TemplateHandling(t *testing.T) { + cloneCfg := func(c template.TemplateConfig) template.TemplateConfig { + if c.AppendFields != nil { + tmp := make(mapping.Fields, len(c.AppendFields)) + copy(tmp, c.AppendFields) + c.AppendFields = tmp + } + + if c.Settings.Index != nil { + c.Settings.Index = (map[string]interface{})(common.MapStr(c.Settings.Index).Clone()) + } + if c.Settings.Index != nil { + c.Settings.Source = (map[string]interface{})(common.MapStr(c.Settings.Source).Clone()) + } + return c + } + + cfgWith := func(s template.TemplateConfig, mods ...map[string]interface{}) *template.TemplateConfig { + for _, mod := range mods { + cfg := common.MustNewConfigFrom(mod) + s = cloneCfg(s) + err := cfg.Unpack(&s) + if err != nil { + panic(err) + } + } + return &s + } + defaultCfg := template.DefaultConfig() + + cases := map[string]struct { + cfg common.MapStr + loadTemplate, loadILM LoadMode + + err bool + tmplCfg *template.TemplateConfig + alias, policy string + }{ + "template default, ilm default": { + tmplCfg: cfgWith(template.DefaultConfig(), map[string]interface{}{ + "overwrite": "true", + "name": "test-9.9.9", + "pattern": "test-9.9.9-*", + "settings.index.lifecycle.name": "test-9.9.9", + "settings.index.lifecycle.rollover_alias": "test-9.9.9", + }), + alias: "test-9.9.9", + policy: "test-9.9.9", + }, + "template default, ilm default with alias and policy changed": { + cfg: common.MapStr{ + "setup.ilm.rollover_alias": "mocktest", + "setup.ilm.policy_name": "policy-keep", + }, + tmplCfg: cfgWith(template.DefaultConfig(), map[string]interface{}{ + "overwrite": "true", + "name": "mocktest", + "pattern": "mocktest-*", + "settings.index.lifecycle.name": "policy-keep", + "settings.index.lifecycle.rollover_alias": "mocktest", + }), + alias: "mocktest", + policy: "policy-keep", + }, + "template default, ilm disabled": { + cfg: common.MapStr{ + "setup.ilm.enabled": false, + }, + loadTemplate: LoadModeEnabled, + tmplCfg: &defaultCfg, + }, + "template loadMode disabled, ilm disabled": { + cfg: common.MapStr{ + "setup.ilm.enabled": false, + }, + loadTemplate: LoadModeDisabled, + }, + "template disabled, ilm default": { + cfg: common.MapStr{ + "setup.template.enabled": false, + }, + alias: "test-9.9.9", + policy: "test-9.9.9", + }, + "template loadmode disabled, ilm loadMode enabled": { + loadTemplate: LoadModeDisabled, + loadILM: LoadModeEnabled, + alias: "test-9.9.9", + policy: "test-9.9.9", + }, + "template default, ilm loadMode disabled": { + loadILM: LoadModeDisabled, + tmplCfg: cfgWith(template.DefaultConfig(), map[string]interface{}{ + "name": "test-9.9.9", + "pattern": "test-9.9.9-*", + "settings.index.lifecycle.name": "test-9.9.9", + "settings.index.lifecycle.rollover_alias": "test-9.9.9", + }), + }, + "template loadmode disabled, ilm loadmode disabled": { + loadTemplate: LoadModeDisabled, + loadILM: LoadModeDisabled, + }, + } + for name, test := range cases { + t.Run(name, func(t *testing.T) { + info := beat.Info{Beat: "test", Version: "9.9.9"} + factory := MakeDefaultSupport(nil) + im, err := factory(nil, info, common.MustNewConfigFrom(test.cfg)) + require.NoError(t, err) + + clientHandler := newMockClientHandler() + manager := im.Manager(clientHandler, BeatsAssets([]byte("testbeat fields"))) + err = manager.Setup(test.loadTemplate, test.loadILM) + if test.err { + assert.Error(t, err) + } else { + require.NoError(t, err) + if test.tmplCfg == nil { + assert.Nil(t, clientHandler.tl.tmplCfg) + } else { + assert.Equal(t, test.tmplCfg, clientHandler.tl.tmplCfg) + } + assert.Equal(t, test.alias, clientHandler.il.alias) + assert.Equal(t, test.policy, clientHandler.il.policy) + } + }) + } +} + +func newMockClientHandler() *mockClientHandler { + tl := mockTemplateLoader{} + il := mockILMClientHandler{} + return &mockClientHandler{&il, &tl, &tl, &il} +} + +type mockClientHandler struct { + ilm.ClientHandler + template.Loader + + tl *mockTemplateLoader + il *mockILMClientHandler +} + +type mockTemplateLoader struct { + tmplCfg *template.TemplateConfig + force bool +} + +func (l *mockTemplateLoader) Load(config template.TemplateConfig, _ beat.Info, fields []byte, migration bool) error { + l.force = config.Overwrite + l.tmplCfg = &config + return nil +} + +type mockILMClientHandler struct { + alias, policy string +} + +func (ch *mockILMClientHandler) CheckILMEnabled(m ilm.Mode) (bool, error) { + return m == ilm.ModeEnabled || m == ilm.ModeAuto, nil +} + +func (ch *mockILMClientHandler) HasAlias(name string) (bool, error) { + return ch.alias == name, nil +} + +func (ch *mockILMClientHandler) CreateAlias(alias ilm.Alias) error { + ch.alias = alias.Name + return nil +} + +func (ch *mockILMClientHandler) HasILMPolicy(name string) (bool, error) { + return ch.policy == name, nil +} + +func (ch *mockILMClientHandler) CreateILMPolicy(policy ilm.Policy) error { + ch.policy = policy.Name + return nil +} diff --git a/libbeat/idxmgmt/ilm/eshandler.go b/libbeat/idxmgmt/ilm/client_handler.go similarity index 53% rename from libbeat/idxmgmt/ilm/eshandler.go rename to libbeat/idxmgmt/ilm/client_handler.go index de493721bf71..1cc8456c21d5 100644 --- a/libbeat/idxmgmt/ilm/eshandler.go +++ b/libbeat/idxmgmt/ilm/client_handler.go @@ -26,14 +26,45 @@ import ( "github.com/elastic/beats/libbeat/common" ) -type esClientHandler struct { +// ClientHandler defines the interface between a remote service and the Manager. +type ClientHandler interface { + CheckILMEnabled(Mode) (bool, error) + + HasAlias(name string) (bool, error) + CreateAlias(alias Alias) error + + HasILMPolicy(name string) (bool, error) + CreateILMPolicy(policy Policy) error +} + +// ESClientHandler implements the Loader interface for talking to ES. +type ESClientHandler struct { client ESClient } -var ( - esMinILMVersion = common.MustNewVersion("6.6.0") - esMinDefaultILMVesion = common.MustNewVersion("7.0.0") -) +// ESClient defines the minimal interface required for the Loader to +// prepare a policy and write alias. +type ESClient interface { + GetVersion() common.Version + Request( + method, path string, + pipeline string, + params map[string]string, + body interface{}, + ) (int, []byte, error) +} + +// FileClientHandler implements the Loader interface for writing to a file. +type FileClientHandler struct { + client FileClient +} + +// FileClient defines the minimal interface required for the Loader to +// prepare a policy and write alias. +type FileClient interface { + GetVersion() common.Version + Write(name string, body string) error +} const ( // esFeaturesPath is used to query Elasticsearch for availability of licensed @@ -45,33 +76,28 @@ const ( esAliasPath = "/_alias" ) -// ESClientHandler creates a new APIHandler executing ILM, and alias queries -// against Elasticsearch. -func ESClientHandler(client ESClient) APIHandler { - if client == nil { - return nil - } - return &esClientHandler{client} +var ( + esMinILMVersion = common.MustNewVersion("6.6.0") + esMinDefaultILMVersion = common.MustNewVersion("7.0.0") +) + +// NewESClientHandler initializes and returns an ESClientHandler, +func NewESClientHandler(c ESClient) *ESClientHandler { + return &ESClientHandler{client: c} } -// ESClient defines the minimal interface required for the ESClientHandler to -// prepare a policy and write alias. -type ESClient interface { - GetVersion() common.Version - Request( - method, path string, - pipeline string, - params map[string]string, - body interface{}, - ) (int, []byte, error) +// NewFileClientHandler initializes and returns a new FileClientHandler instance. +func NewFileClientHandler(c FileClient) *FileClientHandler { + return &FileClientHandler{client: c} } -func (h *esClientHandler) ILMEnabled(mode Mode) (bool, error) { +// CheckILMEnabled indicates whether or not ILM is supported for the configured mode and ES instance. +func (h *ESClientHandler) CheckILMEnabled(mode Mode) (bool, error) { if mode == ModeDisabled { return false, nil } - avail, probe := h.checkILMVersion(mode) + avail, probe := checkILMVersion(mode, h.client.GetVersion()) if !avail { if mode == ModeEnabled { ver := h.client.GetVersion() @@ -105,13 +131,15 @@ func (h *esClientHandler) ILMEnabled(mode Mode) (bool, error) { return enabled, nil } -func (h *esClientHandler) CreateILMPolicy(policy Policy) error { +// CreateILMPolicy loads the given policy to Elasticsearch. +func (h *ESClientHandler) CreateILMPolicy(policy Policy) error { path := path.Join(esILMPath, policy.Name) _, _, err := h.client.Request("PUT", path, "", nil, policy.Body) return err } -func (h *esClientHandler) HasILMPolicy(name string) (bool, error) { +// HasILMPolicy queries Elasticsearch to see if policy with given name exists. +func (h *ESClientHandler) HasILMPolicy(name string) (bool, error) { // XXX: HEAD method does currently not work for checking if a policy exists path := path.Join(esILMPath, name) status, b, err := h.client.Request("GET", path, "", nil, nil) @@ -122,7 +150,8 @@ func (h *esClientHandler) HasILMPolicy(name string) (bool, error) { return status == 200, nil } -func (h *esClientHandler) HasAlias(name string) (bool, error) { +// HasAlias queries Elasticsearch to see if alias exists. +func (h *ESClientHandler) HasAlias(name string) (bool, error) { path := path.Join(esAliasPath, name) status, b, err := h.client.Request("HEAD", path, "", nil, nil) if err != nil && status != 404 { @@ -132,7 +161,8 @@ func (h *esClientHandler) HasAlias(name string) (bool, error) { return status == 200, nil } -func (h *esClientHandler) CreateAlias(alias Alias) error { +// CreateAlias sends request to Elasticsearch for creating alias. +func (h *ESClientHandler) CreateAlias(alias Alias) error { // Escaping because of date pattern // This always assume it's a date pattern by sourrounding it by <...> firstIndex := fmt.Sprintf("<%s-%s>", alias.Name, alias.Pattern) @@ -157,18 +187,7 @@ func (h *esClientHandler) CreateAlias(alias Alias) error { return nil } -func (h *esClientHandler) checkILMVersion(mode Mode) (avail, probe bool) { - ver := h.client.GetVersion() - avail = !ver.LessThan(esMinILMVersion) - if avail { - probe = (mode == ModeEnabled) || - (mode == ModeAuto && !ver.LessThan(esMinDefaultILMVesion)) - } - - return avail, probe -} - -func (h *esClientHandler) checkILMSupport() (avail, enbaled bool, err error) { +func (h *ESClientHandler) checkILMSupport() (avail, enabled bool, err error) { var response struct { Features struct { ILM struct { @@ -187,11 +206,11 @@ func (h *esClientHandler) checkILMSupport() (avail, enbaled bool, err error) { } avail = response.Features.ILM.Available - enbaled = response.Features.ILM.Enabled - return avail, enbaled, nil + enabled = response.Features.ILM.Enabled + return avail, enabled, nil } -func (h *esClientHandler) queryFeatures(to interface{}) (int, error) { +func (h *ESClientHandler) queryFeatures(to interface{}) (int, error) { status, body, err := h.client.Request("GET", esFeaturesPath, "", nil, nil) if status >= 400 || err != nil { return status, err @@ -205,6 +224,56 @@ func (h *esClientHandler) queryFeatures(to interface{}) (int, error) { return status, nil } -func (h *esClientHandler) access() ESClient { - return h.client +// CheckILMEnabled indicates whether or not ILM is supported for the configured mode and client version. +func (h *FileClientHandler) CheckILMEnabled(mode Mode) (bool, error) { + if mode == ModeDisabled { + return false, nil + } + avail, probe := checkILMVersion(mode, h.client.GetVersion()) + if avail { + return probe, nil + } + if mode != ModeEnabled { + return false, nil + } + version := h.client.GetVersion() + return false, errf(ErrESVersionNotSupported, + "Elasticsearch %v does not support ILM", version.String()) +} + +// CreateILMPolicy writes given policy to the configured file. +func (h *FileClientHandler) CreateILMPolicy(policy Policy) error { + p := common.MapStr{policy.Name: policy.Body} + str := fmt.Sprintf("%s\n", p.StringToPrint()) + if err := h.client.Write(policy.Name, str); err != nil { + return fmt.Errorf("error printing policy : %v", err) + } + return nil +} + +// HasILMPolicy always returns false. +func (h *FileClientHandler) HasILMPolicy(name string) (bool, error) { + return false, nil +} + +// CreateAlias is a noop implementation. +func (h *FileClientHandler) CreateAlias(alias Alias) error { + return nil +} + +// HasAlias always returns false. +func (h *FileClientHandler) HasAlias(name string) (bool, error) { + return false, nil +} + +// avail: indicates whether version supports ILM +// probe: in case version potentially supports ILM, check the combination of mode + version +// to indicate whether or not ILM support should be enabled or disabled +func checkILMVersion(mode Mode, ver common.Version) (avail, probe bool) { + avail = !ver.LessThan(esMinILMVersion) + if avail { + probe = (mode == ModeEnabled) || + (mode == ModeAuto && !ver.LessThan(esMinDefaultILMVersion)) + } + return avail, probe } diff --git a/libbeat/idxmgmt/ilm/eshandler_integration_test.go b/libbeat/idxmgmt/ilm/client_handler_integration_test.go similarity index 71% rename from libbeat/idxmgmt/ilm/eshandler_integration_test.go rename to libbeat/idxmgmt/ilm/client_handler_integration_test.go index 808ab48b47d4..8d328cbca2f1 100644 --- a/libbeat/idxmgmt/ilm/eshandler_integration_test.go +++ b/libbeat/idxmgmt/ilm/client_handler_integration_test.go @@ -20,6 +20,7 @@ package ilm_test import ( + "encoding/json" "fmt" "os" "testing" @@ -29,9 +30,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/outputs/elasticsearch" "github.com/elastic/beats/libbeat/outputs/outil" + "github.com/elastic/beats/libbeat/version" ) const ( @@ -41,24 +44,24 @@ const ( ElasticsearchDefaultPort = "9200" ) -func TestESClientHandler_ILMEnabled(t *testing.T) { +func TestESClientHandler_CheckILMEnabled(t *testing.T) { t.Run("no ilm if disabled", func(t *testing.T) { h := newESClientHandler(t) - b, err := h.ILMEnabled(ilm.ModeDisabled) + b, err := h.CheckILMEnabled(ilm.ModeDisabled) assert.NoError(t, err) assert.False(t, b) }) t.Run("with ilm if auto", func(t *testing.T) { h := newESClientHandler(t) - b, err := h.ILMEnabled(ilm.ModeAuto) + b, err := h.CheckILMEnabled(ilm.ModeAuto) assert.NoError(t, err) assert.True(t, b) }) t.Run("with ilm if enabled", func(t *testing.T) { h := newESClientHandler(t) - b, err := h.ILMEnabled(ilm.ModeEnabled) + b, err := h.CheckILMEnabled(ilm.ModeEnabled) assert.NoError(t, err) assert.True(t, b) }) @@ -151,7 +154,7 @@ func TestESClientHandler_Alias(t *testing.T) { }) } -func newESClientHandler(t *testing.T) ilm.APIHandler { +func newESClientHandler(t *testing.T) ilm.ClientHandler { client, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ URL: getURL(), Index: outil.MakeSelector(), @@ -168,7 +171,7 @@ func newESClientHandler(t *testing.T) ilm.APIHandler { t.Fatalf("Failed to connect to Test Elasticsearch instance: %v", err) } - return ilm.ESClientHandler(client) + return ilm.NewESClientHandler(client) } func makeName(base string) string { @@ -205,3 +208,80 @@ func getEnv(name, def string) string { } return def } + +func TestFileClientHandler_CheckILMEnabled(t *testing.T) { + for name, test := range map[string]struct { + m ilm.Mode + version string + enabled bool + err bool + }{ + "ilm enabled": { + m: ilm.ModeEnabled, + enabled: true, + }, + "ilm auto": { + m: ilm.ModeAuto, + enabled: true, + }, + "ilm disabled": { + m: ilm.ModeDisabled, + enabled: false, + }, + "ilm enabled, version too old": { + m: ilm.ModeEnabled, + version: "5.0.0", + err: true, + }, + "ilm auto, version too old": { + m: ilm.ModeAuto, + version: "5.0.0", + enabled: false, + }, + } { + t.Run(name, func(t *testing.T) { + h := ilm.NewFileClientHandler(newMockClient(test.version)) + b, err := h.CheckILMEnabled(test.m) + assert.Equal(t, test.enabled, b) + if test.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestFileClientHandler_CreateILMPolicy(t *testing.T) { + c := newMockClient("") + h := ilm.NewFileClientHandler(c) + name := "test-policy" + body := map[string]interface{}{"foo": "bar"} + h.CreateILMPolicy(ilm.Policy{Name: name, Body: body}) + + assert.Equal(t, name, c.name) + var out common.MapStr + json.Unmarshal([]byte(c.body), &out) + assert.Equal(t, common.MapStr{name: body}, out) +} + +type mockClient struct { + v common.Version + name, body string +} + +func newMockClient(v string) *mockClient { + if v == "" { + v = version.GetDefaultVersion() + } + return &mockClient{v: *common.MustNewVersion(v)} +} + +func (c *mockClient) GetVersion() common.Version { + return c.v +} + +func (c *mockClient) Write(name string, body string) error { + c.name, c.body = name, body + return nil +} diff --git a/libbeat/idxmgmt/ilm/ilm.go b/libbeat/idxmgmt/ilm/ilm.go index 98cc77b70b23..d7e803b5742c 100644 --- a/libbeat/idxmgmt/ilm/ilm.go +++ b/libbeat/idxmgmt/ilm/ilm.go @@ -39,10 +39,10 @@ type Supporter interface { Mode() Mode Alias() Alias Policy() Policy - Manager(h APIHandler) Manager + Manager(h ClientHandler) Manager } -// Manager uses an APIHandler to install a policy. +// Manager uses a ClientHandler to install a policy. type Manager interface { Enabled() (bool, error) @@ -55,17 +55,6 @@ type Manager interface { EnsurePolicy(overwrite bool) (created bool, err error) } -// APIHandler defines the interface between a remote service and the Manager. -type APIHandler interface { - ILMEnabled(Mode) (bool, error) - - HasAlias(name string) (bool, error) - CreateAlias(alias Alias) error - - HasILMPolicy(name string) (bool, error) - CreateILMPolicy(policy Policy) error -} - // Policy describes a policy to be loaded into Elasticsearch. // See: [Policy phases and actions documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/ilm-policy-definition.html). type Policy struct { diff --git a/libbeat/idxmgmt/ilm/ilm_test.go b/libbeat/idxmgmt/ilm/ilm_test.go index 8abbb36c425e..0ca72c866730 100644 --- a/libbeat/idxmgmt/ilm/ilm_test.go +++ b/libbeat/idxmgmt/ilm/ilm_test.go @@ -98,32 +98,32 @@ func TestDefaultSupport_Manager_Enabled(t *testing.T) { }, "disabled via handler": { calls: []onCall{ - onILMEnabled(ModeAuto).Return(false, nil), + onCheckILMEnabled(ModeAuto).Return(false, nil), }, }, "enabled via handler": { calls: []onCall{ - onILMEnabled(ModeAuto).Return(true, nil), + onCheckILMEnabled(ModeAuto).Return(true, nil), }, b: true, }, "handler confirms enabled flag": { calls: []onCall{ - onILMEnabled(ModeEnabled).Return(true, nil), + onCheckILMEnabled(ModeEnabled).Return(true, nil), }, cfg: map[string]interface{}{"enabled": true}, b: true, }, "fail enabled": { calls: []onCall{ - onILMEnabled(ModeEnabled).Return(false, nil), + onCheckILMEnabled(ModeEnabled).Return(false, nil), }, cfg: map[string]interface{}{"enabled": true}, fail: ErrESILMDisabled, }, "io error": { calls: []onCall{ - onILMEnabled(ModeAuto).Return(false, errors.New("ups")), + onCheckILMEnabled(ModeAuto).Return(false, errors.New("ups")), }, cfg: map[string]interface{}{}, err: true, @@ -275,7 +275,7 @@ func TestDefaultSupport_Manager_EnsurePolicy(t *testing.T) { } } -func createManager(t *testing.T, h APIHandler, cfg map[string]interface{}) Manager { +func createManager(t *testing.T, h ClientHandler, cfg map[string]interface{}) Manager { info := beat.Info{Beat: "test", Version: "9.9.9"} s, err := DefaultSupport(nil, info, common.MustNewConfigFrom(cfg)) require.NoError(t, err) diff --git a/libbeat/idxmgmt/ilm/mockapihandler_test.go b/libbeat/idxmgmt/ilm/mockapihandler_test.go index 6c1313b30913..a77f5086a3ae 100644 --- a/libbeat/idxmgmt/ilm/mockapihandler_test.go +++ b/libbeat/idxmgmt/ilm/mockapihandler_test.go @@ -44,8 +44,8 @@ func newMockHandler(calls ...onCall) *mockHandler { return m } -func onILMEnabled(m Mode) onCall { return makeOnCall("ILMEnabled", m) } -func (h *mockHandler) ILMEnabled(mode Mode) (bool, error) { +func onCheckILMEnabled(m Mode) onCall { return makeOnCall("CheckILMEnabled", m) } +func (h *mockHandler) CheckILMEnabled(mode Mode) (bool, error) { args := h.Called(mode) return args.Bool(0), args.Error(1) } diff --git a/libbeat/idxmgmt/ilm/noop.go b/libbeat/idxmgmt/ilm/noop.go index c6479596aff8..779bd16c5eaa 100644 --- a/libbeat/idxmgmt/ilm/noop.go +++ b/libbeat/idxmgmt/ilm/noop.go @@ -31,10 +31,10 @@ func NoopSupport(info beat.Info, config *common.Config) (Supporter, error) { return (*noopSupport)(nil), nil } -func (*noopSupport) Mode() Mode { return ModeDisabled } -func (*noopSupport) Alias() Alias { return Alias{} } -func (*noopSupport) Policy() Policy { return Policy{} } -func (*noopSupport) Manager(_ APIHandler) Manager { return (*noopManager)(nil) } +func (*noopSupport) Mode() Mode { return ModeDisabled } +func (*noopSupport) Alias() Alias { return Alias{} } +func (*noopSupport) Policy() Policy { return Policy{} } +func (*noopSupport) Manager(_ ClientHandler) Manager { return (*noopManager)(nil) } func (*noopManager) Enabled() (bool, error) { return false, nil } func (*noopManager) EnsureAlias() error { return errOf(ErrOpNotAvailable) } diff --git a/libbeat/idxmgmt/ilm/std.go b/libbeat/idxmgmt/ilm/std.go index 5709d03ca91d..c9622eb18472 100644 --- a/libbeat/idxmgmt/ilm/std.go +++ b/libbeat/idxmgmt/ilm/std.go @@ -36,7 +36,7 @@ type ilmSupport struct { type singlePolicyManager struct { *ilmSupport - client APIHandler + client ClientHandler // cached info cache infoCache @@ -71,7 +71,7 @@ func (s *ilmSupport) Mode() Mode { return s.mode } func (s *ilmSupport) Alias() Alias { return s.alias } func (s *ilmSupport) Policy() Policy { return s.policy } -func (s *ilmSupport) Manager(h APIHandler) Manager { +func (s *ilmSupport) Manager(h ClientHandler) Manager { return &singlePolicyManager{ client: h, ilmSupport: s, @@ -87,7 +87,7 @@ func (m *singlePolicyManager) Enabled() (bool, error) { return m.cache.Enabled, nil } - enabled, err := m.client.ILMEnabled(m.mode) + enabled, err := m.client.CheckILMEnabled(m.mode) if err != nil { return enabled, err } diff --git a/libbeat/idxmgmt/loadmode_string.go b/libbeat/idxmgmt/loadmode_string.go new file mode 100644 index 000000000000..0cb74b2242de --- /dev/null +++ b/libbeat/idxmgmt/loadmode_string.go @@ -0,0 +1,43 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by "stringer -linecomment -type LoadMode"; DO NOT EDIT. + +package idxmgmt + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[LoadModeUnset-0] + _ = x[LoadModeEnabled-1] + _ = x[LoadModeForce-2] + _ = x[LoadModeDisabled-3] +} + +const _LoadMode_name = "unsetenabledforcedisabled" + +var _LoadMode_index = [...]uint8{0, 5, 12, 17, 25} + +func (i LoadMode) String() string { + if i >= LoadMode(len(_LoadMode_index)-1) { + return "LoadMode(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _LoadMode_name[_LoadMode_index[i]:_LoadMode_index[i+1]] +} diff --git a/libbeat/idxmgmt/mockilm_test.go b/libbeat/idxmgmt/mockilm_test.go index 305d910717f0..db4c60493c1e 100644 --- a/libbeat/idxmgmt/mockilm_test.go +++ b/libbeat/idxmgmt/mockilm_test.go @@ -69,7 +69,7 @@ func (m *mockILMSupport) Policy() ilm.Policy { return args.Get(0).(ilm.Policy) } -func (m *mockILMSupport) Manager(_ ilm.APIHandler) ilm.Manager { +func (m *mockILMSupport) Manager(_ ilm.ClientHandler) ilm.Manager { return m } diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index e97e0799fd29..2159f1c87e0e 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -50,8 +50,8 @@ type indexManager struct { support *indexSupport ilm ilm.Manager - client ESClient - assets Asseter + clientHandler ClientHandler + assets Asseter } type indexSelector outil.Selector @@ -98,39 +98,16 @@ func (s *indexSupport) Enabled() bool { return s.templateCfg.Enabled || (s.ilm.Mode() != ilm.ModeDisabled) } -func (s *indexSupport) ILM() ilm.Supporter { - return s.ilm -} - -func (s *indexSupport) TemplateConfig(withILM bool) (template.TemplateConfig, error) { - log := s.log - - cfg := s.templateCfg - if withILM { - if mode := s.ilm.Mode(); mode == ilm.ModeDisabled { - withILM = false - } else if mode == ilm.ModeEnabled { - withILM = true - } - } - - var err error - if withILM { - cfg, err = applyILMSettings(log, cfg, s.ilm.Policy(), s.ilm.Alias()) - } - return cfg, err -} - func (s *indexSupport) Manager( - client ESClient, + clientHandler ClientHandler, assets Asseter, ) Manager { - ilm := s.ilm.Manager(ilm.ESClientHandler(client)) + ilm := s.ilm.Manager(clientHandler) return &indexManager{ - support: s, - ilm: ilm, - client: client, - assets: assets, + support: s, + ilm: ilm, + clientHandler: clientHandler, + assets: assets, } } @@ -200,15 +177,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector, }, nil } -func (m *indexManager) Setup(forceTemplate, forcePolicy bool) error { - return m.load(forceTemplate, forcePolicy) -} - -func (m *indexManager) Load() error { - return m.load(false, false) -} - -func (m *indexManager) load(forceTemplate, forcePolicy bool) error { +func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error { var err error log := m.support.log @@ -218,34 +187,47 @@ func (m *indexManager) load(forceTemplate, forcePolicy bool) error { if err != nil { return err } - + } + if loadILM == LoadModeUnset { if withILM { + loadILM = LoadModeEnabled log.Info("Auto ILM enable success.") + } else { + loadILM = LoadModeDisabled } } - // mark ILM as enabled in indexState if withILM is true - if withILM { - m.support.st.withILM.CAS(false, withILM) - } + if withILM && loadILM.Enabled() { + // mark ILM as enabled in indexState if withILM is true + m.support.st.withILM.CAS(false, true) - // install ilm policy - if withILM { - policyCreated, err := m.ilm.EnsurePolicy(forcePolicy) + // install ilm policy + policyCreated, err := m.ilm.EnsurePolicy(loadILM == LoadModeForce) if err != nil { return err } log.Info("ILM policy successfully loaded.") // The template should be updated if a new policy is created. - if policyCreated { - forceTemplate = true + if policyCreated && loadTemplate.Enabled() { + loadTemplate = LoadModeForce + } + + // create alias + if err := m.ilm.EnsureAlias(); err != nil { + if ilm.ErrReason(err) != ilm.ErrAliasAlreadyExists { + return err + } + log.Info("Write alias exists already") + } else { + log.Info("Write alias successfully generated.") } } // create and install template - if m.support.templateCfg.Enabled { + if m.support.templateCfg.Enabled && loadTemplate.Enabled() { tmplCfg := m.support.templateCfg + if withILM { ilm := m.support.ilm tmplCfg, err = applyILMSettings(log, tmplCfg, ilm.Policy(), ilm.Alias()) @@ -254,36 +236,20 @@ func (m *indexManager) load(forceTemplate, forcePolicy bool) error { } } - if forceTemplate { + if loadTemplate == LoadModeForce { tmplCfg.Overwrite = true } fields := m.assets.Fields(m.support.info.Beat) - loader, err := template.NewLoader(tmplCfg, m.client, m.support.info, fields, m.support.migration) - if err != nil { - return fmt.Errorf("Error creating Elasticsearch template loader: %v", err) - } - err = loader.Load() + err = m.clientHandler.Load(tmplCfg, m.support.info, fields, m.support.migration) if err != nil { - return fmt.Errorf("Error loading Elasticsearch template: %v", err) + return fmt.Errorf("error loading template: %v", err) } log.Info("Loaded index template.") } - // create alias - if withILM { - if err := m.ilm.EnsureAlias(); err != nil { - if ilm.ErrReason(err) != ilm.ErrAliasAlreadyExists { - return err - } - log.Info("Write alias exists already") - } else { - log.Info("Write alias successfully generated.") - } - } - return nil } diff --git a/libbeat/template/config.go b/libbeat/template/config.go index 53d9ac5eacb0..a982dbc45cb7 100644 --- a/libbeat/template/config.go +++ b/libbeat/template/config.go @@ -19,6 +19,7 @@ package template import "github.com/elastic/beats/libbeat/mapping" +// TemplateConfig holds config information about the Elasticsearch template type TemplateConfig struct { Enabled bool `config:"enabled"` Name string `config:"name"` @@ -32,8 +33,10 @@ type TemplateConfig struct { AppendFields mapping.Fields `config:"append_fields"` Overwrite bool `config:"overwrite"` Settings TemplateSettings `config:"settings"` + Order int `config:"order"` } +// TemplateSettings are part of the Elasticsearch template and hold index and source specific information. type TemplateSettings struct { Index map[string]interface{} `config:"index"` Source map[string]interface{} `config:"_source"` diff --git a/libbeat/template/load.go b/libbeat/template/load.go index 6e4288a760a2..d9fe54897a6c 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" "os" "github.com/elastic/beats/libbeat/beat" @@ -29,6 +30,16 @@ import ( "github.com/elastic/beats/libbeat/paths" ) +//Loader interface for loading templates +type Loader interface { + Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error +} + +// ESLoader implements Loader interface for loading templates to Elasticsearch. +type ESLoader struct { + client ESClient +} + // ESClient is a subset of the Elasticsearch client API capable of // loading the template. type ESClient interface { @@ -36,135 +47,173 @@ type ESClient interface { GetVersion() common.Version } -// Loader is a template loader capable of loading the template using -// Elasticsearch Client API -type Loader struct { - config TemplateConfig - client ESClient - beatInfo beat.Info - fields []byte - migration bool -} - -// NewLoader creates a new template loader -func NewLoader( - config TemplateConfig, - client ESClient, - beatInfo beat.Info, - fields []byte, - migration bool, -) (*Loader, error) { - return &Loader{ - config: config, - client: client, - beatInfo: beatInfo, - fields: fields, - migration: migration, - }, nil +// FileLoader implements Loader interface for loading templates to a File. +type FileLoader struct { + client FileClient +} + +// FileClient defines the minimal interface required for the FileLoader +type FileClient interface { + GetVersion() common.Version + Write(name string, body string) error +} + +// NewESLoader creates a new template loader for ES +func NewESLoader(client ESClient) *ESLoader { + return &ESLoader{client: client} +} + +// NewFileLoader creates a new template loader for the given file. +func NewFileLoader(c FileClient) *FileLoader { + return &FileLoader{client: c} } // Load checks if the index mapping template should be loaded // In case the template is not already loaded or overwriting is enabled, the -// template is written to index -func (l *Loader) Load() error { - tmpl, err := New(l.beatInfo.Version, l.beatInfo.IndexPrefix, l.client.GetVersion(), l.config, l.migration) - if err != nil { - return fmt.Errorf("error creating template instance: %v", err) +// template is built and written to index +func (l *ESLoader) Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error { + //build template from config + tmpl, err := template(config, info, l.client.GetVersion(), migration) + if err != nil || tmpl == nil { + return err } + // Check if template already exist or should be overwritten templateName := tmpl.GetName() - if l.config.JSON.Enabled { - templateName = l.config.JSON.Name + if config.JSON.Enabled { + templateName = config.JSON.Name } - // Check if template already exist or should be overwritten - exists := l.CheckTemplate(templateName) - if !exists || l.config.Overwrite { - version := l.client.GetVersion() - logp.Info("Loading template for Elasticsearch version: %s", version.String()) - if l.config.Overwrite { - logp.Info("Existing template will be overwritten, as overwrite is enabled.") - } - - var template map[string]interface{} - if l.config.JSON.Enabled { - jsonPath := paths.Resolve(paths.Config, l.config.JSON.Path) - if _, err := os.Stat(jsonPath); err != nil { - return fmt.Errorf("error checking for json template: %s", err) - } - - logp.Info("Loading json template from file %s", jsonPath) - - content, err := ioutil.ReadFile(jsonPath) - if err != nil { - return fmt.Errorf("error reading file. Path: %s, Error: %s", jsonPath, err) - - } - err = json.Unmarshal(content, &template) - if err != nil { - return fmt.Errorf("could not unmarshal json template: %s", err) - } - // Load fields from path - } else if l.config.Fields != "" { - logp.Debug("template", "Load fields.yml from file: %s", l.config.Fields) - - fieldsPath := paths.Resolve(paths.Config, l.config.Fields) - - template, err = tmpl.LoadFile(fieldsPath) - if err != nil { - return fmt.Errorf("error creating template from file %s: %v", fieldsPath, err) - } - } else { - logp.Debug("template", "Load default fields.yml") - template, err = tmpl.LoadBytes(l.fields) - if err != nil { - return fmt.Errorf("error creating template: %v", err) - } - } - - err = l.LoadTemplate(templateName, template) - if err != nil { - return fmt.Errorf("could not load template. Elasticsearch returned: %v. Template is: %s", err, template) - } - - } else { - logp.Info("Template already exists and will not be overwritten.") + if l.templateExists(templateName) && !config.Overwrite { + logp.Info("Template %s already exists and will not be overwritten.", templateName) + return nil } + //loading template to ES + body, err := buildBody(tmpl, config, fields) + if err != nil { + return err + } + if err := l.loadTemplate(templateName, body); err != nil { + return fmt.Errorf("could not load template. Elasticsearch returned: %v. Template is: %s", err, body.StringToPrint()) + } + logp.Info("template with name '%s' loaded.", templateName) return nil } -// LoadTemplate loads a template into Elasticsearch overwriting the existing +// loadTemplate loads a template into Elasticsearch overwriting the existing // template if it exists. If you wish to not overwrite an existing template // then use CheckTemplate prior to calling this method. -func (l *Loader) LoadTemplate(templateName string, template map[string]interface{}) error { - logp.Debug("template", "Try loading template with name: %s", templateName) +func (l *ESLoader) loadTemplate(templateName string, template map[string]interface{}) error { + logp.Info("Try loading template %s to Elasticsearch", templateName) path := "/_template/" + templateName - body, err := loadJSON(l.client, path, template) + params := esVersionParams(l.client.GetVersion()) + status, body, err := l.client.Request("PUT", path, "", params, template) if err != nil { return fmt.Errorf("couldn't load template: %v. Response body: %s", err, body) } - logp.Info("Elasticsearch template with name '%s' loaded", templateName) + if status > http.StatusMultipleChoices { //http status 300 + return fmt.Errorf("couldn't load json. Status: %v", status) + } return nil } -// CheckTemplate checks if a given template already exist. It returns true if +// templateExists checks if a given template already exist. It returns true if // and only if Elasticsearch returns with HTTP status code 200. -func (l *Loader) CheckTemplate(templateName string) bool { +func (l *ESLoader) templateExists(templateName string) bool { + if l.client == nil { + return false + } status, _, _ := l.client.Request("HEAD", "/_template/"+templateName, "", nil, nil) - return status == 200 + if status != http.StatusOK { + return false + } + return true +} + +// Load reads the template from the config, creates the template body and prints it to the configured file. +func (l *FileLoader) Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error { + //build template from config + tmpl, err := template(config, info, l.client.GetVersion(), migration) + if err != nil || tmpl == nil { + return err + } + + //create body to print + body, err := buildBody(tmpl, config, fields) + if err != nil { + return err + } + + p := common.MapStr{tmpl.name: body} + str := fmt.Sprintf("%s\n", p.StringToPrint()) + if err := l.client.Write(tmpl.name, str); err != nil { + return fmt.Errorf("error printing template: %v", err) + } + return nil } -func loadJSON(client ESClient, path string, json map[string]interface{}) ([]byte, error) { - params := esVersionParams(client.GetVersion()) - status, body, err := client.Request("PUT", path, "", params, json) +func template(config TemplateConfig, info beat.Info, esVersion common.Version, migration bool) (*Template, error) { + if !config.Enabled { + logp.Info("template config not enabled") + return nil, nil + } + tmpl, err := New(info.Version, info.IndexPrefix, esVersion, config, migration) if err != nil { - return body, fmt.Errorf("couldn't load json. Error: %s", err) + return nil, fmt.Errorf("error creating template instance: %v", err) } - if status > 300 { - return body, fmt.Errorf("couldn't load json. Status: %v", status) + return tmpl, nil +} + +func buildBody(tmpl *Template, config TemplateConfig, fields []byte) (common.MapStr, error) { + if config.Overwrite { + logp.Info("Existing template will be overwritten, as overwrite is enabled.") } + if config.JSON.Enabled { + return buildBodyFromJSON(config) + } + if config.Fields != "" { + return buildBodyFromFile(tmpl, config) + } + return buildBodyFromFields(tmpl, fields) +} + +func buildBodyFromJSON(config TemplateConfig) (common.MapStr, error) { + jsonPath := paths.Resolve(paths.Config, config.JSON.Path) + if _, err := os.Stat(jsonPath); err != nil { + return nil, fmt.Errorf("error checking json file %s for template: %v", jsonPath, err) + } + logp.Debug("template", "Loading json template from file %s", jsonPath) + content, err := ioutil.ReadFile(jsonPath) + if err != nil { + return nil, fmt.Errorf("error reading file %s for template: %v", jsonPath, err) + + } + var body map[string]interface{} + err = json.Unmarshal(content, &body) + if err != nil { + return nil, fmt.Errorf("could not unmarshal json template: %s", err) + } + return body, nil +} + +func buildBodyFromFile(tmpl *Template, config TemplateConfig) (common.MapStr, error) { + logp.Debug("template", "Load fields.yml from file: %s", config.Fields) + fieldsPath := paths.Resolve(paths.Config, config.Fields) + body, err := tmpl.LoadFile(fieldsPath) + if err != nil { + return nil, fmt.Errorf("error creating template from file %s: %v", fieldsPath, err) + } + return body, nil +} + +func buildBodyFromFields(tmpl *Template, fields []byte) (common.MapStr, error) { + logp.Debug("template", "Load default fields") + body, err := tmpl.LoadBytes(fields) + if err != nil { + return nil, fmt.Errorf("error creating template: %v", err) + } return body, nil } diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index b1f4e01c4a5e..e33f8aae294a 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -41,26 +41,36 @@ type testTemplate struct { common.MapStr } -func TestCheckTemplate(t *testing.T) { +var ( + beatInfo = beat.Info{ + Beat: "testbeat", + IndexPrefix: "testbeatidx", + Version: version.GetDefaultVersion(), + } + + templateName = "testbeatidx-" + version.GetDefaultVersion() +) + +func defaultESLoader(t *testing.T) *ESLoader { client := estest.GetTestingElasticsearch(t) if err := client.Connect(); err != nil { t.Fatal(err) } - loader := &Loader{ - client: client, - } + return NewESLoader(client) +} + +func TestCheckTemplate(t *testing.T) { + loader := defaultESLoader(t) // Check for non existent template - assert.False(t, loader.CheckTemplate("libbeat-notexists")) + assert.False(t, loader.templateExists("libbeat-notexists")) } func TestLoadTemplate(t *testing.T) { // Setup ES - client := estest.GetTestingElasticsearch(t) - if err := client.Connect(); err != nil { - t.Fatal(err) - } + loader := defaultESLoader(t) + client := loader.client // Load template absPath, err := filepath.Abs("../") @@ -71,26 +81,22 @@ func TestLoadTemplate(t *testing.T) { index := "testbeat" tmpl, err := New(version.GetDefaultVersion(), index, client.GetVersion(), TemplateConfig{}, false) - assert.NoError(t, err) + require.NoError(t, err) content, err := tmpl.LoadFile(fieldsPath) - assert.NoError(t, err) - - loader := &Loader{ - client: client, - } + require.NoError(t, err) // Load template - err = loader.LoadTemplate(tmpl.GetName(), content) - assert.Nil(t, err) + err = loader.loadTemplate(tmpl.GetName(), content) + require.NoError(t, err) // Make sure template was loaded - assert.True(t, loader.CheckTemplate(tmpl.GetName())) + assert.True(t, loader.templateExists(tmpl.GetName())) // Delete template again to clean up client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) // Make sure it was removed - assert.False(t, loader.CheckTemplate(tmpl.GetName())) + assert.False(t, loader.templateExists(tmpl.GetName())) } func TestLoadInvalidTemplate(t *testing.T) { @@ -100,23 +106,16 @@ func TestLoadInvalidTemplate(t *testing.T) { } // Setup ES - client := estest.GetTestingElasticsearch(t) - if err := client.Connect(); err != nil { - t.Fatal(err) - } + loader := defaultESLoader(t) templateName := "invalidtemplate" - loader := &Loader{ - client: client, - } - // Try to load invalid template - err := loader.LoadTemplate(templateName, template) + err := loader.loadTemplate(templateName, template) assert.Error(t, err) // Make sure template was not loaded - assert.False(t, loader.CheckTemplate(templateName)) + assert.False(t, loader.templateExists(templateName)) } // Tests loading the templates for each beat @@ -132,10 +131,8 @@ func TestLoadBeatsTemplate(t *testing.T) { assert.Nil(t, err) // Setup ES - client := estest.GetTestingElasticsearch(t) - if err := client.Connect(); err != nil { - t.Fatal(err) - } + loader := defaultESLoader(t) + client := loader.client fieldsPath := absPath + "/fields.yml" index := beat @@ -145,31 +142,25 @@ func TestLoadBeatsTemplate(t *testing.T) { content, err := tmpl.LoadFile(fieldsPath) assert.NoError(t, err) - loader := &Loader{ - client: client, - } - // Load template - err = loader.LoadTemplate(tmpl.GetName(), content) + err = loader.loadTemplate(tmpl.GetName(), content) assert.Nil(t, err) // Make sure template was loaded - assert.True(t, loader.CheckTemplate(tmpl.GetName())) + assert.True(t, loader.templateExists(tmpl.GetName())) // Delete template again to clean up client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) // Make sure it was removed - assert.False(t, loader.CheckTemplate(tmpl.GetName())) + assert.False(t, loader.templateExists(tmpl.GetName())) } } func TestTemplateSettings(t *testing.T) { // Setup ES - client := estest.GetTestingElasticsearch(t) - if err := client.Connect(); err != nil { - t.Fatal(err) - } + loader := defaultESLoader(t) + client := loader.client // Load template absPath, err := filepath.Abs("../") @@ -194,12 +185,8 @@ func TestTemplateSettings(t *testing.T) { content, err := tmpl.LoadFile(fieldsPath) assert.NoError(t, err) - loader := &Loader{ - client: client, - } - // Load template - err = loader.LoadTemplate(tmpl.GetName(), content) + err = loader.loadTemplate(tmpl.GetName(), content) assert.Nil(t, err) // Check that it contains the mapping @@ -211,21 +198,14 @@ func TestTemplateSettings(t *testing.T) { client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) // Make sure it was removed - assert.False(t, loader.CheckTemplate(tmpl.GetName())) + assert.False(t, loader.templateExists(tmpl.GetName())) } func TestOverwrite(t *testing.T) { // Setup ES - client := estest.GetTestingElasticsearch(t) - if err := client.Connect(); err != nil { - t.Fatal(err) - } + loader := defaultESLoader(t) + client := loader.client - beatInfo := beat.Info{ - Beat: "testbeat", - IndexPrefix: "testbeatidx", - Version: version.GetDefaultVersion(), - } templateName := "testbeatidx-" + version.GetDefaultVersion() absPath, err := filepath.Abs("../") @@ -240,9 +220,7 @@ func TestOverwrite(t *testing.T) { Enabled: true, Fields: absPath + "/fields.yml", } - loader, err := NewLoader(config, client, beatInfo, nil, false) - assert.NoError(t, err) - err = loader.Load() + err = loader.Load(config, beatInfo, nil, false) assert.NoError(t, err) // Load template again, this time with custom settings @@ -255,9 +233,8 @@ func TestOverwrite(t *testing.T) { }, }, } - loader, err = NewLoader(config, client, beatInfo, nil, false) - assert.NoError(t, err) - err = loader.Load() + + err = loader.Load(config, beatInfo, nil, false) assert.NoError(t, err) // Overwrite was not enabled, so the first version should still be there @@ -275,9 +252,7 @@ func TestOverwrite(t *testing.T) { }, }, } - loader, err = NewLoader(config, client, beatInfo, nil, false) - assert.NoError(t, err) - err = loader.Load() + err = loader.Load(config, beatInfo, nil, false) assert.NoError(t, err) // Overwrite was enabled, so the custom setting should be there @@ -338,22 +313,22 @@ func TestTemplateWithData(t *testing.T) { // Setup ES client := estest.GetTestingElasticsearch(t) + if err := client.Connect(); err != nil { + t.Fatal(err) + } + loader := NewESLoader(client) tmpl, err := New(version.GetDefaultVersion(), "testindex", client.GetVersion(), TemplateConfig{}, false) assert.NoError(t, err) content, err := tmpl.LoadFile(fieldsPath) assert.NoError(t, err) - loader := &Loader{ - client: client, - } - // Load template - err = loader.LoadTemplate(tmpl.GetName(), content) + err = loader.loadTemplate(tmpl.GetName(), content) assert.Nil(t, err) // Make sure template was loaded - assert.True(t, loader.CheckTemplate(tmpl.GetName())) + assert.True(t, loader.templateExists(tmpl.GetName())) for _, test := range dataTests { _, _, err = client.Index(tmpl.GetName(), "_doc", "", nil, test.data) @@ -369,7 +344,7 @@ func TestTemplateWithData(t *testing.T) { client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) // Make sure it was removed - assert.False(t, loader.CheckTemplate(tmpl.GetName())) + assert.False(t, loader.templateExists(tmpl.GetName())) } func getTemplate(t *testing.T, client ESClient, templateName string) testTemplate { diff --git a/libbeat/template/load_test.go b/libbeat/template/load_test.go new file mode 100644 index 000000000000..22566229ab94 --- /dev/null +++ b/libbeat/template/load_test.go @@ -0,0 +1,94 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package template + +import ( + "fmt" + "testing" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/version" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFileLoader_Load(t *testing.T) { + ver := "7.0.0" + prefix := "mock" + info := beat.Info{Version: ver, IndexPrefix: prefix} + + for name, test := range map[string]struct { + cfg TemplateConfig + fields []byte + migration bool + + name string + }{ + "default config": { + cfg: DefaultConfig(), + name: fmt.Sprintf("%s-%s", prefix, ver), + migration: false, + }, + "default config with migration": { + cfg: DefaultConfig(), + name: fmt.Sprintf("%s-%s", prefix, ver), + migration: true, + }, + } { + t.Run(name, func(t *testing.T) { + fc, err := newFileClient(ver) + require.NoError(t, err) + fl := NewFileLoader(fc) + err = fl.Load(test.cfg, info, test.fields, false) + require.NoError(t, err) + + tmpl, err := New(ver, prefix, *common.MustNewVersion(ver), test.cfg, test.migration) + require.NoError(t, err) + body, err := buildBody(tmpl, test.cfg, test.fields) + require.NoError(t, err) + assert.Equal(t, common.MapStr{test.name: body}.StringToPrint()+"\n", fc.body) + }) + } +} + +type fileClient struct { + ver common.Version + body string +} + +func newFileClient(ver string) (*fileClient, error) { + if ver == "" { + ver = version.GetDefaultVersion() + } + v, err := common.NewVersion(ver) + if err != nil { + return nil, err + } + return &fileClient{ver: *v}, nil +} + +func (c *fileClient) GetVersion() common.Version { + return c.ver +} + +func (c *fileClient) Write(name string, body string) error { + c.body = body + return nil +} diff --git a/libbeat/template/template.go b/libbeat/template/template.go index d0b7d7d3d128..119599a988a2 100644 --- a/libbeat/template/template.go +++ b/libbeat/template/template.go @@ -42,6 +42,7 @@ var ( defaultFields []string ) +// Template holds information for the ES template. type Template struct { sync.Mutex name string @@ -51,6 +52,7 @@ type Template struct { esVersion common.Version config TemplateConfig migration bool + order int } // New creates a new template instance @@ -127,6 +129,7 @@ func New( beatName: beatName, config: config, migration: migration, + order: config.Order, }, nil } @@ -169,7 +172,7 @@ func (t *Template) LoadFile(file string) (common.MapStr, error) { return t.load(fields) } -// LoadBytes loads the the template from the given byte array +// LoadBytes loads the template from the given byte array func (t *Template) LoadBytes(data []byte) (common.MapStr, error) { fields, err := loadYamlByte(data) if err != nil { @@ -193,19 +196,14 @@ func (t *Template) GetPattern() string { // The default values are taken from the default variable. func (t *Template) Generate(properties common.MapStr, dynamicTemplates []common.MapStr) common.MapStr { keyPattern, patterns := buildPatternSettings(t.esVersion, t.GetPattern()) - return common.MapStr{ keyPattern: patterns, - + "order": t.order, "mappings": buildMappings( t.beatVersion, t.esVersion, t.beatName, properties, append(dynamicTemplates, buildDynTmpl(t.esVersion)), - common.MapStr(t.config.Settings.Source), - ), - - "order": 1, - + common.MapStr(t.config.Settings.Source)), "settings": common.MapStr{ "index": buildIdxSettings( t.esVersion, diff --git a/libbeat/template/template_test.go b/libbeat/template/template_test.go index 814656f91850..88221d600424 100644 --- a/libbeat/template/template_test.go +++ b/libbeat/template/template_test.go @@ -84,7 +84,7 @@ func TestTemplate(t *testing.T) { template, err := New(beatVersion, beatName, *ver, TemplateConfig{}, false) assert.NoError(t, err) - data := template.Generate(nil, nil) + data := template.Generate(common.MapStr{}, nil) assert.Equal(t, []string{"testbeat-6.6.0-*"}, data["index_patterns"]) meta, err := data.GetValue("mappings.doc._meta") assert.NoError(t, err) diff --git a/libbeat/tests/system/test_dashboard.py b/libbeat/tests/system/test_dashboard.py index 9822ef941a7d..94088a6ccb64 100644 --- a/libbeat/tests/system/test_dashboard.py +++ b/libbeat/tests/system/test_dashboard.py @@ -188,7 +188,7 @@ def test_export_dashboard_cmd_export_dashboard_by_id_unknown_id(self): beat.check_wait(exit_code=1) - assert self.log_contains("Error getting dashboard: error exporting dashboard: Not found") is True + assert self.log_contains("error exporting dashboard: Not found") is True @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') @@ -242,7 +242,7 @@ def test_export_dashboard_cmd_export_dashboard_from_not_existent_yml(self): ) beat.check_wait(exit_code=1) - assert self.log_contains("Error getting dashboards from yml") + assert self.log_contains("Error exporting dashboards from yml") assert self.log_contains("error opening the list of dashboards") @unittest.skipUnless(INTEGRATION_TESTS, "integration test") diff --git a/libbeat/tests/system/test_ilm.py b/libbeat/tests/system/test_ilm.py index 069093253a2b..c765dce28cc6 100644 --- a/libbeat/tests/system/test_ilm.py +++ b/libbeat/tests/system/test_ilm.py @@ -202,6 +202,7 @@ def test_setup_ilm_policy(self): extra_args=["setup", "--ilm-policy", "-path.config", self.working_dir, + "-E", "setup.ilm.policy_name=" + self.policy_name, "-E", "output.elasticsearch.hosts=['" + self.get_elasticsearch_url() + "']"], config="libbeat.yml") diff --git a/libbeat/tests/system/test_template.py b/libbeat/tests/system/test_template.py index 66a67f4c0a0f..47e9162ac602 100644 --- a/libbeat/tests/system/test_template.py +++ b/libbeat/tests/system/test_template.py @@ -102,7 +102,7 @@ def test_json_template(self): proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains("Loading json template from file")) - self.wait_until(lambda: self.log_contains("Elasticsearch template with name 'bla' loaded")) + self.wait_until(lambda: self.log_contains("template with name 'bla' loaded")) proc.check_kill_and_wait() es = Elasticsearch([self.get_elasticsearch_url()]) diff --git a/metricbeat/cmd/test/modules.go b/metricbeat/cmd/test/modules.go index 14565805a591..ef28beae23b0 100644 --- a/metricbeat/cmd/test/modules.go +++ b/metricbeat/cmd/test/modules.go @@ -43,13 +43,7 @@ func GenTestModulesCmd(name, beatVersion string) *cobra.Command { filter_metricset = args[1] } - b, err := instance.NewBeat(name, "", beatVersion) - if err != nil { - fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) - os.Exit(1) - } - - err = b.Init() + b, err := instance.NewInitializedBeat(instance.Settings{Name: name, Version: beatVersion}) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) os.Exit(1) diff --git a/metricbeat/tests/system/test_template.py b/metricbeat/tests/system/test_template.py index ad268c4eb3a7..335c435119cd 100644 --- a/metricbeat/tests/system/test_template.py +++ b/metricbeat/tests/system/test_template.py @@ -41,8 +41,9 @@ def test_export_template(self): break t = json.loads(template_content) - - properties = t["mappings"]["properties"] + keys = [k for k, v in t.iteritems() if k.startswith("metricbeat")] + assert len(keys) == 1 + properties = t[keys[0]]["mappings"]["properties"] # Check libbeat fields assert properties["@timestamp"] == {"type": "date"} diff --git a/x-pack/functionbeat/cmd/provider_cmd.go b/x-pack/functionbeat/cmd/provider_cmd.go index a519160f75b5..66864af45766 100644 --- a/x-pack/functionbeat/cmd/provider_cmd.go +++ b/x-pack/functionbeat/cmd/provider_cmd.go @@ -20,15 +20,11 @@ var output string // TODO: Add List() subcommand. func handler() (*cliHandler, error) { - b, err := instance.NewBeat(Name, "", "") + b, err := instance.NewInitializedBeat(instance.Settings{Name: Name}) if err != nil { return nil, err } - if err = b.Init(); err != nil { - return nil, err - } - c, err := b.BeatConfig() if err != nil { return nil, err diff --git a/x-pack/libbeat/cmd/enroll.go b/x-pack/libbeat/cmd/enroll.go index 7cb46506d4e3..68d5689fadf4 100644 --- a/x-pack/libbeat/cmd/enroll.go +++ b/x-pack/libbeat/cmd/enroll.go @@ -18,16 +18,10 @@ import ( ) func getBeat(name, version string) (*instance.Beat, error) { - b, err := instance.NewBeat(name, "", version) - + b, err := instance.NewInitializedBeat(instance.Settings{Name: name, Version: version}) if err != nil { return nil, fmt.Errorf("error creating beat: %s", err) } - - if err = b.Init(); err != nil { - return nil, fmt.Errorf("error initializing beat: %s", err) - } - return b, nil }