Skip to content

Commit

Permalink
kvserver: throttle AddSSTable requests with IngestAsWrites
Browse files Browse the repository at this point in the history
`Store.Send()` limits the number of concurrent `AddSSTable` requests
and delays them depending on LSM health via `Engine.PreIngestDelay`, to
prevent overwhelming Pebble. However, requests with `IngestAsWrites`
were not throttled, which has been seen to cause significant read
amplification.

This patch subjects `IngestAsWrites` requests to `Engine.PreIngestDelay`
as well, and adds a separate limit for `IngestAsWrites` requests
controlled via the cluster setting
`kv.bulk_io_write.concurrent_addsstable_as_writes_requests` (default
10). Since these requests are generally small, and will end up in the
Pebble memtable before being flushed to disk, we can tolerate a larger
limit for these requests than regular `AddSSTable` requests (1).

Release note (performance improvement): Bulk ingestion of small write
batches (e.g. index backfill into a large number of ranges) is now
throttled, to avoid buildup of read amplification and associated
performance degradation. Concurrency is controlled by the new cluster
setting `kv.bulk_io_write.concurrent_addsstable_as_writes_requests`.
  • Loading branch information
erikgrinaker committed Dec 19, 2021
1 parent 827dc79 commit cc70f09
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (

// Limiters is the collection of per-store limits used during cmd evaluation.
type Limiters struct {
BulkIOWriteRate *rate.Limiter
ConcurrentImportRequests limit.ConcurrentRequestLimiter
ConcurrentExportRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter
BulkIOWriteRate *rate.Limiter
ConcurrentImportRequests limit.ConcurrentRequestLimiter
ConcurrentExportRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableAsWritesRequests limit.ConcurrentRequestLimiter
// concurrentRangefeedIters is a semaphore used to limit the number of
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
Expand Down
24 changes: 22 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,23 @@ var importRequestsLimit = settings.RegisterIntSetting(
// addSSTableRequestLimit limits concurrent AddSSTable requests.
var addSSTableRequestLimit = settings.RegisterIntSetting(
"kv.bulk_io_write.concurrent_addsstable_requests",
"number of AddSSTable requests a store will handle concurrently before queuing",
"number of concurrent AddSSTable requests per store before queueing",
1,
settings.PositiveInt,
)

// addSSTableAsWritesRequestLimit limits concurrent AddSSTable requests with
// IngestAsWrites set. These are smaller (kv.bulk_io_write.small_write_size),
// and will end up in the Pebble memtable (default 64 MB) before flushing to
// disk, so we can allow a greater amount of concurrency than regular AddSSTable
// requests. Applied independently of concurrent_addsstable_requests.
var addSSTableAsWritesRequestLimit = settings.RegisterIntSetting(
"kv.bulk_io_write.concurrent_addsstable_as_writes_requests",
"number of concurrent AddSSTable requests ingested as writes per store before queueing",
10,
settings.PositiveInt,
)

// concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators.
var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
"kv.rangefeed.concurrent_catchup_iterators",
Expand Down Expand Up @@ -892,7 +904,15 @@ func NewStore(
"addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)),
)
addSSTableRequestLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.ConcurrentAddSSTableRequests.SetLimit(int(addSSTableRequestLimit.Get(&cfg.Settings.SV)))
s.limiters.ConcurrentAddSSTableRequests.SetLimit(
int(addSSTableRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentAddSSTableAsWritesRequests = limit.MakeConcurrentRequestLimiter(
"addSSTableAsWritesRequestLimiter", int(addSSTableAsWritesRequestLimit.Get(&cfg.Settings.SV)),
)
addSSTableAsWritesRequestLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.ConcurrentAddSSTableAsWritesRequests.SetLimit(
int(addSSTableAsWritesRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter(
"rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)),
Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,12 @@ func (s *Store) maybeThrottleBatch(

switch t := ba.Requests[0].GetInner().(type) {
case *roachpb.AddSSTableRequest:
// Limit the number of concurrent AddSSTable requests, since they're
// expensive and block all other writes to the same span. However, don't
// limit AddSSTable requests that are going to ingest as a WriteBatch.
limiter := s.limiters.ConcurrentAddSSTableRequests
if t.IngestAsWrites {
return nil, nil
limiter = s.limiters.ConcurrentAddSSTableAsWritesRequests
}

before := timeutil.Now()
res, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx)
res, err := limiter.Begin(ctx)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit cc70f09

Please sign in to comment.