Skip to content

Commit

Permalink
Refactoring gauges to support floats, unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 14, 2015
1 parent d84a258 commit d403512
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 132 deletions.
14 changes: 7 additions & 7 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
mu sync.Mutex
sync.Mutex

client.BatchPoints

Expand Down Expand Up @@ -71,8 +71,8 @@ func (bp *BatchPoints) Add(
val interface{},
tags map[string]string,
) {
bp.mu.Lock()
defer bp.mu.Unlock()
bp.Lock()
defer bp.Unlock()

measurement = bp.Prefix + measurement

Expand Down Expand Up @@ -113,8 +113,8 @@ func (bp *BatchPoints) AddFieldsWithTime(
// TODO this function should add the fields with the timestamp, but that will
// need to wait for the InfluxDB point precision/unit to be fixed
bp.AddFields(measurement, fields, tags)
// bp.mu.Lock()
// defer bp.mu.Unlock()
// bp.Lock()
// defer bp.Unlock()

// measurement = bp.Prefix + measurement

Expand Down Expand Up @@ -158,8 +158,8 @@ func (bp *BatchPoints) AddFields(
fields map[string]interface{},
tags map[string]string,
) {
bp.mu.Lock()
defer bp.mu.Unlock()
bp.Lock()
defer bp.Unlock()

measurement = bp.Prefix + measurement

Expand Down
122 changes: 68 additions & 54 deletions plugins/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type Statsd struct {
done chan struct{}

// Cache gauges, counters & sets so they can be aggregated as they arrive
gauges map[string]cachedmetric
counters map[string]cachedmetric
sets map[string]cachedmetric
gauges map[string]cachedgauge
counters map[string]cachedcounter
sets map[string]cachedset

Mappings []struct {
Match string
Expand All @@ -52,9 +52,9 @@ func NewStatsd() *Statsd {
s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedmetric)
s.counters = make(map[string]cachedmetric)
s.sets = make(map[string]cachedmetric)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)

return &s
}
Expand All @@ -63,19 +63,32 @@ func NewStatsd() *Statsd {
type metric struct {
name string
bucket string
value int64
intvalue int64
floatvalue float64
mtype string
additive bool
samplerate float64
tags map[string]string
}

// cachedmetric is a subset of metric used specifically for storing cached
// gauges and counters, ready for sending to InfluxDB.
type cachedmetric struct {
type cachedset struct {
set map[int64]bool
tags map[string]string
}

type cachedgauge struct {
value float64
tags map[string]string
}

type cachedcounter struct {
value int64
tags map[string]string
set map[int64]bool
}

type cachedtiming struct {
timings []float64
tags map[string]string
}

func (_ *Statsd) Description() string {
Expand Down Expand Up @@ -105,7 +118,6 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
s.Lock()
defer s.Unlock()

values := make(map[string]int64)
items := len(s.inmetrics)
for i := 0; i < items; i++ {

Expand All @@ -123,26 +135,23 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
acc.Add(name, cmetric.value, cmetric.tags)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedmetric)
s.gauges = make(map[string]cachedgauge)
}

for name, cmetric := range s.counters {
acc.Add(name, cmetric.value, cmetric.tags)
}
if s.DeleteCounters {
s.counters = make(map[string]cachedmetric)
s.counters = make(map[string]cachedcounter)
}

for name, cmetric := range s.sets {
acc.Add(name, cmetric.value, cmetric.tags)
acc.Add(name, int64(len(cmetric.set)), cmetric.tags)
}
if s.DeleteSets {
s.sets = make(map[string]cachedmetric)
s.sets = make(map[string]cachedset)
}

for name, value := range values {
acc.Add(name, value, nil)
}
return nil
}

Expand All @@ -153,9 +162,9 @@ func (s *Statsd) Start() error {
s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedmetric)
s.counters = make(map[string]cachedmetric)
s.sets = make(map[string]cachedmetric)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)

// Start the UDP listener
go s.udpListen()
Expand Down Expand Up @@ -267,16 +276,27 @@ func (s *Statsd) parseStatsdLine(line string) error {
}
m.additive = true
}
v, err := strconv.ParseInt(parts2[1], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)

switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(parts2[1], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(parts2[1], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)
}
m.intvalue = v
}
m.value = v

// Parse the name
m.name, m.tags = s.parseName(m)
Expand All @@ -301,7 +321,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
// map of tags.
// Return values are (<name>, <tags>)
func (s *Statsd) parseName(m metric) (string, map[string]string) {
var tags map[string]string
tags := make(map[string]string)
name := strings.Replace(m.bucket, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1)

Expand All @@ -325,13 +345,13 @@ func (s *Statsd) parseName(m metric) (string, map[string]string) {

switch m.mtype {
case "c":
name = name + "_counter"
tags["metric_type"] = "counter"
case "g":
name = name + "_gauge"
tags["metric_type"] = "gauge"
case "s":
name = name + "_set"
tags["metric_type"] = "set"
case "ms", "h":
name = name + "_timer"
tags["metric_type"] = "timer"
}

return name, tags
Expand Down Expand Up @@ -363,27 +383,27 @@ func (s *Statsd) aggregate(m metric) {
case "c":
cached, ok := s.counters[m.name]
if !ok {
s.counters[m.name] = cachedmetric{
value: m.value,
s.counters[m.name] = cachedcounter{
value: m.intvalue,
tags: m.tags,
}
} else {
cached.value += m.value
cached.value += m.intvalue
cached.tags = m.tags
s.counters[m.name] = cached
}
case "g":
cached, ok := s.gauges[m.name]
if !ok {
s.gauges[m.name] = cachedmetric{
value: m.value,
s.gauges[m.name] = cachedgauge{
value: m.floatvalue,
tags: m.tags,
}
} else {
if m.additive {
cached.value = cached.value + m.value
cached.value = cached.value + m.floatvalue
} else {
cached.value = m.value
cached.value = m.floatvalue
}
cached.tags = m.tags
s.gauges[m.name] = cached
Expand All @@ -392,19 +412,13 @@ func (s *Statsd) aggregate(m metric) {
cached, ok := s.sets[m.name]
if !ok {
// Completely new metric (initialize with count of 1)
s.sets[m.name] = cachedmetric{
value: 1,
tags: m.tags,
set: map[int64]bool{m.value: true},
s.sets[m.name] = cachedset{
tags: m.tags,
set: map[int64]bool{m.intvalue: true},
}
} else {
_, ok := s.sets[m.name].set[m.value]
if !ok {
// Metric exists, but value has not been counted
cached.value += 1
cached.set[m.value] = true
s.sets[m.name] = cached
}
cached.set[m.intvalue] = true
s.sets[m.name] = cached
}
}
}
Expand Down
Loading

0 comments on commit d403512

Please sign in to comment.