Skip to content

Commit

Permalink
Support multiple templates for graphite serializers (influxdata#7136)
Browse files Browse the repository at this point in the history
  • Loading branch information
shellwiz authored and idohalevi committed Sep 23, 2020
1 parent 6f67649 commit e37a559
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 10 deletions.
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1891,6 +1891,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["templates"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.Templates = append(c.Templates, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["influx_max_line_bytes"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if integer, ok := kv.Value.(*ast.Integer); ok {
Expand Down Expand Up @@ -2046,6 +2058,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "json_timestamp_units")
delete(tbl.Fields, "splunkmetric_hec_routing")
delete(tbl.Fields, "splunkmetric_multimetric")
Expand Down
10 changes: 10 additions & 0 deletions plugins/outputs/graphite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ see the [Graphite Data Format](../../../docs/DATA_FORMATS_OUTPUT.md)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"

## Graphite templates patterns
## 1. Template for cpu
## 2. Template for disk*
## 3. Default template
# templates = [
# "cpu tags.measurement.host.field",
# "disk* measurement.field",
# "host.measurement.tags.field"
#]

## Enable Graphite tags support
# graphite_tag_support = false

Expand Down
23 changes: 17 additions & 6 deletions plugins/outputs/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
type Graphite struct {
GraphiteTagSupport bool
// URL is only for backwards compatibility
Servers []string
Prefix string
Template string
Timeout int
conns []net.Conn
Servers []string
Prefix string
Template string
Templates []string
Timeout int
conns []net.Conn
tlsint.ClientConfig
}

Expand All @@ -40,6 +41,16 @@ var sampleConfig = `
## Enable Graphite tags support
# graphite_tag_support = false
## Graphite templates patterns
## 1. Template for cpu
## 2. Template for disk*
## 3. Default template
# templates = [
# "cpu tags.measurement.host.field",
# "disk* measurement.field",
# "host.measurement.tags.field"
#]
## timeout in seconds for the write connection to graphite
timeout = 2
Expand Down Expand Up @@ -134,7 +145,7 @@ func checkEOF(conn net.Conn) {
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.Templates)
if err != nil {
return err
}
Expand Down
94 changes: 94 additions & 0 deletions plugins/outputs/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,70 @@ func TestGraphiteOK(t *testing.T) {
g.Close()
}

func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithMultipleTemplates(t, &wg)

// Init plugin
g := Graphite{
Prefix: "my.prefix",
Template: "measurement.host.tags.field",
Templates: []string{
"my_* host.measurement.tags.field",
"measurement.tags.host.field",
},
}

// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)

// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithMultipleTemplates(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")

require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}

func TestGraphiteOkWithTags(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
Expand Down Expand Up @@ -188,6 +252,36 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
}()
}

func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1)
conn.Close()
tcpServer.Close()
}()
}

func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2)
data3, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3)
conn2.Close()
tcpServer.Close()
}()
}

func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Instrumental struct {
Prefix string
DataFormat string
Template string
Templates []string
Timeout internal.Duration
Debug bool

Expand Down Expand Up @@ -85,7 +86,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
}
}

s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false)
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, i.Templates)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions plugins/serializers/graphite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ method is used, otherwise the [Template Pattern](templates) is used.
prefix = "telegraf"
## Graphite template pattern
template = "host.tags.measurement.field"

## Graphite templates patterns
## 1. Template for cpu
## 2. Template for disk*
## 3. Default template
# templates = [
# "cpu tags.measurement.host.field",
# "disk* measurement.field",
# "host.measurement.tags.field"
#]

## Support Graphite tags, recommended to enable when using Graphite 1.1 or later.
# graphite_tag_support = false
Expand Down
56 changes: 55 additions & 1 deletion plugins/serializers/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
)

const DEFAULT_TEMPLATE = "host.tags.measurement.field"
Expand All @@ -29,10 +30,16 @@ var (
fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
)

type GraphiteTemplate struct {
Filter filter.Filter
Value string
}

type GraphiteSerializer struct {
Prefix string
Template string
TagSupport bool
Templates []*GraphiteTemplate
}

func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
Expand All @@ -59,7 +66,15 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
out = append(out, point...)
}
default:
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
template := s.Template
for _, graphiteTemplate := range s.Templates {
if graphiteTemplate.Filter.Match(metric.Name()) {
template = graphiteTemplate.Value
break
}
}

bucket := SerializeBucketName(metric.Name(), metric.Tags(), template, s.Prefix)
if bucket == "" {
return out, nil
}
Expand Down Expand Up @@ -185,6 +200,45 @@ func SerializeBucketName(
return prefix + "." + strings.Join(out, ".")
}

func InitGraphiteTemplates(templates []string) ([]*GraphiteTemplate, string, error) {
var graphiteTemplates []*GraphiteTemplate
defaultTemplate := ""

for i, t := range templates {
parts := strings.Fields(t)

if len(parts) == 0 {
return nil, "", fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 {
if parts[0] == "" {
return nil, "", fmt.Errorf("missing template at position: %d", i)
} else {
// Override default template
defaultTemplate = t
continue
}
}

if len(parts) > 2 {
return nil, "", fmt.Errorf("invalid template format: '%s'", t)
}

tFilter, err := filter.Compile([]string{parts[0]})

if err != nil {
return nil, "", err
}

graphiteTemplates = append(graphiteTemplates, &GraphiteTemplate{
Filter: tFilter,
Value: parts[1],
})
}

return graphiteTemplates, defaultTemplate, nil
}

// SerializeBucketNameWithTags will take the given measurement name and tags and
// produce a graphite bucket. It will use the Graphite11Serializer.
// http://graphite.readthedocs.io/en/latest/tags.html
Expand Down
Loading

0 comments on commit e37a559

Please sign in to comment.