Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move prospector log to its own package #4273

Merged
merged 1 commit into from
May 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions filebeat/harvester/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,24 @@ func TestMatchAnyRegexps(t *testing.T) {
assert.Equal(t, MatchAny(matchers, "/var/log/log.gz"), true)

}

func TestExcludeLine(t *testing.T) {
regexp, err := InitMatchers("^DBG")
assert.Nil(t, err)
assert.True(t, MatchAny(regexp, "DBG: a debug message"))
assert.False(t, MatchAny(regexp, "ERR: an error message"))
}

func TestIncludeLine(t *testing.T) {
regexp, err := InitMatchers("^ERR", "^WARN")

assert.Nil(t, err)
assert.False(t, MatchAny(regexp, "DBG: a debug message"))
assert.True(t, MatchAny(regexp, "ERR: an error message"))
assert.True(t, MatchAny(regexp, "WARNING: a simple warning message"))
}

func TestInitRegexp(t *testing.T) {
_, err := InitMatchers("(((((")
assert.NotNil(t, err)
}
44 changes: 4 additions & 40 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,19 @@
package prospector

import (
"fmt"
"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/libbeat/common/match"
)

var (
defaultConfig = prospectorConfig{
Enabled: true,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanInactive: 0,
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
}
)

type prospectorConfig struct {
Enabled bool `config:"enabled"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
recursiveGlob bool `config:"recursive_glob.enabled"`
}

func (config *prospectorConfig) Validate() error {

if config.InputType == cfg.LogInputType && len(config.Paths) == 0 {
return fmt.Errorf("No paths were defined for prospector")
}

if config.CleanInactive != 0 && config.IgnoreOlder == 0 {
return fmt.Errorf("ignore_older must be enabled when clean_inactive is used")
}

if config.CleanInactive != 0 && config.CleanInactive <= config.IgnoreOlder+config.ScanFrequency {
return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed")
}

return nil
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
}
73 changes: 57 additions & 16 deletions filebeat/harvester/config.go → filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package harvester
package log

import (
"fmt"
"time"

"github.com/dustin/go-humanize"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/reader"

"github.com/dustin/go-humanize"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
defaultConfig = harvesterConfig{
BufferSize: 16 * humanize.KiByte,
defaultConfig = config{
// Common
InputType: cfg.DefaultInputType,
CleanInactive: 0,

// Prospector
Enabled: true,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,

// Harvester
BufferSize: 16 * humanize.KiByte,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
Expand All @@ -26,15 +38,31 @@ var (
CloseRenamed: false,
CloseEOF: false,
CloseTimeout: 0,
CleanInactive: 0,
}
)

type harvesterConfig struct {
type config struct {

// Common
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`

// Prospector
Enabled bool `config:"enabled"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
recursiveGlob bool `config:"recursive_glob.enabled"`

// Harvester
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Expand All @@ -48,27 +76,40 @@ type harvesterConfig struct {
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *reader.MultilineConfig `config:"multiline"`
JSON *reader.JSONConfig `config:"json"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
}

func (config *harvesterConfig) Validate() error {
func (c *config) Validate() error {

// Prospector
if c.InputType == cfg.LogInputType && len(c.Paths) == 0 {
return fmt.Errorf("No paths were defined for prospector")
}

if c.CleanInactive != 0 && c.IgnoreOlder == 0 {
return fmt.Errorf("ignore_older must be enabled when clean_inactive is used")
}

if c.CleanInactive != 0 && c.CleanInactive <= c.IgnoreOlder+c.ScanFrequency {
return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed")
}

// Harvester
// Check input type
if _, ok := cfg.ValidInputType[config.InputType]; !ok {
return fmt.Errorf("Invalid input type: %v", config.InputType)
if _, ok := cfg.ValidInputType[c.InputType]; !ok {
return fmt.Errorf("Invalid input type: %v", c.InputType)
}

if config.JSON != nil && len(config.JSON.MessageKey) == 0 &&
config.Multiline != nil {
if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
c.Multiline != nil {
return fmt.Errorf("When using the JSON decoder and multiline together, you need to specify a message_key value")
}

if config.JSON != nil && len(config.JSON.MessageKey) == 0 &&
(len(config.IncludeLines) > 0 || len(config.ExcludeLines) > 0) {
if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
(len(c.IncludeLines) > 0 || len(c.ExcludeLines) > 0) {
return fmt.Errorf("When using the JSON decoder and line filtering together, you need to specify a message_key value")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !integration

package prospector
package log

import (
"testing"
Expand All @@ -12,7 +12,7 @@ import (

func TestCleanOlderError(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10 * time.Hour,
}

Expand All @@ -22,7 +22,7 @@ func TestCleanOlderError(t *testing.T) {

func TestCleanOlderIgnoreOlderError(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10 * time.Hour,
IgnoreOlder: 15 * time.Hour,
}
Expand All @@ -33,7 +33,7 @@ func TestCleanOlderIgnoreOlderError(t *testing.T) {

func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10 * time.Hour,
IgnoreOlder: 10 * time.Hour,
}
Expand All @@ -44,9 +44,11 @@ func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) {

func TestCleanOlderIgnoreOlder(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10*time.Hour + defaultConfig.ScanFrequency + 1*time.Second,
IgnoreOlder: 10 * time.Hour,
InputType: "log",
Paths: []string{"hello"},
}

err := config.Validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// line. As soon as the line is completed, it is read and returned.
//
// The stdin harvesters reads data from stdin.
package harvester
package log

import (
"errors"
Expand All @@ -18,7 +18,7 @@ import (

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/config"
cfg "github.com/elastic/beats/filebeat/config"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filebeatConfig?

"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input/file"
Expand All @@ -43,7 +43,7 @@ type Outlet interface {
}

type Harvester struct {
config harvesterConfig
config config
state file.State
states *file.States
file source.FileSource /* the file being watched */
Expand All @@ -59,7 +59,7 @@ type Harvester struct {
}

func NewHarvester(
cfg *common.Config,
config *common.Config,
state file.State,
states *file.States,
outlet Outlet,
Expand All @@ -75,7 +75,7 @@ func NewHarvester(
ID: uuid.NewV4(),
}

if err := cfg.Unpack(&h.config); err != nil {
if err := config.Unpack(&h.config); err != nil {
return nil, err
}

Expand Down Expand Up @@ -107,12 +107,12 @@ func NewHarvester(
func (h *Harvester) open() error {

switch h.config.InputType {
case config.StdinInputType:
case cfg.StdinInputType:
return h.openStdin()
case config.LogInputType:
case cfg.LogInputType:
return h.openFile()
default:
return fmt.Errorf("Invalid input type")
return fmt.Errorf("Invalid harvester type: %+v", h.config)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// +build !integration

package harvester
package log
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package harvester
package log

import (
"testing"
Expand Down
11 changes: 5 additions & 6 deletions filebeat/harvester/log.go → filebeat/prospector/log/log.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package harvester
package log

import (
"bytes"
Expand All @@ -8,7 +8,7 @@ import (
"os"
"time"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input/file"
Expand All @@ -28,7 +28,6 @@ var (
harvesterClosed = monitoring.NewInt(harvesterMetrics, "closed")
harvesterRunning = monitoring.NewInt(harvesterMetrics, "running")
harvesterOpenFiles = monitoring.NewInt(harvesterMetrics, "open_files")
filesTruncated = monitoring.NewInt(harvesterMetrics, "files.truncated")
)

// Setup opens the file handler and creates the reader for the harvester
Expand Down Expand Up @@ -226,14 +225,14 @@ func (h *Harvester) SendStateUpdate() {
// the include_lines and exclude_lines options.
func (h *Harvester) shouldExportLine(line string) bool {
if len(h.config.IncludeLines) > 0 {
if !MatchAny(h.config.IncludeLines, line) {
if !harvester.MatchAny(h.config.IncludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line)
return false
}
}
if len(h.config.ExcludeLines) > 0 {
if MatchAny(h.config.ExcludeLines, line) {
if harvester.MatchAny(h.config.ExcludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line)
return false
Expand Down Expand Up @@ -324,7 +323,7 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
// getState returns an updated copy of the harvester state
func (h *Harvester) getState() file.State {

if h.config.InputType == config.StdinInputType {
if !h.file.HasState() {
return file.State{}
}

Expand Down
Loading