Skip to content

Commit

Permalink
Review comments #1.
Browse files Browse the repository at this point in the history
Track accepts instead of requests.
  • Loading branch information
easwars committed Feb 5, 2020
1 parent da24de0 commit 7c81639
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 42 deletions.
35 changes: 12 additions & 23 deletions balancer/rls/internal/adaptive/adaptive.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ type Throttler struct {
ratioForAccepts float64
requestsPadding float64

// Number of total requests and throttled requests in the lookback period.
// Number of total accepts and throttles in the lookback period.
mu sync.Mutex
requests *lookback
throttled *lookback
accepts *lookback
throttles *lookback
}

// New initializes a new adaptive throttler with the default values.
Expand All @@ -88,8 +88,8 @@ func newWithArgs(duration time.Duration, bins int64, ratioForAccepts, requestsPa
return &Throttler{
ratioForAccepts: ratioForAccepts,
requestsPadding: requestsPadding,
requests: newLookback(bins, duration),
throttled: newLookback(bins, duration),
accepts: newLookback(bins, duration),
throttles: newLookback(bins, duration),
}
}

Expand All @@ -104,29 +104,17 @@ func (t *Throttler) ShouldThrottle() bool {
t.mu.Lock()
defer t.mu.Unlock()

requests, throttled := t.requests.sum(now), t.throttled.sum(now)
accepts := float64(requests - throttled)
reqs := float64(requests)
throttleProbability := (reqs - t.ratioForAccepts*accepts) / (reqs + t.requestsPadding)
accepts, throttles := float64(t.accepts.sum(now)), float64(t.throttles.sum(now))
requests := accepts + throttles
throttleProbability := (requests - t.ratioForAccepts*accepts) / (requests + t.requestsPadding)
if throttleProbability <= randomProbability {
return false
}

t.requests.add(now, 1)
t.throttled.add(now, 1)
t.throttles.add(now, 1)
return true
}

// stats returns a tuple with requests, throttled for the current time.
func (t *Throttler) stats() (int64, int64) {
now := timeNowFunc()

t.mu.Lock()
requests, throttled := t.requests.sum(now), t.throttled.sum(now)
t.mu.Unlock()
return requests, throttled
}

// RegisterBackendResponse registers a response received from the backend for a
// request allowed by ShouldThrottle. This should be called for every response
// received from the backend (i.e., once for each request for which
Expand All @@ -135,9 +123,10 @@ func (t *Throttler) RegisterBackendResponse(throttled bool) {
now := timeNowFunc()

t.mu.Lock()
t.requests.add(now, 1)
if throttled {
t.throttled.add(now, 1)
t.throttles.add(now, 1)
} else {
t.accepts.add(now, 1)
}
t.mu.Unlock()
}
51 changes: 32 additions & 19 deletions balancer/rls/internal/adaptive/adaptive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ import (
"time"
)

// stats returns a tuple with accepts, throttles for the current time.
func (th *Throttler) stats() (int64, int64) {
now := timeNowFunc()

th.mu.Lock()
a, t := th.accepts.sum(now), th.throttles.sum(now)
th.mu.Unlock()
return a, t
}

// Enums for responses.
const (
E = iota // No response
Expand Down Expand Up @@ -88,14 +98,9 @@ func TestRegisterBackendResponse(t *testing.T) {
if test.responses[i] != E {
th.RegisterBackendResponse(test.responses[i] == T)
}
gotRequests, gotThrottled := th.stats()
gotAccepts := gotRequests - gotThrottled

if gotAccepts != test.wantAccepts[i] {
t.Errorf("allowed for index %d got %d, want %d", i, gotAccepts, test.wantAccepts[i])
}
if gotThrottled != test.wantThrottled[i] {
t.Errorf("throttled for index %d got %d, want %d", i, gotThrottled, test.wantThrottled[i])
if gotAccepts, gotThrottled := th.stats(); gotAccepts != test.wantAccepts[i] || gotThrottled != test.wantThrottled[i] {
t.Errorf("th.stats() = {%d, %d} for index %d, want {%d, %d}", i, gotAccepts, gotThrottled, test.wantAccepts[i], test.wantThrottled[i])
}
}
})
Expand Down Expand Up @@ -170,38 +175,46 @@ func TestParallel(t *testing.T) {

testDuration := 2 * time.Second
numRoutines := 10
counts := make([]int64, numRoutines)
accepts := make([]int64, numRoutines)
throttles := make([]int64, numRoutines)
var wg sync.WaitGroup
for i := 0; i < numRoutines; i++ {
wg.Add(1)
go func(num int) {
defer wg.Done()

ticker := time.NewTicker(testDuration)
var count int64
for {
var accept int64
var throttle int64
for i := 0; ; i++ {
select {
case <-ticker.C:
ticker.Stop()
counts[num] = count
accepts[num] = accept
throttles[num] = throttle
return
default:
th.RegisterBackendResponse(true)
count++
if i%2 == 0 {
th.RegisterBackendResponse(true)
throttle++
} else {
th.RegisterBackendResponse(false)
accept++
}
}
}
}(i)
}
wg.Wait()

var total int64
for i := range counts {
total += counts[i]
var wantAccepts, wantThrottles int64
for i := 0; i < numRoutines; i++ {
wantAccepts += accepts[i]
wantThrottles += throttles[i]
}

gotRequests, gotThrottled := th.stats()
if gotRequests != total || gotThrottled != total {
t.Errorf("got requests = %d, throtled = %d, want %d for both", gotRequests, gotThrottled, total)
if gotAccepts, gotThrottles := th.stats(); gotAccepts != wantAccepts || gotThrottles != wantThrottles {
t.Errorf("th.stats() = {%d, %d}, want {%d, %d}", gotAccepts, gotThrottles, wantAccepts, wantThrottles)
}
}

Expand Down

0 comments on commit 7c81639

Please sign in to comment.