From cbc0810ae10747e169dbb4f54fd0163594dedf42 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 10 Jun 2021 18:14:40 -0400 Subject: [PATCH] kv: don't hold latches while rate limiting ExportRequests This commit moves ExportRequest rate limiting up above evaluation and outside of latching. This ensures that if an ExportRequest is rate-limited, it does not inadvertently block others while waiting. Interestingly, we were already careful about this for `AddSSTableRequests` because it is easier for them to block others while holding latches. In the case of read-only requests like `ExportRequest`, blocking others is less common. However, it is still possible. Notably, MVCC write requests with lower timestamps than the read will still block. Additionally, non-MVCC requests like range splits will block. In one customer investigation, we found that an export request was holding latches and blocking a non-MVCC request (a range split) which was transitively blocking all write traffic to the range and all read traffic to the RHS of the range. We believe that the stall lasted for so long (seconds to minutes) because the ExportRequest was throttled while holding its latches. I did notice that some of this code was just touched by #66092, so any potential backport here may be a little involved. Release note (bug fix): Backups no longer risk the possibility of blocking conflicting writes while being rate limited by the kv.bulk_io_write.concurrent_export_requests concurrency limit. --- pkg/ccl/storageccl/export.go | 6 -- pkg/kv/kvserver/batcheval/eval_context.go | 4 - pkg/kv/kvserver/replica.go | 5 -- pkg/kv/kvserver/replica_eval_context_span.go | 5 -- pkg/kv/kvserver/store_send.go | 89 +++++++++++++++----- pkg/roachpb/batch.go | 10 --- pkg/util/limit/limiter.go | 14 ++- 7 files changed, 78 insertions(+), 55 deletions(-) diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 9550a9650dad..68d915e2e1c4 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -85,12 +85,6 @@ func evalExport( reply.StartTime = cArgs.EvalCtx.GetGCThreshold() } - q, err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx) - if err != nil { - return result.Result{}, err - } - defer q.Release() - makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} || (args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0) if makeExternalStorage || log.V(1) { diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 60e1c429e094..3c30a9436b86 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -56,7 +56,6 @@ type EvalContext interface { DB() *kv.DB AbortSpan() *abortspan.AbortSpan GetConcurrencyManager() concurrency.Manager - GetLimiters() *Limiters NodeID() roachpb.NodeID StoreID() roachpb.StoreID @@ -182,9 +181,6 @@ func (m *mockEvalCtxImpl) Clock() *hlc.Clock { func (m *mockEvalCtxImpl) DB() *kv.DB { panic("unimplemented") } -func (m *mockEvalCtxImpl) GetLimiters() *Limiters { - panic("unimplemented") -} func (m *mockEvalCtxImpl) AbortSpan() *abortspan.AbortSpan { return m.MockEvalCtx.AbortSpan } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 772177b12a44..62b53614f348 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -826,11 +826,6 @@ func (r *Replica) AbortSpan() *abortspan.AbortSpan { return r.abortSpan } -// GetLimiters returns the Replica's limiters. -func (r *Replica) GetLimiters() *batcheval.Limiters { - return &r.store.limiters -} - // GetConcurrencyManager returns the Replica's concurrency.Manager. func (r *Replica) GetConcurrencyManager() concurrency.Manager { return r.concMgr diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index e6d6c53c40fc..cf6255c89acc 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -236,11 +236,6 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary( return rec.i.GetCurrentReadSummary(ctx) } -// GetLimiters returns the per-store limiters. -func (rec *SpanSetReplicaEvalContext) GetLimiters() *batcheval.Limiters { - return rec.i.GetLimiters() -} - // GetExternalStorage returns an ExternalStorage object, based on // information parsed from a URI, stored in `dest`. func (rec *SpanSetReplicaEvalContext) GetExternalStorage( diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index b2cb95b105e5..8a6cd885de58 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -52,27 +53,10 @@ func (s *Store) Send( } } - // Limit the number of concurrent AddSSTable requests, since they're expensive - // and block all other writes to the same span. - if ba.IsSingleAddSSTableRequest() { - before := timeutil.Now() - alloc, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx) - if err != nil { - return nil, roachpb.NewError(err) - } - defer alloc.Release() - - beforeEngineDelay := timeutil.Now() - s.engine.PreIngestDelay(ctx) - after := timeutil.Now() - - waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay) - s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds()) - s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds()) - if waited > time.Second { - log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)", - waited, waitedEngine) - } + if res, err := s.maybeThrottleBatch(ctx, ba); err != nil { + return nil, roachpb.NewError(err) + } else if res != nil { + defer res.Release() } if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil { @@ -261,3 +245,66 @@ func (s *Store) Send( } return nil, pErr } + +// maybeThrottleBatch inspects the provided batch and determines whether +// throttling should be applied to avoid overloading the Store. If so, the +// method blocks and returns a reservation that must be released after the +// request has completed. +// +// Of note is that request throttling is all performed above evaluation and +// before a request acquires latches on a range. Otherwise, the request could +// inadvertently block others while being throttled. +func (s *Store) maybeThrottleBatch( + ctx context.Context, ba roachpb.BatchRequest, +) (limit.Reservation, error) { + if !ba.IsSingleRequest() { + return nil, nil + } + + switch t := ba.Requests[0].GetInner().(type) { + case *roachpb.AddSSTableRequest: + // Limit the number of concurrent AddSSTable requests, since they're + // expensive and block all other writes to the same span. However, don't + // limit AddSSTable requests that are going to ingest as a WriteBatch. + if t.IngestAsWrites { + return nil, nil + } + + before := timeutil.Now() + res, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx) + if err != nil { + return nil, err + } + + beforeEngineDelay := timeutil.Now() + s.engine.PreIngestDelay(ctx) + after := timeutil.Now() + + waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay) + s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds()) + s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds()) + if waited > time.Second { + log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)", + waited, waitedEngine) + } + return res, nil + + case *roachpb.ExportRequest: + // Limit the number of concurrent Export requests, as these often scan and + // entire Range at a time and place significant read load on a Store. + before := timeutil.Now() + res, err := s.limiters.ConcurrentExportRequests.Begin(ctx) + if err != nil { + return nil, err + } + + waited := timeutil.Since(before) + if waited > time.Second { + log.Infof(ctx, "Export request was delayed by %v", waited) + } + return res, nil + + default: + return nil, nil + } +} diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 6cb9eaba9b79..7da769cc6d79 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -245,16 +245,6 @@ func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool { return ba.isSingleRequestWithMethod(CheckConsistency) } -// IsSingleAddSSTableRequest returns true iff the batch contains a single -// request, and that request is an AddSSTableRequest that will ingest as an SST, -// (i.e. does not have IngestAsWrites set) -func (ba *BatchRequest) IsSingleAddSSTableRequest() bool { - if ba.isSingleRequestWithMethod(AddSSTable) { - return !ba.Requests[0].GetInner().(*AddSSTableRequest).IngestAsWrites - } - return false -} - // IsCompleteTransaction determines whether a batch contains every write in a // transactions. func (ba *BatchRequest) IsCompleteTransaction() bool { diff --git a/pkg/util/limit/limiter.go b/pkg/util/limit/limiter.go index 0a68748324bc..c2b9033fb4b2 100644 --- a/pkg/util/limit/limiter.go +++ b/pkg/util/limit/limiter.go @@ -25,6 +25,12 @@ type ConcurrentRequestLimiter struct { sem *quotapool.IntPool } +// Reservation is an allocation from a limiter which should be released once the +// limited task has been completed. +type Reservation interface { + Release() +} + // MakeConcurrentRequestLimiter creates a ConcurrentRequestLimiter. func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestLimiter { return ConcurrentRequestLimiter{ @@ -36,19 +42,19 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL // Begin attempts to reserve a spot in the pool, blocking if needed until the // one is available or the context is canceled and adding a tracing span if it // is forced to block. -func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (*quotapool.IntAlloc, error) { +func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) { if err := ctx.Err(); err != nil { return nil, err } - alloc, err := l.sem.TryAcquire(ctx, 1) + res, err := l.sem.TryAcquire(ctx, 1) if errors.Is(err, quotapool.ErrNotEnoughQuota) { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, l.spanName) defer span.Finish() - alloc, err = l.sem.Acquire(ctx, 1) + res, err = l.sem.Acquire(ctx, 1) } - return alloc, err + return res, err } // SetLimit adjusts the size of the pool.