Skip to content

Commit

Permalink
Add central management service (elastic#8263)
Browse files Browse the repository at this point in the history
* Add config manager initial skeleton

Config manager will poll configs from Kibana and apply them locally. It must be
started with the beat.

In order to check the user is not trying to override configurations
provided by central management, the Config Manager can check the exisitng
configuration and return errors if something is wrong.
  • Loading branch information
exekias authored Sep 19, 2018
1 parent d2730d2 commit bffeeb5
Show file tree
Hide file tree
Showing 15 changed files with 620 additions and 46 deletions.
3 changes: 3 additions & 0 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beat

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/management"
)

// Creator initializes and configures a new Beater instance used to execute
Expand Down Expand Up @@ -64,6 +65,8 @@ type Beat struct {
BeatConfig *common.Config // The beat's own configuration section

Fields []byte // Data from fields.yml

ConfigManager management.ConfigManager // config manager
}

// BeatConfig struct contains the basic configuration of every beat
Expand Down
21 changes: 19 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import (
"github.com/satori/go.uuid"
"go.uber.org/zap"

"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"

"github.com/elastic/beats/libbeat/api"
"github.com/elastic/beats/libbeat/asset"
"github.com/elastic/beats/libbeat/beat"
Expand All @@ -43,11 +46,13 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/common/seccomp"
"github.com/elastic/beats/libbeat/dashboards"
"github.com/elastic/beats/libbeat/keystore"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/logp/configure"
"github.com/elastic/beats/libbeat/management"
"github.com/elastic/beats/libbeat/metric/system/host"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/monitoring/report"
Expand All @@ -59,8 +64,6 @@ import (
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/version"
"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"

// Register publisher pipeline modules
_ "github.com/elastic/beats/libbeat/publisher/includes"
Expand Down Expand Up @@ -385,6 +388,10 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
api.Start(b.Config.HTTP)
}

// Launch config manager
b.ConfigManager.Start()
defer b.ConfigManager.Stop()

return beater.Run(&b.Beat)
}

Expand Down Expand Up @@ -565,6 +572,16 @@ func (b *Beat) configure() error {

logp.Info("Beat UUID: %v", b.Info.UUID)

// initialize config manager
b.ConfigManager, err = management.Factory()(reload.Register, b.Beat.Info.UUID)
if err != nil {
return err
}

if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil {
return err
}

if maxProcs := b.Config.MaxProcs; maxProcs > 0 {
runtime.GOMAXPROCS(maxProcs)
}
Expand Down
28 changes: 15 additions & 13 deletions libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// Register holds a registry of reloadable objects
var Register = newRegistry()
var Register = NewRegistry()

// ConfigWithMeta holds a pair of common.Config and optional metadata for it
type ConfigWithMeta struct {
Expand All @@ -47,21 +47,23 @@ type Reloadable interface {
Reload(config *ConfigWithMeta) error
}

type registry struct {
// Registry of reloadable objects and lists
type Registry struct {
sync.RWMutex
confsLists map[string]ReloadableList
confs map[string]Reloadable
}

func newRegistry() *registry {
return &registry{
// NewRegistry initializes and returns a reload registry
func NewRegistry() *Registry {
return &Registry{
confsLists: make(map[string]ReloadableList),
confs: make(map[string]Reloadable),
}
}

// Register declares a reloadable object
func (r *registry) Register(name string, obj Reloadable) error {
func (r *Registry) Register(name string, obj Reloadable) error {
r.Lock()
defer r.Unlock()

Expand All @@ -78,7 +80,7 @@ func (r *registry) Register(name string, obj Reloadable) error {
}

// RegisterList declares a reloadable list of configurations
func (r *registry) RegisterList(name string, list ReloadableList) error {
func (r *Registry) RegisterList(name string, list ReloadableList) error {
r.Lock()
defer r.Unlock()

Expand All @@ -95,34 +97,34 @@ func (r *registry) RegisterList(name string, list ReloadableList) error {
}

// MustRegister declares a reloadable object
func (r *registry) MustRegister(name string, obj Reloadable) {
func (r *Registry) MustRegister(name string, obj Reloadable) {
if err := r.Register(name, obj); err != nil {
panic(err)
}
}

// MustRegisterList declares a reloadable object list
func (r *registry) MustRegisterList(name string, list ReloadableList) {
func (r *Registry) MustRegisterList(name string, list ReloadableList) {
if err := r.RegisterList(name, list); err != nil {
panic(err)
}
}

// Get returns the reloadable object with the given name, nil if not found
func (r *registry) Get(name string) Reloadable {
// GetReloadable returns the reloadable object with the given name, nil if not found
func (r *Registry) GetReloadable(name string) Reloadable {
r.RLock()
defer r.RUnlock()
return r.confs[name]
}

// GetList returns the reloadable list with the given name, nil if not found
func (r *registry) GetList(name string) ReloadableList {
// GetReloadableList returns the reloadable list with the given name, nil if not found
func (r *Registry) GetReloadableList(name string) ReloadableList {
r.RLock()
defer r.RUnlock()
return r.confsLists[name]
}

func (r *registry) nameTaken(name string) bool {
func (r *Registry) nameTaken(name string) bool {
if _, ok := r.confs[name]; ok {
return true
}
Expand Down
16 changes: 8 additions & 8 deletions libbeat/common/reload/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ type reloadableList struct{}
func (reloadable) Reload(config *ConfigWithMeta) error { return nil }
func (reloadableList) Reload(config []*ConfigWithMeta) error { return nil }

func RegisterReloadable(t *testing.T) {
func TestRegisterReloadable(t *testing.T) {
obj := reloadable{}
r := newRegistry()
r := NewRegistry()

r.Register("my.reloadable", obj)

assert.Equal(t, obj, r.Get("my.reloadable"))
assert.Equal(t, obj, r.GetReloadable("my.reloadable"))
}

func RegisterReloadableList(t *testing.T) {
func TestRegisterReloadableList(t *testing.T) {
objl := reloadableList{}
r := newRegistry()
r := NewRegistry()

r.RegisterList("my.reloadable", objl)

assert.Equal(t, objl, r.Get("my.reloadable"))
assert.Equal(t, objl, r.GetReloadableList("my.reloadable"))
}

func TestRegisterNilFails(t *testing.T) {
r := newRegistry()
r := NewRegistry()

err := r.Register("name", nil)
assert.Error(t, err)
Expand All @@ -58,7 +58,7 @@ func TestRegisterNilFails(t *testing.T) {
}

func TestReRegisterFails(t *testing.T) {
r := newRegistry()
r := NewRegistry()

// two obj with the same name
err := r.Register("name", reloadable{})
Expand Down
84 changes: 84 additions & 0 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package management

import (
"github.com/satori/go.uuid"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/feature"
)

// Namespace is the feature namespace for queue definition.
var Namespace = "libbeat.management"

// DebugK used as key for all things central management
var DebugK = "centralmgmt"

// ConfigManager interacts with the beat to update configurations
// from an external source
type ConfigManager interface {
// Enabled returns true if config manager is enabled
Enabled() bool

// Start the config manager
Start()

// Stop the config manager
Stop()

// CheckRawConfig check settings are correct before launching the beat
CheckRawConfig(cfg *common.Config) error
}

// FactoryFunc for creating a config manager
type FactoryFunc func(*reload.Registry, uuid.UUID) (ConfigManager, error)

// Register a config manager
func Register(name string, fn FactoryFunc, stability feature.Stability) {
f := feature.New(Namespace, name, fn, feature.NewDetails(name, "", stability))
feature.MustRegister(f)
}

// Factory retrieves config manager constructor. If no one is registered
// it will create a nil manager
func Factory() FactoryFunc {
factories, err := feature.Registry.LookupAll(Namespace)
if err != nil {
return nilFactory
}

for _, f := range factories {
if factory, ok := f.Factory().(FactoryFunc); ok {
return factory
}
}

return nilFactory
}

// nilManager, fallback when no manager is present
type nilManager struct{}

func nilFactory(*reload.Registry, uuid.UUID) (ConfigManager, error) { return nilManager{}, nil }

func (nilManager) Enabled() bool { return false }
func (nilManager) Start() {}
func (nilManager) Stop() {}
func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
18 changes: 17 additions & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package beater
import (
"sync"

"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/management"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

Expand Down Expand Up @@ -187,6 +190,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
func (bt *Metricbeat) Run(b *beat.Beat) error {
var wg sync.WaitGroup

// Static modules (metricbeat.modules)
for _, m := range bt.modules {
client, err := m.connector.Connect()
if err != nil {
Expand All @@ -203,9 +207,20 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
}()
}

// Centrally managed modules
factory := module.NewFactory(bt.moduleOptions...)
modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher)
reload.Register.MustRegisterList("metricbeat.modules", modules)
wg.Add(1)
go func() {
defer wg.Done()
<-bt.done
modules.Stop()
}()

// Dynamic file based modules (metricbeat.config.modules)
if bt.config.ConfigModules.Enabled() {
moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules)
factory := module.NewFactory(bt.moduleOptions...)

if err := moduleReloader.Check(factory); err != nil {
return err
Expand All @@ -220,6 +235,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
}()
}

// Autodiscover (metricbeat.autodiscover)
if bt.autodiscover != nil {
bt.autodiscover.Start()
wg.Add(1)
Expand Down
7 changes: 6 additions & 1 deletion x-pack/libbeat/cmd/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

package cmd

import "github.com/elastic/beats/libbeat/cmd"
import (
"github.com/elastic/beats/libbeat/cmd"

// register central management
_ "github.com/elastic/beats/x-pack/libbeat/management"
)

// AddXPack extends the given root folder with XPack features
func AddXPack(root *cmd.BeatsRootCmd, name string) {
Expand Down
13 changes: 6 additions & 7 deletions x-pack/libbeat/management/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ func ConfigFromURL(kibanaURL string) (*kibana.ClientConfig, error) {
}

return &kibana.ClientConfig{
Protocol: data.Scheme,
Host: data.Host,
Path: data.Path,
Username: username,
Password: password,
Timeout: defaultTimeout,
IgnoreVersion: true,
Protocol: data.Scheme,
Host: data.Host,
Path: data.Path,
Username: username,
Password: password,
Timeout: defaultTimeout,
}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions x-pack/libbeat/management/api/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv
t.Fatal(err)
}

config.IgnoreVersion = true

client, err := NewClient(config)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit bffeeb5

Please sign in to comment.