Skip to content

Commit

Permalink
Fix several influx parser issues (#5484)
Browse files Browse the repository at this point in the history
- Add line/column position
- Allow handlers to return errors
- Fix tag value escaping
- Allow newline in string fields
  • Loading branch information
danielnelson authored Feb 26, 2019
1 parent 8da6846 commit 04f3c43
Show file tree
Hide file tree
Showing 8 changed files with 23,196 additions and 15,095 deletions.
64 changes: 39 additions & 25 deletions plugins/parsers/influx/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package influx

import (
"bytes"
"errors"
"strconv"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/prometheus/common/log"
)

type MetricHandler struct {
builder *metric.Builder
metrics []telegraf.Metric
err error
precision time.Duration
}

Expand All @@ -32,75 +33,88 @@ func (h *MetricHandler) SetTimePrecision(precision time.Duration) {
}

func (h *MetricHandler) Metric() (telegraf.Metric, error) {
return h.builder.Metric()
m, err := h.builder.Metric()
h.builder.Reset()
return m, err
}

func (h *MetricHandler) SetMeasurement(name []byte) {
func (h *MetricHandler) SetMeasurement(name []byte) error {
h.builder.SetName(nameUnescape(name))
return nil
}

func (h *MetricHandler) AddTag(key []byte, value []byte) {
func (h *MetricHandler) AddTag(key []byte, value []byte) error {
tk := unescape(key)
tv := unescape(value)
h.builder.AddTag(tk, tv)
return nil
}

func (h *MetricHandler) AddInt(key []byte, value []byte) {
func (h *MetricHandler) AddInt(key []byte, value []byte) error {
fk := unescape(key)
fv, err := parseIntBytes(bytes.TrimSuffix(value, []byte("i")), 10, 64)
if err != nil {
log.Errorf("E! Received unparseable int value: %q: %v", value, err)
return
if numerr, ok := err.(*strconv.NumError); ok {
return numerr.Err
}
return err
}
h.builder.AddField(fk, fv)
return nil
}

func (h *MetricHandler) AddUint(key []byte, value []byte) {
func (h *MetricHandler) AddUint(key []byte, value []byte) error {
fk := unescape(key)
fv, err := parseUintBytes(bytes.TrimSuffix(value, []byte("u")), 10, 64)
if err != nil {
log.Errorf("E! Received unparseable uint value: %q: %v", value, err)
return
if numerr, ok := err.(*strconv.NumError); ok {
return numerr.Err
}
return err
}
h.builder.AddField(fk, fv)
return nil
}

func (h *MetricHandler) AddFloat(key []byte, value []byte) {
func (h *MetricHandler) AddFloat(key []byte, value []byte) error {
fk := unescape(key)
fv, err := parseFloatBytes(value, 64)
if err != nil {
log.Errorf("E! Received unparseable float value: %q: %v", value, err)
return
if numerr, ok := err.(*strconv.NumError); ok {
return numerr.Err
}
return err
}
h.builder.AddField(fk, fv)
return nil
}

func (h *MetricHandler) AddString(key []byte, value []byte) {
func (h *MetricHandler) AddString(key []byte, value []byte) error {
fk := unescape(key)
fv := stringFieldUnescape(value)
h.builder.AddField(fk, fv)
return nil
}

func (h *MetricHandler) AddBool(key []byte, value []byte) {
func (h *MetricHandler) AddBool(key []byte, value []byte) error {
fk := unescape(key)
fv, err := parseBoolBytes(value)
if err != nil {
log.Errorf("E! Received unparseable boolean value: %q: %v", value, err)
return
return errors.New("unparseable bool")
}
h.builder.AddField(fk, fv)
return nil
}

func (h *MetricHandler) SetTimestamp(tm []byte) {
func (h *MetricHandler) SetTimestamp(tm []byte) error {
v, err := parseIntBytes(tm, 10, 64)
if err != nil {
log.Errorf("E! Received unparseable timestamp: %q: %v", tm, err)
return
if numerr, ok := err.(*strconv.NumError); ok {
return numerr.Err
}
return err
}
ns := v * int64(h.precision)
h.builder.SetTime(time.Unix(0, ns))
}

func (h *MetricHandler) Reset() {
h.builder.Reset()
return nil
}
Loading

0 comments on commit 04f3c43

Please sign in to comment.