From 776935342306d5f4cb997539d72298a7b56443eb Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Mon, 29 Mar 2021 12:56:41 -0700 Subject: [PATCH] tenantrate: switch to a single token bucket Tenant rate limiting currently uses four separate token buckets, each with its own rate and burst limits. In this commit we switch to a single shared token bucket (see #55114 for motivation). We use a "cost model" to map read and write requests to "KV Compute Units". Requests have a base cost plus a per-byte cost. The details are documented in settings.go. The values were chosen based on experiments ran by Nathan: https://docs.google.com/spreadsheets/d/1PPlIcKnusOqWtBoOZVd9xBEMPe5Ss1FgrJlYFgpQZaM/edit#gid=735409177 The rate was chosen so that it maps to 20% of 1 KV vCPU. This keeps the rate limit on small requests roughly the same as before (especially on mixed workloads). The largest departure from the previous limits is that we allow much more read bytes (the per-byte cost of reads is small in the cost model). If we were to keep close to the previous limits, the value of kv.tenant_rate_limiter.read_cost_per_megabyte would be 200 instead of 10. Perhaps we want to be more conservative here and make this value somewhere in-between? Fixes #55114. Release note: None --- pkg/kv/kvserver/tenantrate/factory.go | 14 +- pkg/kv/kvserver/tenantrate/helpers_test.go | 45 ++- pkg/kv/kvserver/tenantrate/limiter.go | 306 +++++++++------------ pkg/kv/kvserver/tenantrate/limiter_test.go | 24 +- pkg/kv/kvserver/tenantrate/settings.go | 176 ++++++------ pkg/kv/kvserver/tenantrate/testdata/basic | 47 ++-- pkg/kv/kvserver/tenantrate/testdata/burst | 33 ++- pkg/kv/kvserver/tenantrate/testdata/cancel | 16 +- pkg/kv/kvserver/tenantrate/testdata/reads | 33 +-- pkg/kv/kvserver/tenantrate/testdata/update | 19 +- 10 files changed, 347 insertions(+), 366 deletions(-) diff --git a/pkg/kv/kvserver/tenantrate/factory.go b/pkg/kv/kvserver/tenantrate/factory.go index cd075e64bfc6..890d88a5566f 100644 --- a/pkg/kv/kvserver/tenantrate/factory.go +++ b/pkg/kv/kvserver/tenantrate/factory.go @@ -32,7 +32,7 @@ type LimiterFactory struct { systemLimiter systemLimiter mu struct { syncutil.RWMutex - limits LimitConfigs + limits Config tenants map[roachpb.TenantID]*refCountedLimiter } } @@ -53,12 +53,12 @@ func NewLimiterFactory(st *cluster.Settings, knobs *TestingKnobs) *LimiterFactor rl.knobs = *knobs } rl.mu.tenants = make(map[roachpb.TenantID]*refCountedLimiter) - rl.mu.limits = LimitConfigsFromSettings(st) + rl.mu.limits = ConfigFromSettings(st) rl.systemLimiter = systemLimiter{ tenantMetrics: rl.metrics.tenantMetrics(roachpb.SystemTenantID), } - for _, setOnChange := range settingsSetOnChangeFuncs { - setOnChange(&st.SV, rl.updateLimits) + for _, setting := range configSettings { + setting.SetOnChange(&st.SV, rl.updateConfig) } return rl } @@ -114,12 +114,12 @@ func (rl *LimiterFactory) Release(lim Limiter) { } } -func (rl *LimiterFactory) updateLimits() { +func (rl *LimiterFactory) updateConfig() { rl.mu.Lock() defer rl.mu.Unlock() - rl.mu.limits = LimitConfigsFromSettings(rl.settings) + rl.mu.limits = ConfigFromSettings(rl.settings) for _, rcLim := range rl.mu.tenants { - rcLim.lim.updateLimits(rl.mu.limits) + rcLim.lim.updateConfig(rl.mu.limits) } } diff --git a/pkg/kv/kvserver/tenantrate/helpers_test.go b/pkg/kv/kvserver/tenantrate/helpers_test.go index 81f29ec0ff94..c42526917f53 100644 --- a/pkg/kv/kvserver/tenantrate/helpers_test.go +++ b/pkg/kv/kvserver/tenantrate/helpers_test.go @@ -10,17 +10,38 @@ package tenantrate -import "github.com/cockroachdb/cockroach/pkg/settings/cluster" +import "github.com/cockroachdb/cockroach/pkg/settings" -// OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the -// settings. -func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) { - readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate)) - readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst) - writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate)) - writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst) - readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate)) - readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst) - writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate)) - writeBurstLimit.Override(&settings.SV, rl.WriteBytes.Burst) +// SettingValues is a struct that can be populated from test files, via YAML. +type SettingValues struct { + Rate float64 + Burst float64 + + Read Factors + Write Factors +} + +// Factors for reads and writes. +type Factors struct { + Base float64 + PerByte float64 +} + +// OverrideSettings sets the cluster setting according to the given +// settingValues. +// +// Uninitialized (zero) values are ignored. +func OverrideSettings(sv *settings.Values, vals SettingValues) { + override := func(setting *settings.FloatSetting, val float64) { + if val != 0 { + setting.Override(sv, val) + } + } + override(kvcuRateLimit, vals.Rate) + override(kvcuBurstLimitSeconds, vals.Burst/kvcuRateLimit.Get(sv)) + + override(readRequestCost, vals.Read.Base) + override(readCostPerMB, vals.Read.PerByte*1024*1024) + override(writeRequestCost, vals.Write.Base) + override(writeCostPerMB, vals.Write.PerByte*1024*1024) } diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index 5276840e0f22..f22f4254d3fd 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -47,13 +47,11 @@ import ( // The Limiter is backed by a FIFO queue which provides fairness. type Limiter interface { - // Wait acquires n quota from the limiter. This acquisition cannot be - // released. Each call to wait will consume 1 read or write request - // depending on isWrite, 1 read byte, and writeBytes from the token buckets. - // Calls to Wait will block until the buckets contain adequate resources. If - // a request attempts to write more than the burst limit, it will wait until - // the bucket is completely full before acquiring the requested quantity and - // putting the limiter in debt. + // Wait acquires the quota necessary to admit a read or write request. This + // acquisition cannot be released. Calls to Wait will block until the buckets + // contain adequate resources. If a request attempts to write more than the + // burst limit, it will wait until the bucket is completely full before + // acquiring the requested quantity and putting the limiter in debt. // // The only errors which should be returned are due to the context. Wait(ctx context.Context, isWrite bool, writeBytes int64) error @@ -76,7 +74,7 @@ type limiter struct { func (rl *limiter) init( parent *LimiterFactory, tenantID roachpb.TenantID, - conf LimitConfigs, + config Config, metrics tenantMetrics, options ...quotapool.Option, ) { @@ -85,46 +83,41 @@ func (rl *limiter) init( tenantID: tenantID, metrics: metrics, } - buckets := tokenBuckets{ - readRequests: makeTokenBucket(conf.ReadRequests), - writeRequests: makeTokenBucket(conf.WriteRequests), - readBytes: makeTokenBucket(conf.ReadBytes), - writeBytes: makeTokenBucket(conf.WriteBytes), - } - options = append(options, quotapool.OnAcquisition(func( - ctx context.Context, poolName string, r quotapool.Request, start time.Time, - ) { - req := r.(*waitRequest) - if req.readRequests > 0 { - rl.metrics.readRequestsAdmitted.Inc(req.readRequests) - } - if req.writeRequests > 0 { - rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests) - } - // Accounted for in limiter.RecordRead. - // if req.readBytes > 0 { - // rl.metrics.readBytesAdmitted.Inc(req.readBytes) - // } - if req.writeBytes > 0 { - rl.metrics.writeBytesAdmitted.Inc(req.writeBytes) - } - })) - rl.qp = quotapool.New(tenantID.String(), &buckets, options...) - buckets.clock = rl.qp.TimeSource() - buckets.lastUpdated = buckets.clock.Now() + // Note: if multiple token buckets are needed, consult the history of + // this file as of 0e70529f84 for a sample implementation. + bucket := makeTokenBucket(config) + rl.qp = quotapool.New(tenantID.String(), &bucket, options...) + bucket.clock = rl.qp.TimeSource() + bucket.lastUpdated = bucket.clock.Now() } +// Wait is part of the Limiter interface. func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { + // TODO(radu): find a way to omit these atomic operations in the case when we + // don't have to wait. rl.metrics.currentBlocked.Inc(1) defer rl.metrics.currentBlocked.Dec(1) + r := newWaitRequest(isWrite, writeBytes) defer putWaitRequest(r) + if err := rl.qp.Acquire(ctx, r); err != nil { return err } + + if isWrite { + rl.metrics.writeRequestsAdmitted.Inc(1) + rl.metrics.writeBytesAdmitted.Inc(writeBytes) + } else { + // We don't know how much we will read; the bytes will be accounted for + // after the fact in RecordRead. + rl.metrics.readRequestsAdmitted.Inc(1) + } + return nil } +// RecordRead is part of the Limiter interface. func (rl *limiter) RecordRead(ctx context.Context, readBytes int64) { rb := newReadBytesResource(readBytes) defer putReadBytesResource(rb) @@ -132,130 +125,75 @@ func (rl *limiter) RecordRead(ctx context.Context, readBytes int64) { rl.qp.Add(rb) } -// updateLimits is used by the factory to inform the limiter of a new +// updateConfig is used by the factory to inform the limiter of a new // configuration. -func (rl *limiter) updateLimits(limits LimitConfigs) { - rl.qp.Add(limits) -} - -// tokenBuckets is the implementation of Resource which remains in the quotapool -// for a limiter. -type tokenBuckets struct { - clock timeutil.TimeSource - lastUpdated time.Time - readRequests tokenBucket - writeRequests tokenBucket - readBytes tokenBucket - writeBytes tokenBucket -} - -var _ quotapool.Resource = (*tokenBuckets)(nil) - -func (rb *tokenBuckets) update() { - now := rb.clock.Now() - - // Update token bucket capacity given the passage of clock. - // TODO(ajwerner): Consider instituting a minimum update frequency to avoid - // spinning too fast on timers for tons of tiny allocations at a fast rate. - if since := now.Sub(rb.lastUpdated); since > 0 { - rb.readRequests.update(since) - rb.writeRequests.update(since) - rb.readBytes.update(since) - rb.writeBytes.update(since) - rb.lastUpdated = now - } +func (rl *limiter) updateConfig(config Config) { + rl.qp.Add(config) } -// check determines whether a request can be fulfilled by the given tokens in -// the bucket. If not, it determines when the buckets will be adequately full -// to fulfill the request. -func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter time.Duration) { - fulfilled = true - check := func(t *tokenBucket, needed int64) { - if ok, after := t.check(needed); !ok { - fulfilled = false - if after > tryAgainAfter { - tryAgainAfter = after - } - } - } - check(&rb.readRequests, req.readRequests) - check(&rb.writeRequests, req.writeRequests) - check(&rb.readBytes, req.readBytes) - check(&rb.writeBytes, req.writeBytes) - return fulfilled, tryAgainAfter -} - -func (rb *tokenBuckets) subtract(req *waitRequest) { - rb.readRequests.tokens -= float64(req.readRequests) - rb.writeRequests.tokens -= float64(req.writeRequests) - rb.readBytes.tokens -= float64(req.readBytes) - rb.writeBytes.tokens -= float64(req.writeBytes) -} - -func (rb *tokenBuckets) Merge(val interface{}) (shouldNotify bool) { - switch toAdd := val.(type) { - case LimitConfigs: - // Account for the accumulation since lastUpdate and now under the old - // configuration. - rb.update() - - rb.readRequests.setConf(toAdd.ReadRequests) - rb.writeRequests.setConf(toAdd.WriteRequests) - rb.readBytes.setConf(toAdd.ReadBytes) - rb.writeBytes.setConf(toAdd.WriteBytes) - return true - case *readBytesResource: - rb.readBytes.tokens -= float64(*toAdd) - // Do not notify the head of the queue. In the best case we did not disturb - // the time at which it can be fulfilled and in the worst case, we made it - // further in the future. - return false - default: - panic(errors.AssertionFailedf("merge not implemented for %T", val)) - } -} - -// tokenBucket represents a token bucket for a given resource and its associated -// configuration. +// tokenBucket represents the token bucket for KV Compute Units and its +// associated configuration. It implements quotapool.Resource. type tokenBucket struct { - LimitConfig + config Config + clock timeutil.TimeSource + lastUpdated time.Time + // Current number of tokens, in KV Compute Units. tokens float64 } -func makeTokenBucket(rl LimitConfig) tokenBucket { +var _ quotapool.Resource = (*tokenBucket)(nil) + +func makeTokenBucket(config Config) tokenBucket { return tokenBucket{ - LimitConfig: rl, - tokens: float64(rl.Burst), + config: config, + tokens: float64(config.Burst), } } -// update applies the positive time delta update for the resource. -func (t *tokenBucket) update(deltaT time.Duration) { - t.tokens += float64(t.Rate) * deltaT.Seconds() - t.clampTokens() +// update accounts for the passing of time. +func (tb *tokenBucket) update() { + now := tb.clock.Now() + + if since := now.Sub(tb.lastUpdated); since > 0 { + tb.tokens += float64(tb.config.Rate) * since.Seconds() + tb.clampTokens() + tb.lastUpdated = now + } } -// checkQuota returns whether needed will be satisfied by quota. Note that the -// definition of satisfied is either that the integer part of quota exceeds -// needed or that quota is equal to the burst. This is because we want to -// have request put the rate limiter in debt rather than prevent execution of -// requests. +// tryToFulfill calculates the number of KV Compute Units needed for the +// request and tries to remove them from the bucket. +// +// If the request can be fulfilled, the current token amount is adjusted. Note +// if the current amount is equal to Burst, then we allow any request to be +// fulfilled. This is because we want to have request put the rate limiter +// in debt rather than prevent execution of requests. // // If the request is not satisfied, the amount of clock that must be waited for // the request to be satisfied at the current rate is returned. -func (t *tokenBucket) check(needed int64) (fulfilled bool, tryAgainAfter time.Duration) { - if q := int64(t.tokens); needed <= q || q == t.Burst { +func (tb *tokenBucket) tryToFulfill( + req *waitRequest, +) (fulfilled bool, tryAgainAfter time.Duration) { + var needed float64 + if req.isWrite { + needed = tb.config.WriteRequestUnits + float64(req.writeBytes)*tb.config.WriteUnitsPerByte + } else { + // We don't know the size of the read upfront; we will adjust the bucket + // after the fact in RecordRead. + needed = tb.config.ReadRequestUnits + } + if q := tb.tokens; needed <= q || q == tb.config.Burst { + tb.tokens -= needed return true, 0 } // We'll calculate the amount of clock until the quota is full if we're // requesting more than the burst limit. - if needed > t.Burst { - needed = t.Burst + if needed > tb.config.Burst { + needed = tb.config.Burst } - delta := float64(needed) - t.tokens - tryAgainAfter = time.Duration((delta * float64(time.Second)) / float64(t.Rate)) + delta := needed - tb.tokens + tryAgainAfter = time.Duration((delta * float64(time.Second)) / tb.config.Rate) return false, tryAgainAfter } @@ -266,26 +204,49 @@ func (t *tokenBucket) check(needed int64) (fulfilled bool, tryAgainAfter time.Du // It's not obvious that we want to add tokens when increasing the burst as // that might lead to a big spike in load immediately upon increasing this // limit. -func (t *tokenBucket) setConf(rl LimitConfig) { - t.LimitConfig = rl - t.clampTokens() +func (tb *tokenBucket) updateConfig(config Config) { + tb.config = config + tb.clampTokens() } // clampTokens ensures that tokens does not exceed burst. -func (t *tokenBucket) clampTokens() { - if burst := float64(t.Burst); t.tokens > burst { - t.tokens = burst +func (tb *tokenBucket) clampTokens() { + if tb.tokens > tb.config.Burst { + tb.tokens = tb.config.Burst + } +} + +// Merge is part of quotapool.Resource. +func (tb *tokenBucket) Merge(val interface{}) (shouldNotify bool) { + switch val := val.(type) { + case Config: + // Account for the accumulation since lastUpdate and now under the old + // configuration. + tb.update() + + tb.updateConfig(val) + return true + + case *readBytesResource: + tb.tokens -= float64(val.readBytes) * tb.config.ReadUnitsPerByte + // Do not notify the head of the queue. In the best case we did not disturb + // the time at which it can be fulfilled and in the worst case, we made it + // further in the future. + return false + + default: + panic(errors.AssertionFailedf("merge not implemented for %T", val)) } } // waitRequest is used to wait for adequate resources in the tokenBuckets. type waitRequest struct { - readRequests int64 - writeRequests int64 - writeBytes int64 - readBytes int64 + isWrite bool + writeBytes int64 } +var _ quotapool.Request = (*waitRequest)(nil) + var waitRequestSyncPool = sync.Pool{ New: func() interface{} { return new(waitRequest) }, } @@ -295,15 +256,8 @@ var waitRequestSyncPool = sync.Pool{ func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest { r := waitRequestSyncPool.Get().(*waitRequest) *r = waitRequest{ - readRequests: 0, - writeRequests: 0, - readBytes: 1, - writeBytes: writeBytes, - } - if isWrite { - r.writeRequests = 1 - } else { - r.readRequests = 1 + isWrite: isWrite, + writeBytes: writeBytes, } return r } @@ -313,7 +267,23 @@ func putWaitRequest(r *waitRequest) { waitRequestSyncPool.Put(r) } -type readBytesResource int64 +// Acquire is part of quotapool.Request. +func (req *waitRequest) Acquire( + ctx context.Context, res quotapool.Resource, +) (fulfilled bool, tryAgainAfter time.Duration) { + r := res.(*tokenBucket) + r.update() + return r.tryToFulfill(req) +} + +// ShouldWait is part of quotapool.Request. +func (req *waitRequest) ShouldWait() bool { + return true +} + +type readBytesResource struct { + readBytes int64 +} var readBytesResourceSyncPool = sync.Pool{ New: func() interface{} { return new(readBytesResource) }, @@ -321,27 +291,13 @@ var readBytesResourceSyncPool = sync.Pool{ func newReadBytesResource(readBytes int64) *readBytesResource { rb := readBytesResourceSyncPool.Get().(*readBytesResource) - *rb = readBytesResource(readBytes) + *rb = readBytesResource{ + readBytes: readBytes, + } return rb } func putReadBytesResource(rb *readBytesResource) { - *rb = 0 + *rb = readBytesResource{} readBytesResourceSyncPool.Put(rb) } - -func (req *waitRequest) Acquire( - ctx context.Context, res quotapool.Resource, -) (fulfilled bool, tryAgainAfter time.Duration) { - r := res.(*tokenBuckets) - r.update() - if fulfilled, tryAgainAfter = r.check(req); !fulfilled { - return false, tryAgainAfter - } - r.subtract(req) - return true, 0 -} - -func (req *waitRequest) ShouldWait() bool { - return true -} diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index 1c94d70bb958..9a910e891ed8 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -48,9 +48,9 @@ func TestCloser(t *testing.T) { limiter := factory.GetTenant(tenant, closer) ctx := context.Background() // First Wait call will not block. - require.NoError(t, limiter.Wait(ctx, false, 1)) + require.NoError(t, limiter.Wait(ctx, true, 1)) errCh := make(chan error, 1) - go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }() + go func() { errCh <- limiter.Wait(ctx, true, 1<<30) }() testutils.SucceedsSoon(t, func() error { if timers := timeSource.Timers(); len(timers) != 1 { return errors.Errorf("expected 1 timer, found %d", len(timers)) @@ -142,9 +142,8 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string { ts.tenants = make(map[roachpb.TenantID][]tenantrate.Limiter) ts.clock = timeutil.NewManualTime(t0) ts.settings = cluster.MakeTestingClusterSettings() - limits := tenantrate.LimitConfigsFromSettings(ts.settings) - parseLimits(t, d, &limits) - tenantrate.OverrideSettingsWithRateLimits(ts.settings, limits) + settings := parseSettings(t, d) + tenantrate.OverrideSettings(&ts.settings.SV, settings) ts.rl = tenantrate.NewLimiterFactory(ts.settings, &tenantrate.TestingKnobs{ TimeSource: ts.clock, }) @@ -157,9 +156,8 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string { // yaml object representing the limits and updates accordingly. It returns // the current time. See init for more details as the semantics are the same. func (ts *testState) updateSettings(t *testing.T, d *datadriven.TestData) string { - limits := tenantrate.LimitConfigsFromSettings(ts.settings) - parseLimits(t, d, &limits) - tenantrate.OverrideSettingsWithRateLimits(ts.settings, limits) + settings := parseSettings(t, d) + tenantrate.OverrideSettings(&ts.settings.SV, settings) return ts.formatTime() } @@ -363,11 +361,11 @@ func (ts *testState) metrics(t *testing.T, d *datadriven.TestData) string { if err := testutils.SucceedsSoonError(func() error { got := ts.getMetricsText(t, d) if got != exp { - return errors.Errorf("got: %q, exp: %q", got, exp) + return errors.Errorf("got:\n%s\nexp:\n%s\n", got, exp) } return nil }); err != nil { - d.Fatalf(t, "failed to find expected timers: %v", err) + d.Fatalf(t, "failed to find expected metrics: %v", err) } return d.Expected } @@ -516,10 +514,12 @@ func parseTenantIDs(t *testing.T, d *datadriven.TestData) []uint64 { return tenantIDs } -func parseLimits(t *testing.T, d *datadriven.TestData, limits *tenantrate.LimitConfigs) { - if err := yaml.UnmarshalStrict([]byte(d.Input), &limits); err != nil { +func parseSettings(t *testing.T, d *datadriven.TestData) tenantrate.SettingValues { + var vals tenantrate.SettingValues + if err := yaml.UnmarshalStrict([]byte(d.Input), &vals); err != nil { d.Fatalf(t, "failed to unmarshal limits: %v", err) } + return vals } func parseStrings(t *testing.T, d *datadriven.TestData) []string { diff --git a/pkg/kv/kvserver/tenantrate/settings.go b/pkg/kv/kvserver/tenantrate/settings.go index 938b4c785f50..02a03644341c 100644 --- a/pkg/kv/kvserver/tenantrate/settings.go +++ b/pkg/kv/kvserver/tenantrate/settings.go @@ -15,107 +15,111 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" ) -// Limit defines a rate in units per second. -type Limit float64 - -// LimitConfig configures the rate limit and burst limit for a given resource. -type LimitConfig struct { - Rate Limit - Burst int64 -} - -// LimitConfigs configures the rate limits. -// It is exported for convenience and testing. -// The values are derived from cluster settings. -type LimitConfigs struct { - ReadRequests LimitConfig - WriteRequests LimitConfig - ReadBytes LimitConfig - WriteBytes LimitConfig -} +// Config contains the configuration of the rate limiter. +// +// We limit the rate in terms of "KV Compute Units". The configuration contains +// the rate and burst limits for KVCUs, as well as factors that define a "cost +// mode" for calculating the number of KVCUs for a read or write request. +// +// Specifically, the cost model is a linear function combining a fixed +// pre-request cost and a size-dependent (per-byte) cost. +// +// For a read: +// KVCUs = ReadRequestUnits + * ReadUnitsPerByte +// For a write: +// KVCUs = WriteRequestUnits +