Skip to content

Commit

Permalink
Add pass, drop, and interval to the plugin options
Browse files Browse the repository at this point in the history
  • Loading branch information
evanphx committed May 20, 2015
1 parent 203d369 commit 8aa7e35
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 12 deletions.
8 changes: 8 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,19 @@ type BatchPoints struct {
Debug bool

Prefix string

Config *ConfiguredPlugin
}

func (bp *BatchPoints) Add(name string, val interface{}, tags map[string]string) {
name = bp.Prefix + name

if bp.Config != nil {
if !bp.Config.ShouldPass(name) {
return
}
}

if bp.Debug {
var tg []string

Expand Down
108 changes: 102 additions & 6 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"os"
"sort"
"sync"
"time"

"github.com/influxdb/influxdb/client"
Expand All @@ -15,12 +16,12 @@ import (
type runningPlugin struct {
name string
plugin plugins.Plugin
config *ConfiguredPlugin
}

type Agent struct {
Interval Duration
Debug bool
HTTP string
Hostname string

Config *Config
Expand All @@ -31,9 +32,9 @@ type Agent struct {
}

func NewAgent(config *Config) (*Agent, error) {
agent := &Agent{Config: config}
agent := &Agent{Config: config, Interval: Duration{10 * time.Second}}

err := config.Apply("agent", agent)
err := config.ApplyAgent(agent)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -91,12 +92,12 @@ func (a *Agent) LoadPlugins() ([]string, error) {

plugin := creator()

err := a.Config.Apply(name, plugin)
config, err := a.Config.ApplyPlugin(name, plugin)
if err != nil {
return nil, err
}

a.plugins = append(a.plugins, &runningPlugin{name, plugin})
a.plugins = append(a.plugins, &runningPlugin{name, plugin, config})
names = append(names, name)
}

Expand All @@ -105,13 +106,57 @@ func (a *Agent) LoadPlugins() ([]string, error) {
return names, nil
}

func (a *Agent) crankParallel() error {
points := make(chan *BatchPoints, len(a.plugins))

var wg sync.WaitGroup

for _, plugin := range a.plugins {
if plugin.config.Interval != 0 {
continue
}

wg.Add(1)
go func(plugin *runningPlugin) {
defer wg.Done()

var acc BatchPoints
acc.Debug = a.Debug
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config

plugin.plugin.Gather(&acc)

points <- &acc
}(plugin)
}

wg.Wait()

close(points)

var acc BatchPoints
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database

for sub := range points {
acc.Points = append(acc.Points, sub.Points...)
}

return nil
// _, err := a.conn.Write(acc.BatchPoints)
// return err
}

func (a *Agent) crank() error {
var acc BatchPoints

acc.Debug = a.Debug

for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
if err != nil {
return err
Expand All @@ -126,6 +171,36 @@ func (a *Agent) crank() error {
return err
}

func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval)

for {
var acc BatchPoints

acc.Debug = a.Debug

acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
if err != nil {
return err
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database

a.conn.Write(acc.BatchPoints)

select {
case <-shutdown:
return nil
case <-ticker.C:
continue
}
}
}

func (a *Agent) TestAllPlugins() error {
var names []string

Expand Down Expand Up @@ -162,6 +237,13 @@ func (a *Agent) Test() error {

for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config

fmt.Printf("* Plugin: %s\n", plugin.name)
if plugin.config.Interval != 0 {
fmt.Printf("* Internal: %s\n", plugin.config.Interval)
}

err := plugin.plugin.Gather(&acc)
if err != nil {
return err
Expand All @@ -179,10 +261,24 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
}

var wg sync.WaitGroup

for _, plugin := range a.plugins {
if plugin.config.Interval != 0 {
wg.Add(1)
go func(plugin *runningPlugin) {
defer wg.Done()
a.crankSeparate(shutdown, plugin)
}(plugin)
}
}

defer wg.Wait()

ticker := time.NewTicker(a.Interval.Duration)

for {
err := a.crank()
err := a.crankParallel()
if err != nil {
log.Printf("Error in plugins: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/tivan/tivan.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func main() {
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: %#v", ag)
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n",
ag.Interval, ag.Debug, ag.Hostname)
}

if config.URL != "" {
Expand Down
104 changes: 99 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,106 @@ type Config struct {
UserAgent string
Tags map[string]string

agent *ast.Table
plugins map[string]*ast.Table
}

func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}

func (c *Config) Apply(name string, v interface{}) error {
if tbl, ok := c.plugins[name]; ok {
return toml.UnmarshalTable(tbl, v)
type ConfiguredPlugin struct {
Name string

Drop []string
Pass []string

Interval time.Duration
}

func (cp *ConfiguredPlugin) ShouldPass(name string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
if strings.HasPrefix(name, pat) {
return true
}
}

return false
}

if cp.Drop != nil {
for _, pat := range cp.Drop {
if strings.HasPrefix(name, pat) {
return false
}
}

return true
}

return true
}

func (c *Config) ApplyAgent(v interface{}) error {
if c.agent != nil {
return toml.UnmarshalTable(c.agent, v)
}

return nil
}

func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) {
cp := &ConfiguredPlugin{Name: name}

if tbl, ok := c.plugins[name]; ok {

if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["drop"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

cp.Interval = dur
}
}
}

delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
return cp, toml.UnmarshalTable(tbl, v)
}

return cp, nil
}

func (c *Config) PluginsDeclared() []string {
var plugins []string

Expand Down Expand Up @@ -90,12 +175,15 @@ func LoadConfig(path string) (*Config, error) {
return nil, ErrInvalidConfig
}

if name == "influxdb" {
switch name {
case "influxdb":
err := toml.UnmarshalTable(subtbl, c)
if err != nil {
return nil, err
}
} else {
case "agent":
c.agent = subtbl
default:
c.plugins[name] = subtbl
}
}
Expand Down Expand Up @@ -154,6 +242,12 @@ var header = `# Tivan configuration
# [influxdb.tags]
# dc = "us-east-1"
# Configuration for tivan itself
# [agent]
# interval = "10s"
# debug = false
# hostname = "prod3241"
# PLUGINS
`
Expand Down

0 comments on commit 8aa7e35

Please sign in to comment.