Skip to content

Commit

Permalink
Merge #66338
Browse files Browse the repository at this point in the history
66338: kv: don't hold latches while rate limiting ExportRequests r=nvanbenschoten a=nvanbenschoten

This commit moves ExportRequest rate limiting up above evaluation and
outside of latching. This ensures that if an ExportRequest is
rate-limited, it does not inadvertently block others while waiting.

Interestingly, we were already careful about this for `AddSSTableRequests`
because it is easier for them to block others while holding latches. In
the case of read-only requests like `ExportRequest`, blocking others is
less common. However, it is still possible. Notably, MVCC write requests
with lower timestamps than the read will still block. Additionally,
non-MVCC requests like range splits will block. In one customer
investigation, we found that an export request was holding latches and
blocking a non-MVCC request (a range split) which was transitively
blocking all write traffic to the range and all read traffic to the RHS
of the range. We believe that the stall lasted for so long (seconds to
minutes) because the ExportRequest was throttled while holding its
latches.

I did notice that some of this code was just touched by #66092, so any
potential backport here may be a little involved.

Release note (bug fix): Backups no longer risk the possibility of
blocking conflicting writes while being rate limited by the
kv.bulk_io_write.concurrent_export_requests concurrency limit.

/cc. @cockroachdb/kv 

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jun 11, 2021
2 parents 82c89e9 + cbc0810 commit e4fbcdb
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 55 deletions.
6 changes: 0 additions & 6 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ func evalExport(
reply.StartTime = cArgs.EvalCtx.GetGCThreshold()
}

q, err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx)
if err != nil {
return result.Result{}, err
}
defer q.Release()

makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} ||
(args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0)
if makeExternalStorage || log.V(1) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type EvalContext interface {
DB() *kv.DB
AbortSpan() *abortspan.AbortSpan
GetConcurrencyManager() concurrency.Manager
GetLimiters() *Limiters

NodeID() roachpb.NodeID
StoreID() roachpb.StoreID
Expand Down Expand Up @@ -182,9 +181,6 @@ func (m *mockEvalCtxImpl) Clock() *hlc.Clock {
func (m *mockEvalCtxImpl) DB() *kv.DB {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetLimiters() *Limiters {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) AbortSpan() *abortspan.AbortSpan {
return m.MockEvalCtx.AbortSpan
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,11 +826,6 @@ func (r *Replica) AbortSpan() *abortspan.AbortSpan {
return r.abortSpan
}

// GetLimiters returns the Replica's limiters.
func (r *Replica) GetLimiters() *batcheval.Limiters {
return &r.store.limiters
}

// GetConcurrencyManager returns the Replica's concurrency.Manager.
func (r *Replica) GetConcurrencyManager() concurrency.Manager {
return r.concMgr
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,6 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary(
return rec.i.GetCurrentReadSummary(ctx)
}

// GetLimiters returns the per-store limiters.
func (rec *SpanSetReplicaEvalContext) GetLimiters() *batcheval.Limiters {
return rec.i.GetLimiters()
}

// GetExternalStorage returns an ExternalStorage object, based on
// information parsed from a URI, stored in `dest`.
func (rec *SpanSetReplicaEvalContext) GetExternalStorage(
Expand Down
89 changes: 68 additions & 21 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
Expand Down Expand Up @@ -52,27 +53,10 @@ func (s *Store) Send(
}
}

// Limit the number of concurrent AddSSTable requests, since they're expensive
// and block all other writes to the same span.
if ba.IsSingleAddSSTableRequest() {
before := timeutil.Now()
alloc, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx)
if err != nil {
return nil, roachpb.NewError(err)
}
defer alloc.Release()

beforeEngineDelay := timeutil.Now()
s.engine.PreIngestDelay(ctx)
after := timeutil.Now()

waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay)
s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds())
s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds())
if waited > time.Second {
log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)",
waited, waitedEngine)
}
if res, err := s.maybeThrottleBatch(ctx, ba); err != nil {
return nil, roachpb.NewError(err)
} else if res != nil {
defer res.Release()
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
Expand Down Expand Up @@ -261,3 +245,66 @@ func (s *Store) Send(
}
return nil, pErr
}

// maybeThrottleBatch inspects the provided batch and determines whether
// throttling should be applied to avoid overloading the Store. If so, the
// method blocks and returns a reservation that must be released after the
// request has completed.
//
// Of note is that request throttling is all performed above evaluation and
// before a request acquires latches on a range. Otherwise, the request could
// inadvertently block others while being throttled.
func (s *Store) maybeThrottleBatch(
ctx context.Context, ba roachpb.BatchRequest,
) (limit.Reservation, error) {
if !ba.IsSingleRequest() {
return nil, nil
}

switch t := ba.Requests[0].GetInner().(type) {
case *roachpb.AddSSTableRequest:
// Limit the number of concurrent AddSSTable requests, since they're
// expensive and block all other writes to the same span. However, don't
// limit AddSSTable requests that are going to ingest as a WriteBatch.
if t.IngestAsWrites {
return nil, nil
}

before := timeutil.Now()
res, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx)
if err != nil {
return nil, err
}

beforeEngineDelay := timeutil.Now()
s.engine.PreIngestDelay(ctx)
after := timeutil.Now()

waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay)
s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds())
s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds())
if waited > time.Second {
log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)",
waited, waitedEngine)
}
return res, nil

case *roachpb.ExportRequest:
// Limit the number of concurrent Export requests, as these often scan and
// entire Range at a time and place significant read load on a Store.
before := timeutil.Now()
res, err := s.limiters.ConcurrentExportRequests.Begin(ctx)
if err != nil {
return nil, err
}

waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "Export request was delayed by %v", waited)
}
return res, nil

default:
return nil, nil
}
}
10 changes: 0 additions & 10 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,6 @@ func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool {
return ba.isSingleRequestWithMethod(CheckConsistency)
}

// IsSingleAddSSTableRequest returns true iff the batch contains a single
// request, and that request is an AddSSTableRequest that will ingest as an SST,
// (i.e. does not have IngestAsWrites set)
func (ba *BatchRequest) IsSingleAddSSTableRequest() bool {
if ba.isSingleRequestWithMethod(AddSSTable) {
return !ba.Requests[0].GetInner().(*AddSSTableRequest).IngestAsWrites
}
return false
}

// IsCompleteTransaction determines whether a batch contains every write in a
// transactions.
func (ba *BatchRequest) IsCompleteTransaction() bool {
Expand Down
14 changes: 10 additions & 4 deletions pkg/util/limit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ type ConcurrentRequestLimiter struct {
sem *quotapool.IntPool
}

// Reservation is an allocation from a limiter which should be released once the
// limited task has been completed.
type Reservation interface {
Release()
}

// MakeConcurrentRequestLimiter creates a ConcurrentRequestLimiter.
func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestLimiter {
return ConcurrentRequestLimiter{
Expand All @@ -36,19 +42,19 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL
// Begin attempts to reserve a spot in the pool, blocking if needed until the
// one is available or the context is canceled and adding a tracing span if it
// is forced to block.
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (*quotapool.IntAlloc, error) {
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

alloc, err := l.sem.TryAcquire(ctx, 1)
res, err := l.sem.TryAcquire(ctx, 1)
if errors.Is(err, quotapool.ErrNotEnoughQuota) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, l.spanName)
defer span.Finish()
alloc, err = l.sem.Acquire(ctx, 1)
res, err = l.sem.Acquire(ctx, 1)
}
return alloc, err
return res, err
}

// SetLimit adjusts the size of the pool.
Expand Down

0 comments on commit e4fbcdb

Please sign in to comment.