Skip to content

Commit

Permalink
tenantrate: switch to a single token bucket
Browse files Browse the repository at this point in the history
Tenant rate limiting currently uses four separate token buckets, each
with its own rate and burst limits. In this commit we switch to a
single shared token bucket (see cockroachdb#55114 for motivation).

We use a "cost model" to map read and write requests to "KV Compute
Units". Requests have a base cost plus a per-byte cost. The details
are documented in settings.go. The values were chosen based on
experiments ran by Nathan:
https://docs.google.com/spreadsheets/d/1PPlIcKnusOqWtBoOZVd9xBEMPe5Ss1FgrJlYFgpQZaM/edit#gid=735409177

The rate was chosen so that it maps to 20% of 1 KV vCPU. This keeps
the rate limit on small requests roughly the same as before
(especially on mixed workloads).

The largest departure from the previous limits is that we allow much
more read bytes (the per-byte cost of reads is small in the cost
model). If we were to keep close to the previous limits, the value of
kv.tenant_rate_limiter.read_cost_per_megabyte would be 200 instead of
10.  Perhaps we want to be more conservative here and make this
value somewhere in-between?

Fixes cockroachdb#55114.

Release note: None
  • Loading branch information
RaduBerinde committed Apr 8, 2021
1 parent 10881d2 commit 17bb6f7
Show file tree
Hide file tree
Showing 16 changed files with 386 additions and 470 deletions.
46 changes: 31 additions & 15 deletions pkg/kv/kvserver/client_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,31 +173,50 @@ func TestTenantRateLimiter(t *testing.T) {
mkKey := func() roachpb.Key {
return encoding.EncodeUUIDValue(tablePrefix, 1, uuid.MakeV4())
}

// Ensure that the qps rate limit does not affect the system tenant even for
// the tenant range.
tenantCtx := roachpb.NewContextForTenant(ctx, tenantID)
cfg := tenantrate.LimitConfigsFromSettings(s.ClusterSettings())
for i := 0; i < int(cfg.WriteRequests.Burst); i++ {
cfg := tenantrate.ConfigFromSettings(s.ClusterSettings())

// We don't know the exact size of the write, but we can set lower and upper
// bounds.
writeCostLower := cfg.WriteRequestUnits
writeCostUpper := cfg.WriteRequestUnits + float64(32)*cfg.WriteUnitsPerByte
// burstWrites is a number of writes that don't exceed the burst limit.
burstWrites := int(cfg.Burst / writeCostUpper)
// tooManyWrites is a number of writes which definitely exceed the burst
// limit.
tooManyWrites := int(cfg.Burst/writeCostLower) + 2

// Make sure that writes to the system tenant don't block, even if we
// definitely exceed the burst rate.
for i := 0; i < tooManyWrites; i++ {
require.NoError(t, db.Put(ctx, mkKey(), 0))
}
// Now ensure that in the same instant the write QPS limit does affect the
// tenant. Issuing up to the burst limit of requests can happen without
// blocking.
for i := 0; i < int(cfg.WriteRequests.Burst); i++ {
// tenant. First issue requests that can happen without blocking.
for i := 0; i < burstWrites; i++ {
require.NoError(t, db.Put(tenantCtx, mkKey(), 0))
}
// Attempt to issue another request, make sure that it gets blocked by
// observing a timer.
errCh := make(chan error, 1)
go func() { errCh <- db.Put(tenantCtx, mkKey(), 0) }()
expectedTimer := t0.Add(time.Duration(float64(1/cfg.WriteRequests.Rate) * float64(time.Second)))
go func() {
// Issue enough requests so that one has to block.
for i := burstWrites; i < tooManyWrites; i++ {
if err := db.Put(tenantCtx, mkKey(), 0); err != nil {
errCh <- err
return
}
}
errCh <- nil
}()

testutils.SucceedsSoon(t, func() error {
timers := timeSource.Timers()
if len(timers) != 1 {
return errors.Errorf("seeing %d timers: %v", len(timers), timers)
}
require.EqualValues(t, expectedTimer, timers[0])
return nil
})

Expand All @@ -220,14 +239,11 @@ func TestTenantRateLimiter(t *testing.T) {
return fmt.Sprintf("%s %d", tenantMetricStr, expCount)
}

// Ensure that the metric for the admitted requests is equal to the number of
// requests which we've admitted.
require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst))

// Allow the blocked request to proceed.
timeSource.Advance(time.Second)
require.NoError(t, <-errCh)

// Ensure that it is now reflected in the metrics.
require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst+1))
// Ensure that the metric for the admitted requests reflects the number of
// admitted requests.
require.Contains(t, getMetrics(), makeMetricStr(int64(tooManyWrites)))
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/tenantrate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
embed = [":tenantrate"],
deps = [
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/util/leaktest",
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/tenantrate/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type LimiterFactory struct {
systemLimiter systemLimiter
mu struct {
syncutil.RWMutex
limits LimitConfigs
limits Config
tenants map[roachpb.TenantID]*refCountedLimiter
}
}
Expand All @@ -53,12 +53,12 @@ func NewLimiterFactory(st *cluster.Settings, knobs *TestingKnobs) *LimiterFactor
rl.knobs = *knobs
}
rl.mu.tenants = make(map[roachpb.TenantID]*refCountedLimiter)
rl.mu.limits = LimitConfigsFromSettings(st)
rl.mu.limits = ConfigFromSettings(st)
rl.systemLimiter = systemLimiter{
tenantMetrics: rl.metrics.tenantMetrics(roachpb.SystemTenantID),
}
for _, setOnChange := range settingsSetOnChangeFuncs {
setOnChange(&st.SV, rl.updateLimits)
for _, setting := range configSettings {
setting.SetOnChange(&st.SV, rl.updateConfig)
}
return rl
}
Expand Down Expand Up @@ -114,12 +114,12 @@ func (rl *LimiterFactory) Release(lim Limiter) {
}
}

func (rl *LimiterFactory) updateLimits() {
func (rl *LimiterFactory) updateConfig() {
rl.mu.Lock()
defer rl.mu.Unlock()
rl.mu.limits = LimitConfigsFromSettings(rl.settings)
rl.mu.limits = ConfigFromSettings(rl.settings)
for _, rcLim := range rl.mu.tenants {
rcLim.lim.updateLimits(rl.mu.limits)
rcLim.lim.updateConfig(rl.mu.limits)
}
}

Expand Down
75 changes: 43 additions & 32 deletions pkg/kv/kvserver/tenantrate/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,51 @@

package tenantrate

import "github.com/cockroachdb/cockroach/pkg/settings/cluster"

// OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the
// settings.
func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) {
readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate))
readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst)
writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate))
writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst)
readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate))
readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst)
writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate))
writeBurstLimit.Override(&settings.SV, rl.WriteBytes.Burst)
import "github.com/cockroachdb/cockroach/pkg/settings"

// SettingValues is a struct that can be populated from test files, via YAML.
type SettingValues struct {
Rate float64
Burst float64

Read Factors
Write Factors
}

// Factors for reads and writes.
type Factors struct {
Base float64
PerByte float64
}

// OverrideSettings sets the cluster setting according to the given
// settingValues.
//
// Uninitialized (zero) values are ignored.
func OverrideSettings(sv *settings.Values, vals SettingValues) {
override := func(setting *settings.FloatSetting, val float64) {
if val != 0 {
setting.Override(sv, val)
}
}
override(kvcuRateLimit, vals.Rate)
override(kvcuBurstLimitSeconds, vals.Burst/kvcuRateLimit.Get(sv))

override(readRequestCost, vals.Read.Base)
override(readCostPerMB, vals.Read.PerByte*1024*1024)
override(writeRequestCost, vals.Write.Base)
override(writeCostPerMB, vals.Write.PerByte*1024*1024)
}

// DefaultLimitConfigs returns the configuration that corresponds to the default
// DefaultConfig returns the configuration that corresponds to the default
// setting values.
func DefaultLimitConfigs() LimitConfigs {
return LimitConfigs{
ReadRequests: LimitConfig{
Rate: Limit(readRequestRateLimit.Default()),
Burst: readRequestBurstLimit.Default(),
},
WriteRequests: LimitConfig{
Rate: Limit(writeRequestRateLimit.Default()),
Burst: writeRequestBurstLimit.Default(),
},
ReadBytes: LimitConfig{
Rate: Limit(readRateLimit.Default()),
Burst: readBurstLimit.Default(),
},
WriteBytes: LimitConfig{
Rate: Limit(writeRateLimit.Default()),
Burst: writeBurstLimit.Default(),
},
func DefaultConfig() Config {
return Config{
Rate: kvcuRateLimit.Default(),
Burst: kvcuRateLimit.Default() * kvcuBurstLimitSeconds.Default(),
ReadRequestUnits: readRequestCost.Default(),
ReadUnitsPerByte: readCostPerMB.Default() / (1024 * 1024),
WriteRequestUnits: writeRequestCost.Default(),
WriteUnitsPerByte: writeCostPerMB.Default() / (1024 * 1024),
}
}
Loading

0 comments on commit 17bb6f7

Please sign in to comment.