From 0cad343de7e17c2177416433dbbd2753c2275729 Mon Sep 17 00:00:00 2001
From: Ilya Antipov <i2ibrake@gmail.com>
Date: Tue, 31 Mar 2020 21:30:21 +0300
Subject: [PATCH] Support multiple templates for graphite serializers (#7136)

---
 internal/config/config.go                     | 13 +++
 plugins/outputs/graphite/README.md            | 10 ++
 plugins/outputs/graphite/graphite.go          | 23 +++--
 plugins/outputs/graphite/graphite_test.go     | 94 +++++++++++++++++++
 plugins/outputs/instrumental/instrumental.go  |  3 +-
 plugins/serializers/graphite/README.md        | 10 ++
 plugins/serializers/graphite/graphite.go      | 56 ++++++++++-
 plugins/serializers/graphite/graphite_test.go | 91 ++++++++++++++++++
 plugins/serializers/registry.go               | 18 +++-
 9 files changed, 308 insertions(+), 10 deletions(-)

diff --git a/internal/config/config.go b/internal/config/config.go
index f72f1ef26c378..c2335fac26bc7 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -1891,6 +1891,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
 		}
 	}
 
+	if node, ok := tbl.Fields["templates"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if ary, ok := kv.Value.(*ast.Array); ok {
+				for _, elem := range ary.Value {
+					if str, ok := elem.(*ast.String); ok {
+						c.Templates = append(c.Templates, str.Value)
+					}
+				}
+			}
+		}
+	}
+
 	if node, ok := tbl.Fields["influx_max_line_bytes"]; ok {
 		if kv, ok := node.(*ast.KeyValue); ok {
 			if integer, ok := kv.Value.(*ast.Integer); ok {
@@ -2046,6 +2058,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
 	delete(tbl.Fields, "data_format")
 	delete(tbl.Fields, "prefix")
 	delete(tbl.Fields, "template")
+	delete(tbl.Fields, "templates")
 	delete(tbl.Fields, "json_timestamp_units")
 	delete(tbl.Fields, "splunkmetric_hec_routing")
 	delete(tbl.Fields, "splunkmetric_multimetric")
diff --git a/plugins/outputs/graphite/README.md b/plugins/outputs/graphite/README.md
index 878eb8048f485..b7ffd361be5c1 100644
--- a/plugins/outputs/graphite/README.md
+++ b/plugins/outputs/graphite/README.md
@@ -21,6 +21,16 @@ see the [Graphite Data Format](../../../docs/DATA_FORMATS_OUTPUT.md)
   ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
   template = "host.tags.measurement.field"
 
+  ## Graphite templates patterns
+  ## 1. Template for cpu
+  ## 2. Template for disk*
+  ## 3. Default template
+  # templates = [
+  #  "cpu tags.measurement.host.field",
+  #  "disk* measurement.field",
+  #  "host.measurement.tags.field"
+  #]
+
   ## Enable Graphite tags support
   # graphite_tag_support = false
 
diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go
index 09cdbe0809d6c..e7d1926620655 100644
--- a/plugins/outputs/graphite/graphite.go
+++ b/plugins/outputs/graphite/graphite.go
@@ -18,11 +18,12 @@ import (
 type Graphite struct {
 	GraphiteTagSupport bool
 	// URL is only for backwards compatibility
-	Servers  []string
-	Prefix   string
-	Template string
-	Timeout  int
-	conns    []net.Conn
+	Servers   []string
+	Prefix    string
+	Template  string
+	Templates []string
+	Timeout   int
+	conns     []net.Conn
 	tlsint.ClientConfig
 }
 
@@ -40,6 +41,16 @@ var sampleConfig = `
   ## Enable Graphite tags support
   # graphite_tag_support = false
 
+  ## Graphite templates patterns
+  ## 1. Template for cpu
+  ## 2. Template for disk*
+  ## 3. Default template
+  # templates = [
+  #  "cpu tags.measurement.host.field",
+  #  "disk* measurement.field",
+  #  "host.measurement.tags.field"
+  #]
+
   ## timeout in seconds for the write connection to graphite
   timeout = 2
 
@@ -134,7 +145,7 @@ func checkEOF(conn net.Conn) {
 func (g *Graphite) Write(metrics []telegraf.Metric) error {
 	// Prepare data
 	var batch []byte
-	s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
+	s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.Templates)
 	if err != nil {
 		return err
 	}
diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go
index 3857236e50d37..ad76d45b5e16e 100644
--- a/plugins/outputs/graphite/graphite_test.go
+++ b/plugins/outputs/graphite/graphite_test.go
@@ -98,6 +98,70 @@ func TestGraphiteOK(t *testing.T) {
 	g.Close()
 }
 
+func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
+	var wg sync.WaitGroup
+	// Start TCP server
+	wg.Add(1)
+	t.Log("Starting server")
+	TCPServer1WithMultipleTemplates(t, &wg)
+
+	// Init plugin
+	g := Graphite{
+		Prefix:   "my.prefix",
+		Template: "measurement.host.tags.field",
+		Templates: []string{
+			"my_* host.measurement.tags.field",
+			"measurement.tags.host.field",
+		},
+	}
+
+	// Init metrics
+	m1, _ := metric.New(
+		"mymeasurement",
+		map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
+		map[string]interface{}{"myfield": float64(3.14)},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+	m2, _ := metric.New(
+		"mymeasurement",
+		map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
+		map[string]interface{}{"value": float64(3.14)},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+	m3, _ := metric.New(
+		"my_measurement",
+		map[string]string{"host": "192.168.0.1", "mytag": "valuetag"},
+		map[string]interface{}{"value": float64(3.14)},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+
+	// Prepare point list
+	metrics := []telegraf.Metric{m1}
+	metrics2 := []telegraf.Metric{m2, m3}
+	err1 := g.Connect()
+	require.NoError(t, err1)
+	// Send Data
+	t.Log("Send first data")
+	err2 := g.Write(metrics)
+	require.NoError(t, err2)
+
+	// Waiting TCPserver, should reconnect and resend
+	wg.Wait()
+	t.Log("Finished Waiting for first data")
+	var wg2 sync.WaitGroup
+	// Start TCP server
+	wg2.Add(1)
+	TCPServer2WithMultipleTemplates(t, &wg2)
+	//Write but expect an error, but reconnect
+	err3 := g.Write(metrics2)
+	t.Log("Finished writing second data, it should have reconnected automatically")
+
+	require.NoError(t, err3)
+	t.Log("Finished writing third data")
+	wg2.Wait()
+	g.Close()
+}
+
 func TestGraphiteOkWithTags(t *testing.T) {
 	var wg sync.WaitGroup
 	// Start TCP server
@@ -188,6 +252,36 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
 	}()
 }
 
+func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
+	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
+	go func() {
+		defer wg.Done()
+		conn, _ := (tcpServer).Accept()
+		reader := bufio.NewReader(conn)
+		tp := textproto.NewReader(reader)
+		data1, _ := tp.ReadLine()
+		assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1)
+		conn.Close()
+		tcpServer.Close()
+	}()
+}
+
+func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
+	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
+	go func() {
+		defer wg.Done()
+		conn2, _ := (tcpServer).Accept()
+		reader := bufio.NewReader(conn2)
+		tp := textproto.NewReader(reader)
+		data2, _ := tp.ReadLine()
+		assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2)
+		data3, _ := tp.ReadLine()
+		assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3)
+		conn2.Close()
+		tcpServer.Close()
+	}()
+}
+
 func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
 	go func() {
diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go
index f142705a575c4..a861ebc28b40d 100644
--- a/plugins/outputs/instrumental/instrumental.go
+++ b/plugins/outputs/instrumental/instrumental.go
@@ -27,6 +27,7 @@ type Instrumental struct {
 	Prefix     string
 	DataFormat string
 	Template   string
+	Templates  []string
 	Timeout    internal.Duration
 	Debug      bool
 
@@ -85,7 +86,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
 		}
 	}
 
-	s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false)
+	s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, i.Templates)
 	if err != nil {
 		return err
 	}
diff --git a/plugins/serializers/graphite/README.md b/plugins/serializers/graphite/README.md
index 6cff2cbe516fc..74bde2b5d8b49 100644
--- a/plugins/serializers/graphite/README.md
+++ b/plugins/serializers/graphite/README.md
@@ -22,6 +22,16 @@ method is used, otherwise the [Template Pattern](templates) is used.
   prefix = "telegraf"
   ## Graphite template pattern
   template = "host.tags.measurement.field"
+  
+  ## Graphite templates patterns
+  ## 1. Template for cpu
+  ## 2. Template for disk*
+  ## 3. Default template
+  # templates = [
+  #  "cpu tags.measurement.host.field",
+  #  "disk* measurement.field",
+  #  "host.measurement.tags.field"
+  #]
 
   ## Support Graphite tags, recommended to enable when using Graphite 1.1 or later.
   # graphite_tag_support = false
diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go
index d02b0e26bda85..590f80b454db4 100644
--- a/plugins/serializers/graphite/graphite.go
+++ b/plugins/serializers/graphite/graphite.go
@@ -10,6 +10,7 @@ import (
 	"strings"
 
 	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/filter"
 )
 
 const DEFAULT_TEMPLATE = "host.tags.measurement.field"
@@ -29,10 +30,16 @@ var (
 	fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
 )
 
+type GraphiteTemplate struct {
+	Filter filter.Filter
+	Value  string
+}
+
 type GraphiteSerializer struct {
 	Prefix     string
 	Template   string
 	TagSupport bool
+	Templates  []*GraphiteTemplate
 }
 
 func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
@@ -59,7 +66,15 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
 			out = append(out, point...)
 		}
 	default:
-		bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
+		template := s.Template
+		for _, graphiteTemplate := range s.Templates {
+			if graphiteTemplate.Filter.Match(metric.Name()) {
+				template = graphiteTemplate.Value
+				break
+			}
+		}
+
+		bucket := SerializeBucketName(metric.Name(), metric.Tags(), template, s.Prefix)
 		if bucket == "" {
 			return out, nil
 		}
@@ -185,6 +200,45 @@ func SerializeBucketName(
 	return prefix + "." + strings.Join(out, ".")
 }
 
+func InitGraphiteTemplates(templates []string) ([]*GraphiteTemplate, string, error) {
+	var graphiteTemplates []*GraphiteTemplate
+	defaultTemplate := ""
+
+	for i, t := range templates {
+		parts := strings.Fields(t)
+
+		if len(parts) == 0 {
+			return nil, "", fmt.Errorf("missing template at position: %d", i)
+		}
+		if len(parts) == 1 {
+			if parts[0] == "" {
+				return nil, "", fmt.Errorf("missing template at position: %d", i)
+			} else {
+				// Override default template
+				defaultTemplate = t
+				continue
+			}
+		}
+
+		if len(parts) > 2 {
+			return nil, "", fmt.Errorf("invalid template format: '%s'", t)
+		}
+
+		tFilter, err := filter.Compile([]string{parts[0]})
+
+		if err != nil {
+			return nil, "", err
+		}
+
+		graphiteTemplates = append(graphiteTemplates, &GraphiteTemplate{
+			Filter: tFilter,
+			Value:  parts[1],
+		})
+	}
+
+	return graphiteTemplates, defaultTemplate, nil
+}
+
 // SerializeBucketNameWithTags will take the given measurement name and tags and
 // produce a graphite bucket. It will use the Graphite11Serializer.
 // http://graphite.readthedocs.io/en/latest/tags.html
diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go
index e72ed7a306bf9..e50b7292b10ef 100644
--- a/plugins/serializers/graphite/graphite_test.go
+++ b/plugins/serializers/graphite/graphite_test.go
@@ -144,6 +144,97 @@ func TestSerializeMetricHost(t *testing.T) {
 	assert.Equal(t, expS, mS)
 }
 
+func TestSerializeMetricHostWithMultipleTemplates(t *testing.T) {
+	now := time.Now()
+	tags := map[string]string{
+		"host":       "localhost",
+		"cpu":        "cpu0",
+		"datacenter": "us-west-2",
+	}
+	fields := map[string]interface{}{
+		"usage_idle": float64(91.5),
+		"usage_busy": float64(8.5),
+	}
+	m1, err := metric.New("cpu", tags, fields, now)
+	m2, err := metric.New("new_cpu", tags, fields, now)
+	assert.NoError(t, err)
+
+	templates, defaultTemplate, err := InitGraphiteTemplates([]string{
+		"cp* tags.measurement.host.field",
+		"new_cpu tags.host.measurement.field",
+	})
+	assert.NoError(t, err)
+	assert.Equal(t, defaultTemplate, "")
+
+	s := GraphiteSerializer{
+		Templates: templates,
+	}
+
+	buf, _ := s.Serialize(m1)
+	buf2, _ := s.Serialize(m2)
+
+	buf = append(buf, buf2...)
+
+	mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
+	assert.NoError(t, err)
+
+	expS := []string{
+		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()),
+		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_busy 8.5 %d", now.Unix()),
+		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_idle 91.5 %d", now.Unix()),
+		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_busy 8.5 %d", now.Unix()),
+	}
+	sort.Strings(mS)
+	sort.Strings(expS)
+	assert.Equal(t, expS, mS)
+}
+
+func TestSerializeMetricHostWithMultipleTemplatesWithDefault(t *testing.T) {
+	now := time.Now()
+	tags := map[string]string{
+		"host":       "localhost",
+		"cpu":        "cpu0",
+		"datacenter": "us-west-2",
+	}
+	fields := map[string]interface{}{
+		"usage_idle": float64(91.5),
+		"usage_busy": float64(8.5),
+	}
+	m1, err := metric.New("cpu", tags, fields, now)
+	m2, err := metric.New("new_cpu", tags, fields, now)
+	assert.NoError(t, err)
+
+	templates, defaultTemplate, err := InitGraphiteTemplates([]string{
+		"cp* tags.measurement.host.field",
+		"tags.host.measurement.field",
+	})
+	assert.NoError(t, err)
+	assert.Equal(t, defaultTemplate, "tags.host.measurement.field")
+
+	s := GraphiteSerializer{
+		Templates: templates,
+		Template:  defaultTemplate,
+	}
+
+	buf, _ := s.Serialize(m1)
+	buf2, _ := s.Serialize(m2)
+
+	buf = append(buf, buf2...)
+
+	mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
+	assert.NoError(t, err)
+
+	expS := []string{
+		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()),
+		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_busy 8.5 %d", now.Unix()),
+		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_idle 91.5 %d", now.Unix()),
+		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_busy 8.5 %d", now.Unix()),
+	}
+	sort.Strings(mS)
+	sort.Strings(expS)
+	assert.Equal(t, expS, mS)
+}
+
 func TestSerializeMetricHostWithTagSupport(t *testing.T) {
 	now := time.Now()
 	tags := map[string]string{
diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go
index dc9859e3459a8..17de980fd549e 100644
--- a/plugins/serializers/registry.go
+++ b/plugins/serializers/registry.go
@@ -68,6 +68,9 @@ type Config struct {
 	// only supports Graphite
 	Template string `toml:"template"`
 
+	// Templates same Template, but multiple
+	Templates []string `toml:"templates"`
+
 	// Timestamp units to use for JSON formatted output
 	TimestampUnits time.Duration `toml:"timestamp_units"`
 
@@ -104,7 +107,7 @@ func NewSerializer(config *Config) (Serializer, error) {
 	case "influx":
 		serializer, err = NewInfluxSerializerConfig(config)
 	case "graphite":
-		serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport)
+		serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.Templates)
 	case "json":
 		serializer, err = NewJsonSerializer(config.TimestampUnits)
 	case "splunkmetric":
@@ -188,10 +191,21 @@ func NewInfluxSerializer() (Serializer, error) {
 	return influx.NewSerializer(), nil
 }
 
-func NewGraphiteSerializer(prefix, template string, tag_support bool) (Serializer, error) {
+func NewGraphiteSerializer(prefix, template string, tag_support bool, templates []string) (Serializer, error) {
+	graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates)
+
+	if err != nil {
+		return nil, err
+	}
+
+	if defaultTemplate != "" {
+		template = defaultTemplate
+	}
+
 	return &graphite.GraphiteSerializer{
 		Prefix:     prefix,
 		Template:   template,
 		TagSupport: tag_support,
+		Templates:  graphiteTemplates,
 	}, nil
 }