diff --git a/accumulator.go b/accumulator.go index ece69f3b176d1..6d740fb529021 100644 --- a/accumulator.go +++ b/accumulator.go @@ -57,6 +57,14 @@ type Accumulator interface { // TrackingID uniquely identifies a tracked metric group type TrackingID uint64 +type TrackingData interface { + // ID is the TrackingID + ID() TrackingID + + // RefCount is the number of tracking metrics still persistent and referencing this tracking ID + RefCount() int32 +} + // DeliveryInfo provides the results of a delivered metric group. type DeliveryInfo interface { // ID is the TrackingID diff --git a/config/config.go b/config/config.go index f0ccb0a80f507..87605890c5586 100644 --- a/config/config.go +++ b/config/config.go @@ -278,6 +278,9 @@ type AgentConfig struct { // Number of attempts to obtain a remote configuration via a URL during // startup. Set to -1 for unlimited attempts. ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"` + + BufferStrategy string `toml:"buffer_strategy"` + BufferDirectory string `toml:"buffer_directory"` } // InputNames returns a list of strings of the configured inputs. @@ -1521,6 +1524,8 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, c.getFieldString(tbl, "name_suffix", &oc.NameSuffix) c.getFieldString(tbl, "name_prefix", &oc.NamePrefix) c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior) + c.getFieldString(tbl, "buffer_strategy", &oc.BufferStrategy) + c.getFieldString(tbl, "buffer_directory", &oc.BufferDirectory) if c.hasErrs() { return nil, c.firstErr() diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 4d3f189483269..06928cc12126f 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -349,6 +349,8 @@ following works: - github.com/tidwall/gjson [MIT License](https://github.com/tidwall/gjson/blob/master/LICENSE) - github.com/tidwall/match [MIT License](https://github.com/tidwall/match/blob/master/LICENSE) - github.com/tidwall/pretty [MIT License](https://github.com/tidwall/pretty/blob/master/LICENSE) +- github.com/tidwall/tinylru [MIT License](https://github.com/tidwall/tinylru/blob/master/LICENSE) +- github.com/tidwall/wal [MIT License](https://github.com/tidwall/wal/blob/master/LICENSE) - github.com/tinylib/msgp [MIT License](https://github.com/tinylib/msgp/blob/master/LICENSE) - github.com/tklauser/go-sysconf [BSD 3-Clause "New" or "Revised" License](https://github.com/tklauser/go-sysconf/blob/master/LICENSE) - github.com/tklauser/numcpus [Apache License 2.0](https://github.com/tklauser/numcpus/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 10529d396e38b..80ca5d0885206 100644 --- a/go.mod +++ b/go.mod @@ -185,6 +185,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0 github.com/thomasklein94/packer-plugin-libvirt v0.5.0 github.com/tidwall/gjson v1.17.0 + github.com/tidwall/wal v1.1.7 github.com/tinylib/msgp v1.2.0 github.com/urfave/cli/v2 v2.27.2 github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241 @@ -451,6 +452,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/tinylru v1.1.0 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect github.com/twmb/murmur3 v1.1.7 // indirect diff --git a/go.sum b/go.sum index 0833eaae3a9e9..0a76ec2d69ae7 100644 --- a/go.sum +++ b/go.sum @@ -2230,12 +2230,17 @@ github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0 h1:8B1u+sDwYhT github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0/go.mod h1:W1+yLUfUl8VLTzvmApP2FBHgCk8I5SKKjDWjxWEc33U= github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs= github.com/thomasklein94/packer-plugin-libvirt v0.5.0/go.mod h1:GwN82FQ6KxCNKtS8LNUgLbwTZs90GGhBzCmTNkrTCrY= +github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/tinylru v1.1.0 h1:XY6IUfzVTU9rpwdhKUF6nQdChgCdGjkMfLzbWyiau6I= +github.com/tidwall/tinylru v1.1.0/go.mod h1:3+bX+TJ2baOLMWTnlyNWHh4QMnFyARg2TLTQ6OFbzw8= +github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4= +github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY= github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro= github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= diff --git a/metric.go b/metric.go index 2cf3d6a06072b..adad3cfc38c33 100644 --- a/metric.go +++ b/metric.go @@ -149,5 +149,6 @@ type UnwrappableMetric interface { type TrackingMetric interface { // TrackingID returns the ID used for tracking the metric TrackingID() TrackingID + TrackingData() TrackingData UnwrappableMetric } diff --git a/metric/deserialize.go b/metric/deserialize.go new file mode 100644 index 0000000000000..50910643f8e4d --- /dev/null +++ b/metric/deserialize.go @@ -0,0 +1,85 @@ +package metric + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + "sync" + + "github.com/influxdata/telegraf" +) + +// storage for tracking data that can't be serialized to disk +var ( + // grouped tracking metrics means that ID->Data association is not one to one, + // many metrics could be associated with one tracking ID so we cannot just + // clear this every time in FromBytes. + trackingStore = make(map[telegraf.TrackingID]telegraf.TrackingData) + mu = sync.Mutex{} + + // ErrSkipTracking indicates that tracking information could not be found after + // deserializing a metric from bytes. In this case we should skip the metric + // and continue as if it does not exist. + ErrSkipTracking = errors.New("metric tracking data not found") +) + +type serializedMetric struct { + M telegraf.Metric + TID telegraf.TrackingID +} + +func ToBytes(m telegraf.Metric) ([]byte, error) { + var sm serializedMetric + if um, ok := m.(telegraf.UnwrappableMetric); ok { + sm.M = um.Unwrap() + } else { + sm.M = m + } + + if tm, ok := m.(telegraf.TrackingMetric); ok { + sm.TID = tm.TrackingID() + + mu.Lock() + trackingStore[sm.TID] = tm.TrackingData() + mu.Unlock() + } + + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + if err := encoder.Encode(&sm); err != nil { + return nil, fmt.Errorf("failed to encode metric to bytes: %w", err) + } + return buf.Bytes(), nil +} + +func FromBytes(b []byte) (telegraf.Metric, error) { + buf := bytes.NewBuffer(b) + decoder := gob.NewDecoder(buf) + + var sm *serializedMetric + if err := decoder.Decode(&sm); err != nil { + return nil, fmt.Errorf("failed to decode metric from bytes: %w", err) + } + + m := sm.M + if sm.TID != 0 { + mu.Lock() + td := trackingStore[sm.TID] + if td == nil { + mu.Unlock() + return nil, ErrSkipTracking + } + rc := td.RefCount() + if rc <= 1 { + // only 1 metric left referencing this tracking ID, we can remove here since no subsequent metrics + // read can use this ID. If another metric in a metric group with this ID gets added later, it will + // simply be added back into the tracking store again. + trackingStore[sm.TID] = nil + } + mu.Unlock() + + m = rebuildTrackingMetric(m, td) + } + return m, nil +} diff --git a/metric/init.go b/metric/init.go new file mode 100644 index 0000000000000..85c901ce16b39 --- /dev/null +++ b/metric/init.go @@ -0,0 +1,7 @@ +package metric + +import "encoding/gob" + +func Init() { + gob.RegisterName("metric.metric", &metric{}) +} diff --git a/metric/tracking.go b/metric/tracking.go index 50f11c74d6dae..954782ddfcca0 100644 --- a/metric/tracking.go +++ b/metric/tracking.go @@ -33,35 +33,40 @@ func newTrackingID() telegraf.TrackingID { } type trackingData struct { - id telegraf.TrackingID - rc int32 - acceptCount int32 - rejectCount int32 + //nolint:revive // method is already named ID + Id telegraf.TrackingID + Rc int32 + AcceptCount int32 + RejectCount int32 notifyFunc NotifyFunc } func (d *trackingData) incr() { - atomic.AddInt32(&d.rc, 1) + atomic.AddInt32(&d.Rc, 1) +} + +func (d *trackingData) RefCount() int32 { + return d.Rc } func (d *trackingData) decr() int32 { - return atomic.AddInt32(&d.rc, -1) + return atomic.AddInt32(&d.Rc, -1) } func (d *trackingData) accept() { - atomic.AddInt32(&d.acceptCount, 1) + atomic.AddInt32(&d.AcceptCount, 1) } func (d *trackingData) reject() { - atomic.AddInt32(&d.rejectCount, 1) + atomic.AddInt32(&d.RejectCount, 1) } func (d *trackingData) notify() { d.notifyFunc( &deliveryInfo{ - id: d.id, - accepted: int(d.acceptCount), - rejected: int(d.rejectCount), + id: d.Id, + accepted: int(d.AcceptCount), + rejected: int(d.RejectCount), }, ) } @@ -75,10 +80,10 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric, m := &trackingMetric{ Metric: metric, d: &trackingData{ - id: newTrackingID(), - rc: 1, - acceptCount: 0, - rejectCount: 0, + Id: newTrackingID(), + Rc: 1, + AcceptCount: 0, + RejectCount: 0, notifyFunc: fn, }, } @@ -86,15 +91,22 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric, if finalizer != nil { runtime.SetFinalizer(m.d, finalizer) } - return m, m.d.id + return m, m.d.Id +} + +func rebuildTrackingMetric(metric telegraf.Metric, td telegraf.TrackingData) telegraf.Metric { + return &trackingMetric{ + Metric: metric, + d: td.(*trackingData), + } } func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.Metric, telegraf.TrackingID) { d := &trackingData{ - id: newTrackingID(), - rc: 0, - acceptCount: 0, - rejectCount: 0, + Id: newTrackingID(), + Rc: 0, + AcceptCount: 0, + RejectCount: 0, notifyFunc: fn, } @@ -114,7 +126,7 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf. d.notify() } - return group, d.id + return group, d.Id } func (m *trackingMetric) Copy() telegraf.Metric { @@ -152,7 +164,11 @@ func (m *trackingMetric) decr() { // Unwrap allows to access the underlying metric directly e.g. for go-templates func (m *trackingMetric) TrackingID() telegraf.TrackingID { - return m.d.id + return m.d.Id +} + +func (m *trackingMetric) TrackingData() telegraf.TrackingData { + return m.d } // Unwrap allows to access the underlying metric directly e.g. for go-templates @@ -173,3 +189,7 @@ func (r *deliveryInfo) ID() telegraf.TrackingID { func (r *deliveryInfo) Delivered() bool { return r.rejected == 0 } + +func (d *trackingData) ID() telegraf.TrackingID { + return d.Id +} diff --git a/models/buffer.go b/models/buffer.go index 92ea0217845b4..dc6363cc0747f 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -2,14 +2,18 @@ package models import ( "fmt" + "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/selfstat" ) var ( AgentMetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{}) AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) + + registerGob = sync.OnceFunc(func() { metric.Init() }) ) type Buffer interface { @@ -45,12 +49,16 @@ type BufferStats struct { } // NewBuffer returns a new empty Buffer with the given capacity. -func NewBuffer(name string, alias string, capacity int, strategy string, _ string) (Buffer, error) { +func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) { + registerGob() + bs := NewBufferStats(name, alias, capacity) switch strategy { case "", "memory": return NewMemoryBuffer(capacity, bs) + case "disk": + return NewDiskBuffer(name, path, bs) } return nil, fmt.Errorf("invalid buffer strategy %q", strategy) } @@ -97,14 +105,14 @@ func (b *BufferStats) metricAdded() { b.MetricsAdded.Incr(1) } -func (b *BufferStats) metricWritten(metric telegraf.Metric) { +func (b *BufferStats) metricWritten(m telegraf.Metric) { AgentMetricsWritten.Incr(1) b.MetricsWritten.Incr(1) - metric.Accept() + m.Accept() } -func (b *BufferStats) metricDropped(metric telegraf.Metric) { +func (b *BufferStats) metricDropped(m telegraf.Metric) { AgentMetricsDropped.Incr(1) b.MetricsDropped.Incr(1) - metric.Reject() + m.Reject() } diff --git a/models/buffer_disk.go b/models/buffer_disk.go new file mode 100644 index 0000000000000..5d0e687f18195 --- /dev/null +++ b/models/buffer_disk.go @@ -0,0 +1,216 @@ +package models + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/tidwall/wal" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +type DiskBuffer struct { + BufferStats + sync.Mutex + + file *wal.Log + path string + + batchFirst uint64 // Index of the first metric in the batch + batchSize uint64 // Number of metrics currently in the batch + + // Ending point of metrics read from disk on telegraf launch. + // Used to know whether to discard tracking metrics. + originalEnd uint64 +} + +func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) { + filePath := filepath.Join(path, name) + walFile, err := wal.Open(filePath, nil) + if err != nil { + return nil, fmt.Errorf("failed to open wal file: %w", err) + } + buf := &DiskBuffer{ + BufferStats: stats, + file: walFile, + path: filePath, + } + if buf.length() > 0 { + buf.originalEnd = buf.writeIndex() + } + return buf, nil +} + +func (b *DiskBuffer) Len() int { + b.Lock() + defer b.Unlock() + return b.length() +} + +func (b *DiskBuffer) length() int { + // Special case for when the read index is zero, it must be empty (otherwise it would be >= 1) + if b.readIndex() == 0 { + return 0 + } + return int(b.writeIndex() - b.readIndex()) +} + +// readIndex is the first index to start reading metrics from, or the head of the buffer +func (b *DiskBuffer) readIndex() uint64 { + index, err := b.file.FirstIndex() + if err != nil { + panic(err) // can only occur with a corrupt wal file + } + return index +} + +// writeIndex is the first index to start writing metrics to, or the tail of the buffer +func (b *DiskBuffer) writeIndex() uint64 { + index, err := b.file.LastIndex() + if err != nil { + panic(err) // can only occur with a corrupt wal file + } + return index + 1 +} + +func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int { + b.Lock() + defer b.Unlock() + + dropped := 0 + for _, m := range metrics { + if !b.addSingleMetric(m) { + dropped++ + } + } + b.BufferSize.Set(int64(b.length())) + return dropped +} + +func (b *DiskBuffer) addSingleMetric(m telegraf.Metric) bool { + data, err := metric.ToBytes(m) + if err != nil { + panic(err) + } + err = b.file.Write(b.writeIndex(), data) + if err == nil { + b.metricAdded() + return true + } + return false +} + +func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { + b.Lock() + defer b.Unlock() + + if b.length() == 0 { + // no metrics in the wal file, so return an empty array + return []telegraf.Metric{} + } + b.batchFirst = b.readIndex() + var metrics []telegraf.Metric + + b.batchSize = 0 + readIndex := b.batchFirst + endIndex := b.writeIndex() + for batchSize > 0 && readIndex < endIndex { + data, err := b.file.Read(readIndex) + if err != nil { + panic(err) + } + readIndex++ + + m, err := metric.FromBytes(data) + + // Validate that a tracking metric is from this instance of telegraf and skip ones from older instances. + // A tracking metric can be skipped here because metric.Accept() is only called once data is successfully + // written to an output, so any tracking metrics from older instances can be dropped and reacquired to + // have an accurate tracking information. + // There are two primary cases here: + // - ErrSkipTracking: means that the tracking information was unable to be found for a tracking ID. + // - Outside of range: means that the metric was guaranteed to be left over from the previous instance + // as it was here when we opened the wal file in this instance. + if errors.Is(err, metric.ErrSkipTracking) { + // could not look up tracking information for metric, skip + continue + } + if err != nil { + // non-recoverable error in deserialization, abort + panic(err) + } + if _, ok := m.(telegraf.TrackingMetric); ok && readIndex < b.originalEnd { + // tracking metric left over from previous instance, skip + continue + } + + metrics = append(metrics, m) + b.batchSize++ + batchSize-- + } + return metrics +} + +func (b *DiskBuffer) Accept(batch []telegraf.Metric) { + b.Lock() + defer b.Unlock() + + if b.batchSize == 0 || len(batch) == 0 { + // nothing to accept + return + } + for _, m := range batch { + b.metricWritten(m) + } + if b.length() == len(batch) { + b.resetWalFile() + } else { + err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) + if err != nil { + panic(err) + } + } + + // check if the original end index is still valid, clear if not + if b.originalEnd < b.readIndex() { + b.originalEnd = 0 + } + + b.resetBatch() + b.BufferSize.Set(int64(b.length())) +} + +func (b *DiskBuffer) Reject(_ []telegraf.Metric) { + // very little to do here as the disk buffer retains metrics in + // the wal file until a call to accept + b.Lock() + defer b.Unlock() + b.resetBatch() +} + +func (b *DiskBuffer) Stats() BufferStats { + return b.BufferStats +} + +func (b *DiskBuffer) resetBatch() { + b.batchFirst = 0 + b.batchSize = 0 +} + +// This is very messy and not ideal, but serves as the only way I can find currently +// to actually clear the walfile completely if needed, since Truncate() calls require +// that at least one entry remains in them otherwise they return an error. +// Related issue: https://github.com/tidwall/wal/issues/20 +func (b *DiskBuffer) resetWalFile() { + b.file.Close() + os.Remove(b.path) + walFile, err := wal.Open(b.path, nil) + if err != nil { + panic(err) + } + b.file = walFile +} diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go new file mode 100644 index 0000000000000..d650471ad249d --- /dev/null +++ b/models/buffer_disk_test.go @@ -0,0 +1,110 @@ +package models + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tidwall/wal" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +func newTestDiskBuffer(t testing.TB) Buffer { + path, err := os.MkdirTemp("", "*-buffer-test") + require.NoError(t, err) + return newTestDiskBufferWithPath(t, "test", path) +} + +func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer { + t.Helper() + buf, err := NewBuffer(name, "", 0, "disk", path) + require.NoError(t, err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf +} + +func TestBuffer_RetainsTrackingInformation(t *testing.T) { + var delivered int + mm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) { + delivered++ + }) + b := newTestDiskBuffer(t) + b.Add(mm) + batch := b.Batch(1) + b.Accept(batch) + require.Equal(t, 1, delivered) +} + +func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) { + path, err := os.MkdirTemp("", "*-buffer-test") + require.NoError(t, err) + walfile, err := wal.Open(path, nil) + require.NoError(t, err) + + tm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) {}) + + metrics := []telegraf.Metric{ + // Basic metric with 1 field, 0 timestamp + Metric(), + // Basic metric with 1 field, different timestamp + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 20.0, + }, + time.Now(), + ), + // Metric with a field + metric.New( + "cpu", + map[string]string{ + "x": "y", + }, + map[string]interface{}{ + "value": 18.0, + }, + time.Now(), + ), + // Tracking metric + tm, + // Metric with lots of tag types + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value_f64": 20.0, + "value_uint64": uint64(10), + "value_int16": int16(5), + "value_string": "foo", + "value_boolean": true, + "value_byte_array": []byte{1, 2, 3, 4, 5}, + }, + time.Now(), + ), + } + + // call manually so that we can properly use metric.ToBytes() without having initialized a buffer + registerGob() + + for i, m := range metrics { + data, err := metric.ToBytes(m) + require.NoError(t, err) + require.NoError(t, walfile.Write(uint64(i+1), data)) + } + + b := newTestDiskBufferWithPath(t, filepath.Base(path), filepath.Dir(path)) + batch := b.Batch(4) + // expected skips the tracking metric + expected := []telegraf.Metric{ + metrics[0], metrics[1], metrics[2], metrics[4], + } + testutil.RequireMetricsEqual(t, expected, batch) +} diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go index 12803184f4e94..eec7c8b39c01f 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_mem_test.go @@ -16,6 +16,21 @@ func newTestMemoryBuffer(t testing.TB, capacity int) Buffer { return buf } +func TestBuffer_AcceptCallsMetricAccept(t *testing.T) { + var accept int + mm := &MockMetric{ + Metric: Metric(), + AcceptF: func() { + accept++ + }, + } + b := newTestMemoryBuffer(t, 5) + b.Add(mm, mm, mm) + batch := b.Batch(2) + b.Accept(batch) + require.Equal(t, 2, accept) +} + func BenchmarkAddMetrics(b *testing.B) { buf := newTestMemoryBuffer(b, 10000) m := Metric() diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go index 19df313558a65..a984df41e1614 100644 --- a/models/buffer_suite_test.go +++ b/models/buffer_suite_test.go @@ -43,6 +43,11 @@ func (s *BufferSuiteTest) SetupTest() { switch s.bufferType { case "", "memory": s.hasMaxCapacity = true + case "disk": + path, err := os.MkdirTemp("", "*-buffer-test") + s.Require().NoError(err) + s.bufferPath = path + s.hasMaxCapacity = false } } @@ -57,6 +62,10 @@ func TestMemoryBufferSuite(t *testing.T) { suite.Run(t, &BufferSuiteTest{bufferType: "memory"}) } +func TestDiskBufferSuite(t *testing.T) { + suite.Run(t, &BufferSuiteTest{bufferType: "disk"}) +} + func Metric() telegraf.Metric { return MetricTime(0) } @@ -671,21 +680,6 @@ func (s *BufferSuiteTest) TestBuffer_BatchRejectAcceptNoop() { s.Equal(5, b.Len()) } -func (s *BufferSuiteTest) TestBuffer_AcceptCallsMetricAccept() { - var accept int - mm := &MockMetric{ - Metric: Metric(), - AcceptF: func() { - accept++ - }, - } - b := s.newTestBuffer(5) - b.Add(mm, mm, mm) - batch := b.Batch(2) - b.Accept(batch) - s.Equal(2, accept) -} - func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNoBatch() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity")