Skip to content

Commit

Permalink
kv: split rate limits and metrics for read and write requests
Browse files Browse the repository at this point in the history
This commit splits the existing request rate limit into two categories,
read requests and write requests. Experimentation has shown that the
fixed cost of a request is dramatically different between these two
categories, primarily because write requests need to go through Raft
while read requests do not. By splitting the limits and metrics
along this dimension, we expect to be able to more accurately model
the cost of KV traffic and more effectively tune rate limits.

In making the split, the commit replaces the existing metric:
```
kv.tenant_rate_limit.requests_admitted
```
with the following two new metrics:
```
kv.tenant_rate_limit.read_requests_admitted
kv.tenant_rate_limit.write_requests_admitted
```

The commit also replaced the existing two settings:
```
kv.tenant_rate_limiter.requests.rate_limit
kv.tenant_rate_limiter.request.burst_limit
```
with the following four new settings:
```
kv.tenant_rate_limiter.read_requests.rate_limit
kv.tenant_rate_limiter.read_requests.burst_limit
kv.tenant_rate_limiter.write_requests.rate_limit
kv.tenant_rate_limiter.write_requests.burst_limit
```

Release justification: Low-risk, high benefit change.
  • Loading branch information
nvanbenschoten committed Aug 26, 2020
1 parent 9636b0c commit 4c9110f
Show file tree
Hide file tree
Showing 15 changed files with 219 additions and 135 deletions.
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/client_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,20 @@ func TestTenantRateLimiter(t *testing.T) {
// the tenant range.
tenantCtx := roachpb.NewContextForTenant(ctx, tenantID)
cfg := tenantrate.LimitConfigsFromSettings(s.ClusterSettings())
for i := 0; i < int(cfg.Requests.Burst); i++ {
for i := 0; i < int(cfg.WriteRequests.Burst); i++ {
require.NoError(t, db.Put(ctx, mkKey(), 0))
}
// Now ensure that in the same instant the write QPS limit does affect the
// tenant. Issuing up to the burst limit of requests can happen without
// blocking.
for i := 0; i < int(cfg.Requests.Burst); i++ {
for i := 0; i < int(cfg.WriteRequests.Burst); i++ {
require.NoError(t, db.Put(tenantCtx, mkKey(), 0))
}
// Attempt to issue another request, make sure that it gets blocked by
// observing a timer.
errCh := make(chan error, 1)
go func() { errCh <- db.Put(tenantCtx, mkKey(), 0) }()
expectedTimer := t0.Add(time.Duration(float64(1/cfg.Requests.Rate) * float64(time.Second)))
expectedTimer := t0.Add(time.Duration(float64(1/cfg.WriteRequests.Rate) * float64(time.Second)))
testutils.SucceedsSoon(t, func() error {
timers := timeSource.Timers()
if len(timers) != 1 {
Expand All @@ -213,18 +213,18 @@ func TestTenantRateLimiter(t *testing.T) {
return string(read)
}
makeMetricStr := func(expCount int64) string {
const tenantMetricStr = `kv_tenant_rate_limit_requests_admitted{store="1",tenant_id="10"}`
const tenantMetricStr = `kv_tenant_rate_limit_write_requests_admitted{store="1",tenant_id="10"}`
return fmt.Sprintf("%s %d", tenantMetricStr, expCount)
}

// Ensure that the metric for the admitted requests is equal to the number of
// requests which we've admitted.
require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst))
require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst))

// Allow the blocked request to proceed.
timeSource.Advance(time.Second)
require.NoError(t, <-errCh)

// Ensure that it is now reflected in the metrics.
require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst+1))
require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst+1))
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (r *Replica) maybeRateLimitBatch(ctx context.Context, ba *roachpb.BatchRequ
if !ok || tenantID == roachpb.SystemTenantID {
return nil
}
return r.tenantLimiter.Wait(ctx, bytesWrittenFromRequest(ba))
return r.tenantLimiter.Wait(ctx, ba.IsWrite(), bytesWrittenFromRequest(ba))
}

// bytesWrittenFromBatchRequest returns an approximation of the number of bytes
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/tenantrate/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import "github.com/cockroachdb/cockroach/pkg/settings/cluster"
// OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the
// settings.
func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) {
requestRateLimit.Override(&settings.SV, float64(rl.Requests.Rate))
requestBurstLimit.Override(&settings.SV, rl.Requests.Burst)
readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate))
readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst)
writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate))
writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst)
readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate))
readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst)
writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate))
Expand Down
79 changes: 51 additions & 28 deletions pkg/kv/kvserver/tenantrate/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ import (
type Limiter interface {

// Wait acquires n quota from the limiter. This acquisition cannot be
// released. Each call to wait will consume 1 read byte, 1 request, and
// writeBytes from the token buckets. Calls to Wait will block until the
// buckets contain adequate resources. If a request attempts to write more
// than the burst limit, it will wait until the bucket is completely full
// before acquiring the requested quantity and putting the limiter in debt.
// released. Each call to wait will consume 1 read or write request
// depending on isWrite, 1 read byte, and writeBytes from the token buckets.
// Calls to Wait will block until the buckets contain adequate resources. If
// a request attempts to write more than the burst limit, it will wait until
// the bucket is completely full before acquiring the requested quantity and
// putting the limiter in debt.
//
// The only errors which should be returned are due to the context.
Wait(ctx context.Context, writeBytes int64) error
Wait(ctx context.Context, isWrite bool, writeBytes int64) error

// RecordRead subtracts the bytes read by a request from the token bucket.
// This call may push the Limiter into debt in the ReadBytes dimensions
Expand Down Expand Up @@ -85,28 +86,38 @@ func (rl *limiter) init(
metrics: metrics,
}
buckets := tokenBuckets{
requests: makeTokenBucket(conf.Requests),
readBytes: makeTokenBucket(conf.ReadBytes),
writeBytes: makeTokenBucket(conf.WriteBytes),
readRequests: makeTokenBucket(conf.ReadRequests),
writeRequests: makeTokenBucket(conf.WriteRequests),
readBytes: makeTokenBucket(conf.ReadBytes),
writeBytes: makeTokenBucket(conf.WriteBytes),
}
options = append(options, quotapool.OnAcquisition(func(
ctx context.Context, poolName string, r quotapool.Request, start time.Time,
) {
req := r.(*waitRequest)
if req.readRequests > 0 {
rl.metrics.readRequestsAdmitted.Inc(req.readRequests)
}
if req.writeRequests > 0 {
rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests)
}
// Accounted for in limiter.RecordRead.
// if req.readBytes > 0 {
// rl.metrics.readBytesAdmitted.Inc(req.readBytes)
// }
if req.writeBytes > 0 {
rl.metrics.writeBytesAdmitted.Inc(req.writeBytes)
}
rl.metrics.requestsAdmitted.Inc(1)
}))
rl.qp = quotapool.New(tenantID.String(), &buckets, options...)
buckets.clock = rl.qp.TimeSource()
buckets.lastUpdated = buckets.clock.Now()
}

func (rl *limiter) Wait(ctx context.Context, writeBytes int64) error {
func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error {
rl.metrics.currentBlocked.Inc(1)
defer rl.metrics.currentBlocked.Dec(1)
r := newWaitRequest(writeBytes)
r := newWaitRequest(isWrite, writeBytes)
defer putWaitRequest(r)
if err := rl.qp.Acquire(ctx, r); err != nil {
return err
Expand All @@ -130,11 +141,12 @@ func (rl *limiter) updateLimits(limits LimitConfigs) {
// tokenBuckets is the implementation of Resource which remains in the quotapool
// for a limiter.
type tokenBuckets struct {
clock timeutil.TimeSource
lastUpdated time.Time
requests tokenBucket
readBytes tokenBucket
writeBytes tokenBucket
clock timeutil.TimeSource
lastUpdated time.Time
readRequests tokenBucket
writeRequests tokenBucket
readBytes tokenBucket
writeBytes tokenBucket
}

var _ quotapool.Resource = (*tokenBuckets)(nil)
Expand All @@ -146,7 +158,8 @@ func (rb *tokenBuckets) update() {
// TODO(ajwerner): Consider instituting a minimum update frequency to avoid
// spinning too fast on timers for tons of tiny allocations at a fast rate.
if since := now.Sub(rb.lastUpdated); since > 0 {
rb.requests.update(since)
rb.readRequests.update(since)
rb.writeRequests.update(since)
rb.readBytes.update(since)
rb.writeBytes.update(since)
rb.lastUpdated = now
Expand All @@ -166,14 +179,16 @@ func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter t
}
}
}
check(&rb.requests, req.requests)
check(&rb.readRequests, req.readRequests)
check(&rb.writeRequests, req.writeRequests)
check(&rb.readBytes, req.readBytes)
check(&rb.writeBytes, req.writeBytes)
return fulfilled, tryAgainAfter
}

func (rb *tokenBuckets) subtract(req *waitRequest) {
rb.requests.tokens -= float64(req.requests)
rb.readRequests.tokens -= float64(req.readRequests)
rb.writeRequests.tokens -= float64(req.writeRequests)
rb.readBytes.tokens -= float64(req.readBytes)
rb.writeBytes.tokens -= float64(req.writeBytes)
}
Expand All @@ -185,7 +200,8 @@ func (rb *tokenBuckets) Merge(val interface{}) (shouldNotify bool) {
// configuration.
rb.update()

rb.requests.setConf(toAdd.Requests)
rb.readRequests.setConf(toAdd.ReadRequests)
rb.writeRequests.setConf(toAdd.WriteRequests)
rb.readBytes.setConf(toAdd.ReadBytes)
rb.writeBytes.setConf(toAdd.WriteBytes)
return true
Expand Down Expand Up @@ -264,9 +280,10 @@ func (t *tokenBucket) clampTokens() {

// waitRequest is used to wait for adequate resources in the tokenBuckets.
type waitRequest struct {
requests int64
writeBytes int64
readBytes int64
readRequests int64
writeRequests int64
writeBytes int64
readBytes int64
}

var waitRequestSyncPool = sync.Pool{
Expand All @@ -275,12 +292,18 @@ var waitRequestSyncPool = sync.Pool{

// newWaitRequest allocates a waitRequest from the sync.Pool.
// It should be returned with putWaitRequest.
func newWaitRequest(writeBytes int64) *waitRequest {
func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest {
r := waitRequestSyncPool.Get().(*waitRequest)
*r = waitRequest{
requests: 1,
readBytes: 1,
writeBytes: writeBytes,
readRequests: 0,
writeRequests: 0,
readBytes: 1,
writeBytes: writeBytes,
}
if isWrite {
r.writeRequests = 1
} else {
r.readRequests = 1
}
return r
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/kv/kvserver/tenantrate/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func TestCloser(t *testing.T) {
limiter := factory.GetTenant(tenant, closer)
ctx := context.Background()
// First Wait call will not block.
require.NoError(t, limiter.Wait(ctx, 1))
require.NoError(t, limiter.Wait(ctx, false, 1))
errCh := make(chan error, 1)
go func() { errCh <- limiter.Wait(ctx, 1<<30) }()
go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }()
testutils.SucceedsSoon(t, func() error {
if timers := timeSource.Timers(); len(timers) != 1 {
return errors.Errorf("expected 1 timer, found %d", len(timers))
Expand Down Expand Up @@ -84,6 +84,7 @@ type launchState struct {
tenantID roachpb.TenantID
ctx context.Context
cancel context.CancelFunc
isWrite bool
writeBytes int64
reserveCh chan error
}
Expand Down Expand Up @@ -202,6 +203,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string {
var cmds []struct {
ID string
Tenant uint64
IsWrite bool
WriteBytes int64
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &cmds); err != nil {
Expand All @@ -213,6 +215,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string {
s.tenantID = roachpb.MakeTenantID(cmd.Tenant)
s.ctx, s.cancel = context.WithCancel(context.Background())
s.reserveCh = make(chan error, 1)
s.isWrite = cmd.IsWrite
s.writeBytes = cmd.WriteBytes
ts.running[s.id] = &s
lims := ts.tenants[s.tenantID]
Expand All @@ -221,7 +224,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string {
}
go func() {
// We'll not worry about ever releasing tenant Limiters.
s.reserveCh <- lims[0].Wait(s.ctx, s.writeBytes)
s.reserveCh <- lims[0].Wait(s.ctx, s.isWrite, s.writeBytes)
}()
}
return ts.FormatRunning()
Expand Down Expand Up @@ -337,12 +340,15 @@ func (ts *testState) recordRead(t *testing.T, d *datadriven.TestData) string {
// kv_tenant_rate_limit_read_bytes_admitted 0
// kv_tenant_rate_limit_read_bytes_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_read_bytes_admitted{tenant_id="system"} 100
// kv_tenant_rate_limit_requests_admitted 0
// kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 0
// kv_tenant_rate_limit_read_requests_admitted 0
// kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 0
// kv_tenant_rate_limit_write_bytes_admitted 50
// kv_tenant_rate_limit_write_bytes_admitted{tenant_id="2"} 50
// kv_tenant_rate_limit_write_bytes_admitted{tenant_id="system"} 0
// kv_tenant_rate_limit_write_requests_admitted 0
// kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0
//
// Or with a regular expression:
//
Expand Down
Loading

0 comments on commit 4c9110f

Please sign in to comment.