Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global ratelimiter helper: usage-tracking fallback-capable rate.Limiter #6028

Merged
merged 2 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions common/quotas/global/collection/internal/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

// Package internal protects these types' concurrency primitives and other
// internals from accidental misuse.
package internal

import (
"math"
"sync/atomic"

"golang.org/x/time/rate"
)

type (
// Limiter is a simplified version of [github.com/uber/cadence/common/quotas.Limiter],
// for both simpler mocking and to remove the need to import it.
//
// Wait and Reserve are intentionally excluded here. They are not currently needed,
// and it seems likely to be much more difficult to correctly track their usage.
Limiter interface {
Allow() bool
}

// FallbackLimiter wraps a [rate.Limiter] with a fallback Limiter (i.e. a [github.com/uber/cadence/common/quotas.Limiter])
// to use after a configurable number of failed updates.
//
// Intended use is:
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
// - collect allowed vs rejected metrics (implicitly tracked by calling Allow())
// - periodically, the limiting host gathers all FallbackLimiter metrics and zeros them (with Collect())
// - this info is submitted to aggregating hosts, who compute new target RPS values
// - these new target values are used to adjust this ratelimiter (with Update(...))
//
// During this sequence, a requested limit may not be returned by an aggregator for two major reasons,
// and will result in a FailedUpdate() call to shorten a "use fallback logic" fuse:
// - this limit is legitimately unused and no data exists for it
// - ring re-sharding led to losing track of the limit, and it is now owned by a host with insufficient data
//
// To mitigate the impact of the second case, "insufficient data" responses from aggregating hosts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps, it's worth adding some metrics tracking when we fallback

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm planning on metrics. that'll be outside these objects though, and it's why Collect() returns the "use fallback" bool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a transition from non-fallback to fallback or vice versa, the counter will be inaccurate.

Copy link
Member Author

@Groxx Groxx May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you mean the limiting behavior will be inaccurate, e.g. it'll allow / deny more requests than it should:

yes, but we don't have any great way to resolve that. *rate.Limiter doesn't allow directly manipulating its tokens, and there isn't even a true "correct" behavior if this happens because it implies we don't know the correct behavior - we haven't been able to sync with other instances.

it'll only be incorrect for one "time to fill the ratelimiter's burstable tokens" period though, which is currently 1 second (limit == burst in basically all of our uses). generally speaking that'd mean ~2x overage at worst, assuming all hosts switched at the same time.


I could try to empty the tokens when switching, which would be safer for us because it wouldn't allow exceeding either limit, but we don't have an efficient API for that - ideally it'd get tokens and try to e.g. binary-search Allow(n) to empty most or all of them, since it's racing with other calls. Right now it'd need around thousands of calls per limiter to drain, which is definitely excessive.

The #6030 wrapper could do this relatively precisely and efficiently, as it can lock everything while draining. If we think 1 second of overage is worth preventing, I suspect it'd be relatively easy to add a "drain the limiter" API for this kind of purpose. I'm... ~50% "yes, worth it"? Can definitely be convinced to build that.

Copy link
Member Author

@Groxx Groxx May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr from IRL, from clarifying some of the surroundings here:

a layer above this, which holds all these individual tracking-and-fallback-able limiters, will be periodically calling Collect() and it'll emit a counter for "using fallback" so we can monitor how often fallbacks are used.

but "uses fallback" should only occur in practice:

  • temporarily after startup / when a key is first used (no data, must use fallback)
    • this might be worth tracking separately, the failed-update counter can show when this is happening (negative value)
  • when the global limiter is disabled and rejects all requests (falling back is desired here)
  • when something cataclysmic is happening and global limit data is being lost rapidly, e.g. many host crashes -> ring changes rapidly -> does not return weights before crashing too. (falling back is desirable here too, as the old values are old enough to not trust)

so the sustained existence of fallback-using things when we expect the global ratelimiter to be working is a concern, but not individual "failed update" events or temporary use.

the exact metrics emitted are going to be in a later diff, that all happens outside of this object.

// (which are currently modeled as "no data") are temporarily ignored, and the previously-configured
// update is used. This gives the aggregating host time to fill in its data, and then the next cycle should
// use "real" values that match actual usage.
//
// If no data has been returned for a sufficiently long time, the "main" ratelimit will be dropped, and
// the fallback limit will be used exclusively. This is intended as a safety fallback, e.g. during initial
// rollout/rollback and outage scenarios, normal use is not expected to rely on it.
FallbackLimiter struct {
// usage / available-limit data cannot be gathered from rate.Limiter, sadly.
// so it needs to be collected externally.
//
// note that these atomics are values, not pointers, to keep data access relatively local.
// this requires that FallbackLimiter is used as a pointer, not a value, and is never copied.
// if this is not done, `go vet` will produce an error:
// ❯ go vet -copylocks .
// # github.com/uber/cadence/common/quotas/global/limiter
// ./collection.go:30:27: func passes lock by value: github.com/uber/cadence/common/quotas/global/limiter/internal.FallbackLimiter contains sync/atomic.Int64 contains sync/atomic.noCopy
// which is checked by `make lint` during CI.
//
// changing them to pointers will not change any semantics, so that should be fine if it becomes needed.

// accepted-request usage counter
accepted atomic.Int64
// rejected-request usage counter
rejected atomic.Int64
// number of failed updates, for deciding when to use fallback
failedUpdates atomic.Int64

// ratelimiters in use

// fallback used when failedUpdates exceeds maxFailedUpdates,
// or prior to initial data from the global ratelimiter system.
fallback Limiter
// local-only limiter based global ratelimiter values.
//
// note that use and modification is NOT synchronized externally,
// so updates and deciding when to use the fallback must be done carefully
// to avoid undesired combinations when they interleave.
limit *rate.Limiter
}
)

const (
// when failed updates exceeds this value, use the fallback
maxFailedUpdates = 9

// at startup / new limits in use, use the fallback logic.
// that's expected / normal behavior as we have no data yet.
//
// positive values risk being interpreted as a "failure" though, so start deeply
// negative so it can be identified as "still starting up".
// min-int64 has more than enough room to "count" failed updates for eons
// without becoming positive, so it should not risk being misinterpreted.
initialFailedUpdates = math.MinInt64
)

func NewFallbackLimiter(fallback Limiter) *FallbackLimiter {
l := &FallbackLimiter{
fallback: fallback,
limit: rate.NewLimiter(0, 0), // 0 allows no requests, will be unused until we receive an update
}
l.failedUpdates.Store(initialFailedUpdates)
return l
}

// Collect returns the current accepted/rejected values, and resets them to zero.
// Small bits of imprecise counting / time-bucketing due to concurrent limiting is expected and allowed,
// as it should be more than small enough in practice to not matter.
func (b *FallbackLimiter) Collect() (accepted int, rejected int, usingFallback bool) {
accepted = int(b.accepted.Swap(0))
rejected = int(b.rejected.Swap(0))
return accepted, rejected, b.useFallback()
}

func (b *FallbackLimiter) useFallback() bool {
failed := b.failedUpdates.Load()
startingUp := failed < 0
tooManyFailures := failed > maxFailedUpdates
return startingUp || tooManyFailures
}

// Update adjusts the underlying "main" ratelimit, and resets the fallback fuse.
func (b *FallbackLimiter) Update(rps float64) {
// caution: order here matters, to prevent potentially-old limiter values from being used
// before they are updated.
//
// this is probably not going to be noticeable, but some users are sensitive to ANY
// requests being ratelimited. updating the fallback fuse last should be more reliable
// in preventing that from happening when they would not otherwise be limited, e.g. with
// the initial value of 0 burst, or if their rps was very low long ago and is now high.
defer func() {
// reset the use-fallback fuse, which may also (re)enable using the "main" limiter
b.failedUpdates.Store(0)
}()

if b.limit.Limit() == rate.Limit(rps) {
return
}

b.limit.SetLimit(rate.Limit(rps))
b.limit.SetBurst(max(1, int(rps))) // 0 burst blocks all requests, so allow at least 1 and rely on rps to fill sanely
}

// FailedUpdate should be called when a limit fails to update from an aggregator,
// possibly implying some kind of problem, which may be unique to this limit.
//
// After crossing a threshold of failures (currently 10), the fallback will be switched to.
func (b *FallbackLimiter) FailedUpdate() (failures int) {
failures = int(b.failedUpdates.Add(1)) // always increment the count for monitoring purposes
return failures
}

// Reset defers to the fallback until an update is received.
//
// This is intended to be used when the current limit is no longer trustworthy for some reason,
// determined externally rather than from too many FailedUpdate calls.
func (b *FallbackLimiter) Reset() {
b.failedUpdates.Store(initialFailedUpdates)
}

// Allow returns true if a request is allowed right now.
func (b *FallbackLimiter) Allow() bool {
var allowed bool
if b.useFallback() {
allowed = b.fallback.Allow()
} else {
allowed = b.limit.Allow()
}

if allowed {
b.accepted.Add(1)
} else {
b.rejected.Add(1)
}
return allowed
}

// intentionally shadows builtin max, so it can simply be deleted when 1.21 is adopted
func max(a, b int) int {
if a > b {
return a
}
return b
}
178 changes: 178 additions & 0 deletions common/quotas/global/collection/internal/limiter_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package internal_test

import (
"math/rand"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/quotas/global/collection/internal"
)

func TestLimiter(t *testing.T) {
t.Run("uses fallback initially", func(t *testing.T) {
m := quotas.NewMockLimiter(gomock.NewController(t))
m.EXPECT().Allow().Times(1).Return(true)
m.EXPECT().Allow().Times(2).Return(false)
lim := internal.NewFallbackLimiter(m)

assert.True(t, lim.Allow(), "should return fallback's first response")
assert.False(t, lim.Allow(), "should return fallback's second response")
assert.False(t, lim.Allow(), "should return fallback's third response")

accept, reject, fallback := lim.Collect()
assert.Equal(t, 1, accept, "should have counted one accepted request")
assert.Equal(t, 2, reject, "should have counted two rejected requests")
assert.True(t, fallback, "should be using fallback")
})
t.Run("uses real after update", func(t *testing.T) {
m := quotas.NewMockLimiter(gomock.NewController(t)) // no allowances, must not be used
lim := internal.NewFallbackLimiter(m)
lim.Update(1_000_000) // large enough to allow millisecond sleeps to refill

time.Sleep(time.Millisecond)
assert.True(t, lim.Allow(), "limiter allows after enough time has passed")

allowed := lim.Allow() // could be either due to timing
accept, reject, fallback := lim.Collect()
if allowed {
assert.Equal(t, 2, accept, "should have counted two accepted request")
assert.Equal(t, 0, reject, "should have counted no rejected requests")
} else {
assert.Equal(t, 1, accept, "should have counted one accepted request")
assert.Equal(t, 1, reject, "should have counted one rejected request")
}
assert.False(t, fallback, "should not be using fallback")
})

t.Run("collecting usage data resets counts", func(t *testing.T) {
lim := internal.NewFallbackLimiter(nil)
lim.Update(1) // so there's no need to mock
lim.Allow()
accepted, rejected, _ := lim.Collect()
assert.Equal(t, 1, accepted+rejected, "should count one request")
accepted, rejected, _ = lim.Collect()
assert.Zero(t, accepted+rejected, "collect should have cleared the counts")
})

t.Run("use-fallback fuse", func(t *testing.T) {
// duplicate to allow this test to be external, keep in sync by hand
const maxFailedUpdates = 9
t.Cleanup(func() {
if t.Failed() { // notices sub-test failures
t.Logf("maxFailedUpdates may be out of sync (%v), check hardcoded values", maxFailedUpdates)
}
})

t.Run("falls back after too many failures", func(t *testing.T) {
m := quotas.NewMockLimiter(gomock.NewController(t)) // no allowances
lim := internal.NewFallbackLimiter(m)
lim.Update(1)
_, _, fallback := lim.Collect()
require.False(t, fallback, "should not be using fallback")
// bucket starts out empty / with whatever contents it had before (zero).
// this is possibly surprising, so it's asserted.
require.False(t, lim.Allow(), "rate.Limiter should reject requests until filled")

// fail enough times to trigger a fallback
for i := 0; i < maxFailedUpdates; i++ {
// build up to the edge...
lim.FailedUpdate()
_, _, fallback := lim.Collect()
require.False(t, fallback, "should not be using fallback after %n failed updates", i+1)
}
lim.FailedUpdate() // ... and push it over
_, _, fallback = lim.Collect()
require.True(t, fallback, "%vth update should switch to fallback", maxFailedUpdates+1)

m.EXPECT().Allow().Times(1).Return(true)
assert.True(t, lim.Allow(), "should return fallback's allowed request")
})
t.Run("failing many times does not accidentally switch away from fallback", func(t *testing.T) {
lim := internal.NewFallbackLimiter(nil)
for i := 0; i < maxFailedUpdates*10; i++ {
lim.FailedUpdate()
_, _, fallback := lim.Collect()
require.True(t, fallback, "should use fallback after %v failed updates", i+1)
}
})
})

t.Run("coverage", func(t *testing.T) {
// easy line to cover to bring to 100%
lim := internal.NewFallbackLimiter(nil)
lim.Update(1)
lim.Update(1) // should go down "no changes needed, return early" path
})
}

func TestLimiterNotRacy(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{})
var g errgroup.Group
const loops = 1000
for i := 0; i < loops; i++ {
// clear ~10% of the time
if rand.Intn(10) == 0 {
g.Go(func() error {
lim.Reset()
return nil
})
}
// update ~10% of the time, fail the rest.
// this should randomly clear occasionally via failures.
if rand.Intn(10) == 0 {
g.Go(func() error {
lim.Update(rand.Float64()) // essentially never exercises "skip no update" logic
return nil
})
} else {
g.Go(func() error {
lim.FailedUpdate()
return nil
})
}
// collect occasionally
if rand.Intn(10) == 0 {
g.Go(func() error {
lim.Collect()
return nil
})
}
g.Go(func() error {
lim.Allow()
return nil
})
}
}

var _ internal.Limiter = allowlimiter{}

type allowlimiter struct{}

func (allowlimiter) Allow() bool { return true }
Loading