Skip to content

Commit

Permalink
add test for concurrent reservoir writes and flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
maciuszek committed Jan 16, 2025
1 parent 9762152 commit 2641924
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
2 changes: 1 addition & 1 deletion stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

// t.ringMask isn't ever changed and accessing it here is protected by the mutex
// t.ringMask isn't ever changed so the access to should fine
t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value
atomic.AddUint64(&t.count, 1)
}
Expand Down
51 changes: 51 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,57 @@ func TestTimerReservoir_AutomaticFlush(t *testing.T) {
os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

func TestTimerReservoir_ConcurrentFlushingWhileWrites(t *testing.T) {
err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true")
if err != nil {
t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err)
}

flushIntervalMs := 5
expectedStatCount := FixedTimerReservoirSize * 2

ts, sink := setupTestNetSink(t, "tcp", false)
store := newStatStore(sink, true, GetSettings())

ctx, cancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
store.StartContext(ctx, time.NewTicker(time.Duration(flushIntervalMs)*time.Millisecond))
}()

statsToSend := expectedStatCount
for i := 0; i < statsToSend; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
time.Sleep(time.Duration(flushIntervalMs/5) * time.Millisecond)
}

time.Sleep(time.Duration(flushIntervalMs+1) * time.Millisecond) // guarentee we finish flushing

Check failure on line 510 in stats_test.go

View workflow job for this annotation

GitHub Actions / lint

`guarentee` is a misspelling of `guarantee` (misspell)

stats := strings.Split(ts.String(), "\n")
statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount)
}

stats = stats[:statCount]
for _, stat := range stats {
value := strings.Split(stat, ":")[1]
sampleRate := strings.Split(value, ("|@"))[1]
if sampleRate != "1.00" {
t.Errorf("The test1 stat was written without a 1.00 sample rate: %s", stat)
}
}

cancel()
wg.Wait()

os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER")
}

// Ensure 0 counters are not flushed
func TestZeroCounters(t *testing.T) {
sink := &testStatSink{}
Expand Down

0 comments on commit 2641924

Please sign in to comment.