Skip to content

Commit

Permalink
suite tests, error handling, work on new encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 committed May 13, 2024
1 parent 5c1c514 commit f074af5
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 267 deletions.
6 changes: 6 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ type Metric interface {
// Drop marks the metric as processed successfully without being written
// to any output.
Drop()

// ToBytes converts the metric a byte array using the gob encoder.
ToBytes() ([]byte, error)

// FromBytes populates a metrics data using a binary byte array.
FromBytes([]byte) error
}

// TemplateMetric is an interface to use in templates (e.g text/template)
Expand Down
20 changes: 20 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metric

import (
"bytes"
"encoding/gob"
"fmt"
"hash/fnv"
"sort"
Expand Down Expand Up @@ -384,3 +386,21 @@ func convertField(v interface{}) interface{} {
}
return nil
}

func (m *metric) ToBytes() ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(m); err != nil {
return nil, fmt.Errorf("failed to encode metric to bytes: %w", err)
}
return buf.Bytes(), nil
}

func (m *metric) FromBytes(b []byte) error {
buf := bytes.NewBuffer(b)
decoder := gob.NewDecoder(buf)
if err := decoder.Decode(&m); err != nil {
return fmt.Errorf("failed to decode metric from bytes: %w", err)
}
return nil
}
4 changes: 3 additions & 1 deletion models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Buffer interface {
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
Reject([]telegraf.Metric)

Stats() BufferStats
}

// BufferStats holds common metrics used for buffer implementations.
Expand All @@ -42,7 +44,7 @@ type BufferStats struct {
}

// NewBuffer returns a new empty Buffer with the given capacity.
func NewBuffer(name string, alias string, capacity int, strategy string, path string) Buffer {
func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) {
bm := NewBufferMetrics(name, alias, capacity)

switch strategy {
Expand Down
54 changes: 21 additions & 33 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
package models

import (
"bytes"
"encoding/gob"
"fmt"

"github.com/influxdata/telegraf"
"github.com/tidwall/wal"
)

type DiskBuffer struct {
BufferStats

walFile *wal.Log
}

func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) *DiskBuffer {
func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) (*DiskBuffer, error) {
// todo capacity
walFile, err := wal.Open(path+"/"+name, nil)
if err != nil {
return nil // todo error handling
return nil, fmt.Errorf("failed to open wal file: %w", err)
}
return &DiskBuffer{
BufferStats: stats,
walFile: walFile,
}
}, nil
}

func (b *DiskBuffer) Len() int {
Expand Down Expand Up @@ -62,9 +58,13 @@ func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int {
return b.addBatch(metrics)
}

func (b *DiskBuffer) addSingle(metric telegraf.Metric) bool {
err := b.walFile.Write(b.writeIndex(), b.metricToBytes(metric))
metric.Accept()
func (b *DiskBuffer) addSingle(m telegraf.Metric) bool {
data, err := m.ToBytes()
if err != nil {
panic(err)
}
err = b.walFile.Write(b.writeIndex(), data)
m.Accept()
if err == nil {
b.metricAdded()
return true
Expand All @@ -76,7 +76,10 @@ func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int {
written := 0
batch := new(wal.Batch)
for _, m := range metrics {
data := b.metricToBytes(m)
data, err := m.ToBytes()
if err != nil {
panic(err)
}
m.Accept() // accept here, since the metric object is no longer retained from here
batch.Write(b.writeIndex(), data)
b.metricAdded()
Expand All @@ -101,7 +104,11 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
if err != nil {
// todo error handle
}
metrics[index] = b.bytesToMetric(data)
var m telegraf.Metric
if err = m.FromBytes(data); err != nil {
panic(err)
}
metrics[index] = m
index++
}
return metrics
Expand All @@ -127,25 +134,6 @@ func (b *DiskBuffer) Reject(batch []telegraf.Metric) {
}
}

func (b *DiskBuffer) metricToBytes(metric telegraf.Metric) []byte {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(metric); err != nil {
// todo error handle
fmt.Println("Error encoding:", err)
panic(1)
}
return buf.Bytes()
}

func (b *DiskBuffer) bytesToMetric(data []byte) telegraf.Metric {
buf := bytes.NewBuffer(data)
decoder := gob.NewDecoder(buf)
var m telegraf.Metric
if err := decoder.Decode(&m); err != nil {
// todo error handle
fmt.Println("Error decoding:", err)
panic(1)
}
return m
func (b *DiskBuffer) Stats() BufferStats {
return b.BufferStats
}
38 changes: 0 additions & 38 deletions models/buffer_disk_test.go

This file was deleted.

11 changes: 6 additions & 5 deletions models/buffer_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ type MemoryBuffer struct {
batchSize int // number of metrics currently in the batch
}

func NewMemoryBuffer(capacity int, stats BufferStats) *MemoryBuffer {
func NewMemoryBuffer(capacity int, stats BufferStats) (*MemoryBuffer, error) {
return &MemoryBuffer{
BufferStats: stats,
buf: make([]telegraf.Metric, capacity),
first: 0,
last: 0,
size: 0,
cap: capacity,
}
}, nil
}

func (b *MemoryBuffer) Len() int {
Expand Down Expand Up @@ -151,6 +148,10 @@ func (b *MemoryBuffer) Reject(batch []telegraf.Metric) {
b.BufferSize.Set(int64(b.length()))
}

func (b *MemoryBuffer) Stats() BufferStats {
return b.BufferStats
}

// next returns the next index with wrapping.
func (b *MemoryBuffer) next(index int) int {
index++
Expand Down
Loading

0 comments on commit f074af5

Please sign in to comment.