diff --git a/plugins/outputs/influxdb/udp.go b/plugins/outputs/influxdb/udp.go index 62f2a6ab72371..8e636d340129b 100644 --- a/plugins/outputs/influxdb/udp.go +++ b/plugins/outputs/influxdb/udp.go @@ -1,6 +1,8 @@ package influxdb import ( + "bufio" + "bytes" "context" "fmt" "log" @@ -45,9 +47,9 @@ func NewUDPClient(config *UDPConfig) (*udpClient, error) { serializer := config.Serializer if serializer == nil { s := influx.NewSerializer() - s.SetMaxLineBytes(config.MaxPayloadSize) serializer = s } + serializer.SetMaxLineBytes(size) dialer := config.Dialer if dialer == nil { @@ -96,7 +98,11 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error continue } - _, err = c.conn.Write(octets) + scanner := bufio.NewScanner(bytes.NewReader(octets)) + scanner.Split(scanLines) + for scanner.Scan() { + _, err = c.conn.Write(scanner.Bytes()) + } if err != nil { c.conn.Close() c.conn = nil @@ -118,3 +124,15 @@ type netDialer struct { func (d *netDialer) DialContext(ctx context.Context, network, address string) (Conn, error) { return d.Dialer.DialContext(ctx, network, address) } + +func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full newline-terminated line. + return i + 1, data[0 : i+1], nil + + } + return 0, nil, nil +} diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index 2989e44e9f07c..d7b0c14c86bd4 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -100,7 +100,6 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) { if err != nil { return nil, err } - out := make([]byte, s.buf.Len()) copy(out, s.buf.Bytes()) return out, nil @@ -222,12 +221,10 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { } bytesNeeded := len(s.header) + pairsLen + len(s.pair) + len(s.footer) - // Additional length needed for field separator `,` if !firstField { bytesNeeded += 1 } - if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes { // Need at least one field per line, this metric cannot be fit // into the max line bytes. @@ -239,10 +236,9 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { if err != nil { return err } - + pairsLen = 0 firstField = true bytesNeeded = len(s.header) + len(s.pair) + len(s.footer) - if bytesNeeded > s.maxLineBytes { return s.newMetricError(NeedMoreSpace) } diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index 2c1cbd58770c2..5b1eb504260ff 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -275,6 +275,22 @@ var tests = []struct { ), output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"), }, + { + name: "split_fields_overflow", + maxBytes: 36, + input: MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "abc": 123, + "def": 456, + }, + time.Unix(1519194109, 42), + ), + ), + output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"), + }, { name: "name newline", input: MustMetric(