Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize input v2 API #19158

Merged
merged 3 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions filebeat/input/v2/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package v2

import (
"errors"
"fmt"
"strings"
)

// LoadError is returned by Loaders in case of failures.
type LoadError struct {
// Name of input/module that failed to load (if applicable)
Name string

// Reason why the loader failed. Can either be the cause reported by the
// Plugin or some other indicator like ErrUnknown
Reason error

// (optional) Message to report in additon.
Message string
}

// SetupError indicates that the loader initialization has detected
// errors in individual plugin configurations or duplicates.
type SetupError struct {
Fails []error
}

// ErrUnknownInput indicates that the plugin type does not exist. Either
// because the 'type' setting name does not match the loaders expectations,
// or because the type is unknown.
var ErrUnknownInput = errors.New("unknown input type")

// ErrNoInputConfigured indicates that the 'type' setting is missing.
var ErrNoInputConfigured = errors.New("no input type configured")

// ErrPluginWithoutName reports that the operation failed because
// the plugin is required to have a Name.
var ErrPluginWithoutName = errors.New("the plugin has no name")

// IsUnknownInputError checks if an error value indicates an input load
// error because there is no existing plugin that can create the input.
func IsUnknownInputError(err error) bool { return errors.Is(err, ErrUnknownInput) }

func failedInputName(err error) string {
switch e := err.(type) {
case *LoadError:
return e.Name
default:
return ""
}
}

// Unwrap returns the reason if present
func (e *LoadError) Unwrap() error { return e.Reason }

// Error returns the errors string repesentation
func (e *LoadError) Error() string {
var buf strings.Builder

if e.Message != "" {
buf.WriteString(e.Message)
} else if e.Name != "" {
buf.WriteString("failed to load ")
buf.WriteString(e.Name)
}

if e.Reason != nil {
if buf.Len() > 0 {
buf.WriteString(": ")
}
fmt.Fprintf(&buf, "%v", e.Reason)
}

if buf.Len() == 0 {
return "<loader error>"
}
return buf.String()
}

// Error returns the errors string repesentation
func (e *SetupError) Error() string {
var buf strings.Builder
buf.WriteString("invalid plugin setup found:")
for _, err := range e.Fails {
fmt.Fprintf(&buf, "\n\t%v", err)
}
return buf.String()
}
113 changes: 113 additions & 0 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package v2

import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/go-concert/unison"
)

// InputManager creates and maintains actions and background processes for an
// input type.
// The InputManager is used to create inputs. The InputManager can provide
// additional functionality like coordination between input of the same type,
// custom functionality for querying or caching shared information, application
// of common settings not unique to a particular input type, or require a more
// specific Input interface to be implemented by the actual input.
type InputManager interface {
// Init signals to InputManager to initialize internal resources.
// The mode tells the input manager if the Beat is actually running the inputs or
// if inputs are only configured for testing/validation purposes.
Init(grp unison.Group, mode Mode) error

// Creates builds a new Input instance from the given configuation, or returns
// an error if the configuation is invalid.
// The input must establish any connection for data collection yet. The Beat
// will use the Test/Run methods of the input.
Create(*common.Config) (Input, error)
}

// Mode tells the InputManager in which mode it is initialized.
type Mode uint8

//go:generate stringer -type Mode -trimprefix Mode
const (
ModeRun Mode = iota
ModeTest
ModeOther
)

// Input is a configured input object that can be used to test or start
// the actual data collection.
type Input interface {
// Name reports the input name.
//
// XXX: check if/how we can remove this method. Currently it is required for
// compatibility reasons with existing interfaces in libbeat, autodiscovery
// and filebeat.
Name() string
kvch marked this conversation as resolved.
Show resolved Hide resolved

// Test checks the configuaration and runs additional checks if the Input can
// actually collect data for the given configuration (e.g. check if host/port or files are
// accessible).
Test(TestContext) error

// Run starts the data collection. Run must return an error only if the
// error is fatal making it impossible for the input to recover.
Run(Context, beat.PipelineConnector) error
}

// Context provides the Input Run function with common environmental
// information and services.
type Context struct {
// Logger provides a structured logger to inputs. The logger is initialized
// with labels that will identify logs for the input.
Logger *logp.Logger

// The input ID.
ID string

// Agent procides additional Beat info like instance ID or beat name.
urso marked this conversation as resolved.
Show resolved Hide resolved
urso marked this conversation as resolved.
Show resolved Hide resolved
Agent beat.Info

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler
}

// TestContext provides the Input Test function with common environmental
// information and services.
type TestContext struct {
// Logger provides a structured logger to inputs. The logger is initialized
// with labels that will identify logs for the input.
Logger *logp.Logger

// Agent procides additional Beat info like instance ID or beat name.
Agent beat.Info

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler
}

// Canceler is used to provide shutdown handling to the Context.
type Canceler interface {
Done() <-chan struct{}
Err() error
}
132 changes: 132 additions & 0 deletions filebeat/input/v2/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package v2

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

// Loader can be used to create Inputs from configurations.
// The loader is initialized with a list of plugins, and finds the correct plugin
// when a configuration is passed to Configure.
type Loader struct {
log *logp.Logger
registry map[string]Plugin
typeField string
defaultType string
}

// NewLoader creates a new Loader for configuring inputs from a slice if plugins.
// NewLoader returns a SetupError if invalid plugin configurations or duplicates in the slice are detected.
// The Loader will read the plugin name from the configuration object as is
// configured by typeField. If typeField is empty, it defaults to "type".
func NewLoader(log *logp.Logger, plugins []Plugin, typeField, defaultType string) (*Loader, error) {
if typeField == "" {
typeField = "type"
}

if errs := validatePlugins(plugins); len(errs) > 0 {
return nil, &SetupError{errs}
}

registry := make(map[string]Plugin, len(plugins))
for _, p := range plugins {
registry[p.Name] = p
}

return &Loader{
log: log,
registry: registry,
typeField: typeField,
defaultType: defaultType,
}, nil
}

// Init runs Init on all InputManagers for all plugins known to the loader.
func (l *Loader) Init(group unison.Group, mode Mode) error {
for _, p := range l.registry {
if err := p.Manager.Init(group, mode); err != nil {
return err
}
}
return nil
}

// Configure creates a new input from a Config object.
// The loader reads the input type name from the cfg object and tries to find a
// matching plugin. If a plugin is found, the plugin it's InputManager is used to create
// the input.
// Returns a LoadError if the input name can not be read from the config or if
// the type does not exist. Error values for Ccnfiguration errors do depend on
// the InputManager.
func (l *Loader) Configure(cfg *common.Config) (Input, error) {
name, err := cfg.String(l.typeField, -1)
if err != nil {
if l.defaultType == "" {
return nil, &LoadError{
Reason: ErrNoInputConfigured,
Message: fmt.Sprintf("%v setting is missing", l.typeField),
}
}
name = l.defaultType
}

p, exists := l.registry[name]
if !exists {
return nil, &LoadError{Name: name, Reason: ErrUnknownInput}
}

log := l.log.With("input", name, "stability", p.Stability, "deprecated", p.Deprecated)
switch p.Stability {
case feature.Experimental:
log.Warnf("EXPERIMENTAL: The %v input is experimental", name)
case feature.Beta:
log.Warnf("BETA: The %v input is beta", name)
}
if p.Deprecated {
log.Warnf("DEPRECATED: The %v input is deprecated", name)
}

return p.Manager.Create(cfg)
}

// validatePlugins checks if there are multiple plugins with the same name in
// the registry.
func validatePlugins(plugins []Plugin) []error {
var errs []error

counts := map[string]int{}
for _, p := range plugins {
counts[p.Name]++
if err := p.validate(); err != nil {
errs = append(errs, err)
}
}

for name, count := range counts {
if count > 1 {
errs = append(errs, fmt.Errorf("plugin '%v' found %v times", name, count))
}
}
return errs
}
Loading