Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[output.wavefront] Added truncate_tags setting #7503

Merged
merged 1 commit into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/outputs/wavefront/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro

## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true
#convert_bool = true

## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any
## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility.
#truncate_tags = false
```


Expand Down
38 changes: 27 additions & 11 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package wavefront

import (
"fmt"
"log"
"regexp"
"strings"

Expand All @@ -11,6 +10,8 @@ import (
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
)

const maxTagLength = 254

type Wavefront struct {
Url string
Token string
Expand All @@ -23,10 +24,12 @@ type Wavefront struct {
ConvertBool bool
UseRegex bool
UseStrict bool
TruncateTags bool
SourceOverride []string
StringToNumber map[string][]map[string]float64

sender wavefront.Sender
Log telegraf.Logger
}

// catch many of the invalid chars that could appear in a metric or tag name
Expand Down Expand Up @@ -94,6 +97,10 @@ var sampleConfig = `
## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true
#convert_bool = true

## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any
## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility.
#truncate_tags = false

## Define a mapping, namespaced by metric prefix, from string values to numeric values
## deprecated in 1.9; use the enum processor plugin
#[[outputs.wavefront.string_to_number.elasticsearch]]
Expand All @@ -113,11 +120,11 @@ type MetricPoint struct {
func (w *Wavefront) Connect() error {

if len(w.StringToNumber) > 0 {
log.Print("W! [outputs.wavefront] The string_to_number option is deprecated; please use the enum processor instead")
w.Log.Warn("The string_to_number option is deprecated; please use the enum processor instead")
}

if w.Url != "" {
log.Printf("D! [outputs.wavefront] connecting over http/https using Url: %s", w.Url)
w.Log.Debug("connecting over http/https using Url: %s", w.Url)
sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{
Server: w.Url,
Token: w.Token,
Expand All @@ -128,7 +135,7 @@ func (w *Wavefront) Connect() error {
}
w.sender = sender
} else {
log.Printf("D! Output [wavefront] connecting over tcp using Host: %s and Port: %d", w.Host, w.Port)
w.Log.Debug("connecting over tcp using Host: %s and Port: %d", w.Host, w.Port)
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
Host: w.Host,
MetricsPort: w.Port,
Expand All @@ -152,18 +159,17 @@ func (w *Wavefront) Connect() error {
func (w *Wavefront) Write(metrics []telegraf.Metric) error {

for _, m := range metrics {
for _, point := range buildMetrics(m, w) {
for _, point := range w.buildMetrics(m) {
err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
if err != nil {
return fmt.Errorf("Wavefront sending error: %s", err.Error())
}
}
}

return nil
}

func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint {
func (w *Wavefront) buildMetrics(m telegraf.Metric) []*MetricPoint {
ret := []*MetricPoint{}

for fieldName, value := range m.Fields() {
Expand Down Expand Up @@ -193,12 +199,12 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint {

metricValue, buildError := buildValue(value, metric.Metric, w)
if buildError != nil {
log.Printf("D! [outputs.wavefront] %s\n", buildError.Error())
w.Log.Debug("Error building tags: %s\n", buildError.Error())
continue
}
metric.Value = metricValue

source, tags := buildTags(m.Tags(), w)
source, tags := w.buildTags(m.Tags())
metric.Source = source
metric.Tags = tags

Expand All @@ -207,7 +213,7 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint {
return ret
}

func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string) {
func (w *Wavefront) buildTags(mTags map[string]string) (string, map[string]string) {

// Remove all empty tags.
for k, v := range mTags {
Expand Down Expand Up @@ -259,6 +265,16 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string
key = sanitizedChars.Replace(k)
}
val := tagValueReplacer.Replace(v)
if w.TruncateTags {
if len(key) > maxTagLength {
w.Log.Warnf("Tag key length > 254. Skipping tag: %s", key)
continue
}
if len(key)+len(val) > maxTagLength {
w.Log.Debugf("Key+value length > 254: %s", key)
val = val[:maxTagLength-len(key)]
}
}
tags[key] = val
}

Expand Down Expand Up @@ -296,7 +312,6 @@ func buildValue(v interface{}, name string, w *Wavefront) (float64, error) {
default:
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
}

return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
}

Expand All @@ -320,6 +335,7 @@ func init() {
MetricSeparator: ".",
ConvertPaths: true,
ConvertBool: true,
TruncateTags: false,
}
})
}
48 changes: 43 additions & 5 deletions plugins/outputs/wavefront/wavefront_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"reflect"
"strings"
"testing"
Expand All @@ -21,6 +22,7 @@ func defaultWavefront() *Wavefront {
ConvertPaths: true,
ConvertBool: true,
UseRegex: false,
Log: testutil.Logger{},
}
}

Expand Down Expand Up @@ -64,7 +66,7 @@ func TestBuildMetrics(t *testing.T) {
}

for _, mt := range metricTests {
ml := buildMetrics(mt.metric, w)
ml := w.buildMetrics(mt.metric)
for i, line := range ml {
if mt.metricPoints[i].Metric != line.Metric || mt.metricPoints[i].Value != line.Value {
t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricPoints[i].Metric, mt.metricPoints[i].Value, line.Metric, line.Value)
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestBuildMetricsStrict(t *testing.T) {
}

for _, mt := range metricTests {
ml := buildMetrics(mt.metric, w)
ml := w.buildMetrics(mt.metric)
for i, line := range ml {
if mt.metricPoints[i].Metric != line.Metric || mt.metricPoints[i].Value != line.Value {
t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricPoints[i].Metric, mt.metricPoints[i].Value, line.Metric, line.Value)
Expand Down Expand Up @@ -143,7 +145,7 @@ func TestBuildMetricsWithSimpleFields(t *testing.T) {
}

for _, mt := range metricTests {
ml := buildMetrics(mt.metric, w)
ml := w.buildMetrics(mt.metric)
for i, line := range ml {
if mt.metricLines[i].Metric != line.Metric || mt.metricLines[i].Value != line.Value {
t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricLines[i].Metric, mt.metricLines[i].Value, line.Metric, line.Value)
Expand Down Expand Up @@ -195,7 +197,7 @@ func TestBuildTags(t *testing.T) {
}

for _, tt := range tagtests {
source, tags := buildTags(tt.ptIn, w)
source, tags := w.buildTags(tt.ptIn)
if source != tt.outSource {
t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outSource, source)
}
Expand Down Expand Up @@ -247,7 +249,7 @@ func TestBuildTagsWithSource(t *testing.T) {
}

for _, tt := range tagtests {
source, tags := buildTags(tt.ptIn, w)
source, tags := w.buildTags(tt.ptIn)
if source != tt.outSource {
t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outSource, source)
}
Expand Down Expand Up @@ -316,6 +318,42 @@ func TestBuildValueString(t *testing.T) {

}

func TestTagLimits(t *testing.T) {
w := defaultWavefront()
w.TruncateTags = true

// Should fail (all tags skipped)
template := make(map[string]string)
template[strings.Repeat("x", 255)] = "whatever"
_, tags := w.buildTags(template)
require.Empty(t, tags, "All tags should have been skipped")

// Should truncate value
template = make(map[string]string)
longKey := strings.Repeat("x", 253)
template[longKey] = "whatever"
_, tags = w.buildTags(template)
require.Contains(t, tags, longKey, "Should contain truncated long key")
require.Equal(t, "w", tags[longKey])

// Should not truncate
template = make(map[string]string)
longKey = strings.Repeat("x", 251)
template[longKey] = "Hi!"
_, tags = w.buildTags(template)
require.Contains(t, tags, longKey, "Should contain non truncated long key")
require.Equal(t, "Hi!", tags[longKey])

// Turn off truncating and make sure it leaves the tags intact
w.TruncateTags = false
template = make(map[string]string)
longKey = strings.Repeat("x", 255)
template[longKey] = longKey
_, tags = w.buildTags(template)
require.Contains(t, tags, longKey, "Should contain non truncated long key")
require.Equal(t, longKey, tags[longKey])
}

// Benchmarks to test performance of string replacement via Regex and Replacer
var testString = "this_is*my!test/string\\for=replacement"

Expand Down