From 17bb6f708368fde9415c1d5b4c0e7ca261657d3e Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 1 Apr 2021 08:05:49 -0700 Subject: [PATCH] tenantrate: switch to a single token bucket Tenant rate limiting currently uses four separate token buckets, each with its own rate and burst limits. In this commit we switch to a single shared token bucket (see #55114 for motivation). We use a "cost model" to map read and write requests to "KV Compute Units". Requests have a base cost plus a per-byte cost. The details are documented in settings.go. The values were chosen based on experiments ran by Nathan: https://docs.google.com/spreadsheets/d/1PPlIcKnusOqWtBoOZVd9xBEMPe5Ss1FgrJlYFgpQZaM/edit#gid=735409177 The rate was chosen so that it maps to 20% of 1 KV vCPU. This keeps the rate limit on small requests roughly the same as before (especially on mixed workloads). The largest departure from the previous limits is that we allow much more read bytes (the per-byte cost of reads is small in the cost model). If we were to keep close to the previous limits, the value of kv.tenant_rate_limiter.read_cost_per_megabyte would be 200 instead of 10. Perhaps we want to be more conservative here and make this value somewhere in-between? Fixes #55114. Release note: None --- pkg/kv/kvserver/client_tenant_test.go | 46 ++-- pkg/kv/kvserver/tenantrate/BUILD.bazel | 1 + pkg/kv/kvserver/tenantrate/factory.go | 14 +- pkg/kv/kvserver/tenantrate/helpers_test.go | 75 +++--- pkg/kv/kvserver/tenantrate/limiter.go | 254 +++++------------- pkg/kv/kvserver/tenantrate/limiter_test.go | 53 ++-- pkg/kv/kvserver/tenantrate/settings.go | 180 +++++++------ pkg/kv/kvserver/tenantrate/testdata/basic | 47 ++-- pkg/kv/kvserver/tenantrate/testdata/burst | 33 ++- pkg/kv/kvserver/tenantrate/testdata/cancel | 16 +- .../tenantrate/testdata/estimate_iops | 16 +- pkg/kv/kvserver/tenantrate/testdata/reads | 34 +-- pkg/kv/kvserver/tenantrate/testdata/update | 26 +- pkg/sql/logictest/logic.go | 25 +- pkg/util/quotapool/config.go | 29 +- pkg/util/quotapool/quotapool.go | 7 + 16 files changed, 386 insertions(+), 470 deletions(-) diff --git a/pkg/kv/kvserver/client_tenant_test.go b/pkg/kv/kvserver/client_tenant_test.go index c969a57a61cf..a938f1d29703 100644 --- a/pkg/kv/kvserver/client_tenant_test.go +++ b/pkg/kv/kvserver/client_tenant_test.go @@ -173,31 +173,50 @@ func TestTenantRateLimiter(t *testing.T) { mkKey := func() roachpb.Key { return encoding.EncodeUUIDValue(tablePrefix, 1, uuid.MakeV4()) } - // Ensure that the qps rate limit does not affect the system tenant even for // the tenant range. tenantCtx := roachpb.NewContextForTenant(ctx, tenantID) - cfg := tenantrate.LimitConfigsFromSettings(s.ClusterSettings()) - for i := 0; i < int(cfg.WriteRequests.Burst); i++ { + cfg := tenantrate.ConfigFromSettings(s.ClusterSettings()) + + // We don't know the exact size of the write, but we can set lower and upper + // bounds. + writeCostLower := cfg.WriteRequestUnits + writeCostUpper := cfg.WriteRequestUnits + float64(32)*cfg.WriteUnitsPerByte + // burstWrites is a number of writes that don't exceed the burst limit. + burstWrites := int(cfg.Burst / writeCostUpper) + // tooManyWrites is a number of writes which definitely exceed the burst + // limit. + tooManyWrites := int(cfg.Burst/writeCostLower) + 2 + + // Make sure that writes to the system tenant don't block, even if we + // definitely exceed the burst rate. + for i := 0; i < tooManyWrites; i++ { require.NoError(t, db.Put(ctx, mkKey(), 0)) } // Now ensure that in the same instant the write QPS limit does affect the - // tenant. Issuing up to the burst limit of requests can happen without - // blocking. - for i := 0; i < int(cfg.WriteRequests.Burst); i++ { + // tenant. First issue requests that can happen without blocking. + for i := 0; i < burstWrites; i++ { require.NoError(t, db.Put(tenantCtx, mkKey(), 0)) } // Attempt to issue another request, make sure that it gets blocked by // observing a timer. errCh := make(chan error, 1) - go func() { errCh <- db.Put(tenantCtx, mkKey(), 0) }() - expectedTimer := t0.Add(time.Duration(float64(1/cfg.WriteRequests.Rate) * float64(time.Second))) + go func() { + // Issue enough requests so that one has to block. + for i := burstWrites; i < tooManyWrites; i++ { + if err := db.Put(tenantCtx, mkKey(), 0); err != nil { + errCh <- err + return + } + } + errCh <- nil + }() + testutils.SucceedsSoon(t, func() error { timers := timeSource.Timers() if len(timers) != 1 { return errors.Errorf("seeing %d timers: %v", len(timers), timers) } - require.EqualValues(t, expectedTimer, timers[0]) return nil }) @@ -220,14 +239,11 @@ func TestTenantRateLimiter(t *testing.T) { return fmt.Sprintf("%s %d", tenantMetricStr, expCount) } - // Ensure that the metric for the admitted requests is equal to the number of - // requests which we've admitted. - require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst)) - // Allow the blocked request to proceed. timeSource.Advance(time.Second) require.NoError(t, <-errCh) - // Ensure that it is now reflected in the metrics. - require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst+1)) + // Ensure that the metric for the admitted requests reflects the number of + // admitted requests. + require.Contains(t, getMetrics(), makeMetricStr(int64(tooManyWrites))) } diff --git a/pkg/kv/kvserver/tenantrate/BUILD.bazel b/pkg/kv/kvserver/tenantrate/BUILD.bazel index b39c508e21e6..3bb29bd2e241 100644 --- a/pkg/kv/kvserver/tenantrate/BUILD.bazel +++ b/pkg/kv/kvserver/tenantrate/BUILD.bazel @@ -36,6 +36,7 @@ go_test( embed = [":tenantrate"], deps = [ "//pkg/roachpb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/testutils", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/tenantrate/factory.go b/pkg/kv/kvserver/tenantrate/factory.go index cd075e64bfc6..890d88a5566f 100644 --- a/pkg/kv/kvserver/tenantrate/factory.go +++ b/pkg/kv/kvserver/tenantrate/factory.go @@ -32,7 +32,7 @@ type LimiterFactory struct { systemLimiter systemLimiter mu struct { syncutil.RWMutex - limits LimitConfigs + limits Config tenants map[roachpb.TenantID]*refCountedLimiter } } @@ -53,12 +53,12 @@ func NewLimiterFactory(st *cluster.Settings, knobs *TestingKnobs) *LimiterFactor rl.knobs = *knobs } rl.mu.tenants = make(map[roachpb.TenantID]*refCountedLimiter) - rl.mu.limits = LimitConfigsFromSettings(st) + rl.mu.limits = ConfigFromSettings(st) rl.systemLimiter = systemLimiter{ tenantMetrics: rl.metrics.tenantMetrics(roachpb.SystemTenantID), } - for _, setOnChange := range settingsSetOnChangeFuncs { - setOnChange(&st.SV, rl.updateLimits) + for _, setting := range configSettings { + setting.SetOnChange(&st.SV, rl.updateConfig) } return rl } @@ -114,12 +114,12 @@ func (rl *LimiterFactory) Release(lim Limiter) { } } -func (rl *LimiterFactory) updateLimits() { +func (rl *LimiterFactory) updateConfig() { rl.mu.Lock() defer rl.mu.Unlock() - rl.mu.limits = LimitConfigsFromSettings(rl.settings) + rl.mu.limits = ConfigFromSettings(rl.settings) for _, rcLim := range rl.mu.tenants { - rcLim.lim.updateLimits(rl.mu.limits) + rcLim.lim.updateConfig(rl.mu.limits) } } diff --git a/pkg/kv/kvserver/tenantrate/helpers_test.go b/pkg/kv/kvserver/tenantrate/helpers_test.go index 96d4701b91fd..82f4b94d925a 100644 --- a/pkg/kv/kvserver/tenantrate/helpers_test.go +++ b/pkg/kv/kvserver/tenantrate/helpers_test.go @@ -10,40 +10,51 @@ package tenantrate -import "github.com/cockroachdb/cockroach/pkg/settings/cluster" - -// OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the -// settings. -func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) { - readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate)) - readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst) - writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate)) - writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst) - readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate)) - readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst) - writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate)) - writeBurstLimit.Override(&settings.SV, rl.WriteBytes.Burst) +import "github.com/cockroachdb/cockroach/pkg/settings" + +// SettingValues is a struct that can be populated from test files, via YAML. +type SettingValues struct { + Rate float64 + Burst float64 + + Read Factors + Write Factors +} + +// Factors for reads and writes. +type Factors struct { + Base float64 + PerByte float64 +} + +// OverrideSettings sets the cluster setting according to the given +// settingValues. +// +// Uninitialized (zero) values are ignored. +func OverrideSettings(sv *settings.Values, vals SettingValues) { + override := func(setting *settings.FloatSetting, val float64) { + if val != 0 { + setting.Override(sv, val) + } + } + override(kvcuRateLimit, vals.Rate) + override(kvcuBurstLimitSeconds, vals.Burst/kvcuRateLimit.Get(sv)) + + override(readRequestCost, vals.Read.Base) + override(readCostPerMB, vals.Read.PerByte*1024*1024) + override(writeRequestCost, vals.Write.Base) + override(writeCostPerMB, vals.Write.PerByte*1024*1024) } -// DefaultLimitConfigs returns the configuration that corresponds to the default +// DefaultConfig returns the configuration that corresponds to the default // setting values. -func DefaultLimitConfigs() LimitConfigs { - return LimitConfigs{ - ReadRequests: LimitConfig{ - Rate: Limit(readRequestRateLimit.Default()), - Burst: readRequestBurstLimit.Default(), - }, - WriteRequests: LimitConfig{ - Rate: Limit(writeRequestRateLimit.Default()), - Burst: writeRequestBurstLimit.Default(), - }, - ReadBytes: LimitConfig{ - Rate: Limit(readRateLimit.Default()), - Burst: readBurstLimit.Default(), - }, - WriteBytes: LimitConfig{ - Rate: Limit(writeRateLimit.Default()), - Burst: writeBurstLimit.Default(), - }, +func DefaultConfig() Config { + return Config{ + Rate: kvcuRateLimit.Default(), + Burst: kvcuRateLimit.Default() * kvcuBurstLimitSeconds.Default(), + ReadRequestUnits: readRequestCost.Default(), + ReadUnitsPerByte: readCostPerMB.Default() / (1024 * 1024), + WriteRequestUnits: writeRequestCost.Default(), + WriteUnitsPerByte: writeCostPerMB.Default() / (1024 * 1024), } } diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index f3e2e40b6a97..034d8e8f02a6 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -46,13 +46,11 @@ import ( // The Limiter is backed by a FIFO queue which provides fairness. type Limiter interface { - // Wait acquires n quota from the limiter. This acquisition cannot be - // released. Each call to wait will consume 1 read or write request - // depending on isWrite, 1 read byte, and writeBytes from the token buckets. - // Calls to Wait will block until the buckets contain adequate resources. If - // a request attempts to write more than the burst limit, it will wait until - // the bucket is completely full before acquiring the requested quantity and - // putting the limiter in debt. + // Wait acquires the quota necessary to admit a read or write request. This + // acquisition cannot be released. Calls to Wait will block until the buckets + // contain adequate resources. If a request attempts to write more than the + // burst limit, it will wait until the bucket is completely full before + // acquiring the requested quantity and putting the limiter in debt. // // The only errors which should be returned are due to the context. Wait(ctx context.Context, isWrite bool, writeBytes int64) error @@ -75,7 +73,7 @@ type limiter struct { func (rl *limiter) init( parent *LimiterFactory, tenantID roachpb.TenantID, - conf LimitConfigs, + config Config, metrics tenantMetrics, options ...quotapool.Option, ) { @@ -84,51 +82,53 @@ func (rl *limiter) init( tenantID: tenantID, metrics: metrics, } - buckets := tokenBuckets{ - readRequests: makeTokenBucket(conf.ReadRequests), - writeRequests: makeTokenBucket(conf.WriteRequests), - readBytes: makeTokenBucket(conf.ReadBytes), - writeBytes: makeTokenBucket(conf.WriteBytes), - } - options = append(options, quotapool.OnAcquisition(func( - ctx context.Context, poolName string, r quotapool.Request, start time.Time, - ) { - req := r.(*waitRequest) - if req.readRequests > 0 { - rl.metrics.readRequestsAdmitted.Inc(req.readRequests) - } - if req.writeRequests > 0 { - rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests) - } - // Accounted for in limiter.RecordRead. - // if req.readBytes > 0 { - // rl.metrics.readBytesAdmitted.Inc(req.readBytes) - // } - if req.writeBytes > 0 { - rl.metrics.writeBytesAdmitted.Inc(req.writeBytes) - } - })) - rl.qp = quotapool.New(tenantID.String(), &buckets, options...) - buckets.clock = rl.qp.TimeSource() - buckets.lastUpdated = buckets.clock.Now() -} - + // Note: if multiple token buckets are needed, consult the history of + // this file as of 0e70529f84 for a sample implementation. + bucket := &tokenBucket{} + + options = append(options, quotapool.OnWait( + func(ctx context.Context, poolName string, r quotapool.Request) { + rl.metrics.currentBlocked.Inc(1) + }, + func(ctx context.Context, poolName string, r quotapool.Request) { + rl.metrics.currentBlocked.Dec(1) + }, + )) + + // There is a lot of overlap with quotapool.RateLimiter, but we can't use it + // directly without separate synchronization for the Config. + rl.qp = quotapool.New(tenantID.String(), bucket, options...) + bucket.init(config, rl.qp.TimeSource()) +} + +// Wait is part of the Limiter interface. func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { - rl.metrics.currentBlocked.Inc(1) - defer rl.metrics.currentBlocked.Dec(1) r := newWaitRequest(isWrite, writeBytes) defer putWaitRequest(r) + if err := rl.qp.Acquire(ctx, r); err != nil { return err } + + if isWrite { + rl.metrics.writeRequestsAdmitted.Inc(1) + rl.metrics.writeBytesAdmitted.Inc(writeBytes) + } else { + // We don't know how much we will read; the bytes will be accounted for + // after the fact in RecordRead. + rl.metrics.readRequestsAdmitted.Inc(1) + } + return nil } +// RecordRead is part of the Limiter interface. func (rl *limiter) RecordRead(ctx context.Context, readBytes int64) { rl.metrics.readBytesAdmitted.Inc(readBytes) rl.qp.Update(func(res quotapool.Resource) (shouldNotify bool) { - rb := res.(*tokenBuckets) - rb.readBytes.tokens -= float64(readBytes) + tb := res.(*tokenBucket) + amount := float64(readBytes) * tb.config.ReadUnitsPerByte + tb.Adjust(quotapool.Tokens(-amount)) // Do not notify the head of the queue. In the best case we did not disturb // the time at which it can be fulfilled and in the worst case, we made it // further in the future. @@ -136,148 +136,40 @@ func (rl *limiter) RecordRead(ctx context.Context, readBytes int64) { }) } -// updateLimits is used by the factory to inform the limiter of a new -// configuration. -func (rl *limiter) updateLimits(l LimitConfigs) { +func (rl *limiter) updateConfig(config Config) { rl.qp.Update(func(res quotapool.Resource) (shouldNotify bool) { - rb := res.(*tokenBuckets) - // Account for the accumulation since lastUpdate and now under the old - // configuration. - rb.update() - - rb.readRequests.setConf(l.ReadRequests) - rb.writeRequests.setConf(l.WriteRequests) - rb.readBytes.setConf(l.ReadBytes) - rb.writeBytes.setConf(l.WriteBytes) + tb := res.(*tokenBucket) + tb.config = config + tb.UpdateConfig(quotapool.TokensPerSecond(config.Rate), quotapool.Tokens(config.Burst)) return true }) } -// tokenBuckets is the implementation of Resource which remains in the quotapool -// for a limiter. -type tokenBuckets struct { - clock timeutil.TimeSource - lastUpdated time.Time - readRequests tokenBucket - writeRequests tokenBucket - readBytes tokenBucket - writeBytes tokenBucket -} - -var _ quotapool.Resource = (*tokenBuckets)(nil) - -func (rb *tokenBuckets) update() { - now := rb.clock.Now() - - // Update token bucket capacity given the passage of clock. - // TODO(ajwerner): Consider instituting a minimum update frequency to avoid - // spinning too fast on timers for tons of tiny allocations at a fast rate. - if since := now.Sub(rb.lastUpdated); since > 0 { - rb.readRequests.update(since) - rb.writeRequests.update(since) - rb.readBytes.update(since) - rb.writeBytes.update(since) - rb.lastUpdated = now - } -} - -// check determines whether a request can be fulfilled by the given tokens in -// the bucket. If not, it determines when the buckets will be adequately full -// to fulfill the request. -func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter time.Duration) { - fulfilled = true - check := func(t *tokenBucket, needed int64) { - if ok, after := t.check(needed); !ok { - fulfilled = false - if after > tryAgainAfter { - tryAgainAfter = after - } - } - } - check(&rb.readRequests, req.readRequests) - check(&rb.writeRequests, req.writeRequests) - check(&rb.readBytes, req.readBytes) - check(&rb.writeBytes, req.writeBytes) - return fulfilled, tryAgainAfter -} - -func (rb *tokenBuckets) subtract(req *waitRequest) { - rb.readRequests.tokens -= float64(req.readRequests) - rb.writeRequests.tokens -= float64(req.writeRequests) - rb.readBytes.tokens -= float64(req.readBytes) - rb.writeBytes.tokens -= float64(req.writeBytes) -} - -// tokenBucket represents a token bucket for a given resource and its associated -// configuration. +// tokenBucket represents the token bucket for KV Compute Units and its +// associated configuration. It implements quotapool.Resource. type tokenBucket struct { - LimitConfig - tokens float64 -} - -func makeTokenBucket(rl LimitConfig) tokenBucket { - return tokenBucket{ - LimitConfig: rl, - tokens: float64(rl.Burst), - } -} - -// update applies the positive time delta update for the resource. -func (t *tokenBucket) update(deltaT time.Duration) { - t.tokens += float64(t.Rate) * deltaT.Seconds() - t.clampTokens() -} - -// checkQuota returns whether needed will be satisfied by quota. Note that the -// definition of satisfied is either that the integer part of quota exceeds -// needed or that quota is equal to the burst. This is because we want to -// have request put the rate limiter in debt rather than prevent execution of -// requests. -// -// If the request is not satisfied, the amount of clock that must be waited for -// the request to be satisfied at the current rate is returned. -func (t *tokenBucket) check(needed int64) (fulfilled bool, tryAgainAfter time.Duration) { - if q := int64(t.tokens); needed <= q || q == t.Burst { - return true, 0 - } + quotapool.TokenBucket - // We'll calculate the amount of clock until the quota is full if we're - // requesting more than the burst limit. - if needed > t.Burst { - needed = t.Burst - } - delta := float64(needed) - t.tokens - tryAgainAfter = time.Duration((delta * float64(time.Second)) / float64(t.Rate)) - return false, tryAgainAfter + config Config } -// setConf updates the configuration for a tokenBucket. -// -// TODO(ajwerner): It seems possible that when adding or reducing the burst -// values that we might want to remove those values from the token bucket. -// It's not obvious that we want to add tokens when increasing the burst as -// that might lead to a big spike in load immediately upon increasing this -// limit. -func (t *tokenBucket) setConf(rl LimitConfig) { - t.LimitConfig = rl - t.clampTokens() -} +var _ quotapool.Resource = (*tokenBucket)(nil) -// clampTokens ensures that tokens does not exceed burst. -func (t *tokenBucket) clampTokens() { - if burst := float64(t.Burst); t.tokens > burst { - t.tokens = burst - } +func (tb *tokenBucket) init(config Config, timeSource timeutil.TimeSource) { + tb.TokenBucket.Init( + quotapool.TokensPerSecond(config.Rate), quotapool.Tokens(config.Burst), timeSource, + ) + tb.config = config } // waitRequest is used to wait for adequate resources in the tokenBuckets. type waitRequest struct { - readRequests int64 - writeRequests int64 - writeBytes int64 - readBytes int64 + isWrite bool + writeBytes int64 } +var _ quotapool.Request = (*waitRequest)(nil) + var waitRequestSyncPool = sync.Pool{ New: func() interface{} { return new(waitRequest) }, } @@ -287,15 +179,8 @@ var waitRequestSyncPool = sync.Pool{ func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest { r := waitRequestSyncPool.Get().(*waitRequest) *r = waitRequest{ - readRequests: 0, - writeRequests: 0, - readBytes: 1, - writeBytes: writeBytes, - } - if isWrite { - r.writeRequests = 1 - } else { - r.readRequests = 1 + isWrite: isWrite, + writeBytes: writeBytes, } return r } @@ -305,18 +190,23 @@ func putWaitRequest(r *waitRequest) { waitRequestSyncPool.Put(r) } +// Acquire is part of quotapool.Request. func (req *waitRequest) Acquire( ctx context.Context, res quotapool.Resource, ) (fulfilled bool, tryAgainAfter time.Duration) { - r := res.(*tokenBuckets) - r.update() - if fulfilled, tryAgainAfter = r.check(req); !fulfilled { - return false, tryAgainAfter + tb := res.(*tokenBucket) + var needed float64 + if req.isWrite { + needed = tb.config.WriteRequestUnits + float64(req.writeBytes)*tb.config.WriteUnitsPerByte + } else { + // We don't know the size of the read upfront; we will adjust the bucket + // after the fact in RecordRead. + needed = tb.config.ReadRequestUnits } - r.subtract(req) - return true, 0 + return tb.TryToFulfill(quotapool.Tokens(needed)) } +// ShouldWait is part of quotapool.Request. func (req *waitRequest) ShouldWait() bool { return true } diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index 87ddf3ff609f..4fe4ca8cd154 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -15,7 +15,6 @@ import ( "bytes" "context" "fmt" - "math" "regexp" "sort" "strings" @@ -50,9 +49,9 @@ func TestCloser(t *testing.T) { limiter := factory.GetTenant(tenant, closer) ctx := context.Background() // First Wait call will not block. - require.NoError(t, limiter.Wait(ctx, false, 1)) + require.NoError(t, limiter.Wait(ctx, true, 1)) errCh := make(chan error, 1) - go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }() + go func() { errCh <- limiter.Wait(ctx, true, 1<<30) }() testutils.SucceedsSoon(t, func() error { if timers := timeSource.Timers(); len(timers) != 1 { return errors.Errorf("expected 1 timer, found %d", len(timers)) @@ -145,9 +144,8 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string { ts.tenants = make(map[roachpb.TenantID][]tenantrate.Limiter) ts.clock = timeutil.NewManualTime(t0) ts.settings = cluster.MakeTestingClusterSettings() - limits := tenantrate.LimitConfigsFromSettings(ts.settings) - parseLimits(t, d, &limits) - tenantrate.OverrideSettingsWithRateLimits(ts.settings, limits) + settings := parseSettings(t, d) + tenantrate.OverrideSettings(&ts.settings.SV, settings) ts.rl = tenantrate.NewLimiterFactory(ts.settings, &tenantrate.TestingKnobs{ TimeSource: ts.clock, }) @@ -160,9 +158,8 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string { // yaml object representing the limits and updates accordingly. It returns // the current time. See init for more details as the semantics are the same. func (ts *testState) updateSettings(t *testing.T, d *datadriven.TestData) string { - limits := tenantrate.LimitConfigsFromSettings(ts.settings) - parseLimits(t, d, &limits) - tenantrate.OverrideSettingsWithRateLimits(ts.settings, limits) + settings := parseSettings(t, d) + tenantrate.OverrideSettings(&ts.settings.SV, settings) return ts.formatTime() } @@ -366,11 +363,11 @@ func (ts *testState) metrics(t *testing.T, d *datadriven.TestData) string { if err := testutils.SucceedsSoonError(func() error { got := ts.getMetricsText(t, d) if got != exp { - return errors.Errorf("got: %q, exp: %q", got, exp) + return errors.Errorf("got:\n%s\nexp:\n%s\n", got, exp) } return nil }); err != nil { - d.Fatalf(t, "failed to find expected timers: %v", err) + d.Fatalf(t, "failed to find expected metrics: %v", err) } return d.Expected } @@ -513,28 +510,18 @@ func (ts *testState) estimateIOPS(t *testing.T, d *datadriven.TestData) string { if workload.ReadPercentage < 0 || workload.ReadPercentage > 100 { d.Fatalf(t, "Invalid read percentage %d", workload.ReadPercentage) } - limits := tenantrate.DefaultLimitConfigs() + config := tenantrate.DefaultConfig() - calculateIOPS := func(readRate, readBytesRate, writeRate, writeBytesRate float64) float64 { - readIOPS := math.Min(readRate, readBytesRate/float64(workload.ReadSize)) - writeIOPS := math.Min(writeRate, writeBytesRate/float64(workload.WriteSize)) - // The reads and writes are rate-limited separately; our workload will be - // bottlenecked on one of them. - return math.Min( - writeIOPS*100.0/float64(100-workload.ReadPercentage), - readIOPS*100.0/float64(workload.ReadPercentage), - ) + calculateIOPS := func(rate float64) float64 { + readCost := config.ReadRequestUnits + float64(workload.ReadSize)*config.ReadUnitsPerByte + writeCost := config.WriteRequestUnits + float64(workload.WriteSize)*config.WriteUnitsPerByte + readFraction := float64(workload.ReadPercentage) / 100.0 + avgCost := readFraction*readCost + (1-readFraction)*writeCost + return rate / avgCost } - sustained := calculateIOPS( - float64(limits.ReadRequests.Rate), float64(limits.ReadBytes.Rate), - float64(limits.WriteRequests.Rate), float64(limits.WriteBytes.Rate), - ) - - burst := calculateIOPS( - float64(limits.ReadRequests.Burst), float64(limits.ReadBytes.Burst), - float64(limits.WriteRequests.Burst), float64(limits.WriteBytes.Burst), - ) + sustained := calculateIOPS(config.Rate) + burst := calculateIOPS(config.Burst) fmtFloat := func(val float64) string { if val < 10 { return fmt.Sprintf("%.1f", val) @@ -592,10 +579,12 @@ func parseTenantIDs(t *testing.T, d *datadriven.TestData) []uint64 { return tenantIDs } -func parseLimits(t *testing.T, d *datadriven.TestData, limits *tenantrate.LimitConfigs) { - if err := yaml.UnmarshalStrict([]byte(d.Input), &limits); err != nil { +func parseSettings(t *testing.T, d *datadriven.TestData) tenantrate.SettingValues { + var vals tenantrate.SettingValues + if err := yaml.UnmarshalStrict([]byte(d.Input), &vals); err != nil { d.Fatalf(t, "failed to unmarshal limits: %v", err) } + return vals } func parseStrings(t *testing.T, d *datadriven.TestData) []string { diff --git a/pkg/kv/kvserver/tenantrate/settings.go b/pkg/kv/kvserver/tenantrate/settings.go index 938b4c785f50..22dfd8aa8cb8 100644 --- a/pkg/kv/kvserver/tenantrate/settings.go +++ b/pkg/kv/kvserver/tenantrate/settings.go @@ -15,107 +15,115 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" ) -// Limit defines a rate in units per second. -type Limit float64 - -// LimitConfig configures the rate limit and burst limit for a given resource. -type LimitConfig struct { - Rate Limit - Burst int64 -} - -// LimitConfigs configures the rate limits. -// It is exported for convenience and testing. -// The values are derived from cluster settings. -type LimitConfigs struct { - ReadRequests LimitConfig - WriteRequests LimitConfig - ReadBytes LimitConfig - WriteBytes LimitConfig -} +// Config contains the configuration of the rate limiter. +// +// We limit the rate in terms of "KV Compute Units". The configuration contains +// the rate and burst limits for KVCUs, as well as factors that define a "cost +// mode" for calculating the number of KVCUs for a read or write request. +// +// Specifically, the cost model is a linear function combining a fixed +// pre-request cost and a size-dependent (per-byte) cost. +// +// For a read: +// KVCUs = ReadRequestUnits + * ReadUnitsPerByte +// For a write: +// KVCUs = WriteRequestUnits +