diff --git a/pkg/kv/kvserver/tenantrate/factory.go b/pkg/kv/kvserver/tenantrate/factory.go index cd075e64bfc6..890d88a5566f 100644 --- a/pkg/kv/kvserver/tenantrate/factory.go +++ b/pkg/kv/kvserver/tenantrate/factory.go @@ -32,7 +32,7 @@ type LimiterFactory struct { systemLimiter systemLimiter mu struct { syncutil.RWMutex - limits LimitConfigs + limits Config tenants map[roachpb.TenantID]*refCountedLimiter } } @@ -53,12 +53,12 @@ func NewLimiterFactory(st *cluster.Settings, knobs *TestingKnobs) *LimiterFactor rl.knobs = *knobs } rl.mu.tenants = make(map[roachpb.TenantID]*refCountedLimiter) - rl.mu.limits = LimitConfigsFromSettings(st) + rl.mu.limits = ConfigFromSettings(st) rl.systemLimiter = systemLimiter{ tenantMetrics: rl.metrics.tenantMetrics(roachpb.SystemTenantID), } - for _, setOnChange := range settingsSetOnChangeFuncs { - setOnChange(&st.SV, rl.updateLimits) + for _, setting := range configSettings { + setting.SetOnChange(&st.SV, rl.updateConfig) } return rl } @@ -114,12 +114,12 @@ func (rl *LimiterFactory) Release(lim Limiter) { } } -func (rl *LimiterFactory) updateLimits() { +func (rl *LimiterFactory) updateConfig() { rl.mu.Lock() defer rl.mu.Unlock() - rl.mu.limits = LimitConfigsFromSettings(rl.settings) + rl.mu.limits = ConfigFromSettings(rl.settings) for _, rcLim := range rl.mu.tenants { - rcLim.lim.updateLimits(rl.mu.limits) + rcLim.lim.updateConfig(rl.mu.limits) } } diff --git a/pkg/kv/kvserver/tenantrate/helpers_test.go b/pkg/kv/kvserver/tenantrate/helpers_test.go index 81f29ec0ff94..c42526917f53 100644 --- a/pkg/kv/kvserver/tenantrate/helpers_test.go +++ b/pkg/kv/kvserver/tenantrate/helpers_test.go @@ -10,17 +10,38 @@ package tenantrate -import "github.com/cockroachdb/cockroach/pkg/settings/cluster" +import "github.com/cockroachdb/cockroach/pkg/settings" -// OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the -// settings. -func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) { - readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate)) - readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst) - writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate)) - writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst) - readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate)) - readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst) - writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate)) - writeBurstLimit.Override(&settings.SV, rl.WriteBytes.Burst) +// SettingValues is a struct that can be populated from test files, via YAML. +type SettingValues struct { + Rate float64 + Burst float64 + + Read Factors + Write Factors +} + +// Factors for reads and writes. +type Factors struct { + Base float64 + PerByte float64 +} + +// OverrideSettings sets the cluster setting according to the given +// settingValues. +// +// Uninitialized (zero) values are ignored. +func OverrideSettings(sv *settings.Values, vals SettingValues) { + override := func(setting *settings.FloatSetting, val float64) { + if val != 0 { + setting.Override(sv, val) + } + } + override(kvcuRateLimit, vals.Rate) + override(kvcuBurstLimitSeconds, vals.Burst/kvcuRateLimit.Get(sv)) + + override(readRequestCost, vals.Read.Base) + override(readCostPerMB, vals.Read.PerByte*1024*1024) + override(writeRequestCost, vals.Write.Base) + override(writeCostPerMB, vals.Write.PerByte*1024*1024) } diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index 5276840e0f22..f22f4254d3fd 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -47,13 +47,11 @@ import ( // The Limiter is backed by a FIFO queue which provides fairness. type Limiter interface { - // Wait acquires n quota from the limiter. This acquisition cannot be - // released. Each call to wait will consume 1 read or write request - // depending on isWrite, 1 read byte, and writeBytes from the token buckets. - // Calls to Wait will block until the buckets contain adequate resources. If - // a request attempts to write more than the burst limit, it will wait until - // the bucket is completely full before acquiring the requested quantity and - // putting the limiter in debt. + // Wait acquires the quota necessary to admit a read or write request. This + // acquisition cannot be released. Calls to Wait will block until the buckets + // contain adequate resources. If a request attempts to write more than the + // burst limit, it will wait until the bucket is completely full before + // acquiring the requested quantity and putting the limiter in debt. // // The only errors which should be returned are due to the context. Wait(ctx context.Context, isWrite bool, writeBytes int64) error @@ -76,7 +74,7 @@ type limiter struct { func (rl *limiter) init( parent *LimiterFactory, tenantID roachpb.TenantID, - conf LimitConfigs, + config Config, metrics tenantMetrics, options ...quotapool.Option, ) { @@ -85,46 +83,41 @@ func (rl *limiter) init( tenantID: tenantID, metrics: metrics, } - buckets := tokenBuckets{ - readRequests: makeTokenBucket(conf.ReadRequests), - writeRequests: makeTokenBucket(conf.WriteRequests), - readBytes: makeTokenBucket(conf.ReadBytes), - writeBytes: makeTokenBucket(conf.WriteBytes), - } - options = append(options, quotapool.OnAcquisition(func( - ctx context.Context, poolName string, r quotapool.Request, start time.Time, - ) { - req := r.(*waitRequest) - if req.readRequests > 0 { - rl.metrics.readRequestsAdmitted.Inc(req.readRequests) - } - if req.writeRequests > 0 { - rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests) - } - // Accounted for in limiter.RecordRead. - // if req.readBytes > 0 { - // rl.metrics.readBytesAdmitted.Inc(req.readBytes) - // } - if req.writeBytes > 0 { - rl.metrics.writeBytesAdmitted.Inc(req.writeBytes) - } - })) - rl.qp = quotapool.New(tenantID.String(), &buckets, options...) - buckets.clock = rl.qp.TimeSource() - buckets.lastUpdated = buckets.clock.Now() + // Note: if multiple token buckets are needed, consult the history of + // this file as of 0e70529f84 for a sample implementation. + bucket := makeTokenBucket(config) + rl.qp = quotapool.New(tenantID.String(), &bucket, options...) + bucket.clock = rl.qp.TimeSource() + bucket.lastUpdated = bucket.clock.Now() } +// Wait is part of the Limiter interface. func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { + // TODO(radu): find a way to omit these atomic operations in the case when we + // don't have to wait. rl.metrics.currentBlocked.Inc(1) defer rl.metrics.currentBlocked.Dec(1) + r := newWaitRequest(isWrite, writeBytes) defer putWaitRequest(r) + if err := rl.qp.Acquire(ctx, r); err != nil { return err } + + if isWrite { + rl.metrics.writeRequestsAdmitted.Inc(1) + rl.metrics.writeBytesAdmitted.Inc(writeBytes) + } else { + // We don't know how much we will read; the bytes will be accounted for + // after the fact in RecordRead. + rl.metrics.readRequestsAdmitted.Inc(1) + } + return nil } +// RecordRead is part of the Limiter interface. func (rl *limiter) RecordRead(ctx context.Context, readBytes int64) { rb := newReadBytesResource(readBytes) defer putReadBytesResource(rb) @@ -132,130 +125,75 @@ func (rl *limiter) RecordRead(ctx context.Context, readBytes int64) { rl.qp.Add(rb) } -// updateLimits is used by the factory to inform the limiter of a new +// updateConfig is used by the factory to inform the limiter of a new // configuration. -func (rl *limiter) updateLimits(limits LimitConfigs) { - rl.qp.Add(limits) -} - -// tokenBuckets is the implementation of Resource which remains in the quotapool -// for a limiter. -type tokenBuckets struct { - clock timeutil.TimeSource - lastUpdated time.Time - readRequests tokenBucket - writeRequests tokenBucket - readBytes tokenBucket - writeBytes tokenBucket -} - -var _ quotapool.Resource = (*tokenBuckets)(nil) - -func (rb *tokenBuckets) update() { - now := rb.clock.Now() - - // Update token bucket capacity given the passage of clock. - // TODO(ajwerner): Consider instituting a minimum update frequency to avoid - // spinning too fast on timers for tons of tiny allocations at a fast rate. - if since := now.Sub(rb.lastUpdated); since > 0 { - rb.readRequests.update(since) - rb.writeRequests.update(since) - rb.readBytes.update(since) - rb.writeBytes.update(since) - rb.lastUpdated = now - } +func (rl *limiter) updateConfig(config Config) { + rl.qp.Add(config) } -// check determines whether a request can be fulfilled by the given tokens in -// the bucket. If not, it determines when the buckets will be adequately full -// to fulfill the request. -func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter time.Duration) { - fulfilled = true - check := func(t *tokenBucket, needed int64) { - if ok, after := t.check(needed); !ok { - fulfilled = false - if after > tryAgainAfter { - tryAgainAfter = after - } - } - } - check(&rb.readRequests, req.readRequests) - check(&rb.writeRequests, req.writeRequests) - check(&rb.readBytes, req.readBytes) - check(&rb.writeBytes, req.writeBytes) - return fulfilled, tryAgainAfter -} - -func (rb *tokenBuckets) subtract(req *waitRequest) { - rb.readRequests.tokens -= float64(req.readRequests) - rb.writeRequests.tokens -= float64(req.writeRequests) - rb.readBytes.tokens -= float64(req.readBytes) - rb.writeBytes.tokens -= float64(req.writeBytes) -} - -func (rb *tokenBuckets) Merge(val interface{}) (shouldNotify bool) { - switch toAdd := val.(type) { - case LimitConfigs: - // Account for the accumulation since lastUpdate and now under the old - // configuration. - rb.update() - - rb.readRequests.setConf(toAdd.ReadRequests) - rb.writeRequests.setConf(toAdd.WriteRequests) - rb.readBytes.setConf(toAdd.ReadBytes) - rb.writeBytes.setConf(toAdd.WriteBytes) - return true - case *readBytesResource: - rb.readBytes.tokens -= float64(*toAdd) - // Do not notify the head of the queue. In the best case we did not disturb - // the time at which it can be fulfilled and in the worst case, we made it - // further in the future. - return false - default: - panic(errors.AssertionFailedf("merge not implemented for %T", val)) - } -} - -// tokenBucket represents a token bucket for a given resource and its associated -// configuration. +// tokenBucket represents the token bucket for KV Compute Units and its +// associated configuration. It implements quotapool.Resource. type tokenBucket struct { - LimitConfig + config Config + clock timeutil.TimeSource + lastUpdated time.Time + // Current number of tokens, in KV Compute Units. tokens float64 } -func makeTokenBucket(rl LimitConfig) tokenBucket { +var _ quotapool.Resource = (*tokenBucket)(nil) + +func makeTokenBucket(config Config) tokenBucket { return tokenBucket{ - LimitConfig: rl, - tokens: float64(rl.Burst), + config: config, + tokens: float64(config.Burst), } } -// update applies the positive time delta update for the resource. -func (t *tokenBucket) update(deltaT time.Duration) { - t.tokens += float64(t.Rate) * deltaT.Seconds() - t.clampTokens() +// update accounts for the passing of time. +func (tb *tokenBucket) update() { + now := tb.clock.Now() + + if since := now.Sub(tb.lastUpdated); since > 0 { + tb.tokens += float64(tb.config.Rate) * since.Seconds() + tb.clampTokens() + tb.lastUpdated = now + } } -// checkQuota returns whether needed will be satisfied by quota. Note that the -// definition of satisfied is either that the integer part of quota exceeds -// needed or that quota is equal to the burst. This is because we want to -// have request put the rate limiter in debt rather than prevent execution of -// requests. +// tryToFulfill calculates the number of KV Compute Units needed for the +// request and tries to remove them from the bucket. +// +// If the request can be fulfilled, the current token amount is adjusted. Note +// if the current amount is equal to Burst, then we allow any request to be +// fulfilled. This is because we want to have request put the rate limiter +// in debt rather than prevent execution of requests. // // If the request is not satisfied, the amount of clock that must be waited for // the request to be satisfied at the current rate is returned. -func (t *tokenBucket) check(needed int64) (fulfilled bool, tryAgainAfter time.Duration) { - if q := int64(t.tokens); needed <= q || q == t.Burst { +func (tb *tokenBucket) tryToFulfill( + req *waitRequest, +) (fulfilled bool, tryAgainAfter time.Duration) { + var needed float64 + if req.isWrite { + needed = tb.config.WriteRequestUnits + float64(req.writeBytes)*tb.config.WriteUnitsPerByte + } else { + // We don't know the size of the read upfront; we will adjust the bucket + // after the fact in RecordRead. + needed = tb.config.ReadRequestUnits + } + if q := tb.tokens; needed <= q || q == tb.config.Burst { + tb.tokens -= needed return true, 0 } // We'll calculate the amount of clock until the quota is full if we're // requesting more than the burst limit. - if needed > t.Burst { - needed = t.Burst + if needed > tb.config.Burst { + needed = tb.config.Burst } - delta := float64(needed) - t.tokens - tryAgainAfter = time.Duration((delta * float64(time.Second)) / float64(t.Rate)) + delta := needed - tb.tokens + tryAgainAfter = time.Duration((delta * float64(time.Second)) / tb.config.Rate) return false, tryAgainAfter } @@ -266,26 +204,49 @@ func (t *tokenBucket) check(needed int64) (fulfilled bool, tryAgainAfter time.Du // It's not obvious that we want to add tokens when increasing the burst as // that might lead to a big spike in load immediately upon increasing this // limit. -func (t *tokenBucket) setConf(rl LimitConfig) { - t.LimitConfig = rl - t.clampTokens() +func (tb *tokenBucket) updateConfig(config Config) { + tb.config = config + tb.clampTokens() } // clampTokens ensures that tokens does not exceed burst. -func (t *tokenBucket) clampTokens() { - if burst := float64(t.Burst); t.tokens > burst { - t.tokens = burst +func (tb *tokenBucket) clampTokens() { + if tb.tokens > tb.config.Burst { + tb.tokens = tb.config.Burst + } +} + +// Merge is part of quotapool.Resource. +func (tb *tokenBucket) Merge(val interface{}) (shouldNotify bool) { + switch val := val.(type) { + case Config: + // Account for the accumulation since lastUpdate and now under the old + // configuration. + tb.update() + + tb.updateConfig(val) + return true + + case *readBytesResource: + tb.tokens -= float64(val.readBytes) * tb.config.ReadUnitsPerByte + // Do not notify the head of the queue. In the best case we did not disturb + // the time at which it can be fulfilled and in the worst case, we made it + // further in the future. + return false + + default: + panic(errors.AssertionFailedf("merge not implemented for %T", val)) } } // waitRequest is used to wait for adequate resources in the tokenBuckets. type waitRequest struct { - readRequests int64 - writeRequests int64 - writeBytes int64 - readBytes int64 + isWrite bool + writeBytes int64 } +var _ quotapool.Request = (*waitRequest)(nil) + var waitRequestSyncPool = sync.Pool{ New: func() interface{} { return new(waitRequest) }, } @@ -295,15 +256,8 @@ var waitRequestSyncPool = sync.Pool{ func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest { r := waitRequestSyncPool.Get().(*waitRequest) *r = waitRequest{ - readRequests: 0, - writeRequests: 0, - readBytes: 1, - writeBytes: writeBytes, - } - if isWrite { - r.writeRequests = 1 - } else { - r.readRequests = 1 + isWrite: isWrite, + writeBytes: writeBytes, } return r } @@ -313,7 +267,23 @@ func putWaitRequest(r *waitRequest) { waitRequestSyncPool.Put(r) } -type readBytesResource int64 +// Acquire is part of quotapool.Request. +func (req *waitRequest) Acquire( + ctx context.Context, res quotapool.Resource, +) (fulfilled bool, tryAgainAfter time.Duration) { + r := res.(*tokenBucket) + r.update() + return r.tryToFulfill(req) +} + +// ShouldWait is part of quotapool.Request. +func (req *waitRequest) ShouldWait() bool { + return true +} + +type readBytesResource struct { + readBytes int64 +} var readBytesResourceSyncPool = sync.Pool{ New: func() interface{} { return new(readBytesResource) }, @@ -321,27 +291,13 @@ var readBytesResourceSyncPool = sync.Pool{ func newReadBytesResource(readBytes int64) *readBytesResource { rb := readBytesResourceSyncPool.Get().(*readBytesResource) - *rb = readBytesResource(readBytes) + *rb = readBytesResource{ + readBytes: readBytes, + } return rb } func putReadBytesResource(rb *readBytesResource) { - *rb = 0 + *rb = readBytesResource{} readBytesResourceSyncPool.Put(rb) } - -func (req *waitRequest) Acquire( - ctx context.Context, res quotapool.Resource, -) (fulfilled bool, tryAgainAfter time.Duration) { - r := res.(*tokenBuckets) - r.update() - if fulfilled, tryAgainAfter = r.check(req); !fulfilled { - return false, tryAgainAfter - } - r.subtract(req) - return true, 0 -} - -func (req *waitRequest) ShouldWait() bool { - return true -} diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index 1c94d70bb958..9a910e891ed8 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -48,9 +48,9 @@ func TestCloser(t *testing.T) { limiter := factory.GetTenant(tenant, closer) ctx := context.Background() // First Wait call will not block. - require.NoError(t, limiter.Wait(ctx, false, 1)) + require.NoError(t, limiter.Wait(ctx, true, 1)) errCh := make(chan error, 1) - go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }() + go func() { errCh <- limiter.Wait(ctx, true, 1<<30) }() testutils.SucceedsSoon(t, func() error { if timers := timeSource.Timers(); len(timers) != 1 { return errors.Errorf("expected 1 timer, found %d", len(timers)) @@ -142,9 +142,8 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string { ts.tenants = make(map[roachpb.TenantID][]tenantrate.Limiter) ts.clock = timeutil.NewManualTime(t0) ts.settings = cluster.MakeTestingClusterSettings() - limits := tenantrate.LimitConfigsFromSettings(ts.settings) - parseLimits(t, d, &limits) - tenantrate.OverrideSettingsWithRateLimits(ts.settings, limits) + settings := parseSettings(t, d) + tenantrate.OverrideSettings(&ts.settings.SV, settings) ts.rl = tenantrate.NewLimiterFactory(ts.settings, &tenantrate.TestingKnobs{ TimeSource: ts.clock, }) @@ -157,9 +156,8 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string { // yaml object representing the limits and updates accordingly. It returns // the current time. See init for more details as the semantics are the same. func (ts *testState) updateSettings(t *testing.T, d *datadriven.TestData) string { - limits := tenantrate.LimitConfigsFromSettings(ts.settings) - parseLimits(t, d, &limits) - tenantrate.OverrideSettingsWithRateLimits(ts.settings, limits) + settings := parseSettings(t, d) + tenantrate.OverrideSettings(&ts.settings.SV, settings) return ts.formatTime() } @@ -363,11 +361,11 @@ func (ts *testState) metrics(t *testing.T, d *datadriven.TestData) string { if err := testutils.SucceedsSoonError(func() error { got := ts.getMetricsText(t, d) if got != exp { - return errors.Errorf("got: %q, exp: %q", got, exp) + return errors.Errorf("got:\n%s\nexp:\n%s\n", got, exp) } return nil }); err != nil { - d.Fatalf(t, "failed to find expected timers: %v", err) + d.Fatalf(t, "failed to find expected metrics: %v", err) } return d.Expected } @@ -516,10 +514,12 @@ func parseTenantIDs(t *testing.T, d *datadriven.TestData) []uint64 { return tenantIDs } -func parseLimits(t *testing.T, d *datadriven.TestData, limits *tenantrate.LimitConfigs) { - if err := yaml.UnmarshalStrict([]byte(d.Input), &limits); err != nil { +func parseSettings(t *testing.T, d *datadriven.TestData) tenantrate.SettingValues { + var vals tenantrate.SettingValues + if err := yaml.UnmarshalStrict([]byte(d.Input), &vals); err != nil { d.Fatalf(t, "failed to unmarshal limits: %v", err) } + return vals } func parseStrings(t *testing.T, d *datadriven.TestData) []string { diff --git a/pkg/kv/kvserver/tenantrate/settings.go b/pkg/kv/kvserver/tenantrate/settings.go index 938b4c785f50..02a03644341c 100644 --- a/pkg/kv/kvserver/tenantrate/settings.go +++ b/pkg/kv/kvserver/tenantrate/settings.go @@ -15,107 +15,111 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" ) -// Limit defines a rate in units per second. -type Limit float64 - -// LimitConfig configures the rate limit and burst limit for a given resource. -type LimitConfig struct { - Rate Limit - Burst int64 -} - -// LimitConfigs configures the rate limits. -// It is exported for convenience and testing. -// The values are derived from cluster settings. -type LimitConfigs struct { - ReadRequests LimitConfig - WriteRequests LimitConfig - ReadBytes LimitConfig - WriteBytes LimitConfig -} +// Config contains the configuration of the rate limiter. +// +// We limit the rate in terms of "KV Compute Units". The configuration contains +// the rate and burst limits for KVCUs, as well as factors that define a "cost +// mode" for calculating the number of KVCUs for a read or write request. +// +// Specifically, the cost model is a linear function combining a fixed +// pre-request cost and a size-dependent (per-byte) cost. +// +// For a read: +// KVCUs = ReadRequestUnits + * ReadUnitsPerByte +// For a write: +// KVCUs = WriteRequestUnits +