diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 0651ccb93275..e9867f87fbc8 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -63,10 +63,10 @@ kv.snapshot_delegation.enabledbooleanfalseset to true to allow snapshots from follower replicas kv.snapshot_rebalance.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots -kv.store.admission.provisioned_bandwidthbyte size0 Bif set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag kv.transaction.max_intents_bytesinteger4194304maximum number of bytes used to track locks in transactions kv.transaction.max_refresh_spans_bytesinteger4194304maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.reject_over_max_intents_budget.enabledbooleanfalseif set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed +kvadmission.store.provisioned_bandwidthbyte size0 Bif set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag schedules.backup.gc_protection.enabledbooleantrueenable chaining of GC protection across backups run as part of a schedule security.ocsp.modeenumerationoffuse OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] security.ocsp.timeoutduration3stimeout before considering the OCSP server unreachable diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 055c51ff2ff7..1326a7b003b5 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1142,6 +1142,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/idalloc:idalloc_test", "//pkg/kv/kvserver/intentresolver:intentresolver", "//pkg/kv/kvserver/intentresolver:intentresolver_test", + "//pkg/kv/kvserver/kvadmission:kvadmission", "//pkg/kv/kvserver/kvserverbase:kvserverbase", "//pkg/kv/kvserver/kvserverpb:kvserverpb", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb", @@ -2439,6 +2440,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/gc:get_x_data", "//pkg/kv/kvserver/idalloc:get_x_data", "//pkg/kv/kvserver/intentresolver:get_x_data", + "//pkg/kv/kvserver/kvadmission:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", "//pkg/kv/kvserver/kvserverpb:get_x_data", "//pkg/kv/kvserver/liveness:get_x_data", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 37adbb41f394..b2b79c085ea1 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -132,6 +132,7 @@ go_library( "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/idalloc", "//pkg/kv/kvserver/intentresolver", + "//pkg/kv/kvserver/kvadmission", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel new file mode 100644 index 000000000000..44573b5e43fc --- /dev/null +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -0,0 +1,24 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "kvadmission", + srcs = ["kvadmission.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/admission", + "//pkg/util/admission/admissionpb", + "//pkg/util/buildutil", + "//pkg/util/log", + "//pkg/util/stop", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//:pebble", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go new file mode 100644 index 000000000000..5e35934945c7 --- /dev/null +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -0,0 +1,390 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package kvadmission is the integration layer between KV and admission +// control. +package kvadmission + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" +) + +// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted +// for each export request. +var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.elastic_cpu.duration_per_export_request", + "controls how many CPU tokens are allotted for each export request", + admission.MaxElasticCPUDuration, + func(duration time.Duration) error { + if duration < admission.MinElasticCPUDuration { + return fmt.Errorf("minimum CPU duration allowed per export request is %s, got %s", + admission.MinElasticCPUDuration, duration) + } + if duration > admission.MaxElasticCPUDuration { + return fmt.Errorf("maximum CPU duration allowed per export request is %s, got %s", + admission.MaxElasticCPUDuration, duration) + } + return nil + }, +) + +// Controller provides admission control for the KV layer. +type Controller interface { + // AdmitKVWork must be called before performing KV work. + // BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be + // populated for admission to work correctly. If err is non-nil, the + // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be + // called after the KV work is done executing. + AdmitKVWork( + ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, + ) (Handle, error) + // AdmittedKVWorkDone is called after the admitted KV work is done + // executing. + AdmittedKVWorkDone(Handle, *StoreWriteBytes) + // SetTenantWeightProvider is used to set the provider that will be + // periodically polled for weights. The stopper should be used to terminate + // the periodic polling. + SetTenantWeightProvider(provider TenantWeightProvider, stopper *stop.Stopper) + // SnapshotIngested informs admission control about a range snapshot + // ingestion. + SnapshotIngested(storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats) + // FollowerStoreWriteBytes informs admission control about writes + // replicated to a raft follower, that have not been subject to admission + // control. + FollowerStoreWriteBytes(storeID roachpb.StoreID, followerWriteBytes FollowerStoreWriteBytes) +} + +// TenantWeightProvider can be periodically asked to provide the tenant +// weights. +type TenantWeightProvider interface { + GetTenantWeights() TenantWeights +} + +// TenantWeights contains the various tenant weights. +type TenantWeights struct { + // Node is the node level tenant ID => weight. + Node map[uint64]uint32 + // Stores contains the per-store tenant weights. + Stores []TenantWeightsForStore +} + +// TenantWeightsForStore contains the tenant weights for a store. +type TenantWeightsForStore struct { + roachpb.StoreID + // Weights is tenant ID => weight. + Weights map[uint64]uint32 +} + +// controllerImpl implements Controller interface. +type controllerImpl struct { + // Admission control queues and coordinators. All three should be nil or + // non-nil. + kvAdmissionQ *admission.WorkQueue + storeGrantCoords *admission.StoreGrantCoordinators + elasticCPUWorkQueue *admission.ElasticCPUWorkQueue + settings *cluster.Settings + every log.EveryN +} + +var _ Controller = &controllerImpl{} + +// Handle groups data around some piece admitted work. Depending on the +// type of work, it holds (a) references to specific work queues, (b) state +// needed to inform said work queues of what work was done after the fact, and +// (c) information around how much work a request is allowed to do (used for +// cooperative scheduling with elastic CPU granters). +type Handle struct { + tenantID roachpb.TenantID + storeAdmissionQ *admission.StoreWorkQueue + storeWorkHandle admission.StoreWorkHandle + ElasticCPUWorkHandle *admission.ElasticCPUWorkHandle + + callAdmittedWorkDoneOnKVAdmissionQ bool +} + +// MakeController returns a Controller. All three parameters must together be +// nil or non-nil. +func MakeController( + kvAdmissionQ *admission.WorkQueue, + elasticCPUWorkQueue *admission.ElasticCPUWorkQueue, + storeGrantCoords *admission.StoreGrantCoordinators, + settings *cluster.Settings, +) Controller { + return &controllerImpl{ + kvAdmissionQ: kvAdmissionQ, + storeGrantCoords: storeGrantCoords, + elasticCPUWorkQueue: elasticCPUWorkQueue, + settings: settings, + every: log.Every(10 * time.Second), + } +} + +// AdmitKVWork implements the Controller interface. +// +// TODO(irfansharif): There's a fair bit happening here and there's no test +// coverage. Fix that. +func (n *controllerImpl) AdmitKVWork( + ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, +) (handle Handle, retErr error) { + ah := Handle{tenantID: tenantID} + if n.kvAdmissionQ == nil { + return ah, nil + } + + bypassAdmission := ba.IsAdmin() + source := ba.AdmissionHeader.Source + if !roachpb.IsSystemTenantID(tenantID.ToUint64()) { + // Request is from a SQL node. + bypassAdmission = false + source = roachpb.AdmissionHeader_FROM_SQL + } + if source == roachpb.AdmissionHeader_OTHER { + bypassAdmission = true + } + // TODO(abaptist): Revisit and deprecate this setting in v23.1. + if admission.KVBulkOnlyAdmissionControlEnabled.Get(&n.settings.SV) { + if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri { + bypassAdmission = true + } + } + createTime := ba.AdmissionHeader.CreateTime + if !bypassAdmission && createTime == 0 { + // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use + // of zero CreateTime needs to be revisited. It should use high priority. + createTime = timeutil.Now().UnixNano() + } + admissionInfo := admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority), + CreateTime: createTime, + BypassAdmission: bypassAdmission, + } + + admissionEnabled := true + // Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though + // it would bypass admission, it would consume a slot. When writes are + // throttled, we start generating more txn heartbeats, which then consume + // all the slots, causing no useful work to happen. We do want useful work + // to continue even when throttling since there are often significant + // number of tokens available. + if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) + if storeAdmissionQ != nil { + storeWorkHandle, err := storeAdmissionQ.Admit( + ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo}) + if err != nil { + return Handle{}, err + } + admissionEnabled = storeWorkHandle.AdmissionEnabled() + if admissionEnabled { + defer func() { + if retErr != nil { + // No bytes were written. + _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{}) + } + }() + ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle + } + } + } + if admissionEnabled { + if ba.IsSingleExportRequest() { + // Backups generate batches with single export requests, which we + // admit through the elastic CPU work queue. We grant this + // CPU-intensive work a set amount of CPU time and expect it to + // terminate (cooperatively) once it exceeds its grant. The amount + // disbursed is 100ms, which we've experimentally found to be long + // enough to do enough useful work per-request while not causing too + // much in the way of scheduling delays on individual cores. Within + // admission control we have machinery that observes scheduling + // latencies periodically and reduces the total amount of CPU time + // handed out through this mechanism, as a way to provide latency + // isolation to non-elastic ("latency sensitive") work running on + // the same machine. + elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit( + ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, + ) + if err != nil { + return Handle{}, err + } + ah.ElasticCPUWorkHandle = elasticWorkHandle + defer func() { + if retErr != nil { + // No elastic work was done. + n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + } + }() + } else { + callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo) + if err != nil { + return Handle{}, err + } + ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ + } + } + return ah, nil +} + +// AdmittedKVWorkDone implements the Controller interface. +func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { + n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + if ah.callAdmittedWorkDoneOnKVAdmissionQ { + n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) + } + if ah.storeAdmissionQ != nil { + var doneInfo admission.StoreWorkDoneInfo + if writeBytes != nil { + doneInfo = admission.StoreWorkDoneInfo(*writeBytes) + } + err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo) + if err != nil { + // This shouldn't be happening. + if buildutil.CrdbTestBuild { + log.Fatalf(context.Background(), "%s", errors.WithAssertionFailure(err)) + } + if n.every.ShouldLog() { + log.Errorf(context.Background(), "%s", err) + } + } + } +} + +// SetTenantWeightProvider implements the Controller interface. +func (n *controllerImpl) SetTenantWeightProvider( + provider TenantWeightProvider, stopper *stop.Stopper, +) { + // TODO(irfansharif): Use a stopper here instead. + go func() { + const weightCalculationPeriod = 10 * time.Minute + ticker := time.NewTicker(weightCalculationPeriod) + // Used for short-circuiting the weights calculation if all weights are + // disabled. + allWeightsDisabled := false + for { + select { + case <-ticker.C: + kvDisabled := !admission.KVTenantWeightsEnabled.Get(&n.settings.SV) + kvStoresDisabled := !admission.KVStoresTenantWeightsEnabled.Get(&n.settings.SV) + if allWeightsDisabled && kvDisabled && kvStoresDisabled { + // Have already transitioned to disabled, so noop. + continue + } + weights := provider.GetTenantWeights() + if kvDisabled { + weights.Node = nil + } + n.kvAdmissionQ.SetTenantWeights(weights.Node) + n.elasticCPUWorkQueue.SetTenantWeights(weights.Node) + + for _, storeWeights := range weights.Stores { + q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID)) + if q != nil { + if kvStoresDisabled { + storeWeights.Weights = nil + } + q.SetTenantWeights(storeWeights.Weights) + } + } + allWeightsDisabled = kvDisabled && kvStoresDisabled + case <-stopper.ShouldQuiesce(): + ticker.Stop() + return + } + } + }() +} + +// SnapshotIngested implements the Controller interface. +func (n *controllerImpl) SnapshotIngested( + storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats, +) { + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) + if storeAdmissionQ == nil { + return + } + storeAdmissionQ.StatsToIgnore(ingestStats) +} + +// FollowerStoreWriteBytes implements the Controller interface. +func (n *controllerImpl) FollowerStoreWriteBytes( + storeID roachpb.StoreID, followerWriteBytes FollowerStoreWriteBytes, +) { + if followerWriteBytes.WriteBytes == 0 && followerWriteBytes.IngestedBytes == 0 { + return + } + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) + if storeAdmissionQ == nil { + return + } + storeAdmissionQ.BypassedWorkDone( + followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo) +} + +// ProvisionedBandwidth set a value of the provisioned +// bandwidth for each store in the cluster. +var ProvisionedBandwidth = settings.RegisterByteSizeSetting( + settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", + "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ + "for each store. It can be over-ridden on a per-store basis using the --store flag", + 0).WithPublic() + +// FollowerStoreWriteBytes captures stats about writes done to a store by a +// replica that is not the leaseholder. These are used for admission control. +type FollowerStoreWriteBytes struct { + NumEntries int64 + admission.StoreWorkDoneInfo +} + +// Merge follower store write statistics using the given data. +func (f *FollowerStoreWriteBytes) Merge(from FollowerStoreWriteBytes) { + f.NumEntries += from.NumEntries + f.WriteBytes += from.WriteBytes + f.IngestedBytes += from.IngestedBytes +} + +// StoreWriteBytes aliases admission.StoreWorkDoneInfo, since the notion of +// "work is done" is specific to admission control and doesn't need to leak +// everywhere. +type StoreWriteBytes admission.StoreWorkDoneInfo + +var storeWriteBytesPool = sync.Pool{ + New: func() interface{} { return &StoreWriteBytes{} }, +} + +// NewStoreWriteBytes constructs a new StoreWriteBytes. +func NewStoreWriteBytes() *StoreWriteBytes { + wb := storeWriteBytesPool.Get().(*StoreWriteBytes) + *wb = StoreWriteBytes{} + return wb +} + +// Release returns the *StoreWriteBytes to the pool. +func (wb *StoreWriteBytes) Release() { + if wb == nil { + return + } + storeWriteBytesPool.Put(wb) +} diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index b02fbda23ad4..c21ed2c4b262 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -506,7 +507,7 @@ func suspectedFullRangeDeletion(ms enginepb.MVCCStats) bool { type replicaGCer struct { repl *Replica count int32 // update atomically - admissionController KVAdmissionController + admissionController kvadmission.Controller storeID roachpb.StoreID } @@ -533,7 +534,7 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error { ba.Add(&req) // Since we are talking directly to the replica, we need to explicitly do // admission control here, as we are bypassing server.Node. - var admissionHandle AdmissionHandle + var admissionHandle kvadmission.Handle if r.admissionController != nil { pri := admissionpb.WorkPriority(gc.AdmissionPriority.Get(&r.repl.ClusterSettings().SV)) ba.AdmissionHeader = roachpb.AdmissionHeader{ diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 266aede2d050..3e4a1862339f 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -17,13 +17,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -57,20 +57,7 @@ type applyCommittedEntriesStats struct { stateAssertions int numEmptyEntries int numConfChangeEntries int - followerStoreWriteBytes followerStoreWriteBytes -} - -// followerStoreWriteBytes captures stats about writes done to a store by a -// replica that is not the leaseholder. These are used for admission control. -type followerStoreWriteBytes struct { - numEntries int64 - admission.StoreWorkDoneInfo -} - -func (f *followerStoreWriteBytes) merge(from followerStoreWriteBytes) { - f.numEntries += from.numEntries - f.WriteBytes += from.WriteBytes - f.IngestedBytes += from.IngestedBytes + followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes } // nonDeterministicFailure is an error type that indicates that a state machine @@ -450,7 +437,7 @@ type replicaAppBatch struct { emptyEntries int mutations int start time.Time - followerStoreWriteBytes followerStoreWriteBytes + followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes // Reused by addAppliedStateKeyToBatch to avoid heap allocations. asAlloc enginepb.RangeAppliedState @@ -551,7 +538,7 @@ func (b *replicaAppBatch) Stage( // nils the AddSSTable field. if !cmd.IsLocal() { writeBytes, ingestedBytes := cmd.getStoreWriteByteSizes() - b.followerStoreWriteBytes.numEntries++ + b.followerStoreWriteBytes.NumEntries++ b.followerStoreWriteBytes.WriteBytes += writeBytes b.followerStoreWriteBytes.IngestedBytes += ingestedBytes } @@ -1077,7 +1064,7 @@ func (b *replicaAppBatch) recordStatsOnCommit() { b.sm.stats.entriesProcessedBytes += b.entryBytes b.sm.stats.numEmptyEntries += b.emptyEntries b.sm.stats.batchesProcessed++ - b.sm.stats.followerStoreWriteBytes.merge(b.followerStoreWriteBytes) + b.sm.stats.followerStoreWriteBytes.Merge(b.followerStoreWriteBytes) elapsed := timeutil.Since(b.start) b.r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 96a41f1cde47..e082c0db604f 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -106,7 +107,13 @@ func (r *Replica) evalAndPropose( st *kvserverpb.LeaseStatus, ui uncertainty.Interval, tok TrackedRequestToken, -) (chan proposalResult, func(), kvserverbase.CmdIDKey, *StoreWriteBytes, *roachpb.Error) { +) ( + chan proposalResult, + func(), + kvserverbase.CmdIDKey, + *kvadmission.StoreWriteBytes, + *roachpb.Error, +) { defer tok.DoneIfNotMoved(ctx) idKey := makeIDKey() proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui) @@ -167,7 +174,7 @@ func (r *Replica) evalAndPropose( // typical lag in consensus is expected to be small compared to the time // granularity of admission control doing token and size estimation (which // is 15s). Also, admission control corrects for gaps in reporting. - writeBytes := newStoreWriteBytes() + writeBytes := kvadmission.NewStoreWriteBytes() if proposal.command.WriteBatch != nil { writeBytes.WriteBytes = int64(len(proposal.command.WriteBatch.Data)) } @@ -1046,7 +1053,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, getNonDeterministicFailureExplanation(err), err } if r.store.cfg.KVAdmissionController != nil && - stats.apply.followerStoreWriteBytes.numEntries > 0 { + stats.apply.followerStoreWriteBytes.NumEntries > 0 { r.store.cfg.KVAdmissionController.FollowerStoreWriteBytes( r.store.StoreID(), stats.apply.followerStoreWriteBytes) } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 7f2830853047..f63b0666cc82 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -38,7 +39,12 @@ import ( // reflect the key spans that it read. func (r *Replica) executeReadOnlyBatch( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, -) (br *roachpb.BatchResponse, _ *concurrency.Guard, _ *StoreWriteBytes, pErr *roachpb.Error) { +) ( + br *roachpb.BatchResponse, + _ *concurrency.Guard, + _ *kvadmission.StoreWriteBytes, + pErr *roachpb.Error, +) { r.readOnlyCmdMu.RLock() defer r.readOnlyCmdMu.RUnlock() diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 89c05635011d..ab5a705ff0a2 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -120,7 +121,7 @@ func (r *Replica) Send( // *StoreWriteBytes return value. func (r *Replica) SendWithWriteBytes( ctx context.Context, req roachpb.BatchRequest, -) (*roachpb.BatchResponse, *StoreWriteBytes, *roachpb.Error) { +) (*roachpb.BatchResponse, *kvadmission.StoreWriteBytes, *roachpb.Error) { if r.store.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels { defer pprof.SetGoroutineLabels(ctx) // Note: the defer statement captured the previous context. @@ -168,7 +169,7 @@ func (r *Replica) SendWithWriteBytes( // Differentiate between read-write, read-only, and admin. var br *roachpb.BatchResponse var pErr *roachpb.Error - var writeBytes *StoreWriteBytes + var writeBytes *kvadmission.StoreWriteBytes if isReadOnly { log.Event(ctx, "read-only path") fn := (*Replica).executeReadOnlyBatch @@ -373,7 +374,7 @@ func (r *Replica) maybeAddRangeInfoToResponse( // concurrency guard back to the caller. type batchExecutionFn func( *Replica, context.Context, *roachpb.BatchRequest, *concurrency.Guard, -) (*roachpb.BatchResponse, *concurrency.Guard, *StoreWriteBytes, *roachpb.Error) +) (*roachpb.BatchResponse, *concurrency.Guard, *kvadmission.StoreWriteBytes, *roachpb.Error) var _ batchExecutionFn = (*Replica).executeWriteBatch var _ batchExecutionFn = (*Replica).executeReadOnlyBatch @@ -394,7 +395,7 @@ var _ batchExecutionFn = (*Replica).executeReadOnlyBatch // handles the process of retrying batch execution after addressing the error. func (r *Replica) executeBatchWithConcurrencyRetries( ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn, -) (br *roachpb.BatchResponse, writeBytes *StoreWriteBytes, pErr *roachpb.Error) { +) (br *roachpb.BatchResponse, writeBytes *kvadmission.StoreWriteBytes, pErr *roachpb.Error) { // Try to execute command; exit retry loop on success. var latchSpans, lockSpans *spanset.SpanSet var requestEvalKind concurrency.RequestEvalKind @@ -1046,7 +1047,7 @@ func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchReque // recordRequestWriteBytes records the write bytes from a replica batch // request. -func (r *Replica) recordRequestWriteBytes(writeBytes *StoreWriteBytes) { +func (r *Replica) recordRequestWriteBytes(writeBytes *kvadmission.StoreWriteBytes) { if writeBytes == nil { return } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 3924b896a170..7acac0ff25fb 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -77,7 +78,12 @@ var migrateApplicationTimeout = settings.RegisterDurationSetting( // call to applyTimestampCache). func (r *Replica) executeWriteBatch( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, -) (br *roachpb.BatchResponse, _ *concurrency.Guard, _ *StoreWriteBytes, pErr *roachpb.Error) { +) ( + br *roachpb.BatchResponse, + _ *concurrency.Guard, + _ *kvadmission.StoreWriteBytes, + pErr *roachpb.Error, +) { startTime := timeutil.Now() // Even though we're not a read-only operation by definition, we have to diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 03161f2da852..e174c887e270 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/idalloc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" @@ -62,7 +63,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -85,7 +85,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" - "github.com/cockroachdb/pebble" "github.com/cockroachdb/redact" "go.etcd.io/etcd/raft/v3" "golang.org/x/time/rate" @@ -1120,7 +1119,7 @@ type StoreConfig struct { SpanConfigSubscriber spanconfig.KVSubscriber // KVAdmissionController is an optional field used for admission control. - KVAdmissionController KVAdmissionController + KVAdmissionController kvadmission.Controller // SchedulerLatencyListener listens in on scheduling latencies, information // that's then used to adjust various admission control components (like how @@ -3807,329 +3806,3 @@ func min(a, b int) int { } return b } - -// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted -// for each export request. -var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( - settings.SystemOnly, - "kvadmission.elastic_cpu.duration_per_export_request", - "controls how many CPU tokens are allotted for each export request", - admission.MaxElasticCPUDuration, - func(duration time.Duration) error { - if duration < admission.MinElasticCPUDuration { - return fmt.Errorf("minimum CPU duration allowed per export request is %s, got %s", - admission.MinElasticCPUDuration, duration) - } - if duration > admission.MaxElasticCPUDuration { - return fmt.Errorf("maximum CPU duration allowed per export request is %s, got %s", - admission.MaxElasticCPUDuration, duration) - } - return nil - }, -) - -// KVAdmissionController provides admission control for the KV layer. -type KVAdmissionController interface { - // AdmitKVWork must be called before performing KV work. - // BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be - // populated for admission to work correctly. If err is non-nil, the - // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be - // called after the KV work is done executing. - AdmitKVWork( - ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, - ) (AdmissionHandle, error) - // AdmittedKVWorkDone is called after the admitted KV work is done - // executing. - AdmittedKVWorkDone(AdmissionHandle, *StoreWriteBytes) - // SetTenantWeightProvider is used to set the provider that will be - // periodically polled for weights. The stopper should be used to terminate - // the periodic polling. - SetTenantWeightProvider(provider TenantWeightProvider, stopper *stop.Stopper) - // SnapshotIngested informs admission control about a range snapshot - // ingestion. - SnapshotIngested(storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats) - // FollowerStoreWriteBytes informs admission control about writes - // replicated to a raft follower, that have not been subject to admission - // control. - FollowerStoreWriteBytes(storeID roachpb.StoreID, followerWriteBytes followerStoreWriteBytes) -} - -// TenantWeightProvider can be periodically asked to provide the tenant -// weights. -type TenantWeightProvider interface { - GetTenantWeights() TenantWeights -} - -// TenantWeights contains the various tenant weights. -type TenantWeights struct { - // Node is the node level tenant ID => weight. - Node map[uint64]uint32 - // Stores contains the per-store tenant weights. - Stores []TenantWeightsForStore -} - -// TenantWeightsForStore contains the tenant weights for a store. -type TenantWeightsForStore struct { - roachpb.StoreID - // Weights is tenant ID => weight. - Weights map[uint64]uint32 -} - -// KVAdmissionControllerImpl implements KVAdmissionController interface. -type KVAdmissionControllerImpl struct { - // Admission control queues and coordinators. All three should be nil or - // non-nil. - kvAdmissionQ *admission.WorkQueue - storeGrantCoords *admission.StoreGrantCoordinators - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue - settings *cluster.Settings - every log.EveryN -} - -var _ KVAdmissionController = &KVAdmissionControllerImpl{} - -// AdmissionHandle groups data around some piece admitted work. Depending on the -// type of work, it holds (a) references to specific work queues, (b) state -// needed to inform said work queues of what work was done after the fact, and -// (c) information around how much work a request is allowed to do (used for -// cooperative scheduling with elastic CPU granters). -// -// TODO(irfansharif): Consider moving KVAdmissionController and adjacent types -// into a kvserver/kvadmission package. -type AdmissionHandle struct { - tenantID roachpb.TenantID - storeAdmissionQ *admission.StoreWorkQueue - storeWorkHandle admission.StoreWorkHandle - ElasticCPUWorkHandle *admission.ElasticCPUWorkHandle - - callAdmittedWorkDoneOnKVAdmissionQ bool -} - -// MakeKVAdmissionController returns a KVAdmissionController. All three -// parameters must together be nil or non-nil. -func MakeKVAdmissionController( - kvAdmissionQ *admission.WorkQueue, - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue, - storeGrantCoords *admission.StoreGrantCoordinators, - settings *cluster.Settings, -) KVAdmissionController { - return &KVAdmissionControllerImpl{ - kvAdmissionQ: kvAdmissionQ, - storeGrantCoords: storeGrantCoords, - elasticCPUWorkQueue: elasticCPUWorkQueue, - settings: settings, - every: log.Every(10 * time.Second), - } -} - -// AdmitKVWork implements the KVAdmissionController interface. -// -// TODO(irfansharif): There's a fair bit happening here and there's no test -// coverage. Fix that. -func (n *KVAdmissionControllerImpl) AdmitKVWork( - ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, -) (handle AdmissionHandle, retErr error) { - ah := AdmissionHandle{tenantID: tenantID} - if n.kvAdmissionQ == nil { - return ah, nil - } - - bypassAdmission := ba.IsAdmin() - source := ba.AdmissionHeader.Source - if !roachpb.IsSystemTenantID(tenantID.ToUint64()) { - // Request is from a SQL node. - bypassAdmission = false - source = roachpb.AdmissionHeader_FROM_SQL - } - if source == roachpb.AdmissionHeader_OTHER { - bypassAdmission = true - } - // TODO(abaptist): Revisit and deprecate this setting in v23.1. - if admission.KVBulkOnlyAdmissionControlEnabled.Get(&n.settings.SV) { - if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri { - bypassAdmission = true - } - } - - createTime := ba.AdmissionHeader.CreateTime - if !bypassAdmission && createTime == 0 { - // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use - // of zero CreateTime needs to be revisited. It should use high priority. - createTime = timeutil.Now().UnixNano() - } - admissionInfo := admission.WorkInfo{ - TenantID: tenantID, - Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority), - CreateTime: createTime, - BypassAdmission: bypassAdmission, - } - - admissionEnabled := true - // Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though - // it would bypass admission, it would consume a slot. When writes are - // throttled, we start generating more txn heartbeats, which then consume - // all the slots, causing no useful work to happen. We do want useful work - // to continue even when throttling since there are often significant - // number of tokens available. - if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { - storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) - if storeAdmissionQ != nil { - storeWorkHandle, err := storeAdmissionQ.Admit( - ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo}) - if err != nil { - return AdmissionHandle{}, err - } - admissionEnabled = storeWorkHandle.AdmissionEnabled() - if admissionEnabled { - defer func() { - if retErr != nil { - // No bytes were written. - _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{}) - } - }() - ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle - } - } - } - if admissionEnabled { - if ba.IsSingleExportRequest() { - // Backups generate batches with single export requests, which we - // admit through the elastic CPU work queue. We grant this - // CPU-intensive work a set amount of CPU time and expect it to - // terminate (cooperatively) once it exceeds its grant. The amount - // disbursed is 100ms, which we've experimentally found to be long - // enough to do enough useful work per-request while not causing too - // much in the way of scheduling delays on individual cores. Within - // admission control we have machinery that observes scheduling - // latencies periodically and reduces the total amount of CPU time - // handed out through this mechanism, as a way to provide latency - // isolation to non-elastic ("latency sensitive") work running on - // the same machine. - elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit( - ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, - ) - if err != nil { - return AdmissionHandle{}, err - } - ah.ElasticCPUWorkHandle = elasticWorkHandle - defer func() { - if retErr != nil { - // No elastic work was done. - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) - } - }() - } else { - callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo) - if err != nil { - return AdmissionHandle{}, err - } - ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ - } - } - return ah, nil -} - -// AdmittedKVWorkDone implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) AdmittedKVWorkDone( - ah AdmissionHandle, writeBytes *StoreWriteBytes, -) { - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) - if ah.callAdmittedWorkDoneOnKVAdmissionQ { - n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) - } - if ah.storeAdmissionQ != nil { - var doneInfo admission.StoreWorkDoneInfo - if writeBytes != nil { - doneInfo = admission.StoreWorkDoneInfo(*writeBytes) - } - err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo) - if err != nil { - // This shouldn't be happening. - if buildutil.CrdbTestBuild { - log.Fatalf(context.Background(), "%s", errors.WithAssertionFailure(err)) - } - if n.every.ShouldLog() { - log.Errorf(context.Background(), "%s", err) - } - } - } -} - -// SetTenantWeightProvider implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) SetTenantWeightProvider( - provider TenantWeightProvider, stopper *stop.Stopper, -) { - // TODO(irfansharif): Use a stopper here instead. - go func() { - const weightCalculationPeriod = 10 * time.Minute - ticker := time.NewTicker(weightCalculationPeriod) - // Used for short-circuiting the weights calculation if all weights are - // disabled. - allWeightsDisabled := false - for { - select { - case <-ticker.C: - kvDisabled := !admission.KVTenantWeightsEnabled.Get(&n.settings.SV) - kvStoresDisabled := !admission.KVStoresTenantWeightsEnabled.Get(&n.settings.SV) - if allWeightsDisabled && kvDisabled && kvStoresDisabled { - // Have already transitioned to disabled, so noop. - continue - } - weights := provider.GetTenantWeights() - if kvDisabled { - weights.Node = nil - } - n.kvAdmissionQ.SetTenantWeights(weights.Node) - n.elasticCPUWorkQueue.SetTenantWeights(weights.Node) - - for _, storeWeights := range weights.Stores { - q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID)) - if q != nil { - if kvStoresDisabled { - storeWeights.Weights = nil - } - q.SetTenantWeights(storeWeights.Weights) - } - } - allWeightsDisabled = kvDisabled && kvStoresDisabled - case <-stopper.ShouldQuiesce(): - ticker.Stop() - return - } - } - }() -} - -// SnapshotIngested implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) SnapshotIngested( - storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats, -) { - storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) - if storeAdmissionQ == nil { - return - } - storeAdmissionQ.StatsToIgnore(ingestStats) -} - -// FollowerStoreWriteBytes implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes( - storeID roachpb.StoreID, followerWriteBytes followerStoreWriteBytes, -) { - if followerWriteBytes.WriteBytes == 0 && followerWriteBytes.IngestedBytes == 0 { - return - } - storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) - if storeAdmissionQ == nil { - return - } - storeAdmissionQ.BypassedWorkDone( - followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo) -} - -// ProvisionedBandwidthForAdmissionControl set a value of the provisioned -// bandwidth for each store in the cluster. -var ProvisionedBandwidthForAdmissionControl = settings.RegisterByteSizeSetting( - settings.SystemOnly, "kv.store.admission.provisioned_bandwidth", - "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ - "for each store. It can be over-ridden on a per-store basis using the --store flag", - 0).WithPublic() diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index 44c97c591844..cbb4b499552e 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" @@ -45,7 +46,7 @@ import ( func (s *Store) Send( ctx context.Context, ba roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - var writeBytes *StoreWriteBytes + var writeBytes *kvadmission.StoreWriteBytes br, writeBytes, pErr = s.SendWithWriteBytes(ctx, ba) writeBytes.Release() return br, pErr @@ -55,7 +56,7 @@ func (s *Store) Send( // *StoreWriteBytes return value. func (s *Store) SendWithWriteBytes( ctx context.Context, ba roachpb.BatchRequest, -) (br *roachpb.BatchResponse, writeBytes *StoreWriteBytes, pErr *roachpb.Error) { +) (br *roachpb.BatchResponse, writeBytes *kvadmission.StoreWriteBytes, pErr *roachpb.Error) { // Attach any log tags from the store to the context (which normally // comes from gRPC). ctx = s.AnnotateCtx(ctx) diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 60cdd1b6d8fc..b18c2c24468a 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -14,7 +14,6 @@ import ( "context" "fmt" math "math" - "sync" "unsafe" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -22,9 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -187,34 +186,11 @@ func (ls *Stores) Send( return br, pErr } -// StoreWriteBytes aliases admission.StoreWorkDoneInfo, since the notion of -// "work is done" is specific to admission control and doesn't need to leak -// everywhere. -type StoreWriteBytes admission.StoreWorkDoneInfo - -var storeWriteBytesPool = sync.Pool{ - New: func() interface{} { return &StoreWriteBytes{} }, -} - -func newStoreWriteBytes() *StoreWriteBytes { - wb := storeWriteBytesPool.Get().(*StoreWriteBytes) - *wb = StoreWriteBytes{} - return wb -} - -// Release returns the *StoreWriteBytes to the pool. -func (wb *StoreWriteBytes) Release() { - if wb == nil { - return - } - storeWriteBytesPool.Put(wb) -} - // SendWithWriteBytes is the implementation of Send with an additional // *StoreWriteBytes return value. func (ls *Stores) SendWithWriteBytes( ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *StoreWriteBytes, *roachpb.Error) { +) (*roachpb.BatchResponse, *kvadmission.StoreWriteBytes, *roachpb.Error) { if err := ba.ValidateForEvaluation(); err != nil { log.Fatalf(ctx, "invalid batch (%s): %s", ba, err) } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 542f9c411788..a578b6b50cb2 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -102,6 +102,7 @@ go_library( "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", + "//pkg/kv/kvserver/kvadmission", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", diff --git a/pkg/server/node.go b/pkg/server/node.go index def0e0a53e1d..51e505299c58 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -386,7 +387,7 @@ func NewNode( spanConfigAccessor: spanConfigAccessor, testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent, } - n.storeCfg.KVAdmissionController = kvserver.MakeKVAdmissionController( + n.storeCfg.KVAdmissionController = kvadmission.MakeController( kvAdmissionQ, elasticCPUGrantCoord.ElasticCPUWorkQueue, storeGrantCoords, cfg.Settings, ) n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener @@ -846,7 +847,7 @@ func (n *Node) registerEnginesForDiskStatsMap( // GetPebbleMetrics implements admission.PebbleMetricsProvider. func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { - clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get( + clusterProvisionedBandwidth := kvadmission.ProvisionedBandwidth.Get( &n.storeCfg.Settings.SV) storeIDToDiskStats, err := n.diskStatsMap.tryPopulateAdmissionDiskStats( context.Background(), clusterProvisionedBandwidth, status.GetDiskCounters) @@ -874,13 +875,13 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { } // GetTenantWeights implements kvserver.TenantWeightProvider. -func (n *Node) GetTenantWeights() kvserver.TenantWeights { - weights := kvserver.TenantWeights{ +func (n *Node) GetTenantWeights() kvadmission.TenantWeights { + weights := kvadmission.TenantWeights{ Node: make(map[uint64]uint32), } _ = n.stores.VisitStores(func(store *kvserver.Store) error { sw := make(map[uint64]uint32) - weights.Stores = append(weights.Stores, kvserver.TenantWeightsForStore{ + weights.Stores = append(weights.Stores, kvadmission.TenantWeightsForStore{ StoreID: store.StoreID(), Weights: sw, }) @@ -1082,7 +1083,7 @@ func (n *Node) batchInternal( ctx = admission.ContextWithElasticCPUWorkHandle(ctx, handle.ElasticCPUWorkHandle) } - var writeBytes *kvserver.StoreWriteBytes + var writeBytes *kvadmission.StoreWriteBytes defer func() { n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes) writeBytes.Release()