Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: don't hold latches while rate limiting ExportRequests #66338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ func evalExport(
reply.StartTime = cArgs.EvalCtx.GetGCThreshold()
}

q, err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx)
if err != nil {
return result.Result{}, err
}
defer q.Release()

makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} ||
(args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0)
if makeExternalStorage || log.V(1) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type EvalContext interface {
DB() *kv.DB
AbortSpan() *abortspan.AbortSpan
GetConcurrencyManager() concurrency.Manager
GetLimiters() *Limiters

NodeID() roachpb.NodeID
StoreID() roachpb.StoreID
Expand Down Expand Up @@ -182,9 +181,6 @@ func (m *mockEvalCtxImpl) Clock() *hlc.Clock {
func (m *mockEvalCtxImpl) DB() *kv.DB {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetLimiters() *Limiters {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) AbortSpan() *abortspan.AbortSpan {
return m.MockEvalCtx.AbortSpan
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,11 +826,6 @@ func (r *Replica) AbortSpan() *abortspan.AbortSpan {
return r.abortSpan
}

// GetLimiters returns the Replica's limiters.
func (r *Replica) GetLimiters() *batcheval.Limiters {
return &r.store.limiters
}

// GetConcurrencyManager returns the Replica's concurrency.Manager.
func (r *Replica) GetConcurrencyManager() concurrency.Manager {
return r.concMgr
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,6 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary(
return rec.i.GetCurrentReadSummary(ctx)
}

// GetLimiters returns the per-store limiters.
func (rec *SpanSetReplicaEvalContext) GetLimiters() *batcheval.Limiters {
return rec.i.GetLimiters()
}

// GetExternalStorage returns an ExternalStorage object, based on
// information parsed from a URI, stored in `dest`.
func (rec *SpanSetReplicaEvalContext) GetExternalStorage(
Expand Down
89 changes: 68 additions & 21 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
Expand Down Expand Up @@ -52,27 +53,10 @@ func (s *Store) Send(
}
}

// Limit the number of concurrent AddSSTable requests, since they're expensive
// and block all other writes to the same span.
if ba.IsSingleAddSSTableRequest() {
before := timeutil.Now()
alloc, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx)
if err != nil {
return nil, roachpb.NewError(err)
}
defer alloc.Release()

beforeEngineDelay := timeutil.Now()
s.engine.PreIngestDelay(ctx)
after := timeutil.Now()

waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay)
s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds())
s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds())
if waited > time.Second {
log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)",
waited, waitedEngine)
}
if res, err := s.maybeThrottleBatch(ctx, ba); err != nil {
return nil, roachpb.NewError(err)
} else if res != nil {
defer res.Release()
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
Expand Down Expand Up @@ -261,3 +245,66 @@ func (s *Store) Send(
}
return nil, pErr
}

// maybeThrottleBatch inspects the provided batch and determines whether
// throttling should be applied to avoid overloading the Store. If so, the
// method blocks and returns a reservation that must be released after the
// request has completed.
//
// Of note is that request throttling is all performed above evaluation and
// before a request acquires latches on a range. Otherwise, the request could
// inadvertently block others while being throttled.
func (s *Store) maybeThrottleBatch(
ctx context.Context, ba roachpb.BatchRequest,
) (limit.Reservation, error) {
if !ba.IsSingleRequest() {
return nil, nil
}

switch t := ba.Requests[0].GetInner().(type) {
case *roachpb.AddSSTableRequest:
// Limit the number of concurrent AddSSTable requests, since they're
// expensive and block all other writes to the same span. However, don't
// limit AddSSTable requests that are going to ingest as a WriteBatch.
if t.IngestAsWrites {
return nil, nil
}

before := timeutil.Now()
res, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx)
if err != nil {
return nil, err
}

beforeEngineDelay := timeutil.Now()
s.engine.PreIngestDelay(ctx)
after := timeutil.Now()

waited, waitedEngine := after.Sub(before), after.Sub(beforeEngineDelay)
s.metrics.AddSSTableProposalTotalDelay.Inc(waited.Nanoseconds())
s.metrics.AddSSTableProposalEngineDelay.Inc(waitedEngine.Nanoseconds())
if waited > time.Second {
log.Infof(ctx, "SST ingestion was delayed by %v (%v for storage engine back-pressure)",
waited, waitedEngine)
}
return res, nil

case *roachpb.ExportRequest:
// Limit the number of concurrent Export requests, as these often scan and
// entire Range at a time and place significant read load on a Store.
before := timeutil.Now()
res, err := s.limiters.ConcurrentExportRequests.Begin(ctx)
if err != nil {
return nil, err
}

waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "Export request was delayed by %v", waited)
}
return res, nil

default:
return nil, nil
}
}
10 changes: 0 additions & 10 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,6 @@ func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool {
return ba.isSingleRequestWithMethod(CheckConsistency)
}

// IsSingleAddSSTableRequest returns true iff the batch contains a single
// request, and that request is an AddSSTableRequest that will ingest as an SST,
// (i.e. does not have IngestAsWrites set)
func (ba *BatchRequest) IsSingleAddSSTableRequest() bool {
if ba.isSingleRequestWithMethod(AddSSTable) {
return !ba.Requests[0].GetInner().(*AddSSTableRequest).IngestAsWrites
}
return false
}

// IsCompleteTransaction determines whether a batch contains every write in a
// transactions.
func (ba *BatchRequest) IsCompleteTransaction() bool {
Expand Down
14 changes: 10 additions & 4 deletions pkg/util/limit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ type ConcurrentRequestLimiter struct {
sem *quotapool.IntPool
}

// Reservation is an allocation from a limiter which should be released once the
// limited task has been completed.
type Reservation interface {
Release()
}

// MakeConcurrentRequestLimiter creates a ConcurrentRequestLimiter.
func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestLimiter {
return ConcurrentRequestLimiter{
Expand All @@ -36,19 +42,19 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL
// Begin attempts to reserve a spot in the pool, blocking if needed until the
// one is available or the context is canceled and adding a tracing span if it
// is forced to block.
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (*quotapool.IntAlloc, error) {
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

alloc, err := l.sem.TryAcquire(ctx, 1)
res, err := l.sem.TryAcquire(ctx, 1)
if errors.Is(err, quotapool.ErrNotEnoughQuota) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, l.spanName)
defer span.Finish()
alloc, err = l.sem.Acquire(ctx, 1)
res, err = l.sem.Acquire(ctx, 1)
}
return alloc, err
return res, err
}

// SetLimit adjusts the size of the pool.
Expand Down