Skip to content

Commit

Permalink
Add influx_timestamp_units to the influx output serializer
Browse files Browse the repository at this point in the history
Follows this change for the JSON serializer:
influxdata#2587
  • Loading branch information
gregschrock committed Feb 4, 2021
1 parent 7a3e149 commit 898bff1
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 11 deletions.
8 changes: 6 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,10 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
// a serializers.Serializer object, and creates it, which can then be added onto
// an Output object.
func (c *Config) buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) {
sc := &serializers.Config{TimestampUnits: time.Duration(1 * time.Second)}
sc := &serializers.Config{
TimestampUnits: time.Duration(1 * time.Second),
InfluxTimestampUnits: 1 * time.Nanosecond,
}

c.getFieldString(tbl, "data_format", &sc.DataFormat)

Expand All @@ -1353,6 +1356,7 @@ func (c *Config) buildSerializer(name string, tbl *ast.Table) (serializers.Seria

c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport)
c.getFieldDuration(tbl, "influx_timestamp_units", &sc.InfluxTimestampUnits)
c.getFieldBool(tbl, "graphite_tag_support", &sc.GraphiteTagSupport)
c.getFieldString(tbl, "graphite_separator", &sc.GraphiteSeparator)

Expand Down Expand Up @@ -1420,7 +1424,7 @@ func (c *Config) missingTomlField(typ reflect.Type, key string) error {
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",
"grace", "graphite_separator", "graphite_tag_support", "grok_custom_pattern_files",
"grok_custom_patterns", "grok_named_patterns", "grok_patterns", "grok_timezone",
"grok_unique_timestamp", "influx_max_line_bytes", "influx_sort_fields", "influx_uint_support",
"grok_unique_timestamp", "influx_max_line_bytes", "influx_sort_fields", "influx_uint_support", "influx_timestamp_units",
"interval", "json_name_key", "json_query", "json_strict", "json_string_fields",
"json_time_format", "json_time_key", "json_timestamp_units", "json_timezone",
"metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix",
Expand Down
9 changes: 9 additions & 0 deletions plugins/serializers/influx/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ for interoperability.
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
influx_uint_support = false

## By default, the line format timestamp is at nanosecond precision. The
## precision can be adjusted here. This parameter can be used to set the
## timestamp units to nanoseconds (`ns`), microseconds (`us` or `µs`),
## milliseconds (`ms`), or seconds (`s`). Note that this parameter will be
## truncated to the nearest power of 10, so if the `influx_timestamp_units`
## are set to `15ms` the timestamps for the serialized line will be output in
## hundredths of a second (`10ms`).
influx_timestamp_units = "1ns"
```

### Metrics
Expand Down
20 changes: 18 additions & 2 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
)
Expand Down Expand Up @@ -62,6 +63,7 @@ type Serializer struct {
bytesWritten int
fieldSortOrder FieldSortOrder
fieldTypeSupport FieldTypeSupport
unitsNanoseconds int64

buf bytes.Buffer
header []byte
Expand All @@ -71,7 +73,8 @@ type Serializer struct {

func NewSerializer() *Serializer {
serializer := &Serializer{
fieldSortOrder: NoSortFields,
fieldSortOrder: NoSortFields,
unitsNanoseconds: 1,

header: make([]byte, 0, 50),
footer: make([]byte, 0, 21),
Expand All @@ -92,6 +95,19 @@ func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) {
s.fieldTypeSupport = typeSupport
}

func (s *Serializer) SetTimestampUnits(units time.Duration) {
unitsNanoseconds := units.Nanoseconds()
fmt.Printf("Nanoseconds: %v\n", unitsNanoseconds)

// if the units passed in were less than or equal to zero,
// then serialize the timestamp in seconds (the default)
if unitsNanoseconds <= 0 {
unitsNanoseconds = 1
}

s.unitsNanoseconds = unitsNanoseconds
}

// Serialize writes the telegraf.Metric to a byte slice. May produce multiple
// lines of output if longer than maximum line length. Lines are terminated
// with a newline (LF) char.
Expand Down Expand Up @@ -182,7 +198,7 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error {
func (s *Serializer) buildFooter(m telegraf.Metric) {
s.footer = s.footer[:0]
s.footer = append(s.footer, ' ')
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10)
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano()/s.unitsNanoseconds, 10)
s.footer = append(s.footer, '\n')
}

Expand Down
32 changes: 25 additions & 7 deletions plugins/serializers/influx/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
}

var tests = []struct {
name string
maxBytes int
typeSupport FieldTypeSupport
input telegraf.Metric
output []byte
errReason string
name string
maxBytes int
typeSupport FieldTypeSupport
input telegraf.Metric
output []byte
errReason string
timestampUnits time.Duration
}{
{
name: "minimal",
Expand Down Expand Up @@ -230,7 +231,7 @@ var tests = []struct {
output: []byte("cpu value=\"howdy\" 0\n"),
},
{
name: "timestamp",
name: "default timestamp",
input: MustMetric(
metric.New(
"cpu",
Expand All @@ -243,6 +244,21 @@ var tests = []struct {
),
output: []byte("cpu value=42 1519194109000000042\n"),
},
{
name: "timestamp second units",
input: MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(1519194109, 42),
),
),
timestampUnits: time.Second,
output: []byte("cpu value=42 1519194109\n"),
},
{
name: "split fields exact",
maxBytes: 33,
Expand Down Expand Up @@ -539,6 +555,7 @@ func TestSerializer(t *testing.T) {
serializer.SetMaxLineBytes(tt.maxBytes)
serializer.SetFieldSortOrder(SortFields)
serializer.SetFieldTypeSupport(tt.typeSupport)
serializer.SetTimestampUnits(tt.timestampUnits)
output, err := serializer.Serialize(tt.input)
if tt.errReason != "" {
require.Error(t, err)
Expand All @@ -555,6 +572,7 @@ func BenchmarkSerializer(b *testing.B) {
serializer := NewSerializer()
serializer.SetMaxLineBytes(tt.maxBytes)
serializer.SetFieldTypeSupport(tt.typeSupport)
serializer.SetTimestampUnits(tt.timestampUnits)
for n := 0; n < b.N; n++ {
output, err := serializer.Serialize(tt.input)
_ = err
Expand Down
4 changes: 4 additions & 0 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Config struct {
// Support unsigned integer output; influx format only
InfluxUintSupport bool `toml:"influx_uint_support"`

// Timestamp units to use for Influx formatted output
InfluxTimestampUnits time.Duration `toml:"influx_timestamp_units"`

// Prefix to add to all measurements, only supports Graphite
Prefix string `toml:"prefix"`

Expand Down Expand Up @@ -210,6 +213,7 @@ func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
s.SetMaxLineBytes(config.InfluxMaxLineBytes)
s.SetFieldSortOrder(sort)
s.SetFieldTypeSupport(typeSupport)
s.SetTimestampUnits(config.InfluxTimestampUnits)
return s, nil
}

Expand Down

0 comments on commit 898bff1

Please sign in to comment.