Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple templates for graphite serializers #7136

Merged
merged 7 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1891,6 +1891,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["templates"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.Templates = append(c.Templates, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["influx_max_line_bytes"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if integer, ok := kv.Value.(*ast.Integer); ok {
Expand Down Expand Up @@ -2046,6 +2058,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "json_timestamp_units")
delete(tbl.Fields, "splunkmetric_hec_routing")
delete(tbl.Fields, "splunkmetric_multimetric")
Expand Down
10 changes: 10 additions & 0 deletions plugins/outputs/graphite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ see the [Graphite Data Format](../../../docs/DATA_FORMATS_OUTPUT.md)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"

## Graphite templates patterns
## 1. Template for cpu
## 2. Template for disk*
## 3. Default template
# templates = [
# "cpu tags.measurement.host.field",
# "disk* measurement.field",
# "host.measurement.tags.field"
#]

## Enable Graphite tags support
# graphite_tag_support = false

Expand Down
23 changes: 17 additions & 6 deletions plugins/outputs/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
type Graphite struct {
GraphiteTagSupport bool
// URL is only for backwards compatibility
Servers []string
Prefix string
Template string
Timeout int
conns []net.Conn
Servers []string
Prefix string
Template string
Templates []string
Timeout int
conns []net.Conn
tlsint.ClientConfig
}

Expand All @@ -40,6 +41,16 @@ var sampleConfig = `
## Enable Graphite tags support
# graphite_tag_support = false

## Graphite templates patterns
## 1. Template for cpu
## 2. Template for disk*
## 3. Default template
# templates = [
# "cpu tags.measurement.host.field",
# "disk* measurement.field",
# "host.measurement.tags.field"
#]

## timeout in seconds for the write connection to graphite
timeout = 2

Expand Down Expand Up @@ -134,7 +145,7 @@ func checkEOF(conn net.Conn) {
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.Templates)
if err != nil {
return err
}
Expand Down
94 changes: 94 additions & 0 deletions plugins/outputs/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,70 @@ func TestGraphiteOK(t *testing.T) {
g.Close()
}

func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithMultipleTemplates(t, &wg)

// Init plugin
g := Graphite{
Prefix: "my.prefix",
Template: "measurement.host.tags.field",
Templates: []string{
"my_* host.measurement.tags.field",
"measurement.tags.host.field",
},
}

// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)

// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithMultipleTemplates(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")

require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}

func TestGraphiteOkWithTags(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
Expand Down Expand Up @@ -188,6 +252,36 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
}()
}

func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1)
conn.Close()
tcpServer.Close()
}()
}

func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2)
data3, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3)
conn2.Close()
tcpServer.Close()
}()
}

func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Instrumental struct {
Prefix string
DataFormat string
Template string
Templates []string
Timeout internal.Duration
Debug bool

Expand Down Expand Up @@ -85,7 +86,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
}
}

s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false)
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, i.Templates)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions plugins/serializers/graphite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ method is used, otherwise the [Template Pattern](templates) is used.
prefix = "telegraf"
## Graphite template pattern
template = "host.tags.measurement.field"

## Graphite templates patterns
## 1. Template for cpu
## 2. Template for disk*
## 3. Default template
# templates = [
# "cpu tags.measurement.host.field",
# "disk* measurement.field",
# "host.measurement.tags.field"
#]

## Support Graphite tags, recommended to enable when using Graphite 1.1 or later.
# graphite_tag_support = false
Expand Down
56 changes: 55 additions & 1 deletion plugins/serializers/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
)

const DEFAULT_TEMPLATE = "host.tags.measurement.field"
Expand All @@ -29,10 +30,16 @@ var (
fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
)

type GraphiteTemplate struct {
Filter filter.Filter
Value string
}

type GraphiteSerializer struct {
Prefix string
Template string
TagSupport bool
Templates []*GraphiteTemplate
}

func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
Expand All @@ -59,7 +66,15 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
out = append(out, point...)
}
default:
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
template := s.Template
for _, graphiteTemplate := range s.Templates {
if graphiteTemplate.Filter.Match(metric.Name()) {
template = graphiteTemplate.Value
break
}
}

bucket := SerializeBucketName(metric.Name(), metric.Tags(), template, s.Prefix)
if bucket == "" {
return out, nil
}
Expand Down Expand Up @@ -185,6 +200,45 @@ func SerializeBucketName(
return prefix + "." + strings.Join(out, ".")
}

func InitGraphiteTemplates(templates []string) ([]*GraphiteTemplate, string, error) {
var graphiteTemplates []*GraphiteTemplate
defaultTemplate := ""

for i, t := range templates {
parts := strings.Fields(t)

if len(parts) == 0 {
return nil, "", fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 {
if parts[0] == "" {
return nil, "", fmt.Errorf("missing template at position: %d", i)
} else {
// Override default template
defaultTemplate = t
continue
}
}

if len(parts) > 2 {
return nil, "", fmt.Errorf("invalid template format: '%s'", t)
}

tFilter, err := filter.Compile([]string{parts[0]})

if err != nil {
return nil, "", err
}

graphiteTemplates = append(graphiteTemplates, &GraphiteTemplate{
Filter: tFilter,
Value: parts[1],
})
}

return graphiteTemplates, defaultTemplate, nil
}

// SerializeBucketNameWithTags will take the given measurement name and tags and
// produce a graphite bucket. It will use the Graphite11Serializer.
// http://graphite.readthedocs.io/en/latest/tags.html
Expand Down
Loading