Skip to content

Commit

Permalink
Enforce stat prefixing at the accumulator layer
Browse files Browse the repository at this point in the history
  • Loading branch information
evanphx committed May 18, 2015
1 parent 34e87e7 commit f1e1204
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 244 deletions.
4 changes: 4 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ type BatchPoints struct {
client.BatchPoints

Debug bool

Prefix string
}

func (bp *BatchPoints) Add(name string, val interface{}, tags map[string]string) {
name = bp.Prefix + name

if bp.Debug {
var tg []string

Expand Down
15 changes: 11 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
"github.com/influxdb/tivan/plugins"
)

type runningPlugin struct {
name string
plugin plugins.Plugin
}

type Agent struct {
Interval Duration
Debug bool
Expand All @@ -19,7 +24,7 @@ type Agent struct {

Config *Config

plugins []plugins.Plugin
plugins []*runningPlugin

conn *client.Client
}
Expand Down Expand Up @@ -93,7 +98,7 @@ func (a *Agent) LoadPlugins() ([]string, error) {
return nil, err
}

a.plugins = append(a.plugins, plugin)
a.plugins = append(a.plugins, &runningPlugin{name, plugin})
names = append(names, name)
}

Expand All @@ -108,7 +113,8 @@ func (a *Agent) crank() error {
acc.Debug = a.Debug

for _, plugin := range a.plugins {
err := plugin.Gather(&acc)
acc.Prefix = plugin.name + "_"
err := plugin.plugin.Gather(&acc)
if err != nil {
return err
}
Expand All @@ -128,7 +134,8 @@ func (a *Agent) Test() error {
acc.Debug = true

for _, plugin := range a.plugins {
err := plugin.Gather(&acc)
acc.Prefix = plugin.name + "_"
err := plugin.plugin.Gather(&acc)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions plugins/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,23 @@ type mapping struct {
var mappings = []*mapping{
{
onServer: "Bytes_",
inExport: "mysql_bytes_",
inExport: "bytes_",
},
{
onServer: "Com_",
inExport: "mysql_commands_",
inExport: "commands_",
},
{
onServer: "Handler_",
inExport: "mysql_handler_",
inExport: "handler_",
},
{
onServer: "Innodb_",
inExport: "mysql_innodb_",
inExport: "innodb_",
},
{
onServer: "Threads_",
inExport: "mysql_threads_",
inExport: "threads_",
},
}

Expand Down Expand Up @@ -113,14 +113,14 @@ func (m *Mysql) gatherServer(serv *Server, acc plugins.Accumulator) error {
return err
}

acc.Add("mysql_queries", i, nil)
acc.Add("queries", i, nil)
case "Slow_queries":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil {
return err
}

acc.Add("mysql_slow_queries", i, nil)
acc.Add("slow_queries", i, nil)
}
}

Expand Down
14 changes: 7 additions & 7 deletions plugins/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ func TestMysqlGeneratesMetrics(t *testing.T) {
prefix string
count int
}{
{"mysql_commands", 141},
{"mysql_handler", 18},
{"mysql_bytes", 2},
{"mysql_innodb", 51},
{"mysql_threads", 4},
{"commands", 141},
{"handler", 18},
{"bytes", 2},
{"innodb", 51},
{"threads", 4},
}

intMetrics := []string{
"mysql_queries",
"mysql_slow_queries",
"queries",
"slow_queries",
}

for _, prefix := range prefixes {
Expand Down
30 changes: 15 additions & 15 deletions plugins/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,21 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {

tags := map[string]string{"db": name}

acc.Add("postgresql_xact_commit", commit, tags)
acc.Add("postgresql_xact_rollback", rollback, tags)
acc.Add("postgresql_blks_read", read, tags)
acc.Add("postgresql_blks_hit", hit, tags)
acc.Add("postgresql_tup_returned", returned, tags)
acc.Add("postgresql_tup_fetched", fetched, tags)
acc.Add("postgresql_tup_inserted", inserted, tags)
acc.Add("postgresql_tup_updated", updated, tags)
acc.Add("postgresql_tup_deleted", deleted, tags)
acc.Add("postgresql_conflicts", conflicts, tags)
acc.Add("postgresql_temp_files", temp_files, tags)
acc.Add("postgresql_temp_bytes", temp_bytes, tags)
acc.Add("postgresql_deadlocks", deadlocks, tags)
acc.Add("postgresql_blk_read_time", read_time, tags)
acc.Add("postgresql_blk_write_time", read_time, tags)
acc.Add("xact_commit", commit, tags)
acc.Add("xact_rollback", rollback, tags)
acc.Add("blks_read", read, tags)
acc.Add("blks_hit", hit, tags)
acc.Add("tup_returned", returned, tags)
acc.Add("tup_fetched", fetched, tags)
acc.Add("tup_inserted", inserted, tags)
acc.Add("tup_updated", updated, tags)
acc.Add("tup_deleted", deleted, tags)
acc.Add("conflicts", conflicts, tags)
acc.Add("temp_files", temp_files, tags)
acc.Add("temp_bytes", temp_bytes, tags)
acc.Add("deadlocks", deadlocks, tags)
acc.Add("blk_read_time", read_time, tags)
acc.Add("blk_write_time", read_time, tags)

return nil
}
Expand Down
34 changes: 17 additions & 17 deletions plugins/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
require.NoError(t, err)

intMetrics := []string{
"postgresql_xact_commit",
"postgresql_xact_rollback",
"postgresql_blks_read",
"postgresql_blks_hit",
"postgresql_tup_returned",
"postgresql_tup_fetched",
"postgresql_tup_inserted",
"postgresql_tup_updated",
"postgresql_tup_deleted",
"postgresql_conflicts",
"postgresql_temp_files",
"postgresql_temp_bytes",
"postgresql_deadlocks",
"xact_commit",
"xact_rollback",
"blks_read",
"blks_hit",
"tup_returned",
"tup_fetched",
"tup_inserted",
"tup_updated",
"tup_deleted",
"conflicts",
"temp_files",
"temp_bytes",
"deadlocks",
}

floatMetrics := []string{
"postgresql_blk_read_time",
"postgresql_blk_write_time",
"blk_read_time",
"blk_write_time",
}

for _, metric := range intMetrics {
Expand All @@ -68,7 +68,7 @@ func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) {
err := p.Gather(&acc)
require.NoError(t, err)

point, ok := acc.Get("postgresql_xact_commit")
point, ok := acc.Get("xact_commit")
require.True(t, ok)

assert.Equal(t, "postgres", point.Tags["db"])
Expand All @@ -91,7 +91,7 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) {
var found bool

for _, pnt := range acc.Points {
if pnt.Name == "postgresql_xact_commit" {
if pnt.Name == "xact_commit" {
if pnt.Tags["db"] == "postgres" {
found = true
break
Expand Down
60 changes: 30 additions & 30 deletions plugins/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,36 @@ type Redis struct {
}

var Tracking = map[string]string{
"uptime_in_seconds": "redis_uptime",
"connected_clients": "redis_clients",
"used_memory": "redis_used_memory",
"used_memory_rss": "redis_used_memory_rss",
"used_memory_peak": "redis_used_memory_peak",
"used_memory_lua": "redis_used_memory_lua",
"rdb_changes_since_last_save": "redis_rdb_changes_since_last_save",
"total_connections_received": "redis_total_connections_received",
"total_commands_processed": "redis_total_commands_processed",
"instantaneous_ops_per_sec": "redis_instantaneous_ops_per_sec",
"sync_full": "redis_sync_full",
"sync_partial_ok": "redis_sync_partial_ok",
"sync_partial_err": "redis_sync_partial_err",
"expired_keys": "redis_expired_keys",
"evicted_keys": "redis_evicted_keys",
"keyspace_hits": "redis_keyspace_hits",
"keyspace_misses": "redis_keyspace_misses",
"pubsub_channels": "redis_pubsub_channels",
"pubsub_patterns": "redis_pubsub_patterns",
"latest_fork_usec": "redis_latest_fork_usec",
"connected_slaves": "redis_connected_slaves",
"master_repl_offset": "redis_master_repl_offset",
"repl_backlog_active": "redis_repl_backlog_active",
"repl_backlog_size": "redis_repl_backlog_size",
"repl_backlog_histlen": "redis_repl_backlog_histlen",
"mem_fragmentation_ratio": "redis_mem_fragmentation_ratio",
"used_cpu_sys": "redis_used_cpu_sys",
"used_cpu_user": "redis_used_cpu_user",
"used_cpu_sys_children": "redis_used_cpu_sys_children",
"used_cpu_user_children": "redis_used_cpu_user_children",
"uptime_in_seconds": "uptime",
"connected_clients": "clients",
"used_memory": "used_memory",
"used_memory_rss": "used_memory_rss",
"used_memory_peak": "used_memory_peak",
"used_memory_lua": "used_memory_lua",
"rdb_changes_since_last_save": "rdb_changes_since_last_save",
"total_connections_received": "total_connections_received",
"total_commands_processed": "total_commands_processed",
"instantaneous_ops_per_sec": "instantaneous_ops_per_sec",
"sync_full": "sync_full",
"sync_partial_ok": "sync_partial_ok",
"sync_partial_err": "sync_partial_err",
"expired_keys": "expired_keys",
"evicted_keys": "evicted_keys",
"keyspace_hits": "keyspace_hits",
"keyspace_misses": "keyspace_misses",
"pubsub_channels": "pubsub_channels",
"pubsub_patterns": "pubsub_patterns",
"latest_fork_usec": "latest_fork_usec",
"connected_slaves": "connected_slaves",
"master_repl_offset": "master_repl_offset",
"repl_backlog_active": "repl_backlog_active",
"repl_backlog_size": "repl_backlog_size",
"repl_backlog_histlen": "repl_backlog_histlen",
"mem_fragmentation_ratio": "mem_fragmentation_ratio",
"used_cpu_sys": "used_cpu_sys",
"used_cpu_user": "used_cpu_user",
"used_cpu_sys_children": "used_cpu_sys_children",
"used_cpu_user_children": "used_cpu_user_children",
}

var ErrProtocolError = errors.New("redis protocol error")
Expand Down
Loading

0 comments on commit f1e1204

Please sign in to comment.