diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 57953eed72600..26cbe26289615 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -68,6 +68,9 @@ ## Maximum socket buffer size in bytes, once the buffer fills up, metrics ## will start dropping. Defaults to the OS default. # read_buffer_size = 65535 + + ## Max duration (TTL) for each metric to stay cached/reported without being updated. + # max_ttl = "10h" ``` ### Description @@ -192,6 +195,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time. measurements and tags. - **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) - **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) +- **max_ttl** config.Duration: Max duration (TTL) for each metric to stay cached/reported without being updated. ### Statsd bucket -> InfluxDB line-protocol Templates diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 9c5780d00a596..f74eb0ef4dc38 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -13,6 +13,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/graphite" @@ -117,6 +118,9 @@ type Statsd struct { TCPKeepAlive bool `toml:"tcp_keep_alive"` TCPKeepAlivePeriod *internal.Duration `toml:"tcp_keep_alive_period"` + // Max duration for each metric to stay cached without being updated. + MaxTTL config.Duration `toml:"max_ttl"` + graphiteParser *graphite.GraphiteParser acc telegraf.Accumulator @@ -131,7 +135,7 @@ type Statsd struct { UDPBytesRecv selfstat.Stat ParseTimeNS selfstat.Stat - Log telegraf.Logger + Log telegraf.Logger `toml:"-"` // A pool of byte slices to handle parsing bufPool sync.Pool @@ -159,27 +163,31 @@ type metric struct { } type cachedset struct { - name string - fields map[string]map[string]bool - tags map[string]string + name string + fields map[string]map[string]bool + tags map[string]string + expiresAt time.Time } type cachedgauge struct { - name string - fields map[string]interface{} - tags map[string]string + name string + fields map[string]interface{} + tags map[string]string + expiresAt time.Time } type cachedcounter struct { - name string - fields map[string]interface{} - tags map[string]string + name string + fields map[string]interface{} + tags map[string]string + expiresAt time.Time } type cachedtimings struct { - name string - fields map[string]RunningStats - tags map[string]string + name string + fields map[string]RunningStats + tags map[string]string + expiresAt time.Time } func (_ *Statsd) Description() string { @@ -243,6 +251,9 @@ const sampleConfig = ` ## calculation of percentiles. Raising this limit increases the accuracy ## of percentiles but also increases the memory usage and cpu time. percentile_limit = 1000 + + ## Max duration (TTL) for each metric to stay cached/reported without being updated. + #max_ttl = "1000h" ` func (_ *Statsd) SampleConfig() string { @@ -306,6 +317,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { if s.DeleteSets { s.sets = make(map[string]cachedset) } + + s.expireCachedMetrics() + return nil } @@ -527,9 +541,6 @@ func (s *Statsd) parser() error { // parseStatsdLine will parse the given statsd line, validating it as it goes. // If the line is valid, it will be cached for the next call to Gather() func (s *Statsd) parseStatsdLine(line string) error { - s.Lock() - defer s.Unlock() - lineTags := make(map[string]string) if s.DataDogExtensions { recombinedSegments := make([]string, 0) @@ -734,6 +745,9 @@ func parseKeyValue(keyvalue string) (string, string) { // aggregates and caches the current value(s). It does not deal with the // Delete* options, because those are dealt with in the Gather function. func (s *Statsd) aggregate(m metric) { + s.Lock() + defer s.Unlock() + switch m.mtype { case "ms", "h": // Check if the measurement exists @@ -761,61 +775,67 @@ func (s *Statsd) aggregate(m metric) { field.AddValue(m.floatvalue) } cached.fields[m.field] = field + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) s.timings[m.hash] = cached case "c": // check if the measurement exists - _, ok := s.counters[m.hash] + cached, ok := s.counters[m.hash] if !ok { - s.counters[m.hash] = cachedcounter{ + cached = cachedcounter{ name: m.name, fields: make(map[string]interface{}), tags: m.tags, } } // check if the field exists - _, ok = s.counters[m.hash].fields[m.field] + _, ok = cached.fields[m.field] if !ok { - s.counters[m.hash].fields[m.field] = int64(0) + cached.fields[m.field] = int64(0) } - s.counters[m.hash].fields[m.field] = - s.counters[m.hash].fields[m.field].(int64) + m.intvalue + cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) + s.counters[m.hash] = cached case "g": // check if the measurement exists - _, ok := s.gauges[m.hash] + cached, ok := s.gauges[m.hash] if !ok { - s.gauges[m.hash] = cachedgauge{ + cached = cachedgauge{ name: m.name, fields: make(map[string]interface{}), tags: m.tags, } } // check if the field exists - _, ok = s.gauges[m.hash].fields[m.field] + _, ok = cached.fields[m.field] if !ok { - s.gauges[m.hash].fields[m.field] = float64(0) + cached.fields[m.field] = float64(0) } if m.additive { - s.gauges[m.hash].fields[m.field] = - s.gauges[m.hash].fields[m.field].(float64) + m.floatvalue + cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue } else { - s.gauges[m.hash].fields[m.field] = m.floatvalue + cached.fields[m.field] = m.floatvalue } + + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) + s.gauges[m.hash] = cached case "s": // check if the measurement exists - _, ok := s.sets[m.hash] + cached, ok := s.sets[m.hash] if !ok { - s.sets[m.hash] = cachedset{ + cached = cachedset{ name: m.name, fields: make(map[string]map[string]bool), tags: m.tags, } } // check if the field exists - _, ok = s.sets[m.hash].fields[m.field] + _, ok = cached.fields[m.field] if !ok { - s.sets[m.hash].fields[m.field] = make(map[string]bool) + cached.fields[m.field] = make(map[string]bool) } - s.sets[m.hash].fields[m.field][m.strvalue] = true + cached.fields[m.field][m.strvalue] = true + cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL)) + s.sets[m.hash] = cached } } @@ -932,6 +952,39 @@ func (s *Statsd) isUDP() bool { return strings.HasPrefix(s.Protocol, "udp") } +func (s *Statsd) expireCachedMetrics() { + // If Max TTL wasn't configured, skip expiration. + if s.MaxTTL == 0 { + return + } + + now := time.Now() + + for key, cached := range s.gauges { + if now.After(cached.expiresAt) { + delete(s.gauges, key) + } + } + + for key, cached := range s.sets { + if now.After(cached.expiresAt) { + delete(s.sets, key) + } + } + + for key, cached := range s.timings { + if now.After(cached.expiresAt) { + delete(s.timings, key) + } + } + + for key, cached := range s.counters { + if now.After(cached.expiresAt) { + delete(s.counters, key) + } + } +} + func init() { inputs.Add("statsd", func() telegraf.Input { return &Statsd{ diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index f76681134a094..fd3b49b9203f0 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -2,15 +2,17 @@ package statsd import ( "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "net" "sync" "testing" "time" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/testutil" ) @@ -1077,6 +1079,65 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { } } +// Test that the metric caches expire (clear) an entry after the entry hasn't been updated for the configurable MaxTTL duration. +func TestCachesExpireAfterMaxTTL(t *testing.T) { + s := NewTestStatsd() + s.MaxTTL = config.Duration(100 * time.Microsecond) + + acc := &testutil.Accumulator{} + s.parseStatsdLine("valid:45|c") + s.parseStatsdLine("valid:45|c") + require.NoError(t, s.Gather(acc)) + + // Max TTL goes by, our 'valid' entry is cleared. + time.Sleep(100 * time.Microsecond) + require.NoError(t, s.Gather(acc)) + + // Now when we gather, we should have a counter that is reset to zero. + s.parseStatsdLine("valid:45|c") + require.NoError(t, s.Gather(acc)) + + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + testutil.MustMetric( + "valid", + map[string]string{ + "metric_type": "counter", + }, + map[string]interface{}{ + "value": 90, + }, + time.Now(), + telegraf.Counter, + ), + testutil.MustMetric( + "valid", + map[string]string{ + "metric_type": "counter", + }, + map[string]interface{}{ + "value": 90, + }, + time.Now(), + telegraf.Counter, + ), + testutil.MustMetric( + "valid", + map[string]string{ + "metric_type": "counter", + }, + map[string]interface{}{ + "value": 45, + }, + time.Now(), + telegraf.Counter, + ), + }, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +} + // Test that measurements with multiple bits, are treated as different outputs // but are equal to their single-measurement representation func TestParse_MeasurementsWithMultipleValues(t *testing.T) {