Skip to content

Commit

Permalink
Merge pull request #36444 from lucy-zhang/backport19.1-36403
Browse files Browse the repository at this point in the history
release-19.1: storage: rate-limit AddSST requests
  • Loading branch information
lucy-zhang authored Apr 2, 2019
2 parents f003d13 + 7879190 commit 1cbf368
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>2</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.qps_rebalance_threshold</code></td><td>float</td><td><code>0.25</code></td><td>minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_addsstable_requests</code></td><td>integer</td><td><code>1</code></td><td>number of AddSSTable requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_export_requests</code></td><td>integer</td><td><code>3</code></td><td>number of export requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_import_requests</code></td><td>integer</td><td><code>1</code></td><td>number of import requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.max_rate</code></td><td>byte size</td><td><code>8.0 EiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func evalExport(
reply.StartTime = gcThreshold
}

if err := cArgs.EvalCtx.GetLimiters().ConcurrentExports.Begin(ctx); err != nil {
if err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx); err != nil {
return result.Result{}, err
}
defer cArgs.EvalCtx.GetLimiters().ConcurrentExports.Finish()
defer cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Finish()

makeExportStorage := !args.ReturnSST || (args.Storage != roachpb.ExportStorage{})
if makeExportStorage || log.V(1) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
return nil, errors.Wrap(err, "make key rewriter")
}

if err := cArgs.EvalCtx.GetLimiters().ConcurrentImports.Begin(ctx); err != nil {
if err := cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Begin(ctx); err != nil {
return nil, err
}
defer cArgs.EvalCtx.GetLimiters().ConcurrentImports.Finish()
defer cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Finish()

var iters []engine.SimpleIterator
for _, file := range args.Files {
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool {
return false
}

// IsSingleAddSSTableRequest returns true iff the batch contains a single
// request, and that request is an AddSSTableRequest.
func (ba *BatchRequest) IsSingleAddSSTableRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*AddSSTableRequest)
return ok
}
return false
}

// IsCompleteTransaction determines whether a batch contains every write in a
// transactions.
func (ba *BatchRequest) IsCompleteTransaction() bool {
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (

// Limiters is the collection of per-store limits used during cmd evaluation.
type Limiters struct {
BulkIOWriteRate *rate.Limiter
ConcurrentImports limit.ConcurrentRequestLimiter
ConcurrentExports limit.ConcurrentRequestLimiter
BulkIOWriteRate *rate.Limiter
ConcurrentImportRequests limit.ConcurrentRequestLimiter
ConcurrentExportRequests limit.ConcurrentRequestLimiter
ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter
// concurrentRangefeedIters is a semaphore used to limit the number of
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
Expand Down
30 changes: 26 additions & 4 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ var importRequestsLimit = settings.RegisterPositiveIntSetting(
1,
)

// addSSTableRequestLimit limits concurrent AddSSTable requests.
var addSSTableRequestLimit = settings.RegisterPositiveIntSetting(
"kv.bulk_io_write.concurrent_addsstable_requests",
"number of AddSSTable requests a store will handle concurrently before queuing",
1,
)

// concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators.
var concurrentRangefeedItersLimit = settings.RegisterPositiveIntSetting(
"kv.rangefeed.concurrent_catchup_iterators",
Expand Down Expand Up @@ -827,13 +834,13 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.BulkIOWriteRate.SetLimit(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentImports = limit.MakeConcurrentRequestLimiter(
s.limiters.ConcurrentImportRequests = limit.MakeConcurrentRequestLimiter(
"importRequestLimiter", int(importRequestsLimit.Get(&cfg.Settings.SV)),
)
importRequestsLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.ConcurrentImports.SetLimit(int(importRequestsLimit.Get(&cfg.Settings.SV)))
s.limiters.ConcurrentImportRequests.SetLimit(int(importRequestsLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentExports = limit.MakeConcurrentRequestLimiter(
s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
// On low-CPU instances, a default limit value may still allow ExportRequests
Expand All @@ -847,7 +854,13 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
if limit > exportCores {
limit = exportCores
}
s.limiters.ConcurrentExports.SetLimit(limit)
s.limiters.ConcurrentExportRequests.SetLimit(limit)
})
s.limiters.ConcurrentAddSSTableRequests = limit.MakeConcurrentRequestLimiter(
"addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)),
)
importRequestsLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.ConcurrentAddSSTableRequests.SetLimit(int(addSSTableRequestLimit.Get(&cfg.Settings.SV)))
})
s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter(
"rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)),
Expand Down Expand Up @@ -2735,6 +2748,15 @@ func (s *Store) Send(
}
}

// Limit the number of concurrent AddSSTable requests, since they're expensive
// and block all other writes to the same span.
if ba.IsSingleAddSSTableRequest() {
if err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx); err != nil {
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTableRequests.Finish()
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
return nil, roachpb.NewError(err)
}
Expand Down

0 comments on commit 1cbf368

Please sign in to comment.