Skip to content

Commit

Permalink
add test for reservoir automatic flushing
Browse files Browse the repository at this point in the history
fix data race in reservoirTimer
  • Loading branch information
maciuszek committed Jan 16, 2025
1 parent 8dad5ed commit 9762152
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
6 changes: 3 additions & 3 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

// direct access to t.count and t.ringMask is protected by the mutex
t.values[t.count&t.ringMask] = value
t.count++
// t.ringMask isn't ever changed and accessing it here is protected by the mutex
t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value
atomic.AddUint64(&t.count, 1)
}

func (t *reservoirTimer) AllocateSpan() Timespan {
Expand Down
67 changes: 65 additions & 2 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func TestTimerReservoir_Overflow(t *testing.T) {
ts, sink := setupTestNetSink(t, "tcp", false)
store := newStatStore(sink, true, GetSettings())

// this should equate to a 0.1 sample rate; 0.1 * 1280 = 128
for i := 0; i < statsToSend; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
}
Expand Down Expand Up @@ -414,7 +413,71 @@ func TestTimerReservoir_ReusedStore(t *testing.T) {
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

// todo: add test coverage for NewDefaultStore and the automatic flush loop
func TestTimerReservoir_AutomaticFlush(t *testing.T) {
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true")
if err != nil {
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

flushIntervalS := 5
expectedStatCount := FixedTimerReservoirSize * 2

ts, sink := setupTestNetSink(t, "tcp", false)
store := newStatStore(sink, true, GetSettings())

ctx, cancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
store.StartContext(ctx, time.NewTicker(time.Duration(flushIntervalS)*time.Second))
}()

// first reservoir timer
statsToSend := FixedTimerReservoirSize
for i := 0; i < statsToSend; i++ {
store.NewTimer("test1").AddValue(float64(i % 10))
}

// second reservoir timer
statsToSend = FixedTimerReservoirSize * 3
for i := 0; i < statsToSend; i++ {
store.NewTimer("test2").AddValue(float64(i % 10))
}

if ts.String() != "" {
t.Errorf("Stats were written pre flush invalidating the test")
}

time.Sleep(time.Duration(flushIntervalS+1) * time.Second) // increment a second to allow the flush to happen

stats := strings.Split(ts.String(), "\n")
statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount)
}

stats = stats[:statCount]
for _, stat := range stats {
name := strings.Split(stat, ":")[0]
value := strings.Split(stat, ":")[1]
sampleRate := strings.Split(value, ("|@"))[1]
if name == "test1" && sampleRate != "1.00" {
t.Errorf("The test1 stat was written without a 1.00 sample rate: %s", stat)
} else if name == "test2" && sampleRate != "0.33" {
t.Errorf("The test2 stat was written without a 0.33 sample rate: %s", stat)
} else if name != "test1" && name != "test2" {
t.Errorf("A unknown stat was written: %s", stat)
}
}

cancel()
wg.Wait()

os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

// Ensure 0 counters are not flushed
func TestZeroCounters(t *testing.T) {
Expand Down

0 comments on commit 9762152

Please sign in to comment.