Skip to content

Commit

Permalink
kvserver,backupccl: make export, gc subject to admission control
Browse files Browse the repository at this point in the history
They are now marked with AdmissionHeader_ROOT_KV, which stops
them from bypassing admission control. The priority is set to
admission.LowPri since these are background activities that
should not be preferred over interactive user traffic.

Informs cockroachdb#65957

Release note: None
  • Loading branch information
sumeerbhola committed Oct 4, 2021
1 parent 95affc1 commit d0ad381
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 49 deletions.
13 changes: 10 additions & 3 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -335,7 +336,13 @@ func runBackupProcessor(
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
header.TargetBytes = 1

admissionHeader := roachpb.AdmissionHeader{
// Backups are low priority.
Priority: int32(admission.LowPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_ROOT_KV,
NoMemoryReservedAtSource: true,
}
log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawRes roachpb.Response
Expand All @@ -353,8 +360,8 @@ func runBackupProcessor(
ReqSentTime: reqSentTime.String(),
})

rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(),
header, req)
rawRes, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
Expand Down
39 changes: 34 additions & 5 deletions pkg/kv/kvserver/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -404,8 +406,10 @@ func makeGCQueueScoreImpl(
}

type replicaGCer struct {
repl *Replica
count int32 // update atomically
repl *Replica
count int32 // update atomically
admissionController KVAdmissionController
storeID roachpb.StoreID
}

var _ gc.GCer = &replicaGCer{}
Expand All @@ -429,8 +433,29 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error {
ba.RangeID = r.repl.Desc().RangeID
ba.Timestamp = r.repl.Clock().Now()
ba.Add(&req)

if _, pErr := r.repl.Send(ctx, ba); pErr != nil {
// Since we are talking directly to the replica, we need to explicitly do
// admission control here, as we are bypassing server.Node.
var admissionHandle interface{}
if r.admissionController != nil {
ba.AdmissionHeader = roachpb.AdmissionHeader{
// GC is low priority.
Priority: int32(admission.LowPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_ROOT_KV,
NoMemoryReservedAtSource: true,
}
ba.Replica.StoreID = r.storeID
var err error
admissionHandle, err = r.admissionController.AdmitKVWork(ctx, roachpb.SystemTenantID, &ba)
if err != nil {
return err
}
}
_, pErr := r.repl.Send(ctx, ba)
if r.admissionController != nil {
r.admissionController.AdmittedKVWorkDone(admissionHandle)
}
if pErr != nil {
log.VErrEventf(ctx, 2, "%v", pErr.String())
return pErr.GoError()
}
Expand Down Expand Up @@ -531,7 +556,11 @@ func (gcq *gcQueue) process(
IntentCleanupBatchTimeout: gcQueueIntentBatchTimeout,
},
conf.TTL(),
&replicaGCer{repl: repl},
&replicaGCer{
repl: repl,
admissionController: gcq.store.cfg.KVAdmissionController,
storeID: gcq.store.StoreID(),
},
func(ctx context.Context, intents []roachpb.Intent) error {
intentCount, err := repl.store.intentResolver.
CleanupIntents(ctx, intents, gcTimestamp, roachpb.PUSH_TOUCH)
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,9 @@ type StoreConfig struct {
// SpanConfigsEnabled determines whether we're able to use the span configs
// infrastructure.
SpanConfigsEnabled bool

// KVAdmissionController is an optional field used for admission control.
KVAdmissionController KVAdmissionController
}

// ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the
Expand Down Expand Up @@ -2956,3 +2959,18 @@ func min(a, b int) int {
}
return b
}

// KVAdmissionController provides admission control for the KV layer.
type KVAdmissionController interface {
// AdmitKVWork must be called before performing KV work.
// BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be
// populated for admission to work correctly. If err is non-nil, the
// returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be
// called after the KV work is done executing.
AdmitKVWork(
ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
) (handle interface{}, err error)
// AdmittedKVWorkDone is called after the admitted KV work is done
// executing.
AdmittedKVWorkDone(handle interface{})
}
15 changes: 15 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,24 @@ func (f NonTransactionalFactoryFunc) NonTransactionalSender() Sender {
// `nil` context; an empty one is used in that case.
func SendWrappedWith(
ctx context.Context, sender Sender, h roachpb.Header, args roachpb.Request,
) (roachpb.Response, *roachpb.Error) {
return SendWrappedWithAdmission(ctx, sender, h, roachpb.AdmissionHeader{}, args)
}

// SendWrappedWithAdmission is a convenience function which wraps the request
// in a batch and sends it via the provided Sender and headers. It returns the
// unwrapped response or an error. It's valid to pass a `nil` context; an
// empty one is used in that case.
func SendWrappedWithAdmission(
ctx context.Context,
sender Sender,
h roachpb.Header,
ah roachpb.AdmissionHeader,
args roachpb.Request,
) (roachpb.Response, *roachpb.Error) {
ba := roachpb.BatchRequest{}
ba.Header = h
ba.AdmissionHeader = ah
ba.Add(args)

br, pErr := sender.Send(ctx, ba)
Expand Down
105 changes: 64 additions & 41 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func NewNode(
tenantUsage: tenantUsage,
spanConfigAccessor: spanConfigAccessor,
}
n.storeCfg.KVAdmissionController = n
n.perReplicaServer = kvserver.MakeServer(&n.Descriptor, n.stores)
return n
}
Expand Down Expand Up @@ -961,17 +962,50 @@ func (n *Node) Batch(
// log tags more expensive and makes local calls differ from remote calls.
ctx = n.storeCfg.AmbientCtx.ResetAndAnnotateCtx(ctx)

var callAdmittedWorkDoneOnKVAdmissionQ bool
var tenantID roachpb.TenantID
var storeAdmissionQ *admission.WorkQueue
if n.kvAdmissionQ != nil {
var ok bool
tenantID, ok = roachpb.TenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
tenantID, ok := roachpb.TenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
}
handle, err := n.AdmitKVWork(ctx, tenantID, args)
if err != nil {
return nil, err
}
br, err := n.batchInternal(ctx, tenantID, args)
n.AdmittedKVWorkDone(handle)

// We always return errors via BatchResponse.Error so structure is
// preserved; plain errors are presumed to be from the RPC
// framework and not from cockroach.
if err != nil {
if br == nil {
br = &roachpb.BatchResponse{}
}
if br.Error != nil {
log.Fatalf(
ctx, "attempting to return both a plain error (%s) and roachpb.Error (%s)", err, br.Error,
)
}
bypassAdmission := args.IsAdmin()
source := args.AdmissionHeader.Source
br.Error = roachpb.NewError(err)
}
return br, nil
}

var _ kvserver.KVAdmissionController = &Node{}

type admissionHandle struct {
tenantID roachpb.TenantID
callAdmittedWorkDoneOnKVAdmissionQ bool
storeAdmissionQ *admission.WorkQueue
}

// AdmitKVWork implements the kvserver.KVAdmissionController interface.
func (n *Node) AdmitKVWork(
ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
) (handle interface{}, err error) {
var ah admissionHandle
if n.kvAdmissionQ != nil {
bypassAdmission := ba.IsAdmin()
source := ba.AdmissionHeader.Source
if !roachpb.IsSystemTenantID(tenantID.ToUint64()) {
// Request is from a SQL node.
bypassAdmission = false
Expand All @@ -980,15 +1014,15 @@ func (n *Node) Batch(
if source == roachpb.AdmissionHeader_OTHER {
bypassAdmission = true
}
createTime := args.AdmissionHeader.CreateTime
createTime := ba.AdmissionHeader.CreateTime
if !bypassAdmission && createTime == 0 {
// TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
// of zero CreateTime needs to be revisited. It should use high priority.
createTime = timeutil.Now().UnixNano()
}
admissionInfo := admission.WorkInfo{
TenantID: tenantID,
Priority: admission.WorkPriority(args.AdmissionHeader.Priority),
Priority: admission.WorkPriority(ba.AdmissionHeader.Priority),
CreateTime: createTime,
BypassAdmission: bypassAdmission,
}
Expand All @@ -999,52 +1033,41 @@ func (n *Node) Batch(
// all the slots, causing no useful work to happen. We do want useful work
// to continue even when throttling since there are often significant
// number of tokens available.
if args.IsWrite() && !isSingleHeartbeatTxnRequest(args) {
storeAdmissionQ = n.storeGrantCoords.TryGetQueueForStore(int32(args.Replica.StoreID))
if ba.IsWrite() && !isSingleHeartbeatTxnRequest(ba) {
ah.storeAdmissionQ = n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
}
admissionEnabled := true
if storeAdmissionQ != nil {
if admissionEnabled, err = storeAdmissionQ.Admit(ctx, admissionInfo); err != nil {
return nil, err
if ah.storeAdmissionQ != nil {
if admissionEnabled, err = ah.storeAdmissionQ.Admit(ctx, admissionInfo); err != nil {
return admissionHandle{}, err
}
if !admissionEnabled {
// Set storeAdmissionQ to nil so that we don't call AdmittedWorkDone
// on it. Additionally, the code below will not call
// kvAdmissionQ.Admit, and so callAdmittedWorkDoneOnKVAdmissionQ will
// stay false.
storeAdmissionQ = nil
ah.storeAdmissionQ = nil
}
}
if admissionEnabled {
callAdmittedWorkDoneOnKVAdmissionQ, err = n.kvAdmissionQ.Admit(ctx, admissionInfo)
ah.callAdmittedWorkDoneOnKVAdmissionQ, err = n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return nil, err
return admissionHandle{}, err
}
}
}
br, err := n.batchInternal(ctx, tenantID, args)
if callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(tenantID)
}
if storeAdmissionQ != nil {
storeAdmissionQ.AdmittedWorkDone(tenantID)
}
return ah, nil
}

// We always return errors via BatchResponse.Error so structure is
// preserved; plain errors are presumed to be from the RPC
// framework and not from cockroach.
if err != nil {
if br == nil {
br = &roachpb.BatchResponse{}
}
if br.Error != nil {
log.Fatalf(
ctx, "attempting to return both a plain error (%s) and roachpb.Error (%s)", err, br.Error,
)
}
br.Error = roachpb.NewError(err)
// AdmittedKVWorkDone implement the kvserver.KVAdmissionController interface.
func (n *Node) AdmittedKVWorkDone(handle interface{}) {
ah := handle.(admissionHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
if ah.storeAdmissionQ != nil {
ah.storeAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
return br, nil
}

// setupSpanForIncomingRPC takes a context and returns a derived context with a
Expand Down

0 comments on commit d0ad381

Please sign in to comment.