Skip to content

Commit

Permalink
correct the ring buffer implementation to utilize bitwise benefits
Browse files Browse the repository at this point in the history
additionally remove variablility of the reservoir size as we need to ensure this number is a power of two
  • Loading branch information
maciuszek committed Jan 15, 2025
1 parent 74a26a1 commit 6d2687c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 49 deletions.
14 changes: 6 additions & 8 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
DefaultTimerReservoirSize = 0
DefaultUseReservoirTimer = false // DefaultUseReservoirTimer defines if reservoir timers should be used by default, default is false.
DefaultTimerReservoirSize = 128 // DefaultTimerReservoirSize is the max capacity of the reservoir for reservoir timers. needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -39,7 +40,8 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"`
// Enable all timers to act as reservoir timers with sampling
UseReservoirTimer bool `envconfig:"GOSTATS_USE_RESERVOIR_TIMER" default:"false"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -103,7 +105,7 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize)
useReservoirTimer, err := envBool("GOSTATS_USE_RESERVOIR_TIMER", DefaultUseReservoirTimer)
if err != nil {
panic(err)
}
Expand All @@ -114,15 +116,11 @@ func GetSettings() Settings {
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
TimerReservoirSize: timerReservoirSize,
UseReservoirTimer: useReservoirTimer,
}
}

// FlushInterval returns the flush interval duration.
func (s *Settings) FlushInterval() time.Duration {
return time.Duration(s.FlushIntervalS) * time.Second
}

func (s *Settings) isTimerReservoirEnabled() bool {
return s.TimerReservoirSize > 0
}
26 changes: 7 additions & 19 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stats

import (
"context"
"math/bits"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -354,11 +353,10 @@ type reservoirTimer struct {
mu sync.Mutex
base time.Duration
name string
ringSize int
ringSize int // just used so that we don't have to re-evaluate capacity of values
ringMask int
values []float64
count int
overflow int
}

func (t *reservoirTimer) time(dur time.Duration) {
Expand All @@ -373,14 +371,7 @@ func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

t.values[t.overflow&t.ringMask] = value
t.overflow++

// todo: can i optimize this with xor?
if t.overflow == t.ringSize {
t.overflow = 0
}

t.values[t.count&t.ringMask] = value
t.count++
}

Expand Down Expand Up @@ -413,15 +404,14 @@ func (t *reservoirTimer) SampleRate() float64 {
if t.count <= t.ringSize {
return 1.0
}
return float64(t.ringSize) / float64(t.count) // todo: is it worth it to use t.ringSize instead of computing len of values worth it?
return float64(t.ringSize) / float64(t.count)
}

func (t *reservoirTimer) Reset() {
t.mu.Lock()
defer t.mu.Unlock()

t.count = 0 // this will imply a 0.0 sample rate until it's increased
t.overflow = 0
}

type timespan struct {
Expand Down Expand Up @@ -618,15 +608,13 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
}

var t timer
if s.conf.isTimerReservoirEnabled() {
capacity := s.conf.TimerReservoirSize
capacityRoundedToTheNextPowerOfTwo := 1 << bits.Len(uint(capacity))
if s.conf.UseReservoirTimer {
t = &reservoirTimer{
name: serializedName,
base: base,
ringSize: capacity,
ringMask: capacityRoundedToTheNextPowerOfTwo - 1,
values: make([]float64, capacity),
ringSize: DefaultTimerReservoirSize,
ringMask: DefaultTimerReservoirSize - 1,
values: make([]float64, DefaultTimerReservoirSize),
}
} else {
t = &standardTimer{
Expand Down
45 changes: 23 additions & 22 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func TestMilliTimer(t *testing.T) {
}

func TestTimerReservoir_Disabled(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0")
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "false")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

expectedStatCount := 1000
Expand Down Expand Up @@ -165,21 +165,22 @@ func TestTimerReservoir_Disabled(t *testing.T) {
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

func TestTimerReservoir_Overflow(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

expectedStatCount := 100
expectedStatCount := 128 // reservoir size

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
// this should equate to a 0.1 sample rate; 0.1 * 1280 = 128
for i := 0; i < 1280; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
}

Expand All @@ -206,21 +207,21 @@ func TestTimerReservoir_Overflow(t *testing.T) {
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

func TestTimerReservoir_Full(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

expectedStatCount := 100
expectedStatCount := 128 // reservoir size

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 100; i++ {
for i := 0; i < 128; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
}

Expand All @@ -247,13 +248,13 @@ func TestTimerReservoir_Full(t *testing.T) {
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

func TestTimerReservoir_NotFull(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

expectedStatCount := 50
Expand Down Expand Up @@ -288,13 +289,13 @@ func TestTimerReservoir_NotFull(t *testing.T) {
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

func TestTimerReservoir_IndependantReservoirs(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

expectedStatCount := 1000
Expand Down Expand Up @@ -329,13 +330,13 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) {
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

func TestTimerReservoir_ReusedStore(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

expectedStatCount := 100
Expand Down Expand Up @@ -403,7 +404,7 @@ func TestTimerReservoir_ReusedStore(t *testing.T) {
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

// Ensure 0 counters are not flushed
Expand Down

0 comments on commit 6d2687c

Please sign in to comment.