diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 9550a9650dad..68d915e2e1c4 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -85,12 +85,6 @@ func evalExport( reply.StartTime = cArgs.EvalCtx.GetGCThreshold() } - q, err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx) - if err != nil { - return result.Result{}, err - } - defer q.Release() - makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} || (args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0) if makeExternalStorage || log.V(1) { diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 60e1c429e094..3c30a9436b86 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -56,7 +56,6 @@ type EvalContext interface { DB() *kv.DB AbortSpan() *abortspan.AbortSpan GetConcurrencyManager() concurrency.Manager - GetLimiters() *Limiters NodeID() roachpb.NodeID StoreID() roachpb.StoreID @@ -182,9 +181,6 @@ func (m *mockEvalCtxImpl) Clock() *hlc.Clock { func (m *mockEvalCtxImpl) DB() *kv.DB { panic("unimplemented") } -func (m *mockEvalCtxImpl) GetLimiters() *Limiters { - panic("unimplemented") -} func (m *mockEvalCtxImpl) AbortSpan() *abortspan.AbortSpan { return m.MockEvalCtx.AbortSpan } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 772177b12a44..62b53614f348 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -826,11 +826,6 @@ func (r *Replica) AbortSpan() *abortspan.AbortSpan { return r.abortSpan } -// GetLimiters returns the Replica's limiters. -func (r *Replica) GetLimiters() *batcheval.Limiters { - return &r.store.limiters -} - // GetConcurrencyManager returns the Replica's concurrency.Manager. func (r *Replica) GetConcurrencyManager() concurrency.Manager { return r.concMgr diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index e6d6c53c40fc..cf6255c89acc 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -236,11 +236,6 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary( return rec.i.GetCurrentReadSummary(ctx) } -// GetLimiters returns the per-store limiters. -func (rec *SpanSetReplicaEvalContext) GetLimiters() *batcheval.Limiters { - return rec.i.GetLimiters() -} - // GetExternalStorage returns an ExternalStorage object, based on // information parsed from a URI, stored in `dest`. func (rec *SpanSetReplicaEvalContext) GetExternalStorage( diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index b2cb95b105e5..8a6cd885de58 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -52,27 +53,10 @@ func (s *Store) Send( } } - // Limit the number of concurrent AddSSTable requests, since they're expensive - // and block all other writes to the same span. - if ba.IsSingleAddSSTableRequest() { - before := timeutil.Now() - alloc, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx) - if err != nil { - return nil, roachpb.NewError(err) - } - defer alloc.Release() - - beforeEngineDelay := timeutil.Now() - s.engine.PreIngestDelay(ctx) - after := timeutil.Now() - - waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay) - s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds()) - s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds()) - if waited > time.Second { - log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)", - waited, waitedEngine) - } + if res, err := s.maybeThrottleBatch(ctx, ba); err != nil { + return nil, roachpb.NewError(err) + } else if res != nil { + defer res.Release() } if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil { @@ -261,3 +245,66 @@ func (s *Store) Send( } return nil, pErr } + +// maybeThrottleBatch inspects the provided batch and determines whether +// throttling should be applied to avoid overloading the Store. If so, the +// method blocks and returns a reservation that must be released after the +// request has completed. +// +// Of note is that request throttling is all performed above evaluation and +// before a request acquires latches on a range. Otherwise, the request could +// inadvertently block others while being throttled. +func (s *Store) maybeThrottleBatch( + ctx context.Context, ba roachpb.BatchRequest, +) (limit.Reservation, error) { + if !ba.IsSingleRequest() { + return nil, nil + } + + switch t := ba.Requests[0].GetInner().(type) { + case *roachpb.AddSSTableRequest: + // Limit the number of concurrent AddSSTable requests, since they're + // expensive and block all other writes to the same span. However, don't + // limit AddSSTable requests that are going to ingest as a WriteBatch. + if t.IngestAsWrites { + return nil, nil + } + + before := timeutil.Now() + res, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx) + if err != nil { + return nil, err + } + + beforeEngineDelay := timeutil.Now() + s.engine.PreIngestDelay(ctx) + after := timeutil.Now() + + waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay) + s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds()) + s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds()) + if waited > time.Second { + log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)", + waited, waitedEngine) + } + return res, nil + + case *roachpb.ExportRequest: + // Limit the number of concurrent Export requests, as these often scan and + // entire Range at a time and place significant read load on a Store. + before := timeutil.Now() + res, err := s.limiters.ConcurrentExportRequests.Begin(ctx) + if err != nil { + return nil, err + } + + waited := timeutil.Since(before) + if waited > time.Second { + log.Infof(ctx, "Export request was delayed by %v", waited) + } + return res, nil + + default: + return nil, nil + } +} diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 6cb9eaba9b79..7da769cc6d79 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -245,16 +245,6 @@ func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool { return ba.isSingleRequestWithMethod(CheckConsistency) } -// IsSingleAddSSTableRequest returns true iff the batch contains a single -// request, and that request is an AddSSTableRequest that will ingest as an SST, -// (i.e. does not have IngestAsWrites set) -func (ba *BatchRequest) IsSingleAddSSTableRequest() bool { - if ba.isSingleRequestWithMethod(AddSSTable) { - return !ba.Requests[0].GetInner().(*AddSSTableRequest).IngestAsWrites - } - return false -} - // IsCompleteTransaction determines whether a batch contains every write in a // transactions. func (ba *BatchRequest) IsCompleteTransaction() bool { diff --git a/pkg/util/limit/limiter.go b/pkg/util/limit/limiter.go index 0a68748324bc..c2b9033fb4b2 100644 --- a/pkg/util/limit/limiter.go +++ b/pkg/util/limit/limiter.go @@ -25,6 +25,12 @@ type ConcurrentRequestLimiter struct { sem *quotapool.IntPool } +// Reservation is an allocation from a limiter which should be released once the +// limited task has been completed. +type Reservation interface { + Release() +} + // MakeConcurrentRequestLimiter creates a ConcurrentRequestLimiter. func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestLimiter { return ConcurrentRequestLimiter{ @@ -36,19 +42,19 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL // Begin attempts to reserve a spot in the pool, blocking if needed until the // one is available or the context is canceled and adding a tracing span if it // is forced to block. -func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (*quotapool.IntAlloc, error) { +func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) { if err := ctx.Err(); err != nil { return nil, err } - alloc, err := l.sem.TryAcquire(ctx, 1) + res, err := l.sem.TryAcquire(ctx, 1) if errors.Is(err, quotapool.ErrNotEnoughQuota) { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, l.spanName) defer span.Finish() - alloc, err = l.sem.Acquire(ctx, 1) + res, err = l.sem.Acquire(ctx, 1) } - return alloc, err + return res, err } // SetLimit adjusts the size of the pool.