From bffeeb5b86dd06e20eae58990b9c58b9bfc8102e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Wed, 19 Sep 2018 12:48:35 +0200 Subject: [PATCH] Add central management service (#8263) * Add config manager initial skeleton Config manager will poll configs from Kibana and apply them locally. It must be started with the beat. In order to check the user is not trying to override configurations provided by central management, the Config Manager can check the exisitng configuration and return errors if something is wrong. --- libbeat/beat/beat.go | 3 + libbeat/cmd/instance/beat.go | 21 +- libbeat/common/reload/reload.go | 28 +-- libbeat/common/reload/reload_test.go | 16 +- libbeat/management/management.go | 84 ++++++++ metricbeat/beater/metricbeat.go | 18 +- x-pack/libbeat/cmd/inject.go | 7 +- x-pack/libbeat/management/api/client.go | 13 +- x-pack/libbeat/management/api/client_test.go | 2 + .../libbeat/management/api/configuration.go | 49 ++++- .../management/api/configuration_test.go | 102 ++++++++- x-pack/libbeat/management/config.go | 18 +- x-pack/libbeat/management/enroll.go | 12 +- x-pack/libbeat/management/manager.go | 200 ++++++++++++++++++ x-pack/libbeat/management/manager_test.go | 93 ++++++++ 15 files changed, 620 insertions(+), 46 deletions(-) create mode 100644 libbeat/management/management.go create mode 100644 x-pack/libbeat/management/manager.go create mode 100644 x-pack/libbeat/management/manager_test.go diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 84e8c4288043..9423b774dd10 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -19,6 +19,7 @@ package beat import ( "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/management" ) // Creator initializes and configures a new Beater instance used to execute @@ -64,6 +65,8 @@ type Beat struct { BeatConfig *common.Config // The beat's own configuration section Fields []byte // Data from fields.yml + + ConfigManager management.ConfigManager // config manager } // BeatConfig struct contains the basic configuration of every beat diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 62fda7c43ea4..b8a7ba72e4d8 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -35,6 +35,9 @@ import ( "github.com/satori/go.uuid" "go.uber.org/zap" + "github.com/elastic/go-sysinfo" + "github.com/elastic/go-sysinfo/types" + "github.com/elastic/beats/libbeat/api" "github.com/elastic/beats/libbeat/asset" "github.com/elastic/beats/libbeat/beat" @@ -43,11 +46,13 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/file" + "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/common/seccomp" "github.com/elastic/beats/libbeat/dashboards" "github.com/elastic/beats/libbeat/keystore" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/logp/configure" + "github.com/elastic/beats/libbeat/management" "github.com/elastic/beats/libbeat/metric/system/host" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/monitoring/report" @@ -59,8 +64,6 @@ import ( svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/template" "github.com/elastic/beats/libbeat/version" - "github.com/elastic/go-sysinfo" - "github.com/elastic/go-sysinfo/types" // Register publisher pipeline modules _ "github.com/elastic/beats/libbeat/publisher/includes" @@ -385,6 +388,10 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { api.Start(b.Config.HTTP) } + // Launch config manager + b.ConfigManager.Start() + defer b.ConfigManager.Stop() + return beater.Run(&b.Beat) } @@ -565,6 +572,16 @@ func (b *Beat) configure() error { logp.Info("Beat UUID: %v", b.Info.UUID) + // initialize config manager + b.ConfigManager, err = management.Factory()(reload.Register, b.Beat.Info.UUID) + if err != nil { + return err + } + + if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil { + return err + } + if maxProcs := b.Config.MaxProcs; maxProcs > 0 { runtime.GOMAXPROCS(maxProcs) } diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index b3f61c267271..1f8b3ab4a738 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -26,7 +26,7 @@ import ( ) // Register holds a registry of reloadable objects -var Register = newRegistry() +var Register = NewRegistry() // ConfigWithMeta holds a pair of common.Config and optional metadata for it type ConfigWithMeta struct { @@ -47,21 +47,23 @@ type Reloadable interface { Reload(config *ConfigWithMeta) error } -type registry struct { +// Registry of reloadable objects and lists +type Registry struct { sync.RWMutex confsLists map[string]ReloadableList confs map[string]Reloadable } -func newRegistry() *registry { - return ®istry{ +// NewRegistry initializes and returns a reload registry +func NewRegistry() *Registry { + return &Registry{ confsLists: make(map[string]ReloadableList), confs: make(map[string]Reloadable), } } // Register declares a reloadable object -func (r *registry) Register(name string, obj Reloadable) error { +func (r *Registry) Register(name string, obj Reloadable) error { r.Lock() defer r.Unlock() @@ -78,7 +80,7 @@ func (r *registry) Register(name string, obj Reloadable) error { } // RegisterList declares a reloadable list of configurations -func (r *registry) RegisterList(name string, list ReloadableList) error { +func (r *Registry) RegisterList(name string, list ReloadableList) error { r.Lock() defer r.Unlock() @@ -95,34 +97,34 @@ func (r *registry) RegisterList(name string, list ReloadableList) error { } // MustRegister declares a reloadable object -func (r *registry) MustRegister(name string, obj Reloadable) { +func (r *Registry) MustRegister(name string, obj Reloadable) { if err := r.Register(name, obj); err != nil { panic(err) } } // MustRegisterList declares a reloadable object list -func (r *registry) MustRegisterList(name string, list ReloadableList) { +func (r *Registry) MustRegisterList(name string, list ReloadableList) { if err := r.RegisterList(name, list); err != nil { panic(err) } } -// Get returns the reloadable object with the given name, nil if not found -func (r *registry) Get(name string) Reloadable { +// GetReloadable returns the reloadable object with the given name, nil if not found +func (r *Registry) GetReloadable(name string) Reloadable { r.RLock() defer r.RUnlock() return r.confs[name] } -// GetList returns the reloadable list with the given name, nil if not found -func (r *registry) GetList(name string) ReloadableList { +// GetReloadableList returns the reloadable list with the given name, nil if not found +func (r *Registry) GetReloadableList(name string) ReloadableList { r.RLock() defer r.RUnlock() return r.confsLists[name] } -func (r *registry) nameTaken(name string) bool { +func (r *Registry) nameTaken(name string) bool { if _, ok := r.confs[name]; ok { return true } diff --git a/libbeat/common/reload/reload_test.go b/libbeat/common/reload/reload_test.go index e2238895375e..04c478f74761 100644 --- a/libbeat/common/reload/reload_test.go +++ b/libbeat/common/reload/reload_test.go @@ -29,26 +29,26 @@ type reloadableList struct{} func (reloadable) Reload(config *ConfigWithMeta) error { return nil } func (reloadableList) Reload(config []*ConfigWithMeta) error { return nil } -func RegisterReloadable(t *testing.T) { +func TestRegisterReloadable(t *testing.T) { obj := reloadable{} - r := newRegistry() + r := NewRegistry() r.Register("my.reloadable", obj) - assert.Equal(t, obj, r.Get("my.reloadable")) + assert.Equal(t, obj, r.GetReloadable("my.reloadable")) } -func RegisterReloadableList(t *testing.T) { +func TestRegisterReloadableList(t *testing.T) { objl := reloadableList{} - r := newRegistry() + r := NewRegistry() r.RegisterList("my.reloadable", objl) - assert.Equal(t, objl, r.Get("my.reloadable")) + assert.Equal(t, objl, r.GetReloadableList("my.reloadable")) } func TestRegisterNilFails(t *testing.T) { - r := newRegistry() + r := NewRegistry() err := r.Register("name", nil) assert.Error(t, err) @@ -58,7 +58,7 @@ func TestRegisterNilFails(t *testing.T) { } func TestReRegisterFails(t *testing.T) { - r := newRegistry() + r := NewRegistry() // two obj with the same name err := r.Register("name", reloadable{}) diff --git a/libbeat/management/management.go b/libbeat/management/management.go new file mode 100644 index 000000000000..4a574f79f343 --- /dev/null +++ b/libbeat/management/management.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package management + +import ( + "github.com/satori/go.uuid" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/libbeat/feature" +) + +// Namespace is the feature namespace for queue definition. +var Namespace = "libbeat.management" + +// DebugK used as key for all things central management +var DebugK = "centralmgmt" + +// ConfigManager interacts with the beat to update configurations +// from an external source +type ConfigManager interface { + // Enabled returns true if config manager is enabled + Enabled() bool + + // Start the config manager + Start() + + // Stop the config manager + Stop() + + // CheckRawConfig check settings are correct before launching the beat + CheckRawConfig(cfg *common.Config) error +} + +// FactoryFunc for creating a config manager +type FactoryFunc func(*reload.Registry, uuid.UUID) (ConfigManager, error) + +// Register a config manager +func Register(name string, fn FactoryFunc, stability feature.Stability) { + f := feature.New(Namespace, name, fn, feature.NewDetails(name, "", stability)) + feature.MustRegister(f) +} + +// Factory retrieves config manager constructor. If no one is registered +// it will create a nil manager +func Factory() FactoryFunc { + factories, err := feature.Registry.LookupAll(Namespace) + if err != nil { + return nilFactory + } + + for _, f := range factories { + if factory, ok := f.Factory().(FactoryFunc); ok { + return factory + } + } + + return nilFactory +} + +// nilManager, fallback when no manager is present +type nilManager struct{} + +func nilFactory(*reload.Registry, uuid.UUID) (ConfigManager, error) { return nilManager{}, nil } + +func (nilManager) Enabled() bool { return false } +func (nilManager) Start() {} +func (nilManager) Stop() {} +func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil } diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index fd5ea64031fa..8c0951bbb0f2 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -20,6 +20,9 @@ package beater import ( "sync" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/libbeat/management" + "github.com/joeshaw/multierror" "github.com/pkg/errors" @@ -187,6 +190,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe func (bt *Metricbeat) Run(b *beat.Beat) error { var wg sync.WaitGroup + // Static modules (metricbeat.modules) for _, m := range bt.modules { client, err := m.connector.Connect() if err != nil { @@ -203,9 +207,20 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { }() } + // Centrally managed modules + factory := module.NewFactory(bt.moduleOptions...) + modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher) + reload.Register.MustRegisterList("metricbeat.modules", modules) + wg.Add(1) + go func() { + defer wg.Done() + <-bt.done + modules.Stop() + }() + + // Dynamic file based modules (metricbeat.config.modules) if bt.config.ConfigModules.Enabled() { moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules) - factory := module.NewFactory(bt.moduleOptions...) if err := moduleReloader.Check(factory); err != nil { return err @@ -220,6 +235,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { }() } + // Autodiscover (metricbeat.autodiscover) if bt.autodiscover != nil { bt.autodiscover.Start() wg.Add(1) diff --git a/x-pack/libbeat/cmd/inject.go b/x-pack/libbeat/cmd/inject.go index 2a60409321f0..715f3d2df5c6 100644 --- a/x-pack/libbeat/cmd/inject.go +++ b/x-pack/libbeat/cmd/inject.go @@ -4,7 +4,12 @@ package cmd -import "github.com/elastic/beats/libbeat/cmd" +import ( + "github.com/elastic/beats/libbeat/cmd" + + // register central management + _ "github.com/elastic/beats/x-pack/libbeat/management" +) // AddXPack extends the given root folder with XPack features func AddXPack(root *cmd.BeatsRootCmd, name string) { diff --git a/x-pack/libbeat/management/api/client.go b/x-pack/libbeat/management/api/client.go index c99effe701ec..1516c13ce111 100644 --- a/x-pack/libbeat/management/api/client.go +++ b/x-pack/libbeat/management/api/client.go @@ -38,13 +38,12 @@ func ConfigFromURL(kibanaURL string) (*kibana.ClientConfig, error) { } return &kibana.ClientConfig{ - Protocol: data.Scheme, - Host: data.Host, - Path: data.Path, - Username: username, - Password: password, - Timeout: defaultTimeout, - IgnoreVersion: true, + Protocol: data.Scheme, + Host: data.Host, + Path: data.Path, + Username: username, + Password: password, + Timeout: defaultTimeout, }, nil } diff --git a/x-pack/libbeat/management/api/client_test.go b/x-pack/libbeat/management/api/client_test.go index 25335d3954a7..d7acdba0122b 100644 --- a/x-pack/libbeat/management/api/client_test.go +++ b/x-pack/libbeat/management/api/client_test.go @@ -24,6 +24,8 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv t.Fatal(err) } + config.IgnoreVersion = true + client, err := NewClient(config) if err != nil { t.Fatal(err) diff --git a/x-pack/libbeat/management/api/configuration.go b/x-pack/libbeat/management/api/configuration.go index 70ae403e7dea..9d5c19a1c1a1 100644 --- a/x-pack/libbeat/management/api/configuration.go +++ b/x-pack/libbeat/management/api/configuration.go @@ -6,6 +6,9 @@ package api import ( "net/http" + "reflect" + + "github.com/elastic/beats/libbeat/common/reload" uuid "github.com/satori/go.uuid" @@ -14,27 +17,61 @@ import ( // ConfigBlock stores a piece of config from central management type ConfigBlock struct { - Type string `json:"type"` - Raw string `json:"block_yml"` + Raw map[string]interface{} } +// ConfigBlocks holds a map of type -> list of configs +type ConfigBlocks map[string][]*ConfigBlock + // Config returns a common.Config object holding the config from this block func (c *ConfigBlock) Config() (*common.Config, error) { - return common.NewConfigWithYAML([]byte(c.Raw), "") + return common.NewConfigFrom(c.Raw) +} + +// ConfigWithMeta returns a reload.ConfigWithMeta object holding the config from this block, meta will be nil +func (c *ConfigBlock) ConfigWithMeta() (*reload.ConfigWithMeta, error) { + config, err := c.Config() + if err != nil { + return nil, err + } + return &reload.ConfigWithMeta{ + Config: config, + }, nil } // Configuration retrieves the list of configuration blocks from Kibana -func (c *Client) Configuration(accessToken string, beatUUID uuid.UUID) ([]*ConfigBlock, error) { +func (c *Client) Configuration(accessToken string, beatUUID uuid.UUID) (ConfigBlocks, error) { headers := http.Header{} headers.Set("kbn-beats-access-token", accessToken) resp := struct { - ConfigBlocks []*ConfigBlock `json:"configuration_blocks"` + ConfigBlocks []*struct { + Type string `json:"type"` + Raw map[string]interface{} `json:"config"` + } `json:"configuration_blocks"` }{} _, err := c.request("GET", "/api/beats/agent/"+beatUUID.String()+"/configuration", nil, headers, &resp) if err != nil { return nil, err } - return resp.ConfigBlocks, err + res := ConfigBlocks{} + for _, block := range resp.ConfigBlocks { + res[block.Type] = append(res[block.Type], &ConfigBlock{Raw: block.Raw}) + } + + return res, nil +} + +// ConfigBlocksEqual returns true if the given config blocks are equal, false if not +func ConfigBlocksEqual(a, b ConfigBlocks) bool { + if len(a) != len(b) { + return false + } + + if len(a) == 0 { + return true + } + + return reflect.DeepEqual(a, b) } diff --git a/x-pack/libbeat/management/api/configuration_test.go b/x-pack/libbeat/management/api/configuration_test.go index 953511b49327..b5166ceb4dbe 100644 --- a/x-pack/libbeat/management/api/configuration_test.go +++ b/x-pack/libbeat/management/api/configuration_test.go @@ -23,7 +23,7 @@ func TestConfiguration(t *testing.T) { // Check enrollment token is correct assert.Equal(t, "thisismyenrollmenttoken", r.Header.Get("kbn-beats-access-token")) - fmt.Fprintf(w, `{"configuration_blocks":[{"type":"filebeat.modules","block_yml":"module: apache2\n"},{"type":"metricbeat.modules","block_yml":"module: nginx\n"}]}`) + fmt.Fprintf(w, `{"configuration_blocks":[{"type":"filebeat.modules","config":{"module":"apache2"}},{"type":"metricbeat.modules","config":{"module":"system","period":"10s"}}]}`) })) defer server.Close() @@ -33,4 +33,104 @@ func TestConfiguration(t *testing.T) { } assert.Equal(t, 2, len(configs)) + assert.Equal(t, &ConfigBlock{Raw: map[string]interface{}{ + "module": "apache2", + }}, configs["filebeat.modules"][0]) + + assert.Equal(t, &ConfigBlock{Raw: map[string]interface{}{ + "module": "system", + "period": "10s", + }}, configs["metricbeat.modules"][0]) +} + +func TestConfigBlocksEqual(t *testing.T) { + tests := []struct { + name string + a, b ConfigBlocks + equal bool + }{ + { + name: "empty lists or nil", + a: nil, + b: ConfigBlocks{}, + equal: true, + }, + { + name: "single element", + a: ConfigBlocks{ + "metricbeat.modules": []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + b: ConfigBlocks{ + "metricbeat.modules": []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + equal: true, + }, + { + name: "different number of blocks", + a: ConfigBlocks{ + "metricbeat.modules": []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + &ConfigBlock{ + Raw: map[string]interface{}{ + "baz": "buzz", + }, + }, + }, + }, + b: ConfigBlocks{ + "metricbeat.modules": []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + equal: false, + }, + { + name: "different block", + a: ConfigBlocks{ + "metricbeat.modules": []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "baz": "buzz", + }, + }, + }, + }, + b: ConfigBlocks{ + "metricbeat.modules": []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + equal: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.equal, ConfigBlocksEqual(test.a, test.b)) + }) + } } diff --git a/x-pack/libbeat/management/config.go b/x-pack/libbeat/management/config.go index 7779aa5d74e9..e76a66c757c1 100644 --- a/x-pack/libbeat/management/config.go +++ b/x-pack/libbeat/management/config.go @@ -6,6 +6,7 @@ package management import ( "os" + "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/file" @@ -22,11 +23,20 @@ type Config struct { // true when enrolled Enabled bool + // Poll configs period + Period time.Duration + AccessToken string Kibana *kibana.ClientConfig - Configs []*api.ConfigBlock + Configs api.ConfigBlocks +} + +func defaultConfig() *Config { + return &Config{ + Period: 60 * time.Second, + } } // Load settings from its source file @@ -34,10 +44,14 @@ func (c *Config) Load() error { path := paths.Resolve(paths.Data, "management.yml") config, err := common.LoadFile(path) if err != nil { + if os.IsNotExist(err) { + // File is not present, beat is not enrolled + return nil + } return err } - if err = config.Unpack(c); err != nil { + if err = config.Unpack(&c); err != nil { return err } diff --git a/x-pack/libbeat/management/enroll.go b/x-pack/libbeat/management/enroll.go index c610b9115a1f..8c6e5cfe3817 100644 --- a/x-pack/libbeat/management/enroll.go +++ b/x-pack/libbeat/management/enroll.go @@ -17,6 +17,9 @@ func Enroll(beat *instance.Beat, kibanaURL, enrollmentToken string) error { return err } + // Ignore kibana version to avoid permission errors + config.IgnoreVersion = true + client, err := api.NewClient(config) if err != nil { return err @@ -29,11 +32,10 @@ func Enroll(beat *instance.Beat, kibanaURL, enrollmentToken string) error { // Enrolled, persist state // TODO use beat.Keystore() for access_token - settings := Config{ - Enabled: true, - AccessToken: accessToken, - Kibana: config, - } + settings := defaultConfig() + settings.Enabled = true + settings.AccessToken = accessToken + settings.Kibana = config return settings.Save() } diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go new file mode 100644 index 000000000000..0c47a4d1b735 --- /dev/null +++ b/x-pack/libbeat/management/manager.go @@ -0,0 +1,200 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "sync" + "time" + + "github.com/elastic/beats/libbeat/common/reload" + + "github.com/satori/go.uuid" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/libbeat/management/api" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/management" +) + +func init() { + management.Register("x-pack", NewConfigManager, feature.Beta) +} + +// ConfigManager handles internal config updates. By retrieving +// new configs from Kibana and applying them to the Beat +type ConfigManager struct { + config *Config + logger *logp.Logger + client *api.Client + beatUUID uuid.UUID + done chan struct{} + registry *reload.Registry + wg sync.WaitGroup +} + +// NewConfigManager returns a X-Pack Beats Central Management manager +func NewConfigManager(registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { + c := defaultConfig() + if err := c.Load(); err != nil { + return nil, errors.Wrap(err, "reading central management internal settings") + } + return NewConfigManagerWithConfig(c, registry, beatUUID) +} + +// NewConfigManagerWithConfig returns a X-Pack Beats Central Management manager +func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { + var client *api.Client + if c.Enabled { + var err error + + // Ignore kibana version to avoid permission errors + c.Kibana.IgnoreVersion = true + + client, err = api.NewClient(c.Kibana) + if err != nil { + return nil, errors.Wrap(err, "initializing kibana client") + } + } + + return &ConfigManager{ + config: c, + logger: logp.NewLogger(management.DebugK), + client: client, + done: make(chan struct{}), + beatUUID: beatUUID, + registry: registry, + }, nil +} + +// Enabled returns true if config management is enabled +func (cm *ConfigManager) Enabled() bool { + return cm.config.Enabled +} + +// Start the config manager +func (cm *ConfigManager) Start() { + if !cm.Enabled() { + return + } + cfgwarn.Beta("Central management is enabled") + cm.logger.Info("Starting central management service") + + cm.wg.Add(1) + go cm.worker() +} + +// Stop the config manager +func (cm *ConfigManager) Stop() { + if !cm.Enabled() { + return + } + cm.logger.Info("Stopping central management service") + close(cm.done) + cm.wg.Wait() +} + +// CheckRawConfig check settings are correct to start the beat. This method +// checks there are no collision between the existing configuration and what +// central management can configure. +func (cm *ConfigManager) CheckRawConfig(cfg *common.Config) error { + // TODO implement this method + return nil +} + +func (cm *ConfigManager) worker() { + defer cm.wg.Done() + + // Initial fetch && apply (even if errors happen while fetching) + cm.fetch() + cm.apply() + + // Start worker loop: fetch + apply + cache new settings + for { + select { + case <-cm.done: + return + case <-time.After(cm.config.Period): + } + + if cm.fetch() { + cm.logger.Info("New configuration retrieved from central management, applying changes...") + + // configs changed, apply changes + // TODO only reload the blocks that changed + cm.apply() + + // store new configs (already applied) + if err := cm.config.Save(); err != nil { + cm.logger.Errorf("error storing central management state: %s", err) + } + } + } +} + +// fetch configurations from kibana, return true if they changed +func (cm *ConfigManager) fetch() bool { + cm.logger.Debug("Retrieving new configurations from Kibana") + configs, err := cm.client.Configuration(cm.config.AccessToken, cm.beatUUID) + if err != nil { + cm.logger.Errorf("error retriving new configurations, will use cached ones: %s", err) + return false + } + + if api.ConfigBlocksEqual(configs, cm.config.Configs) { + cm.logger.Debug("configuration didn't change, sleeping") + return false + } + + cm.config.Configs = configs + + return true +} + +func (cm *ConfigManager) apply() { + for blockType, blockList := range cm.config.Configs { + cm.reload(blockType, blockList) + } +} + +func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) { + cm.logger.Infof("Applying settings for %s", t) + + if obj := cm.registry.GetReloadable(t); obj != nil { + // Single object + if len(blocks) != 1 { + cm.logger.Errorf("got an invalid number of configs for %s: %d, expected: 1", t, len(blocks)) + return + } + config, err := blocks[0].ConfigWithMeta() + if err != nil { + cm.logger.Error(err) + return + } + + if err := obj.Reload(config); err != nil { + cm.logger.Error(err) + } + } else if obj := cm.registry.GetReloadableList(t); obj != nil { + // List + var configs []*reload.ConfigWithMeta + for _, block := range blocks { + config, err := block.ConfigWithMeta() + if err != nil { + cm.logger.Error(err) + continue + } + configs = append(configs, config) + } + + if err := obj.Reload(configs); err != nil { + cm.logger.Error(err) + } + } +} diff --git a/x-pack/libbeat/management/manager_test.go b/x-pack/libbeat/management/manager_test.go new file mode 100644 index 000000000000..9e37b4d061eb --- /dev/null +++ b/x-pack/libbeat/management/manager_test.go @@ -0,0 +1,93 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/x-pack/libbeat/management/api" +) + +type reloadable struct { + reloaded chan *reload.ConfigWithMeta +} + +func (r *reloadable) Reload(c *reload.ConfigWithMeta) error { + r.reloaded <- c + return nil +} + +func TestConfigManager(t *testing.T) { + registry := reload.NewRegistry() + id := uuid.NewV4() + accessToken := "footoken" + reloadable := reloadable{ + reloaded: make(chan *reload.ConfigWithMeta, 1), + } + registry.MustRegister("test.block", &reloadable) + + mux := http.NewServeMux() + i := 0 + responses := []string{ + // Initial load + `{"configuration_blocks":[{"type":"test.block","config":{"module":"apache2"}}]}`, + + // No change, no reload + `{"configuration_blocks":[{"type":"test.block","config":{"module":"apache2"}}]}`, + + // Changed, reload + `{"configuration_blocks":[{"type":"test.block","config":{"module":"system"}}]}`, + } + mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, fmt.Sprintf("/api/beats/agent/%s/configuration", id), r.RequestURI) + fmt.Fprintf(w, responses[i]) + i++ + })) + + server := httptest.NewServer(mux) + + c, err := api.ConfigFromURL(server.URL) + if err != nil { + t.Fatal(err) + } + + config := &Config{ + Enabled: true, + Period: 100 * time.Millisecond, + Kibana: c, + AccessToken: accessToken, + } + + manager, err := NewConfigManagerWithConfig(config, registry, id) + if err != nil { + t.Fatal(err) + } + + manager.Start() + + // On first reload we will get apache2 module + config1 := <-reloadable.reloaded + assert.Equal(t, &reload.ConfigWithMeta{ + Config: common.MustNewConfigFrom(map[string]interface{}{ + "module": "apache2", + }), + }, config1) + + config2 := <-reloadable.reloaded + assert.Equal(t, &reload.ConfigWithMeta{ + Config: common.MustNewConfigFrom(map[string]interface{}{ + "module": "system", + }), + }, config2) +}