From 78791900e61d451ca8302b8b719ad4cf18fb4f3b Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Mon, 1 Apr 2019 17:54:49 -0400 Subject: [PATCH] storage: rate-limit AddSST requests We've been seeing extremely high latency for foreground traffic during bulk index backfills, because AddSST requests into non-empty ranges can be expensive, and write requests that are queued behind an AddSST request for an overlapping span can get stuck waiting for multiple seconds. This PR limits the number of concurrent AddSST requests for a single store, determined by a new cluster setting, `kv.bulk_io_write.concurrent_addsstable_requests`, to decrease the impact of index backfills on foreground writes. (It also decreases the risk of writing too many L0 files to RocksDB at once, which causes stalls.) Release note (general change): Add a new cluster setting, `kv.bulk_io_write.concurrent_addsstable_requests`, which limits the number of SSTables that can be added concurrently during bulk operations. --- docs/generated/settings/settings.html | 1 + pkg/ccl/storageccl/export.go | 4 ++-- pkg/ccl/storageccl/import.go | 4 ++-- pkg/roachpb/batch.go | 10 +++++++++ pkg/storage/batcheval/eval_context.go | 7 ++++--- pkg/storage/store.go | 30 +++++++++++++++++++++++---- 6 files changed, 45 insertions(+), 11 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 1332aad46c50..15174f25195b 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -28,6 +28,7 @@ kv.allocator.load_based_rebalancingenumeration2whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2] kv.allocator.qps_rebalance_thresholdfloat0.25minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull +kv.bulk_io_write.concurrent_addsstable_requestsinteger1number of AddSSTable requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_export_requestsinteger3number of export requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_import_requestsinteger1number of import requests a store will handle concurrently before queuing kv.bulk_io_write.max_ratebyte size8.0 EiBthe rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 66e7382628f6..af378dcc4817 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -68,10 +68,10 @@ func evalExport( reply.StartTime = gcThreshold } - if err := cArgs.EvalCtx.GetLimiters().ConcurrentExports.Begin(ctx); err != nil { + if err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx); err != nil { return result.Result{}, err } - defer cArgs.EvalCtx.GetLimiters().ConcurrentExports.Finish() + defer cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Finish() makeExportStorage := !args.ReturnSST || (args.Storage != roachpb.ExportStorage{}) if makeExportStorage || log.V(1) { diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index 5e9d3ed43308..e46bac1cbcfb 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -78,10 +78,10 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo return nil, errors.Wrap(err, "make key rewriter") } - if err := cArgs.EvalCtx.GetLimiters().ConcurrentImports.Begin(ctx); err != nil { + if err := cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Begin(ctx); err != nil { return nil, err } - defer cArgs.EvalCtx.GetLimiters().ConcurrentImports.Finish() + defer cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Finish() var iters []engine.SimpleIterator for _, file := range args.Files { diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 7bcbedcdde85..fd68da609924 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -217,6 +217,16 @@ func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool { return false } +// IsSingleAddSSTableRequest returns true iff the batch contains a single +// request, and that request is an AddSSTableRequest. +func (ba *BatchRequest) IsSingleAddSSTableRequest() bool { + if ba.IsSingleRequest() { + _, ok := ba.Requests[0].GetInner().(*AddSSTableRequest) + return ok + } + return false +} + // IsCompleteTransaction determines whether a batch contains every write in a // transactions. func (ba *BatchRequest) IsCompleteTransaction() bool { diff --git a/pkg/storage/batcheval/eval_context.go b/pkg/storage/batcheval/eval_context.go index b2064837c22d..82a3fbf18bf9 100644 --- a/pkg/storage/batcheval/eval_context.go +++ b/pkg/storage/batcheval/eval_context.go @@ -34,9 +34,10 @@ import ( // Limiters is the collection of per-store limits used during cmd evaluation. type Limiters struct { - BulkIOWriteRate *rate.Limiter - ConcurrentImports limit.ConcurrentRequestLimiter - ConcurrentExports limit.ConcurrentRequestLimiter + BulkIOWriteRate *rate.Limiter + ConcurrentImportRequests limit.ConcurrentRequestLimiter + ConcurrentExportRequests limit.ConcurrentRequestLimiter + ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter // concurrentRangefeedIters is a semaphore used to limit the number of // rangefeeds in the "catch-up" state across the store. The "catch-up" state // is a temporary state at the beginning of a rangefeed which is expensive diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 58345fb1094e..75fcdf4a918a 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -123,6 +123,13 @@ var importRequestsLimit = settings.RegisterPositiveIntSetting( 1, ) +// addSSTableRequestLimit limits concurrent AddSSTable requests. +var addSSTableRequestLimit = settings.RegisterPositiveIntSetting( + "kv.bulk_io_write.concurrent_addsstable_requests", + "number of AddSSTable requests a store will handle concurrently before queuing", + 1, +) + // concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators. var concurrentRangefeedItersLimit = settings.RegisterPositiveIntSetting( "kv.rangefeed.concurrent_catchup_iterators", @@ -827,13 +834,13 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func() { s.limiters.BulkIOWriteRate.SetLimit(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV))) }) - s.limiters.ConcurrentImports = limit.MakeConcurrentRequestLimiter( + s.limiters.ConcurrentImportRequests = limit.MakeConcurrentRequestLimiter( "importRequestLimiter", int(importRequestsLimit.Get(&cfg.Settings.SV)), ) importRequestsLimit.SetOnChange(&cfg.Settings.SV, func() { - s.limiters.ConcurrentImports.SetLimit(int(importRequestsLimit.Get(&cfg.Settings.SV))) + s.limiters.ConcurrentImportRequests.SetLimit(int(importRequestsLimit.Get(&cfg.Settings.SV))) }) - s.limiters.ConcurrentExports = limit.MakeConcurrentRequestLimiter( + s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) // On low-CPU instances, a default limit value may still allow ExportRequests @@ -847,7 +854,13 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript if limit > exportCores { limit = exportCores } - s.limiters.ConcurrentExports.SetLimit(limit) + s.limiters.ConcurrentExportRequests.SetLimit(limit) + }) + s.limiters.ConcurrentAddSSTableRequests = limit.MakeConcurrentRequestLimiter( + "addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)), + ) + importRequestsLimit.SetOnChange(&cfg.Settings.SV, func() { + s.limiters.ConcurrentAddSSTableRequests.SetLimit(int(addSSTableRequestLimit.Get(&cfg.Settings.SV))) }) s.limiters.ConcurrentRangefeedIters = limit.MakeConcurrentRequestLimiter( "rangefeedIterLimiter", int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV)), @@ -2735,6 +2748,15 @@ func (s *Store) Send( } } + // Limit the number of concurrent AddSSTable requests, since they're expensive + // and block all other writes to the same span. + if ba.IsSingleAddSSTableRequest() { + if err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx); err != nil { + return nil, roachpb.NewError(err) + } + defer s.limiters.ConcurrentAddSSTableRequests.Finish() + } + if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil { return nil, roachpb.NewError(err) }