Skip to content

Commit

Permalink
Merge pull request #74072 from erikgrinaker/backport21.1-73904
Browse files Browse the repository at this point in the history
release-21.1: kvserver: throttle `AddSSTable` requests with `IngestAsWrites`
  • Loading branch information
erikgrinaker authored Dec 20, 2021
2 parents 827dc79 + cc70f09 commit ef024c1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (

// Limiters is the collection of per-store limits used during cmd evaluation.
type Limiters struct {
BulkIOWriteRate *rate.Limiter
ConcurrentImportRequests limit.ConcurrentRequestLimiter
ConcurrentExportRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter
BulkIOWriteRate *rate.Limiter
ConcurrentImportRequests limit.ConcurrentRequestLimiter
ConcurrentExportRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableAsWritesRequests limit.ConcurrentRequestLimiter
// concurrentRangefeedIters is a semaphore used to limit the number of
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
Expand Down
24 changes: 22 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,23 @@ var importRequestsLimit = settings.RegisterIntSetting(
// addSSTableRequestLimit limits concurrent AddSSTable requests.
var addSSTableRequestLimit = settings.RegisterIntSetting(
"kv.bulk_io_write.concurrent_addsstable_requests",
"number of AddSSTable requests a store will handle concurrently before queuing",
"number of concurrent AddSSTable requests per store before queueing",
1,
settings.PositiveInt,
)

// addSSTableAsWritesRequestLimit limits concurrent AddSSTable requests with
// IngestAsWrites set. These are smaller (kv.bulk_io_write.small_write_size),
// and will end up in the Pebble memtable (default 64 MB) before flushing to
// disk, so we can allow a greater amount of concurrency than regular AddSSTable
// requests. Applied independently of concurrent_addsstable_requests.
var addSSTableAsWritesRequestLimit = settings.RegisterIntSetting(
"kv.bulk_io_write.concurrent_addsstable_as_writes_requests",
"number of concurrent AddSSTable requests ingested as writes per store before queueing",
10,
settings.PositiveInt,
)

// concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators.
var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
"kv.rangefeed.concurrent_catchup_iterators",
Expand Down Expand Up @@ -892,7 +904,15 @@ func NewStore(
"addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)),
)
addSSTableRequestLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.ConcurrentAddSSTableRequests.SetLimit(int(addSSTableRequestLimit.Get(&cfg.Settings.SV)))
s.limiters.ConcurrentAddSSTableRequests.SetLimit(
int(addSSTableRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentAddSSTableAsWritesRequests = limit.MakeConcurrentRequestLimiter(
"addSSTableAsWritesRequestLimiter", int(addSSTableAsWritesRequestLimit.Get(&cfg.Settings.SV)),
)
addSSTableAsWritesRequestLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.ConcurrentAddSSTableAsWritesRequests.SetLimit(
int(addSSTableAsWritesRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter(
"rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)),
Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,12 @@ func (s *Store) maybeThrottleBatch(

switch t := ba.Requests[0].GetInner().(type) {
case *roachpb.AddSSTableRequest:
// Limit the number of concurrent AddSSTable requests, since they're
// expensive and block all other writes to the same span. However, don't
// limit AddSSTable requests that are going to ingest as a WriteBatch.
limiter := s.limiters.ConcurrentAddSSTableRequests
if t.IngestAsWrites {
return nil, nil
limiter = s.limiters.ConcurrentAddSSTableAsWritesRequests
}

before := timeutil.Now()
res, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx)
res, err := limiter.Begin(ctx)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit ef024c1

Please sign in to comment.