Skip to content

Commit

Permalink
block never flush edge cases when stores are constructed outside of N…
Browse files Browse the repository at this point in the history
…ewDefaultStore and resevoir timers are enabled

implicitly optimize store construction
  • Loading branch information
maciuszek committed Jan 16, 2025
1 parent 18c0e57 commit bf0ef63
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 22 deletions.
6 changes: 3 additions & 3 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (s *netSink) FlushSampledTimer(name string, value, sampleRate float64) {
b.WriteString(timerSuffix)

b.WriteString(sampleSuffix)
b.writeFloat64WithPercision(sampleRate, 2) // todo: deteremine how many decimal places we need
b.writeFloat64WithPrecision(sampleRate, 2) // todo: deteremine how many decimal places we need
b.WriteString(metricSuffix)

s.writeBuffer(b)
Expand Down Expand Up @@ -447,9 +447,9 @@ func (b *buffer) WriteUnit64(val uint64) {
}

func (b *buffer) WriteFloat64(val float64) {
b.writeFloat64WithPercision(val, 6)
b.writeFloat64WithPrecision(val, 6)
}

func (b *buffer) writeFloat64WithPercision(val float64, precision int) {
func (b *buffer) writeFloat64WithPrecision(val float64, precision int) {
*b = strconv.AppendFloat(*b, val, 'f', precision, 64)
}
6 changes: 5 additions & 1 deletion settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ const (
DefaultUseReservoirTimer = false
// FixedTimerReservoirSize is the max capacity of the reservoir for reservoir timers.
// note: needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128
// todo: see if we can use not-strict number and just account for the offset
// todo: see if it's worth efficency trade off to reduce tech debt of this magic number and allowing any number.

Check failure on line 27 in settings.go

View workflow job for this annotation

GitHub Actions / lint

`efficency` is a misspelling of `efficiency` (misspell)
// we could determine the difference between the defined size and next power of two
// and use that to offset the counter when ANDing it against the mask,
// once the result is 0 we just increment offset by "original offset"
FixedTimerReservoirSize = 128
)

Expand Down Expand Up @@ -92,6 +95,7 @@ func envBool(key string, def bool) (bool, error) {
}

// GetSettings returns the Settings gostats will run with.
// todo: can we optimize this by storing the result for subsequent calls
func GetSettings() Settings {
useStatsd, err := envBool("USE_STATSD", DefaultUseStatsd)
if err != nil {
Expand Down
42 changes: 30 additions & 12 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,24 +215,33 @@ type StatGenerator interface {
// Note: the export argument is unused.
func NewStore(sink Sink, _ bool) Store {
return &statStore{
sink: sink,
conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient by computing it once and storing for subsequent gets
sink: sink,
timerType: standard,
}
}

func newStatStore(sink Sink, export bool, conf Settings) *statStore {
store := NewStore(sink, export).(*statStore)
if conf.UseReservoirTimer {
store.timerType = reservoir
}
return store
}

// NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer.
// note: this is the only way to use reservoir timers as they rely on the store flush loop
func NewDefaultStore() Store {
var newStore Store
settings := GetSettings()
if !settings.UseStatsd {
if settings.LoggingSinkDisabled {
newStore = NewStore(NewNullSink(), false)
newStore = newStatStore(NewNullSink(), false, settings)
} else {
newStore = NewStore(NewLoggingSink(), false)
newStore = newStatStore(NewLoggingSink(), false, settings)
}
go newStore.Start(time.NewTicker(10 * time.Second))
} else {
newStore = NewStore(NewTCPStatsdSink(), false)
newStore = newStatStore(NewTCPStatsdSink(), false, settings)
go newStore.Start(time.NewTicker(time.Duration(settings.FlushIntervalS) * time.Second))
}
return newStore
Expand Down Expand Up @@ -301,6 +310,13 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timerType int

const (
standard timerType = iota
reservoir
)

type timer interface {
time(time.Duration)
AddDuration(time.Duration)
Expand Down Expand Up @@ -428,16 +444,15 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {

type statStore struct {
// these maps may grow indefinitely however slots in this maps are reused as stats names are stable over the lifetime of the process
counters sync.Map
gauges sync.Map
timers sync.Map
counters sync.Map
gauges sync.Map
timers sync.Map
timerType timerType

mu sync.RWMutex
statGenerators []StatGenerator

sink Sink

conf Settings
}

var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true}
Expand Down Expand Up @@ -605,15 +620,18 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
}

var t timer
if s.conf.UseReservoirTimer {
switch s.timerType {
case reservoir:
t = &reservoirTimer{
name: serializedName,
base: base,
ringSize: FixedTimerReservoirSize,
ringMask: FixedTimerReservoirSize - 1,
values: make([]float64, FixedTimerReservoirSize),
}
} else {
case standard: // this should allow backward compatible a backwards compatible fallback as standard is the zero value of s.timerType
fallthrough
default:
t = &standardTimer{
name: serializedName,
sink: s.sink,
Expand Down
14 changes: 8 additions & 6 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestTimerReservoir_Disabled(t *testing.T) {
expectedStatCount := statsToSend

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)
store := newStatStore(sink, true, GetSettings())

for i := 0; i < statsToSend; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestTimerReservoir_Overflow(t *testing.T) {
expectedStatCount := FixedTimerReservoirSize

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)
store := newStatStore(sink, true, GetSettings())

// this should equate to a 0.1 sample rate; 0.1 * 1280 = 128
for i := 0; i < statsToSend; i++ {
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestTimerReservoir_Full(t *testing.T) {
expectedStatCount := statsToSend

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)
store := newStatStore(sink, true, GetSettings())

for i := 0; i < statsToSend; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestTimerReservoir_NotFull(t *testing.T) {
expectedStatCount := statsToSend

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)
store := newStatStore(sink, true, GetSettings())

for i := 0; i < statsToSend; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) {
expectedStatCount := statsToSend

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)
store := newStatStore(sink, true, GetSettings())

for i := 0; i < statsToSend; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestTimerReservoir_ReusedStore(t *testing.T) {
expectedStatCount := statsToSend

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)
store := newStatStore(sink, true, GetSettings())

for i := 0; i < statsToSend; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
Expand Down Expand Up @@ -414,6 +414,8 @@ func TestTimerReservoir_ReusedStore(t *testing.T) {
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

// todo: add test coverage for NewDefaultStore and the automatic flush loop

// Ensure 0 counters are not flushed
func TestZeroCounters(t *testing.T) {
sink := &testStatSink{}
Expand Down

0 comments on commit bf0ef63

Please sign in to comment.