diff --git a/accumulator.go b/accumulator.go index ece69f3b176d1..4991752bb2d94 100644 --- a/accumulator.go +++ b/accumulator.go @@ -57,6 +57,10 @@ type Accumulator interface { // TrackingID uniquely identifies a tracked metric group type TrackingID uint64 +type TrackingData interface { + ID() TrackingID +} + // DeliveryInfo provides the results of a delivered metric group. type DeliveryInfo interface { // ID is the TrackingID diff --git a/agent/agent.go b/agent/agent.go index d9c3a6c039080..14d8a80280c79 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/snmp" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers/influx" @@ -127,6 +128,7 @@ func (a *Agent) Run(ctx context.Context) error { startTime := time.Now() log.Printf("D! [agent] Connecting outputs") + metric.Init() next, ou, err := a.startOutputs(ctx, a.Config.Outputs) if err != nil { return err @@ -871,12 +873,12 @@ func (a *Agent) runOutputs( }(output) } - for metric := range unit.src { + for m := range unit.src { for i, output := range unit.outputs { if i == len(a.Config.Outputs)-1 { - output.AddMetric(metric) + output.AddMetric(m) } else { - output.AddMetric(metric.Copy()) + output.AddMetric(m.Copy()) } } } diff --git a/config/config.go b/config/config.go index f0ccb0a80f507..87605890c5586 100644 --- a/config/config.go +++ b/config/config.go @@ -278,6 +278,9 @@ type AgentConfig struct { // Number of attempts to obtain a remote configuration via a URL during // startup. Set to -1 for unlimited attempts. ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"` + + BufferStrategy string `toml:"buffer_strategy"` + BufferDirectory string `toml:"buffer_directory"` } // InputNames returns a list of strings of the configured inputs. @@ -1521,6 +1524,8 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, c.getFieldString(tbl, "name_suffix", &oc.NameSuffix) c.getFieldString(tbl, "name_prefix", &oc.NamePrefix) c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior) + c.getFieldString(tbl, "buffer_strategy", &oc.BufferStrategy) + c.getFieldString(tbl, "buffer_directory", &oc.BufferDirectory) if c.hasErrs() { return nil, c.firstErr() diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 4d3f189483269..06928cc12126f 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -349,6 +349,8 @@ following works: - github.com/tidwall/gjson [MIT License](https://github.com/tidwall/gjson/blob/master/LICENSE) - github.com/tidwall/match [MIT License](https://github.com/tidwall/match/blob/master/LICENSE) - github.com/tidwall/pretty [MIT License](https://github.com/tidwall/pretty/blob/master/LICENSE) +- github.com/tidwall/tinylru [MIT License](https://github.com/tidwall/tinylru/blob/master/LICENSE) +- github.com/tidwall/wal [MIT License](https://github.com/tidwall/wal/blob/master/LICENSE) - github.com/tinylib/msgp [MIT License](https://github.com/tinylib/msgp/blob/master/LICENSE) - github.com/tklauser/go-sysconf [BSD 3-Clause "New" or "Revised" License](https://github.com/tklauser/go-sysconf/blob/master/LICENSE) - github.com/tklauser/numcpus [Apache License 2.0](https://github.com/tklauser/numcpus/blob/master/LICENSE) diff --git a/go.mod b/go.mod index cc8c3a717145c..7dd11326796f4 100644 --- a/go.mod +++ b/go.mod @@ -185,6 +185,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/kafka v0.30.0 github.com/thomasklein94/packer-plugin-libvirt v0.5.0 github.com/tidwall/gjson v1.17.0 + github.com/tidwall/wal v1.1.7 github.com/tinylib/msgp v1.1.9 github.com/urfave/cli/v2 v2.27.1 github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241 @@ -451,6 +452,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/tinylru v1.1.0 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect github.com/twmb/murmur3 v1.1.7 // indirect @@ -483,7 +485,7 @@ require ( go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.24.0 // indirect + go.uber.org/zap v1.25.0 // indirect golang.org/x/exp v0.0.0-20240529005216-23cca8864a10 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect diff --git a/go.sum b/go.sum index 3c46d8117dfa3..0ee20e01374a4 100644 --- a/go.sum +++ b/go.sum @@ -2222,12 +2222,17 @@ github.com/testcontainers/testcontainers-go/modules/kafka v0.30.0 h1:lQx20102vAH github.com/testcontainers/testcontainers-go/modules/kafka v0.30.0/go.mod h1:n3m3SH0ivwFZbehY8fgTLADfwSPK2ZC5za4r9nYYm4Q= github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs= github.com/thomasklein94/packer-plugin-libvirt v0.5.0/go.mod h1:GwN82FQ6KxCNKtS8LNUgLbwTZs90GGhBzCmTNkrTCrY= +github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/tinylru v1.1.0 h1:XY6IUfzVTU9rpwdhKUF6nQdChgCdGjkMfLzbWyiau6I= +github.com/tidwall/tinylru v1.1.0/go.mod h1:3+bX+TJ2baOLMWTnlyNWHh4QMnFyARg2TLTQ6OFbzw8= +github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4= +github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= @@ -2392,8 +2397,8 @@ go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9E go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= +go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/metric.go b/metric.go index 2cf3d6a06072b..adad3cfc38c33 100644 --- a/metric.go +++ b/metric.go @@ -149,5 +149,6 @@ type UnwrappableMetric interface { type TrackingMetric interface { // TrackingID returns the ID used for tracking the metric TrackingID() TrackingID + TrackingData() TrackingData UnwrappableMetric } diff --git a/metric/deserialize.go b/metric/deserialize.go new file mode 100644 index 0000000000000..a253406a3b50c --- /dev/null +++ b/metric/deserialize.go @@ -0,0 +1,78 @@ +package metric + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + "sync" + + "github.com/influxdata/telegraf" +) + +// storage for tracking data that can't be serialized to disk +var ( + // todo need some way to empty this map out when done with a tracking ID. + // grouped tracking metrics means that ID->Data association is not one to one, + // many metrics could be associated with one tracking ID so we cannot just + // clear this every time in FromBytes. + trackingStore = make(map[telegraf.TrackingID]telegraf.TrackingData) + mu = sync.Mutex{} + + // ErrSkipTracking indicates that tracking information could not be found after + // deserializing a metric from bytes. In this case we should skip the metric + // and continue as if it does not exist. + ErrSkipTracking = errors.New("metric tracking data not found") +) + +type serializedMetric struct { + M telegraf.Metric + TID telegraf.TrackingID +} + +func ToBytes(m telegraf.Metric) ([]byte, error) { + var sm serializedMetric + if um, ok := m.(telegraf.UnwrappableMetric); ok { + sm.M = um.Unwrap() + } else { + sm.M = m + } + + if tm, ok := m.(telegraf.TrackingMetric); ok { + sm.TID = tm.TrackingID() + + mu.Lock() + trackingStore[sm.TID] = tm.TrackingData() + mu.Unlock() + } + + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + if err := encoder.Encode(&sm); err != nil { + return nil, fmt.Errorf("failed to encode metric to bytes: %w", err) + } + return buf.Bytes(), nil +} + +func FromBytes(b []byte) (telegraf.Metric, error) { + buf := bytes.NewBuffer(b) + decoder := gob.NewDecoder(buf) + + var sm *serializedMetric + if err := decoder.Decode(&sm); err != nil { + return nil, fmt.Errorf("failed to decode metric from bytes: %w", err) + } + + m := sm.M + if sm.TID != 0 { + mu.Lock() + td := trackingStore[sm.TID] + mu.Unlock() + + if td == nil { + return nil, ErrSkipTracking + } + m = rebuildTrackingMetric(m, td) + } + return m, nil +} diff --git a/metric/init.go b/metric/init.go new file mode 100644 index 0000000000000..85c901ce16b39 --- /dev/null +++ b/metric/init.go @@ -0,0 +1,7 @@ +package metric + +import "encoding/gob" + +func Init() { + gob.RegisterName("metric.metric", &metric{}) +} diff --git a/metric/tracking.go b/metric/tracking.go index 50f11c74d6dae..8e700d8f98ae9 100644 --- a/metric/tracking.go +++ b/metric/tracking.go @@ -33,35 +33,36 @@ func newTrackingID() telegraf.TrackingID { } type trackingData struct { - id telegraf.TrackingID - rc int32 - acceptCount int32 - rejectCount int32 + //nolint:revive // method is already named ID + Id telegraf.TrackingID + Rc int32 + AcceptCount int32 + RejectCount int32 notifyFunc NotifyFunc } func (d *trackingData) incr() { - atomic.AddInt32(&d.rc, 1) + atomic.AddInt32(&d.Rc, 1) } func (d *trackingData) decr() int32 { - return atomic.AddInt32(&d.rc, -1) + return atomic.AddInt32(&d.Rc, -1) } func (d *trackingData) accept() { - atomic.AddInt32(&d.acceptCount, 1) + atomic.AddInt32(&d.AcceptCount, 1) } func (d *trackingData) reject() { - atomic.AddInt32(&d.rejectCount, 1) + atomic.AddInt32(&d.RejectCount, 1) } func (d *trackingData) notify() { d.notifyFunc( &deliveryInfo{ - id: d.id, - accepted: int(d.acceptCount), - rejected: int(d.rejectCount), + id: d.Id, + accepted: int(d.AcceptCount), + rejected: int(d.RejectCount), }, ) } @@ -75,10 +76,10 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric, m := &trackingMetric{ Metric: metric, d: &trackingData{ - id: newTrackingID(), - rc: 1, - acceptCount: 0, - rejectCount: 0, + Id: newTrackingID(), + Rc: 1, + AcceptCount: 0, + RejectCount: 0, notifyFunc: fn, }, } @@ -86,15 +87,22 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric, if finalizer != nil { runtime.SetFinalizer(m.d, finalizer) } - return m, m.d.id + return m, m.d.Id +} + +func rebuildTrackingMetric(metric telegraf.Metric, td telegraf.TrackingData) telegraf.Metric { + return &trackingMetric{ + Metric: metric, + d: td.(*trackingData), + } } func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.Metric, telegraf.TrackingID) { d := &trackingData{ - id: newTrackingID(), - rc: 0, - acceptCount: 0, - rejectCount: 0, + Id: newTrackingID(), + Rc: 0, + AcceptCount: 0, + RejectCount: 0, notifyFunc: fn, } @@ -114,7 +122,7 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf. d.notify() } - return group, d.id + return group, d.Id } func (m *trackingMetric) Copy() telegraf.Metric { @@ -152,7 +160,11 @@ func (m *trackingMetric) decr() { // Unwrap allows to access the underlying metric directly e.g. for go-templates func (m *trackingMetric) TrackingID() telegraf.TrackingID { - return m.d.id + return m.d.ID() +} + +func (m *trackingMetric) TrackingData() telegraf.TrackingData { + return m.d } // Unwrap allows to access the underlying metric directly e.g. for go-templates @@ -173,3 +185,7 @@ func (r *deliveryInfo) ID() telegraf.TrackingID { func (r *deliveryInfo) Delivered() bool { return r.rejected == 0 } + +func (d *trackingData) ID() telegraf.TrackingID { + return d.Id +} diff --git a/models/buffer.go b/models/buffer.go index 3f97a43a6482b..cdfad68b5e46d 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -1,7 +1,7 @@ package models import ( - "sync" + "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/selfstat" @@ -12,18 +12,32 @@ var ( AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) ) -// Buffer stores metrics in a circular buffer. -type Buffer struct { - sync.Mutex - buf []telegraf.Metric - first int // index of the first/oldest metric - last int // one after the index of the last/newest metric - size int // number of metrics currently in the buffer - cap int // the capacity of the buffer +type Buffer interface { - batchFirst int // index of the first metric in the batch - batchSize int // number of metrics currently in the batch + // Len returns the number of metrics currently in the buffer. + Len() int + // Add adds metrics to the buffer and returns number of dropped metrics. + Add(metrics ...telegraf.Metric) int + + // Batch returns a slice containing up to batchSize of the oldest metrics not + // yet dropped. Metrics are ordered from oldest to newest in the batch. The + // batch must not be modified by the client. + Batch(batchSize int) []telegraf.Metric + + // Accept marks the batch, acquired from Batch(), as successfully written. + Accept(metrics []telegraf.Metric) + + // Reject returns the batch, acquired from Batch(), to the buffer and marks it + // as unsent. + Reject([]telegraf.Metric) + + Stats() BufferStats +} + +// BufferStats holds common metrics used for buffer implementations. +// Implementations of Buffer should embed this struct in them. +type BufferStats struct { MetricsAdded selfstat.Stat MetricsWritten selfstat.Stat MetricsDropped selfstat.Stat @@ -32,19 +46,26 @@ type Buffer struct { } // NewBuffer returns a new empty Buffer with the given capacity. -func NewBuffer(name string, alias string, capacity int) *Buffer { +func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) { + bm := NewBufferMetrics(name, alias, capacity) + + switch strategy { + case "", "memory": + return NewMemoryBuffer(capacity, bm) + case "disk": + return NewDiskBuffer(name, path, bm) + } + + return nil, fmt.Errorf("invalid buffer strategy %q", strategy) +} + +func NewBufferMetrics(name string, alias string, capacity int) BufferStats { tags := map[string]string{"output": name} if alias != "" { tags["alias"] = alias } - b := &Buffer{ - buf: make([]telegraf.Metric, capacity), - first: 0, - last: 0, - size: 0, - cap: capacity, - + bm := BufferStats{ MetricsAdded: selfstat.Register( "write", "metrics_added", @@ -71,183 +92,23 @@ func NewBuffer(name string, alias string, capacity int) *Buffer { tags, ), } - b.BufferSize.Set(int64(0)) - b.BufferLimit.Set(int64(capacity)) - return b -} - -// Len returns the number of metrics currently in the buffer. -func (b *Buffer) Len() int { - b.Lock() - defer b.Unlock() - - return b.length() -} - -func (b *Buffer) length() int { - return min(b.size+b.batchSize, b.cap) + bm.BufferSize.Set(int64(0)) + bm.BufferLimit.Set(int64(capacity)) + return bm } -func (b *Buffer) metricAdded() { +func (b *BufferStats) metricAdded() { b.MetricsAdded.Incr(1) } -func (b *Buffer) metricWritten(metric telegraf.Metric) { +func (b *BufferStats) metricWritten(metric telegraf.Metric) { AgentMetricsWritten.Incr(1) b.MetricsWritten.Incr(1) metric.Accept() } -func (b *Buffer) metricDropped(metric telegraf.Metric) { +func (b *BufferStats) metricDropped(metric telegraf.Metric) { AgentMetricsDropped.Incr(1) b.MetricsDropped.Incr(1) metric.Reject() } - -func (b *Buffer) addMetric(m telegraf.Metric) int { - dropped := 0 - // Check if Buffer is full - if b.size == b.cap { - b.metricDropped(b.buf[b.last]) - dropped++ - - if b.batchSize > 0 { - b.batchSize-- - b.batchFirst = b.next(b.batchFirst) - } - } - - b.metricAdded() - - b.buf[b.last] = m - b.last = b.next(b.last) - - if b.size == b.cap { - b.first = b.next(b.first) - } - - b.size = min(b.size+1, b.cap) - return dropped -} - -// Add adds metrics to the buffer and returns number of dropped metrics. -func (b *Buffer) Add(metrics ...telegraf.Metric) int { - b.Lock() - defer b.Unlock() - - dropped := 0 - for i := range metrics { - if n := b.addMetric(metrics[i]); n != 0 { - dropped += n - } - } - - b.BufferSize.Set(int64(b.length())) - return dropped -} - -// Batch returns a slice containing up to batchSize of the oldest metrics not -// yet dropped. Metrics are ordered from oldest to newest in the batch. The -// batch must not be modified by the client. -func (b *Buffer) Batch(batchSize int) []telegraf.Metric { - b.Lock() - defer b.Unlock() - - outLen := min(b.size, batchSize) - out := make([]telegraf.Metric, outLen) - if outLen == 0 { - return out - } - - b.batchFirst = b.first - b.batchSize = outLen - - batchIndex := b.batchFirst - for i := range out { - out[i] = b.buf[batchIndex] - b.buf[batchIndex] = nil - batchIndex = b.next(batchIndex) - } - - b.first = b.nextby(b.first, b.batchSize) - b.size -= outLen - return out -} - -// Accept marks the batch, acquired from Batch(), as successfully written. -func (b *Buffer) Accept(batch []telegraf.Metric) { - b.Lock() - defer b.Unlock() - - for _, m := range batch { - b.metricWritten(m) - } - - b.resetBatch() - b.BufferSize.Set(int64(b.length())) -} - -// Reject returns the batch, acquired from Batch(), to the buffer and marks it -// as unsent. -func (b *Buffer) Reject(batch []telegraf.Metric) { - b.Lock() - defer b.Unlock() - - if len(batch) == 0 { - return - } - - free := b.cap - b.size - restore := min(len(batch), free) - skip := len(batch) - restore - - b.first = b.prevby(b.first, restore) - b.size = min(b.size+restore, b.cap) - - re := b.first - - // Copy metrics from the batch back into the buffer - for i := range batch { - if i < skip { - b.metricDropped(batch[i]) - } else { - b.buf[re] = batch[i] - re = b.next(re) - } - } - - b.resetBatch() - b.BufferSize.Set(int64(b.length())) -} - -// next returns the next index with wrapping. -func (b *Buffer) next(index int) int { - index++ - if index == b.cap { - return 0 - } - return index -} - -// nextby returns the index that is count newer with wrapping. -func (b *Buffer) nextby(index, count int) int { - index += count - index %= b.cap - return index -} - -// prevby returns the index that is count older with wrapping. -func (b *Buffer) prevby(index, count int) int { - index -= count - for index < 0 { - index += b.cap - } - - index %= b.cap - return index -} - -func (b *Buffer) resetBatch() { - b.batchFirst = 0 - b.batchSize = 0 -} diff --git a/models/buffer_disk.go b/models/buffer_disk.go new file mode 100644 index 0000000000000..ef05791618247 --- /dev/null +++ b/models/buffer_disk.go @@ -0,0 +1,222 @@ +package models + +import ( + "errors" + "fmt" + "os" + "sync" + + "github.com/tidwall/wal" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +type DiskBuffer struct { + BufferStats + sync.Mutex + + file *wal.Log + path string + + batchFirst uint64 // Index of the first metric in the batch + batchSize uint64 // Number of metrics currently in the batch + + // Ending point of metrics read from disk on telegraf launch. + // Used to know whether to discard tracking metrics. + originalEnd uint64 +} + +func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) { + filePath := path + "/" + name + walFile, err := wal.Open(filePath, nil) + if err != nil { + return nil, fmt.Errorf("failed to open wal file: %w", err) + } + return &DiskBuffer{ + BufferStats: stats, + file: walFile, + path: filePath, + }, nil +} + +func (b *DiskBuffer) Len() int { + b.Lock() + defer b.Unlock() + return b.length() +} + +func (b *DiskBuffer) length() int { + // Special case for when the read index is zero, it must be empty (otherwise it would be >= 1) + if b.readIndex() == 0 { + return 0 + } + return int(b.writeIndex() - b.readIndex()) +} + +// readIndex is the first index to start reading metrics from, or the head of the buffer +func (b *DiskBuffer) readIndex() uint64 { + index, err := b.file.FirstIndex() + if err != nil { + panic(err) // can only occur with a corrupt wal file + } + return index +} + +// writeIndex is the first index to start writing metrics to, or the tail of the buffer +func (b *DiskBuffer) writeIndex() uint64 { + index, err := b.file.LastIndex() + if err != nil { + panic(err) // can only occur with a corrupt wal file + } + return index + 1 +} + +func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int { + b.Lock() + defer b.Unlock() + + dropped := 0 + for _, m := range metrics { + if !b.addSingle(m) { + dropped++ + } + } + b.BufferSize.Set(int64(b.length())) + return dropped + // todo implement batched writes +} + +func (b *DiskBuffer) addSingle(m telegraf.Metric) bool { + data, err := metric.ToBytes(m) + if err != nil { + panic(err) + } + err = b.file.Write(b.writeIndex(), data) + if err == nil { + b.metricAdded() + return true + } + return false +} + +//nolint:unused // to be implemented in the future +func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int { + written := 0 + batch := new(wal.Batch) + for _, m := range metrics { + data, err := metric.ToBytes(m) + if err != nil { + panic(err) + } + batch.Write(b.writeIndex(), data) + b.metricAdded() + written++ + } + err := b.file.WriteBatch(batch) + if err != nil { + return 0 // todo error handle, test if a partial write occur + } + return written +} + +func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { + b.Lock() + defer b.Unlock() + + if b.length() == 0 { + // no metrics in the wal file, so return an empty array + return []telegraf.Metric{} + } + b.batchFirst = b.readIndex() + var metrics []telegraf.Metric + + b.batchSize = 0 + readIndex := b.batchFirst + endIndex := b.writeIndex() + for batchSize > 0 && readIndex < endIndex { + data, err := b.file.Read(readIndex) + if err != nil { + panic(err) + } + readIndex++ + + m, err := metric.FromBytes(data) + if errors.Is(err, metric.ErrSkipTracking) { + // could not look up tracking information for metric, skip + continue + } + if err != nil { + // non-recoverable error in deserialization, abort + panic(err) + } + if _, ok := m.(telegraf.TrackingMetric); ok && readIndex < b.originalEnd { + // tracking metric left over from previous instance, skip + continue + } + + metrics = append(metrics, m) + b.batchSize++ + batchSize-- + } + return metrics +} + +func (b *DiskBuffer) Accept(batch []telegraf.Metric) { + b.Lock() + defer b.Unlock() + + if b.batchSize == 0 || len(batch) == 0 { + // nothing to accept + return + } + for _, m := range batch { + b.metricWritten(m) + } + if b.length() == len(batch) { + b.resetWalFile() + } else { + err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) + if err != nil { + panic(err) + } + } + + // check if the original end index is still valid, clear if not + if b.originalEnd < b.readIndex() { + b.originalEnd = 0 + } + + b.resetBatch() + b.BufferSize.Set(int64(b.length())) +} + +func (b *DiskBuffer) Reject(_ []telegraf.Metric) { + // very little to do here as the disk buffer retains metrics in + // the wal file until a call to accept + b.Lock() + defer b.Unlock() + b.resetBatch() +} + +func (b *DiskBuffer) Stats() BufferStats { + return b.BufferStats +} + +func (b *DiskBuffer) resetBatch() { + b.batchFirst = 0 + b.batchSize = 0 +} + +// todo This is very messy and not ideal, but serves as the only way I can find currently +// todo to actually clear the walfile completely if needed, since Truncate() calls require +// todo at least one entry remains in them otherwise they return an error. +func (b *DiskBuffer) resetWalFile() { + b.file.Close() + os.Remove(b.path) + walFile, err := wal.Open(b.path, nil) + if err != nil { + panic(err) + } + b.file = walFile +} diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go new file mode 100644 index 0000000000000..17d8b19ab001e --- /dev/null +++ b/models/buffer_disk_test.go @@ -0,0 +1,102 @@ +package models + +import ( + "fmt" + "io" + "os" + "path/filepath" + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func newTestDiskBuffer(t testing.TB) Buffer { + path, err := os.MkdirTemp("", "*-buffer-test") + require.NoError(t, err) + return newTestDiskBufferWithPath(t, "test", path) +} + +func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer { + t.Helper() + buf, err := NewBuffer(name, "", 0, "disk", path) + require.NoError(t, err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf +} + +func TestBuffer_RetainsTrackingInformation(t *testing.T) { + var delivered int + mm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) { + delivered++ + }) + metric.Init() + b := newTestDiskBuffer(t) + b.Add(mm) + batch := b.Batch(1) + b.Accept(batch) + require.Equal(t, 1, delivered) +} + +// WAL file tested here was written as: +// 1: Metric() +// 2: Metric() +// 3: Metric() +// 4: metric.WithTracking(Metric()) +// 5: Metric() +// +// Expected to drop the 4th metric, as tracking metrics from +// previous instances are dropped when the wal file is reopened. +func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) { + // copy the testdata so we do not destroy the testdata wal file + path, err := os.MkdirTemp("", "*-buffer-test") + require.NoError(t, err) + f, err := os.Create(path + "/00000000000000000001") + require.NoError(t, err) + f1, err := os.Open("testdata/testwal/00000000000000000001") + require.NoError(t, err) + written, err := io.Copy(f, f1) + require.NoError(t, err) + fmt.Println(written) + + metric.Init() + b := newTestDiskBufferWithPath(t, filepath.Base(path), filepath.Dir(path)) + batch := b.Batch(4) + expected := []telegraf.Metric{ + Metric(), Metric(), Metric(), Metric(), + } + testutil.RequireMetricsEqual(t, expected, batch) +} + +/* +// Function used to create the test data used in the test above +func Test_CreateTestData(t *testing.T) { + metric.Init() + walfile, _ := wal.Open("testdata/testwal", nil) + + data, err := metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(1, data)) + + data, err = metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(2, data)) + + data, err = metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(3, data)) + + m, _ := metric.WithTracking(Metric(), func(di telegraf.DeliveryInfo) {}) + data, err = metric.ToBytes(m) + require.NoError(t, err) + require.NoError(t, walfile.Write(4, data)) + + data, err = metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(5, data)) +} +*/ diff --git a/models/buffer_mem.go b/models/buffer_mem.go new file mode 100644 index 0000000000000..55eb1c321db36 --- /dev/null +++ b/models/buffer_mem.go @@ -0,0 +1,185 @@ +package models + +import ( + "sync" + + "github.com/influxdata/telegraf" +) + +// MemoryBuffer stores metrics in a circular buffer. +type MemoryBuffer struct { + sync.Mutex + BufferStats + + buf []telegraf.Metric + first int // index of the first/oldest metric + last int // one after the index of the last/newest metric + size int // number of metrics currently in the buffer + cap int // the capacity of the buffer + + batchFirst int // index of the first metric in the batch + batchSize int // number of metrics currently in the batch +} + +func NewMemoryBuffer(capacity int, stats BufferStats) (*MemoryBuffer, error) { + return &MemoryBuffer{ + BufferStats: stats, + buf: make([]telegraf.Metric, capacity), + cap: capacity, + }, nil +} + +func (b *MemoryBuffer) Len() int { + b.Lock() + defer b.Unlock() + return b.length() +} + +func (b *MemoryBuffer) length() int { + return min(b.size+b.batchSize, b.cap) +} + +func (b *MemoryBuffer) addMetric(m telegraf.Metric) int { + dropped := 0 + // Check if Buffer is full + if b.size == b.cap { + b.metricDropped(b.buf[b.last]) + dropped++ + + if b.batchSize > 0 { + b.batchSize-- + b.batchFirst = b.next(b.batchFirst) + } + } + + b.metricAdded() + + b.buf[b.last] = m + b.last = b.next(b.last) + + if b.size == b.cap { + b.first = b.next(b.first) + } + + b.size = min(b.size+1, b.cap) + return dropped +} + +func (b *MemoryBuffer) Add(metrics ...telegraf.Metric) int { + b.Lock() + defer b.Unlock() + + dropped := 0 + for i := range metrics { + if n := b.addMetric(metrics[i]); n != 0 { + dropped += n + } + } + + b.BufferSize.Set(int64(b.length())) + return dropped +} + +func (b *MemoryBuffer) Batch(batchSize int) []telegraf.Metric { + b.Lock() + defer b.Unlock() + + outLen := min(b.size, batchSize) + out := make([]telegraf.Metric, outLen) + if outLen == 0 { + return out + } + + b.batchFirst = b.first + b.batchSize = outLen + + batchIndex := b.batchFirst + for i := range out { + out[i] = b.buf[batchIndex] + b.buf[batchIndex] = nil + batchIndex = b.next(batchIndex) + } + + b.first = b.nextby(b.first, b.batchSize) + b.size -= outLen + return out +} + +func (b *MemoryBuffer) Accept(batch []telegraf.Metric) { + b.Lock() + defer b.Unlock() + + for _, m := range batch { + b.metricWritten(m) + } + + b.resetBatch() + b.BufferSize.Set(int64(b.length())) +} + +func (b *MemoryBuffer) Reject(batch []telegraf.Metric) { + b.Lock() + defer b.Unlock() + + if len(batch) == 0 { + return + } + + free := b.cap - b.size + restore := min(len(batch), free) + skip := len(batch) - restore + + b.first = b.prevby(b.first, restore) + b.size = min(b.size+restore, b.cap) + + re := b.first + + // Copy metrics from the batch back into the buffer + for i := range batch { + if i < skip { + b.metricDropped(batch[i]) + } else { + b.buf[re] = batch[i] + re = b.next(re) + } + } + + b.resetBatch() + b.BufferSize.Set(int64(b.length())) +} + +func (b *MemoryBuffer) Stats() BufferStats { + return b.BufferStats +} + +// next returns the next index with wrapping. +func (b *MemoryBuffer) next(index int) int { + index++ + if index == b.cap { + return 0 + } + return index +} + +// nextby returns the index that is count newer with wrapping. +func (b *MemoryBuffer) nextby(index, count int) int { + index += count + index %= b.cap + return index +} + +// prevby returns the index that is count older with wrapping. +func (b *MemoryBuffer) prevby(index, count int) int { + index -= count + for index < 0 { + index += b.cap + } + + index %= b.cap + return index +} + +func (b *MemoryBuffer) resetBatch() { + b.batchFirst = 0 + b.batchSize = 0 +} diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go new file mode 100644 index 0000000000000..a4ec6568a6b96 --- /dev/null +++ b/models/buffer_mem_test.go @@ -0,0 +1,57 @@ +package models + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func newTestMemoryBuffer(t testing.TB, capacity int) Buffer { + t.Helper() + buf, err := NewBuffer("test", "", capacity, "memory", "") + require.NoError(t, err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf +} + +func TestBuffer_AcceptCallsMetricAccept(t *testing.T) { + var accept int + mm := &MockMetric{ + Metric: Metric(), + AcceptF: func() { + accept++ + }, + } + b := newTestMemoryBuffer(t, 5) + b.Add(mm, mm, mm) + batch := b.Batch(2) + b.Accept(batch) + require.Equal(t, 2, accept) +} + +func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) { + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := newTestMemoryBuffer(t, 5) + b.Add(mm, mm, mm, mm, mm) + batch := b.Batch(5) + b.Add(mm, mm) + require.Equal(t, 0, reject) + b.Reject(batch) + require.Equal(t, 2, reject) +} + +func BenchmarkMemoryAddMetrics(b *testing.B) { + buf := newTestMemoryBuffer(b, 10000) + m := Metric() + for n := 0; n < b.N; n++ { + buf.Add(m) + } +} diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go new file mode 100644 index 0000000000000..4f5e46d21bc24 --- /dev/null +++ b/models/buffer_suite_test.go @@ -0,0 +1,794 @@ +package models + +import ( + "os" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/suite" +) + +type MockMetric struct { + telegraf.Metric + AcceptF func() + RejectF func() + DropF func() +} + +func (m *MockMetric) Accept() { + m.AcceptF() +} + +func (m *MockMetric) Reject() { + m.RejectF() +} + +func (m *MockMetric) Drop() { + m.DropF() +} + +func (m *MockMetric) Unwrap() telegraf.Metric { + return m.Metric +} + +type BufferSuiteTest struct { + suite.Suite + bufferType string + bufferPath string + + hasMaxCapacity bool // whether the buffer type being tested supports a maximum metric capacity +} + +func (s *BufferSuiteTest) SetupTest() { + if s.bufferType == "disk" { + path, err := os.MkdirTemp("", "*-buffer-test") + s.Require().NoError(err) + s.bufferPath = path + s.hasMaxCapacity = false + // lets gob properly encode our metrics + metric.Init() + } else { + s.hasMaxCapacity = true + } +} + +func (s *BufferSuiteTest) TearDownTest() { + if s.bufferPath != "" { + _ = os.RemoveAll(s.bufferPath) + s.bufferPath = "" + } +} + +func TestMemoryBufferSuite(t *testing.T) { + suite.Run(t, &BufferSuiteTest{bufferType: "memory"}) +} + +func TestDiskBufferSuite(t *testing.T) { + suite.Run(t, &BufferSuiteTest{bufferType: "disk"}) +} + +func Metric() telegraf.Metric { + return MetricTime(0) +} + +func MetricTime(sec int64) telegraf.Metric { + m := metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(sec, 0), + ) + return m +} + +func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer { + s.T().Helper() + buf, err := NewBuffer("test", "", capacity, s.bufferType, s.bufferPath) + s.Require().NoError(err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf +} + +func (s *BufferSuiteTest) TestBuffer_LenEmpty() { + b := s.newTestBuffer(5) + + s.Equal(0, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_LenOne() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m) + + s.Equal(1, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_LenFull() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_LenOverfill() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m, m) + + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenZero() { + b := s.newTestBuffer(5) + batch := b.Batch(0) + + s.Empty(batch) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenBufferEmpty() { + b := s.newTestBuffer(5) + batch := b.Batch(2) + + s.Empty(batch) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenUnderfill() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m) + batch := b.Batch(2) + + s.Len(batch, 1) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenFill() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m) + batch := b.Batch(2) + s.Len(batch, 2) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenExact() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m) + batch := b.Batch(2) + s.Len(batch, 2) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenLargerThanBuffer() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + batch := b.Batch(6) + s.Len(batch, 5) +} + +func (s *BufferSuiteTest) TestBuffer_BatchWrap() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + batch := b.Batch(2) + b.Accept(batch) + b.Add(m, m) + batch = b.Batch(5) + s.Len(batch, 5) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLatest() { + b := s.newTestBuffer(4) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLatestWrap() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(4) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + batch := b.Batch(2) + + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(2), + MetricTime(3), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_MultipleBatch() { + b := s.newTestBuffer(10) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + batch := b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) + b.Accept(batch) + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(6), + }, batch) + b.Accept(batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectWithRoom() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Reject(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectNothingNewFull() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + batch := b.Batch(2) + b.Reject(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectNoRoom() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + + b.Reject(batch) + + s.Equal(int64(3), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(4), + MetricTime(5), + MetricTime(6), + MetricTime(7), + MetricTime(8), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectRoomExact() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + batch := b.Batch(2) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + b.Reject(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectRoomOverwriteOld() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(1) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + + b.Reject(batch) + + s.Equal(int64(1), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + MetricTime(6), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectPartialRoom() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Reject(batch) + + s.Equal(int64(2), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(3), + MetricTime(4), + MetricTime(5), + MetricTime(6), + MetricTime(7), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectNewMetricsWrapped() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + // buffer: 1, 4, 5; batch: 2, 3 + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + b.Add(MetricTime(9)) + b.Add(MetricTime(10)) + + // buffer: 8, 9, 10, 6, 7; batch: 2, 3 + s.Equal(int64(3), b.Stats().MetricsDropped.Get()) + + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + b.Add(MetricTime(13)) + b.Add(MetricTime(14)) + b.Add(MetricTime(15)) + // buffer: 13, 14, 15, 11, 12; batch: 2, 3 + s.Equal(int64(8), b.Stats().MetricsDropped.Get()) + b.Reject(batch) + + s.Equal(int64(10), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(11), + MetricTime(12), + MetricTime(13), + MetricTime(14), + MetricTime(15), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectWrapped() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + batch := b.Batch(3) + + b.Add(MetricTime(9)) + b.Add(MetricTime(10)) + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + + b.Reject(batch) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(8), + MetricTime(9), + MetricTime(10), + MetricTime(11), + MetricTime(12), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectAdjustFirst() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(10) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(3) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Reject(batch) + + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + b.Add(MetricTime(9)) + batch = b.Batch(3) + b.Add(MetricTime(10)) + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + b.Reject(batch) + + b.Add(MetricTime(13)) + b.Add(MetricTime(14)) + b.Add(MetricTime(15)) + batch = b.Batch(3) + b.Add(MetricTime(16)) + b.Add(MetricTime(17)) + b.Add(MetricTime(18)) + b.Reject(batch) + + b.Add(MetricTime(19)) + + batch = b.Batch(10) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(10), + MetricTime(11), + MetricTime(12), + MetricTime(13), + MetricTime(14), + MetricTime(15), + MetricTime(16), + MetricTime(17), + MetricTime(18), + MetricTime(19), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_AddDropsOverwrittenMetrics() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + b.Add(m, m, m, m, m) + + s.Equal(int64(5), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_AcceptRemovesBatch() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m) + batch := b.Batch(2) + b.Accept(batch) + s.Equal(1, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_RejectLeavesBatch() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m) + batch := b.Batch(2) + b.Reject(batch) + s.Equal(3, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_AcceptWritesOverwrittenBatch() { + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(5) + b.Add(m, m, m, m, m) + b.Accept(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + s.Equal(int64(5), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchRejectDropsOverwrittenBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(5) + b.Add(m, m, m, m, m) + b.Reject(batch) + + s.Equal(int64(5), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchAccept() { + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(3) + b.Add(m, m, m) + b.Accept(batch) + s.Equal(int64(0), b.Stats().MetricsDropped.Get(), "dropped") + s.Equal(int64(3), b.Stats().MetricsWritten.Get(), "written") +} + +func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchReject() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(3) + b.Add(m, m, m) + b.Reject(batch) + s.Equal(int64(3), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_MetricsBatchAcceptRemoved() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(3) + b.Add(m, m, m, m, m) + b.Accept(batch) + s.Equal(int64(2), b.Stats().MetricsDropped.Get()) + s.Equal(int64(3), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_WrapWithBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m) + b.Batch(3) + b.Add(m, m, m, m, m, m) + + s.Equal(int64(1), b.Stats().MetricsDropped.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchNotRemoved() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + b.Batch(2) + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchRejectAcceptNoop() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + batch := b.Batch(2) + b.Reject(batch) + b.Accept(batch) + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNoBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm) + s.Equal(2, reject) +} + +func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNotInBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm, mm, mm) + batch := b.Batch(2) + b.Add(mm, mm, mm, mm) + s.Equal(2, reject) + b.Reject(batch) + s.Equal(4, reject) +} + +func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndReject() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm, mm, mm) + batch := b.Batch(5) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm, mm, mm, mm) + s.Equal(15, reject) + b.Reject(batch) + s.Equal(20, reject) +} + +func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndRejectOffset() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + var accept int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + AcceptF: func() { + accept++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm) + b.Add(mm, mm, mm, mm) + s.Equal(2, reject) + batch := b.Batch(5) + b.Add(mm, mm, mm, mm) + s.Equal(2, reject) + b.Add(mm, mm, mm, mm) + s.Equal(5, reject) + b.Add(mm, mm, mm, mm) + s.Equal(9, reject) + b.Add(mm, mm, mm, mm) + s.Equal(13, reject) + b.Accept(batch) + s.Equal(13, reject) + s.Equal(5, accept) +} + +func (s *BufferSuiteTest) TestBuffer_RejectEmptyBatch() { + b := s.newTestBuffer(5) + batch := b.Batch(2) + b.Add(MetricTime(1)) + b.Reject(batch) + b.Add(MetricTime(2)) + batch = b.Batch(2) + for _, m := range batch { + s.NotNil(m) + } +} diff --git a/models/buffer_test.go b/models/buffer_test.go deleted file mode 100644 index 276b5c47cabbb..0000000000000 --- a/models/buffer_test.go +++ /dev/null @@ -1,726 +0,0 @@ -package models - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/testutil" -) - -type MockMetric struct { - telegraf.Metric - AcceptF func() - RejectF func() - DropF func() -} - -func (m *MockMetric) Accept() { - m.AcceptF() -} - -func (m *MockMetric) Reject() { - m.RejectF() -} - -func (m *MockMetric) Drop() { - m.DropF() -} - -func Metric() telegraf.Metric { - return MetricTime(0) -} - -func MetricTime(sec int64) telegraf.Metric { - m := metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "value": 42.0, - }, - time.Unix(sec, 0), - ) - return m -} - -func BenchmarkAddMetrics(b *testing.B) { - buf := NewBuffer("test", "", 10000) - m := Metric() - for n := 0; n < b.N; n++ { - buf.Add(m) - } -} - -func setup(b *Buffer) *Buffer { - b.MetricsAdded.Set(0) - b.MetricsWritten.Set(0) - b.MetricsDropped.Set(0) - return b -} - -func TestBuffer_LenEmpty(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - - require.Equal(t, 0, b.Len()) -} - -func TestBuffer_LenOne(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m) - - require.Equal(t, 1, b.Len()) -} - -func TestBuffer_LenFull(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_LenOverfill(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - setup(b) - b.Add(m, m, m, m, m, m) - - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_BatchLenZero(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - batch := b.Batch(0) - - require.Empty(t, batch) -} - -func TestBuffer_BatchLenBufferEmpty(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - batch := b.Batch(2) - - require.Empty(t, batch) -} - -func TestBuffer_BatchLenUnderfill(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m) - batch := b.Batch(2) - - require.Len(t, batch, 1) -} - -func TestBuffer_BatchLenFill(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m) - batch := b.Batch(2) - require.Len(t, batch, 2) -} - -func TestBuffer_BatchLenExact(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m) - batch := b.Batch(2) - require.Len(t, batch, 2) -} - -func TestBuffer_BatchLenLargerThanBuffer(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - batch := b.Batch(6) - require.Len(t, batch, 5) -} - -func TestBuffer_BatchWrap(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - batch := b.Batch(2) - b.Accept(batch) - b.Add(m, m) - batch = b.Batch(5) - require.Len(t, batch, 5) -} - -func TestBuffer_BatchLatest(t *testing.T) { - b := setup(NewBuffer("test", "", 4)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - }, batch) -} - -func TestBuffer_BatchLatestWrap(t *testing.T) { - b := setup(NewBuffer("test", "", 4)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - batch := b.Batch(2) - - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(2), - MetricTime(3), - }, batch) -} - -func TestBuffer_MultipleBatch(t *testing.T) { - b := setup(NewBuffer("test", "", 10)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - batch := b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) - b.Accept(batch) - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(6), - }, batch) - b.Accept(batch) -} - -func TestBuffer_RejectWithRoom(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Reject(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) -} - -func TestBuffer_RejectNothingNewFull(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - batch := b.Batch(2) - b.Reject(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) -} - -func TestBuffer_RejectNoRoom(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - - b.Reject(batch) - - require.Equal(t, int64(3), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(4), - MetricTime(5), - MetricTime(6), - MetricTime(7), - MetricTime(8), - }, batch) -} - -func TestBuffer_RejectRoomExact(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - batch := b.Batch(2) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - - b.Reject(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) -} - -func TestBuffer_RejectRoomOverwriteOld(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(1) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - - b.Reject(batch) - - require.Equal(t, int64(1), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - MetricTime(6), - }, batch) -} - -func TestBuffer_RejectPartialRoom(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Reject(batch) - - require.Equal(t, int64(2), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(3), - MetricTime(4), - MetricTime(5), - MetricTime(6), - MetricTime(7), - }, batch) -} - -func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - - // buffer: 1, 4, 5; batch: 2, 3 - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - b.Add(MetricTime(9)) - b.Add(MetricTime(10)) - - // buffer: 8, 9, 10, 6, 7; batch: 2, 3 - require.Equal(t, int64(3), b.MetricsDropped.Get()) - - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - b.Add(MetricTime(13)) - b.Add(MetricTime(14)) - b.Add(MetricTime(15)) - // buffer: 13, 14, 15, 11, 12; batch: 2, 3 - require.Equal(t, int64(8), b.MetricsDropped.Get()) - b.Reject(batch) - - require.Equal(t, int64(10), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(11), - MetricTime(12), - MetricTime(13), - MetricTime(14), - MetricTime(15), - }, batch) -} - -func TestBuffer_RejectWrapped(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - batch := b.Batch(3) - - b.Add(MetricTime(9)) - b.Add(MetricTime(10)) - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - - b.Reject(batch) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(8), - MetricTime(9), - MetricTime(10), - MetricTime(11), - MetricTime(12), - }, batch) -} - -func TestBuffer_RejectAdjustFirst(t *testing.T) { - b := setup(NewBuffer("test", "", 10)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(3) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Reject(batch) - - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - b.Add(MetricTime(9)) - batch = b.Batch(3) - b.Add(MetricTime(10)) - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - b.Reject(batch) - - b.Add(MetricTime(13)) - b.Add(MetricTime(14)) - b.Add(MetricTime(15)) - batch = b.Batch(3) - b.Add(MetricTime(16)) - b.Add(MetricTime(17)) - b.Add(MetricTime(18)) - b.Reject(batch) - - b.Add(MetricTime(19)) - - batch = b.Batch(10) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(10), - MetricTime(11), - MetricTime(12), - MetricTime(13), - MetricTime(14), - MetricTime(15), - MetricTime(16), - MetricTime(17), - MetricTime(18), - MetricTime(19), - }, batch) -} - -func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - b.Add(m, m, m, m, m) - - require.Equal(t, int64(5), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) -} - -func TestBuffer_AcceptRemovesBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m) - batch := b.Batch(2) - b.Accept(batch) - require.Equal(t, 1, b.Len()) -} - -func TestBuffer_RejectLeavesBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m) - batch := b.Batch(2) - b.Reject(batch) - require.Equal(t, 3, b.Len()) -} - -func TestBuffer_AcceptWritesOverwrittenBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(5) - b.Add(m, m, m, m, m) - b.Accept(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - require.Equal(t, int64(5), b.MetricsWritten.Get()) -} - -func TestBuffer_BatchRejectDropsOverwrittenBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(5) - b.Add(m, m, m, m, m) - b.Reject(batch) - - require.Equal(t, int64(5), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) -} - -func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m) - b.Accept(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get(), "dropped") - require.Equal(t, int64(3), b.MetricsWritten.Get(), "written") -} - -func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m) - b.Reject(batch) - require.Equal(t, int64(3), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) -} - -func TestBuffer_MetricsBatchAcceptRemoved(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m, m, m) - b.Accept(batch) - require.Equal(t, int64(2), b.MetricsDropped.Get()) - require.Equal(t, int64(3), b.MetricsWritten.Get()) -} - -func TestBuffer_WrapWithBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m) - b.Batch(3) - b.Add(m, m, m, m, m, m) - - require.Equal(t, int64(1), b.MetricsDropped.Get()) -} - -func TestBuffer_BatchNotRemoved(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - b.Batch(2) - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_BatchRejectAcceptNoop(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - batch := b.Batch(2) - b.Reject(batch) - b.Accept(batch) - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_AcceptCallsMetricAccept(t *testing.T) { - var accept int - mm := &MockMetric{ - Metric: Metric(), - AcceptF: func() { - accept++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm) - batch := b.Batch(2) - b.Accept(batch) - require.Equal(t, 2, accept) -} - -func TestBuffer_AddCallsMetricRejectWhenNoBatch(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - setup(b) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm) - require.Equal(t, 2, reject) -} - -func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - setup(b) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(2) - b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) - b.Reject(batch) - require.Equal(t, 4, reject) -} - -func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(5) - b.Add(mm, mm) - require.Equal(t, 0, reject) - b.Reject(batch) - require.Equal(t, 2, reject) -} - -func TestBuffer_AddOverwriteAndReject(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(5) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - require.Equal(t, 15, reject) - b.Reject(batch) - require.Equal(t, 20, reject) -} - -func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) { - var reject int - var accept int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - AcceptF: func() { - accept++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm) - b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) - batch := b.Batch(5) - b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) - b.Add(mm, mm, mm, mm) - require.Equal(t, 5, reject) - b.Add(mm, mm, mm, mm) - require.Equal(t, 9, reject) - b.Add(mm, mm, mm, mm) - require.Equal(t, 13, reject) - b.Accept(batch) - require.Equal(t, 13, reject) - require.Equal(t, 5, accept) -} - -func TestBuffer_RejectEmptyBatch(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - batch := b.Batch(2) - b.Add(MetricTime(1)) - b.Reject(batch) - b.Add(MetricTime(2)) - batch = b.Batch(2) - for _, m := range batch { - require.NotNil(t, m) - } -} diff --git a/models/running_output.go b/models/running_output.go index ada1f745f2c6d..17f4e9de0c9d9 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -37,6 +37,9 @@ type OutputConfig struct { NameOverride string NamePrefix string NameSuffix string + + BufferStrategy string + BufferDirectory string } // RunningOutput contains the output configuration @@ -56,7 +59,7 @@ type RunningOutput struct { BatchReady chan time.Time - buffer *Buffer + buffer Buffer log telegraf.Logger started bool @@ -96,8 +99,12 @@ func NewRunningOutput( batchSize = DefaultMetricBatchSize } + b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory) + if err != nil { + panic(err) // todo be more graceful here? + } ro := &RunningOutput{ - buffer: NewBuffer(config.Name, config.Alias, bufferLimit), + buffer: b, BatchReady: make(chan time.Time, 1), Output: output, Config: config, diff --git a/models/testdata/testwal/00000000000000000001 b/models/testdata/testwal/00000000000000000001 new file mode 100644 index 0000000000000..a4d72e966edfb Binary files /dev/null and b/models/testdata/testwal/00000000000000000001 differ