Skip to content

Commit

Permalink
quotapool: extract token bucket accounting
Browse files Browse the repository at this point in the history
This commit extracts the token bucket accounting logic into a
separate, public type. The same code will be used by the tenantrate
limiter.

Release note: None
  • Loading branch information
RaduBerinde committed Apr 8, 2021
1 parent 9bddccf commit 10881d2
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 76 deletions.
86 changes: 10 additions & 76 deletions pkg/util/quotapool/int_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"context"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// Limit defines a rate in terms of quota per second.
Expand All @@ -34,17 +32,9 @@ type RateLimiter struct {
// put the token bucket in debt.
func NewRateLimiter(name string, rate Limit, burst int64, options ...Option) *RateLimiter {
rl := &RateLimiter{}
bucket := rateBucket{
limitConfig: limitConfig{
rate: rate,
burst: burst,
},
p: rl,
cur: float64(burst),
lastUpdated: timeutil.Now(),
}
rl.qp = New(name, &bucket, options...)
bucket.lastUpdated = rl.qp.timeSource.Now()
tb := &TokenBucket{}
rl.qp = New(name, tb, options...)
tb.Init(TokensPerSecond(rate), Tokens(burst), rl.qp.timeSource)
return rl
}

Expand Down Expand Up @@ -89,30 +79,12 @@ func (rl *RateLimiter) AdmitN(n int64) bool {
// putting the limiter into debt.
func (rl *RateLimiter) UpdateLimit(rate Limit, burst int64) {
rl.qp.Update(func(res Resource) (shouldNotify bool) {
r := res.(*rateBucket)
shouldNotify = r.burst < burst || r.rate < rate
burstDelta := burst - r.burst
r.limitConfig = limitConfig{rate: rate, burst: burst}
r.cur += float64(burstDelta)
r.update(r.p.qp.TimeSource().Now())
return shouldNotify
tb := res.(*TokenBucket)
tb.UpdateConfig(TokensPerSecond(rate), Tokens(burst))
return true
})
}

// rateBucket is the implementation of Resource which remains in the quotapool
// for a RateLimiter.
type rateBucket struct {
limitConfig
p *RateLimiter
cur float64
lastUpdated time.Time
}

type limitConfig struct {
rate Limit
burst int64
}

// RateAlloc is an allocated quantity of quota which can be released back into
// the token-bucket RateLimiter.
type RateAlloc struct {
Expand All @@ -124,11 +96,8 @@ type RateAlloc struct {
// methods on the RateAlloc after this call.
func (ra *RateAlloc) Return() {
ra.rl.qp.Update(func(res Resource) (shouldNotify bool) {
r := res.(*rateBucket)
r.cur += float64(ra.alloc)
if r.cur > float64(r.burst) {
r.cur = float64(r.burst)
}
tb := res.(*TokenBucket)
tb.Adjust(Tokens(ra.alloc))
return true
})
ra.rl.putRateAlloc((*rateAlloc)(ra))
Expand Down Expand Up @@ -167,43 +136,8 @@ func (rl *RateLimiter) putRateRequest(r *rateRequest) {
func (i *rateRequest) Acquire(
ctx context.Context, res Resource,
) (fulfilled bool, tryAgainAfter time.Duration) {
r := res.(*rateBucket)
now := r.p.qp.timeSource.Now()

r.update(now)

// Deal with the case where the allocation is larger than the burst size.
// In this case we'll allow the acquisition to complete if the current value
// is equal to the burst. If the acquisition succeeds, it will put the limiter
// into debt.
want := float64(i.want)
if i.want > r.burst {
want = float64(r.burst)
}
if delta := want - r.cur; delta > 0 {
// Compute the time it will take for r.cur to get to the needed capacity.
timeDelta := time.Duration((delta * float64(time.Second)) / float64(r.rate))

// Deal with the exceedingly edge case that timeDelta, as a floating point
// number, is less than 1ns by returning 1ns and looping back around.
if timeDelta == 0 {
timeDelta++
}

return false, timeDelta
}
r.cur -= float64(i.want)
return true, 0
}

func (r *rateBucket) update(now time.Time) {
if since := now.Sub(r.lastUpdated); since > 0 {
r.cur += float64(r.rate) * since.Seconds()
if r.cur > float64(r.burst) {
r.cur = float64(r.burst)
}
r.lastUpdated = now
}
tb := res.(*TokenBucket)
return tb.TryToFulfill(Tokens(i.want))
}

func (i *rateRequest) ShouldWait() bool {
Expand Down
116 changes: 116 additions & 0 deletions pkg/util/quotapool/token_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package quotapool

import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// Tokens are abstract units (usually units of work).
type Tokens float64

// TokensPerSecond is the rate of token replenishment.
type TokensPerSecond float64

// TokenBucket implements the basic accounting for a token bucket.
//
// A token bucket has a rate of replenishment and a burst limit. Tokens are
// replenished over time, up to the burst limit.
//
// The token bucket keeps track of the current amount and updates it as time
// passes. The bucket can go into debt (i.e. negative current amount).
type TokenBucket struct {
rate TokensPerSecond
burst Tokens
timeSource timeutil.TimeSource

current Tokens
lastUpdated time.Time
}

// Init the token bucket.
func (tb *TokenBucket) Init(rate TokensPerSecond, burst Tokens, timeSource timeutil.TimeSource) {
*tb = TokenBucket{
rate: rate,
burst: burst,
timeSource: timeSource,
current: burst,
lastUpdated: timeSource.Now(),
}
}

// update moves the time forward, accounting for the replenishment since the
// last update.
func (tb *TokenBucket) update() {
now := tb.timeSource.Now()
if since := now.Sub(tb.lastUpdated); since > 0 {
tb.current += Tokens(float64(tb.rate) * since.Seconds())

if tb.current > tb.burst {
tb.current = tb.burst
}
tb.lastUpdated = now
}
}

// UpdateConfig updates the rate and burst limits. The change in burst will be
// applied to the current token quantity. For example, if the RateLimiter
// currently had 5 available tokens and the burst is updated from 10 to 20, the
// amount will increase to 15. Similarly, if the burst is decreased by 10, the
// current quota will decrease accordingly, potentially putting the limiter into
// debt.
func (tb *TokenBucket) UpdateConfig(rate TokensPerSecond, burst Tokens) {
tb.update()

burstDelta := burst - tb.burst
tb.rate = rate
tb.burst = burst

tb.current += burstDelta
}

// Adjust returns tokens to the bucket (positive delta) or accounts for a debt
// of tokens (negative delta).
func (tb *TokenBucket) Adjust(delta Tokens) {
tb.update()
tb.current += delta
if tb.current > tb.burst {
tb.current = tb.burst
}
}

// TryToFulfill either removes the given amount if is available, or returns a
// time after which the request should be retried.
func (tb *TokenBucket) TryToFulfill(amount Tokens) (fulfilled bool, tryAgainAfter time.Duration) {
tb.update()

// Deal with the case where the request is larger than the burst size. In
// this case we'll allow the acquisition to complete if and when the current
// value is equal to the burst. If the acquisition succeeds, it will put the
// limiter into debt.
want := amount
if want > tb.burst {
want = tb.burst
}
if delta := want - tb.current; delta > 0 {
// Compute the time it will take to get to the needed capacity.
timeDelta := time.Duration((float64(delta) * float64(time.Second)) / float64(tb.rate))
if timeDelta < time.Nanosecond {
timeDelta = time.Nanosecond
}
return false, timeDelta
}

tb.current -= amount
return true, 0
}
88 changes: 88 additions & 0 deletions pkg/util/quotapool/token_bucket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package quotapool

import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func TestTokenBucket(t *testing.T) {
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mt := timeutil.NewManualTime(t0)

var tb TokenBucket
tb.Init(10, 20, mt)

check := func(expected Tokens) {
t.Helper()
const eps = 1e-10
tb.update()
if delta := tb.current - expected; delta > eps || delta < -eps {
t.Fatalf("expected current amount %v, got %v", expected, tb.current)
}
}

checkFulfill := func(amount Tokens, expected time.Duration) {
t.Helper()
ok, tryAgainAfter := tb.TryToFulfill(amount)
if ok {
if expected != 0 {
t.Fatalf("expected not to be fulfilled")
}
} else {
if expected == 0 {
t.Fatalf("expected to be fulfilled")
} else if tryAgainAfter.Round(time.Microsecond) != expected.Round(time.Microsecond) {
t.Fatalf("expected tryAgainAfter %v, got %v", expected, tryAgainAfter)
}
}
}

check(20)
tb.Adjust(-10)
check(10)
tb.Adjust(5)
check(15)
tb.Adjust(20)
check(20)

mt.Advance(time.Second)
check(20)
tb.Adjust(-15)
check(5)

mt.Advance(time.Second)
check(15)
mt.Advance(time.Second)
check(20)

checkFulfill(15, 0)
checkFulfill(15, time.Second)

mt.Advance(10 * time.Second)
// Now put the bucket into debt with a huge ask.
checkFulfill(120, 0)
checkFulfill(10, 11*time.Second)

mt.Advance(100 * time.Second)

// A full bucket should remain full.
tb.UpdateConfig(100, 1000)
checkFulfill(1000, 0)
checkFulfill(100, 1*time.Second)

tb.UpdateConfig(10, 20)
check(-980)
checkFulfill(20, 100*time.Second)
}

0 comments on commit 10881d2

Please sign in to comment.