Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reservoir timers #171

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
72bc838
prototype a simple reservoir for timers
maciuszek Jan 8, 2025
f67aca4
clarify comment
maciuszek Jan 8, 2025
5cc34d9
Move s.timerCount inside timer.Range
bshramin Jan 8, 2025
f7fc55e
fix test messages
maciuszek Jan 8, 2025
4a0662a
fix spelling
maciuszek Jan 8, 2025
f35471d
Merge remote-tracking branch 'origin/master' into mattkuzminski/add-t…
maciuszek Jan 10, 2025
57ef42b
re-design with timer reservoirs correctly independent per mertic
maciuszek Jan 10, 2025
4610f55
add some more todos
maciuszek Jan 10, 2025
cc908b5
clean up redundant code
maciuszek Jan 10, 2025
b1a2def
some more clean up
maciuszek Jan 11, 2025
8eb942d
address todos
maciuszek Jan 13, 2025
5dd8757
fix comment
maciuszek Jan 13, 2025
0d3fb45
ensure memory and flush management for timers
maciuszek Jan 13, 2025
ea5ae6a
optimize reservoirTimer by utilizing a ring buffer
maciuszek Jan 14, 2025
e81d603
correct how we flush reusable timer entries
maciuszek Jan 15, 2025
74a26a1
add test for reused timer map after flushing
maciuszek Jan 15, 2025
6d2687c
correct the ring buffer implementation to utilize bitwise benefits
maciuszek Jan 15, 2025
d067744
improve reservoirTimer property access
maciuszek Jan 15, 2025
7e5a451
make reservoir tests more dynamic
maciuszek Jan 16, 2025
a54db1a
improve comments
maciuszek Jan 16, 2025
18c0e57
optimize reservoir timer flush
maciuszek Jan 16, 2025
bf0ef63
block never flush edge cases when stores are constructed outside of N…
maciuszek Jan 16, 2025
8dad5ed
fix typo in comment
maciuszek Jan 16, 2025
9762152
add test for reservoir automatic flushing
maciuszek Jan 16, 2025
2641924
add test for concurrent reservoir writes and flushing
maciuszek Jan 16, 2025
858a3fd
fix typo in comment
maciuszek Jan 16, 2025
4e9611d
protect writes while flushing
maciuszek Jan 16, 2025
70cc61c
dont export controls that can result in a deadlock or datarace
maciuszek Jan 16, 2025
b352a4f
add critical optimization todo
maciuszek Jan 16, 2025
ef8cf0b
simplify reservoir processing
maciuszek Jan 17, 2025
8cceead
unexport RingSize and document immutability
maciuszek Jan 17, 2025
4cae9ac
print to stdout for testing
maciuszek Jan 17, 2025
7fe2893
improve test logging
maciuszek Jan 17, 2025
c2738fa
temporarily make logging a bit better at the sacrifice of performance
maciuszek Jan 20, 2025
236e2cc
remove test logging
maciuszek Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
DefaultTimerReservoirSize = 0
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +39,7 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -101,17 +103,26 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
TimerReservoirSize: timerReservoirSize,
}
}

// FlushInterval returns the flush interval duration.
func (s *Settings) FlushInterval() time.Duration {
return time.Duration(s.FlushIntervalS) * time.Second
}

func (s *Settings) isTimerReservoirEnabled() bool {
return s.TimerReservoirSize > 0
}
97 changes: 87 additions & 10 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"context"
"math"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -298,30 +299,69 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timer struct {
type timer interface {
time(time.Duration)
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
Value() float64
}

type standardTimer struct {
base time.Duration
name string
sink Sink
}

func (t *timer) time(dur time.Duration) {
func (t *standardTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *timer) AddDuration(dur time.Duration) {
func (t *standardTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *timer) AddValue(value float64) {
func (t *standardTimer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
}

func (t *timer) AllocateSpan() Timespan {
func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) Value() float64 {
return 0.0 // float zero value
}

type reservoirTimer struct {
base time.Duration
name string
value uint64
}

func (t *reservoirTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *reservoirTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *reservoirTimer) AddValue(value float64) {
// todo does this need to be atomtic? ideally for the the use case it won't/shouldn't be changed like a counter/gauge would be
atomic.StoreUint64(&t.value, math.Float64bits(value))
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) Value() float64 {
return math.Float64frombits(atomic.LoadUint64(&t.value))
}

type timespan struct {
timer *timer
timer timer
start time.Time
}

Expand All @@ -340,6 +380,8 @@ type statStore struct {
gauges sync.Map
timers sync.Map

timerCount int

mu sync.RWMutex
statGenerators []StatGenerator

Expand Down Expand Up @@ -393,6 +435,20 @@ func (s *statStore) Flush() {
return true
})

settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
s.timers.Range(func(key, v interface{}) bool {
// todo: maybe change this to not even add to the reservoir
// do not flush timers that are zero value
if value := v.(timer).Value(); value != 0.0 {
s.sink.FlushTimer(key.(string), v.(timer).Value())
}
s.timers.Delete(key)
s.timerCount--
return true
})
}

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
Expand Down Expand Up @@ -490,14 +546,35 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags))
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
return v.(timer)
}

var t timer
settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
t = &reservoirTimer{name: serializedName, base: base}

// todo: > shouldn't be necessary
if s.timerCount >= settings.TimerReservoirSize {
// this will delete 1 random timer in the map
s.timers.Range(func(key, _ interface{}) bool {
s.timers.Delete(key)
return false
})
s.timerCount--
}
} else {
t = &standardTimer{name: serializedName, sink: s.sink, base: base}
}
t := &timer{name: serializedName, sink: s.sink, base: base}

if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
return v.(timer)
}

s.timerCount++

return t
}

Expand Down
112 changes: 105 additions & 7 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"math/rand"
"os"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -78,9 +79,9 @@ func TestValidateTags(t *testing.T) {
store.Flush()

expected := "test:1|c"
counter := sink.record
if !strings.Contains(counter, expected) {
t.Error("wanted counter value of test:1|c, got", counter)
output := sink.record
if !strings.Contains(output, expected) && !strings.Contains(output, "reserved_tag") {
t.Errorf("Expected without reserved tags: '%s' Got: '%s'", expected, output)
}

// A reserved tag should trigger adding the reserved_tag counter
Expand All @@ -89,10 +90,11 @@ func TestValidateTags(t *testing.T) {
store.NewCounterWithTags("test", map[string]string{"host": "i"}).Inc()
store.Flush()

expected = "reserved_tag:1|c\ntest.__host=i:1|c"
counter = sink.record
if !strings.Contains(counter, expected) {
t.Error("wanted counter value of test.___f=i:1|c, got", counter)
expected = "test.__host=i:1|c"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this test change is out of scope of this work but it was previously volatile with the order of reserved_tag vs test.__host not being deterministic

expectedReservedTag := "reserved_tag:1|c"
output = sink.record
if !strings.Contains(output, expected) && !strings.Contains(output, expectedReservedTag) {
t.Errorf("Expected: '%s' and '%s', In: '%s'", expected, expectedReservedTag, output)
}
}

Expand Down Expand Up @@ -126,6 +128,102 @@ func TestMilliTimer(t *testing.T) {
}
}

func TestTimerResevoir_Disabled(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0")
if err != nil {
t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err)
}

expectedStatCount := 1000

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10))
}

if ts.String() != "" {
t.Errorf("Stats were written despite forced batching")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err)
}

expectedStatCount := 100

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10 + 1)) // don't create timers with 0 values to make the count deterministic
}

if ts.String() != "" {
t.Errorf("Stats were written despite forced batching")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir_FilteredZeros(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err)
}

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10))
}

if ts.String() != "" {
t.Errorf("Stats were written despite forced batching")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

stats := strings.Split(ts.String(), "\n")
stats = stats[:len(stats)-1] // remove the extra new line character at the end of the buffer
for _, stat := range stats {
value := strings.Split(strings.Split(stat, ":")[1], ("|ms"))[0] // strip value and remove suffix and get raw number
if value == "0" {
t.Errorf("Got a zero value stat: %s", stat)
}

}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

// Ensure 0 counters are not flushed
func TestZeroCounters(t *testing.T) {
sink := &testStatSink{}
Expand Down
Loading