Skip to content

Commit

Permalink
Rewrite Registrar to have a configurable registryFile
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin committed Sep 14, 2015
1 parent 1443c97 commit 4b34f0f
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ARCH?=$(shell uname -m)
GODEP=$(GOPATH)/bin/godep


filebeat:
filebeat: $(GOFILES)
# first make sure we have godep
go get github.com/tools/godep
$(GODEP) go build
Expand Down
17 changes: 14 additions & 3 deletions beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
fb.SpoolChan = make(chan *FileEvent, 16)
fb.publisherChan = make(chan []*FileEvent, 1)
fb.RegistrarChan = make(chan []*FileEvent, 1)

persist := make(map[string]*FileState)

restart := &ProspectorResume{}
restart.LoadState()
registrar := &Registrar{
registryFile: fb.FbConfig.Filebeat.RegistryFile,
}
registrar.Init()

restart := &ProspectorResume{
Persist: make(chan *FileState),
// Load the previous log file locations now, for use in prospector
Files: make(map[string]*FileState),
}

registrar.LoadState(restart.Files)
restart.Scan(fb.FbConfig.Filebeat.Files, persist, fb.SpoolChan)

// Start spooler: Harvesters dump events into the spooler.
Expand All @@ -93,7 +104,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
go Publish(b, fb)

// registrar records last acknowledged positions in all files.
Registrar(persist, fb.RegistrarChan)
registrar.WriteState(persist, fb.RegistrarChan)

return nil
}
Expand Down
43 changes: 38 additions & 5 deletions beat/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,38 @@ import (
"github.com/elastic/libbeat/logp"
)

func Registrar(state map[string]*FileState, input chan []*FileEvent) {
type Registrar struct {
registryFile string
}

func (r *Registrar) Init() {
// Set to default in case it is not set
if r.registryFile == "" {
r.registryFile = ".filebeat"
}

logp.Debug("registrar", "Registry file set to: %s", r.registryFile)

}

// loadState fetches the previous reading state from the configure registryFile file
// The default file is .filebeat file which is stored in the same path as the binary is running
func (r *Registrar) LoadState(files map[string]*FileState) {

if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
wd := ""
if wd, e = os.Getwd(); e != nil {
logp.Warn("WARNING: os.Getwd retuned unexpected error %s -- ignoring", e.Error())
}
logp.Info("Loading registrar data from %s/%s", wd, r.registryFile)

decoder := json.NewDecoder(existing)
decoder.Decode(&files)
}
}

func (r *Registrar) WriteState(state map[string]*FileState, input chan []*FileEvent) {
logp.Debug("registrar", "Starting Registrar")
for events := range input {
logp.Debug("registrar", "Registrar: processing %d events", len(events))
Expand All @@ -22,7 +53,7 @@ func Registrar(state map[string]*FileState, input chan []*FileEvent) {
state[*event.Source] = event.GetState()
}

if e := writeRegistry(state, ".filebeat"); e != nil {
if e := r.writeRegistry(state); e != nil {
// REVU: but we should panic, or something, right?
logp.Warn("WARNING: (continuing) update of registry returned error: %s", e)
}
Expand All @@ -31,8 +62,10 @@ func Registrar(state map[string]*FileState, input chan []*FileEvent) {
}

// writeRegistry Writes the new json registry file to disk
func writeRegistry(state map[string]*FileState, path string) error {
tempfile := path + ".new"
func (r *Registrar) writeRegistry(state map[string]*FileState) error {
logp.Debug("registrar", "Write registry file:", r.registryFile)

tempfile := r.registryFile + ".new"
file, e := os.Create(tempfile)
if e != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, e)
Expand All @@ -43,5 +76,5 @@ func writeRegistry(state map[string]*FileState, path string) error {
encoder := json.NewEncoder(file)
encoder.Encode(state)

return SafeFileRotate(path, tempfile)
return SafeFileRotate(r.registryFile, tempfile)
}
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type FilebeatConfig struct {
IdleTimeout time.Duration
TailOnRotate bool
Quiet bool
RegistryFile string
}

type FileConfig struct {
Expand Down Expand Up @@ -104,7 +105,7 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
}

// Fetches and merges all config files given by Options.configArgs. All are put into one config object
func (config *Config) FetchConfigs(path string) {
func (config *Config) FetchConfigs(path string) {

configFiles, err := getConfigFiles(path)

Expand Down
23 changes: 0 additions & 23 deletions crawler/prospector.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package crawler

import (
"encoding/json"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -30,28 +29,6 @@ type ProspectorInfo struct {
Last_seen uint32 /* int number of the last iterations in which we saw this file */
}

// loadState fetches the previes reading state from the .filebeat file
// The .filebeat file is stored in the same path as the binary is running
func (restart *ProspectorResume) LoadState() {
restart.Persist = make(chan *FileState)

// Load the previous log file locations now, for use in prospector
restart.Files = make(map[string]*FileState)

// TODO: Should the location and path of .filebeat be configurable?
if existing, e := os.Open(".filebeat"); e == nil {
defer existing.Close()
wd := ""
if wd, e = os.Getwd(); e != nil {
logp.Warn("WARNING: os.Getwd retuned unexpected error %s -- ignoring", e.Error())
}
logp.Info("Loading registrar data from %s/.filebeat", wd)

decoder := json.NewDecoder(existing)
decoder.Decode(&restart.Files)
}
}

func (restart *ProspectorResume) Scan(files []cfg.FileConfig, persist map[string]*FileState, eventChan chan *FileEvent) {
pendingProspectorCnt := 0

Expand Down
13 changes: 7 additions & 6 deletions etc/filebeat.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ filebeat:
# Paths that should be crawled and fetched
paths:
#- /var/log/app*.log
- /var/log/*.log
- /var/log/*
# Type of the files. Annotated in every documented
type: syslog
# Optional additional fields
Expand All @@ -27,12 +27,13 @@ filebeat:
type: stdin
paths:
- "-"
spoolSize:
harvesterBufferSize:
cpuProfileFile:
idleTimeout:
tailOnRotate:
spoolsize:
harvesterbuffersize:
cpuprofilefile:
idletimeout:
tailonRotate:
quiet:
registryfile: .filebeat

############################# Shipper ############################################
shipper:
Expand Down
2 changes: 2 additions & 0 deletions etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ filebeat:
idleTimeout:
tailOnRotate:
quiet:
registryfile: .filebeat


############################# Shipper ############################################
shipper:
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package main
// This file is mandatory as otherwise the packetbeat.test binary is not generated correctly. Reason???

import (
"testing"
"flag"
"testing"
)

var integration *bool
Expand Down

0 comments on commit 4b34f0f

Please sign in to comment.