From 80a7ed97ae0ece6c0109ab0773199f757b4f0d00 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 10 Nov 2022 11:19:11 -0500 Subject: [PATCH 1/3] admission: make Pacer type available in SQL server config Currently, the Pacer type is only used within KV, but will be used by SQL in future changes. For example, code for encoding/decoding CDC events resides in distSQL and is CPU intensive, so there is a plan to integrate admission control to it in (https://github.com/cockroachdb/cockroach/issues/90089). This change makes the Pacer type available to the SQL layer via the `execinfra.ServerConfig`. Because the Pacer was previously only used by KV, it lived in the `kvadmission` package. Since this change makes it available outside of KV, it is moved to the `admission` package. Furthermore, this change adds a new method, `ElasticCPUGrantCoordinator.NewPacer`, to instantiate new Pacer structs. Since the `ElasticCPUGrantCoordinator` implements several features not relevant to SQL, this change passes the coordinator to the SQL server config as the interface `PacerMaker`, which makes only the `NewPacer` method accessible. Currently tenant servers do not create grant coordinators for admission control. This change retains that behavior, except it passes a `nil ElasticCPUGrandCoordinator` which creates `nil`/noop Pacers. Adding these coordinators to tenant servers is a change outside the scope of this commit and is left as a `TODO`. Release note: None --- pkg/kv/kvserver/kvadmission/kvadmission.go | 88 ++++++---------------- pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 +- pkg/kv/kvserver/rangefeed/catchup_scan.go | 6 +- pkg/kv/kvserver/replica_rangefeed.go | 6 +- pkg/server/node.go | 2 +- pkg/server/server.go | 1 + pkg/server/server_sql.go | 5 ++ pkg/server/tenant.go | 5 ++ pkg/sql/execinfra/server_config.go | 4 + pkg/util/admission/BUILD.bazel | 1 + pkg/util/admission/admission.go | 7 ++ pkg/util/admission/grant_coordinator.go | 12 +++ pkg/util/admission/pacer.go | 60 +++++++++++++++ 13 files changed, 124 insertions(+), 75 deletions(-) create mode 100644 pkg/util/admission/pacer.go diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index b682b584648e..c9d4a98b30cb 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -96,7 +96,7 @@ type Controller interface { // If enabled, it returns a non-nil Pacer that's to be used within rangefeed // catchup scans (typically CPU-intensive and affecting scheduling // latencies). - AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer + AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *admission.Pacer // SetTenantWeightProvider is used to set the provider that will be // periodically polled for weights. The stopper should be used to terminate // the periodic polling. @@ -135,11 +135,11 @@ type TenantWeightsForStore struct { type controllerImpl struct { // Admission control queues and coordinators. All three should be nil or // non-nil. - kvAdmissionQ *admission.WorkQueue - storeGrantCoords *admission.StoreGrantCoordinators - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue - settings *cluster.Settings - every log.EveryN + kvAdmissionQ *admission.WorkQueue + storeGrantCoords *admission.StoreGrantCoordinators + elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator + settings *cluster.Settings + every log.EveryN } var _ Controller = &controllerImpl{} @@ -162,16 +162,16 @@ type Handle struct { // nil or non-nil. func MakeController( kvAdmissionQ *admission.WorkQueue, - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue, + elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator, storeGrantCoords *admission.StoreGrantCoordinators, settings *cluster.Settings, ) Controller { return &controllerImpl{ - kvAdmissionQ: kvAdmissionQ, - storeGrantCoords: storeGrantCoords, - elasticCPUWorkQueue: elasticCPUWorkQueue, - settings: settings, - every: log.Every(10 * time.Second), + kvAdmissionQ: kvAdmissionQ, + storeGrantCoords: storeGrantCoords, + elasticCPUGrantCoordinator: elasticCPUGrantCoordinator, + settings: settings, + every: log.Every(10 * time.Second), } } @@ -257,7 +257,7 @@ func (n *controllerImpl) AdmitKVWork( // handed out through this mechanism, as a way to provide latency // isolation to non-elastic ("latency sensitive") work running on // the same machine. - elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit( + elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit( ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, ) if err != nil { @@ -267,7 +267,7 @@ func (n *controllerImpl) AdmitKVWork( defer func() { if retErr != nil { // No elastic work was done. - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) } }() } else { @@ -283,7 +283,7 @@ func (n *controllerImpl) AdmitKVWork( // AdmittedKVWorkDone implements the Controller interface. func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) if ah.callAdmittedWorkDoneOnKVAdmissionQ { n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) } @@ -308,21 +308,19 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt // AdmitRangefeedRequest implements the Controller interface. func (n *controllerImpl) AdmitRangefeedRequest( tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest, -) *Pacer { +) *admission.Pacer { if !rangefeedCatchupScanElasticControlEnabled.Get(&n.settings.SV) { return nil } - return &Pacer{ - unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), - wi: admission.WorkInfo{ + return n.elasticCPUGrantCoordinator.NewPacer( + elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), + admission.WorkInfo{ TenantID: tenantID, Priority: admissionpb.WorkPriority(request.AdmissionHeader.Priority), CreateTime: request.AdmissionHeader.CreateTime, BypassAdmission: false, - }, - wq: n.elasticCPUWorkQueue, - } + }) } // SetTenantWeightProvider implements the Controller interface. @@ -350,7 +348,7 @@ func (n *controllerImpl) SetTenantWeightProvider( weights.Node = nil } n.kvAdmissionQ.SetTenantWeights(weights.Node) - n.elasticCPUWorkQueue.SetTenantWeights(weights.Node) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.SetTenantWeights(weights.Node) for _, storeWeights := range weights.Stores { q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID)) @@ -441,47 +439,3 @@ func (wb *StoreWriteBytes) Release() { } storeWriteBytesPool.Put(wb) } - -// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work. -// Callers are expected to invoke Pace() every loop iteration and Close() once -// done. Internally this type integrates with elastic CPU work queue, acquiring -// tokens for the CPU work being done, and blocking if tokens are unavailable. -// This allows for a form of cooperative scheduling with elastic CPU granters. -type Pacer struct { - unit time.Duration - wi admission.WorkInfo - wq *admission.ElasticCPUWorkQueue - - cur *admission.ElasticCPUWorkHandle -} - -// Pace is part of the Pacer interface. -func (p *Pacer) Pace(ctx context.Context) error { - if p == nil { - return nil - } - - if overLimit, _ := p.cur.OverLimit(); overLimit { - p.wq.AdmittedWorkDone(p.cur) - p.cur = nil - } - - if p.cur == nil { - handle, err := p.wq.Admit(ctx, p.unit, p.wi) - if err != nil { - return err - } - p.cur = handle - } - return nil -} - -// Close is part of the Pacer interface. -func (p *Pacer) Close() { - if p == nil || p.cur == nil { - return - } - - p.wq.AdmittedWorkDone(p.cur) - p.cur = nil -} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 674602f4682f..bac4f6ebbc90 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -17,11 +17,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/keys", - "//pkg/kv/kvserver/kvadmission", "//pkg/roachpb", "//pkg/settings", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/admission", "//pkg/util/bufalloc", "//pkg/util/envutil", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 06a2fd974cec..1726d672187d 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -15,10 +15,10 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -66,7 +66,7 @@ type CatchUpIterator struct { close func() span roachpb.Span startTime hlc.Timestamp // exclusive - pacer *kvadmission.Pacer + pacer *admission.Pacer } // NewCatchUpIterator returns a CatchUpIterator for the given Reader over the @@ -79,7 +79,7 @@ func NewCatchUpIterator( span roachpb.Span, startTime hlc.Timestamp, closer func(), - pacer *kvadmission.Pacer, + pacer *admission.Pacer, ) *CatchUpIterator { return &CatchUpIterator{ simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 9945ba0a548a..d31a687ac57e 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -21,13 +21,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -135,7 +135,7 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The surrounding store's ConcurrentRequestLimiter is used to limit // the number of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *kvadmission.Pacer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *admission.Pacer, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream, pacer) } @@ -144,7 +144,7 @@ func (r *Replica) rangeFeedWithRangeID( _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, - pacer *kvadmission.Pacer, + pacer *admission.Pacer, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/server/node.go b/pkg/server/node.go index dc687088da23..f26cef1e55b6 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -388,7 +388,7 @@ func NewNode( testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent, } n.storeCfg.KVAdmissionController = kvadmission.MakeController( - kvAdmissionQ, elasticCPUGrantCoord.ElasticCPUWorkQueue, storeGrantCoords, cfg.Settings, + kvAdmissionQ, elasticCPUGrantCoord, storeGrantCoords, cfg.Settings, ) n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener n.perReplicaServer = kvserver.MakeServer(&n.Descriptor, n.stores) diff --git a/pkg/server/server.go b/pkg/server/server.go index 06580d4a62d3..175d920a15eb 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -880,6 +880,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { monitorAndMetrics: sqlMonitorAndMetrics, settingsStorage: settingsWriter, eventsServer: eventsServer, + admissionPacerFactory: gcoords.Elastic, }) if err != nil { return nil, err diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index dadc6cd20b15..0f89569dd3c1 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -364,6 +364,10 @@ type sqlServerArgs struct { // externalStorageBuilder is the constructor for accesses to external // storage. externalStorageBuilder *externalStorageBuilder + + // admissionPacerFactory is used for elastic CPU control when performing + // CPU intensive operations, such as CDC event encoding/decoding. + admissionPacerFactory admission.PacerFactory } type monitorAndMetrics struct { @@ -742,6 +746,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ExternalIORecorder: cfg.costController, TenantCostController: cfg.costController, RangeStatsFetcher: rangeStatsFetcher, + AdmissionPacerFactory: cfg.admissionPacerFactory, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 9662061667d2..4ccd557e9c3c 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -55,6 +55,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -871,6 +872,9 @@ func makeTenantSQLServerArgs( eventsServer.TestingKnobs = knobs.(obs.EventServerTestingKnobs) } + // TODO(irfansharif): hook up NewGrantCoordinatorSQL. + var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil + return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil), @@ -920,6 +924,7 @@ func makeTenantSQLServerArgs( grpc: grpcServer, eventsServer: eventsServer, externalStorageBuilder: esb, + admissionPacerFactory: noopElasticCPUGrantCoord, }, nil } diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index bee68cc28777..a9b5cc63e771 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -197,6 +197,10 @@ type ServerConfig struct { // RangeStatsFetcher is used to fetch range stats for keys. RangeStatsFetcher eval.RangeStatsFetcher + + // AdmissionPacerFactory is used to integrate CPU-intensive work + // with elastic CPU control. + AdmissionPacerFactory admission.PacerFactory } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 4d30bc57e5ac..718eab88af70 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "granter.go", "io_load_listener.go", "kv_slot_adjuster.go", + "pacer.go", "scheduler_latency_listener.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index 7d862dfc8d44..b2b96766abcb 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -560,3 +560,10 @@ type storeRequestEstimates struct { // writeTokens is the tokens to request at admission time. Must be > 0. writeTokens int64 } + +// PacerFactory is used to construct a new admission.Pacer. +type PacerFactory interface { + NewPacer(unit time.Duration, wi WorkInfo) *Pacer +} + +var _ PacerFactory = &ElasticCPUGrantCoordinator{} diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 45ebe90aaff7..6cc1c7f1e785 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -1014,3 +1014,15 @@ func (e *ElasticCPUGrantCoordinator) close() { func (e *ElasticCPUGrantCoordinator) tryGrant() { e.elasticCPUGranter.tryGrant() } + +// NewPacer implements the PacerMaker interface. +func (e *ElasticCPUGrantCoordinator) NewPacer(unit time.Duration, wi WorkInfo) *Pacer { + if e == nil { + return nil + } + return &Pacer{ + unit: unit, + wi: wi, + wq: e.ElasticCPUWorkQueue, + } +} diff --git a/pkg/util/admission/pacer.go b/pkg/util/admission/pacer.go new file mode 100644 index 000000000000..ea5eabcd1685 --- /dev/null +++ b/pkg/util/admission/pacer.go @@ -0,0 +1,60 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "context" + "time" +) + +// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work. +// Callers are expected to invoke Pace() every loop iteration and Close() once +// done. Internally this type integrates with elastic CPU work queue, acquiring +// tokens for the CPU work being done, and blocking if tokens are unavailable. +// This allows for a form of cooperative scheduling with elastic CPU granters. +type Pacer struct { + unit time.Duration + wi WorkInfo + wq *ElasticCPUWorkQueue + + cur *ElasticCPUWorkHandle +} + +// Pace is part of the Pacer interface. +func (p *Pacer) Pace(ctx context.Context) error { + if p == nil { + return nil + } + + if overLimit, _ := p.cur.OverLimit(); overLimit { + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil + } + + if p.cur == nil { + handle, err := p.wq.Admit(ctx, p.unit, p.wi) + if err != nil { + return err + } + p.cur = handle + } + return nil +} + +// Close is part of the Pacer interface. +func (p *Pacer) Close() { + if p == nil || p.cur == nil { + return + } + + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil +} From de92474df12234493c1000b9d7c56ff0d00ee4b6 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 1 Nov 2022 14:49:59 -0400 Subject: [PATCH 2/3] cdc: add elastic CPU control to CDC event processing Previously, the CPU-bound work of CDC event processing (encoding / decoding rows) had the potential to consume a lot of CPU and disrupt foreground SQL traffic. This changes adds elastic CPU control to event processing so that it does not use excessive CPU and starve foreground traffic. This change also adds a new, non-public cluster setting, which controls enabling/disabling CPU control for CDC event processing and controlling the requested grant size measured in CPU time. Fixes: https://github.com/cockroachdb/cockroach/issues/90089 Release note: None --- .../settings/settings-for-tenants.txt | 4 +- docs/generated/settings/settings.html | 4 +- pkg/ccl/changefeedccl/BUILD.bazel | 2 + pkg/ccl/changefeedccl/bench_test.go | 2 +- .../changefeedccl/changefeedbase/settings.go | 28 +++++++- pkg/ccl/changefeedccl/event_processing.go | 68 ++++++++++++++++++- 6 files changed, 98 insertions(+), 10 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 9c6ecd19204f..a44ce5405b94 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -11,8 +11,8 @@ bulkio.backup.read_timeout duration 5m0s amount of time after which a read attem bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes -changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer -changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value +changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer +changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled changefeed.fast_gzip.enabled boolean true use fast gzip implementation changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3d68bda51a1d..f2cbb588ded7 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -17,8 +17,8 @@ bulkio.backup.read_with_priority_afterduration1m0samount of time since the read-as-of time above which a BACKUP should use priority when retrying reads bulkio.stream_ingestion.minimum_flush_intervalduration5sthe minimum timestamp between flushes; flushes may still occur if internal buffers fill up changefeed.balance_range_distribution.enablebooleanfalseif enabled, the ranges are balanced equally among all nodes -changefeed.event_consumer_worker_queue_sizeinteger16if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer -changefeed.event_consumer_workersinteger0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value +changefeed.event_consumer_worker_queue_sizeinteger16if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer +changefeed.event_consumer_workersinteger0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled changefeed.fast_gzip.enabledbooleantrueuse fast gzip implementation changefeed.node_throttle_configstringspecifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_afterduration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disables diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index b13bf8c0cf5d..5effd956679a 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -102,6 +102,8 @@ go_library( "//pkg/sql/sqlutil", "//pkg/sql/types", "//pkg/util", + "//pkg/util/admission", + "//pkg/util/admission/admissionpb", "//pkg/util/bitarray", "//pkg/util/bufalloc", "//pkg/util/cache", diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 1d4d3916b025..7e17fd45a32a 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -250,7 +250,7 @@ func createBenchmarkChangefeed( serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, nil, sf, initialHighWater, sink, encoder, makeChangefeedConfigFromJobDetails(details), - execinfrapb.Expression{}, TestingKnobs{}, nil) + execinfrapb.Expression{}, TestingKnobs{}, nil, nil) if err != nil { return nil, nil, err diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 22841234c7e1..738876efce61 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -242,7 +242,8 @@ var EventConsumerWorkers = settings.RegisterIntSetting( settings.TenantWritable, "changefeed.event_consumer_workers", "the number of workers to use when processing events: <0 disables, "+ - "0 assigns a reasonable default, >0 assigns the setting value", + "0 assigns a reasonable default, >0 assigns the setting value. for experimental/core "+ + "changefeeds and changefeeds using parquet format, this is disabled", 0, ).WithPublic() @@ -250,8 +251,31 @@ var EventConsumerWorkers = settings.RegisterIntSetting( var EventConsumerWorkerQueueSize = settings.RegisterIntSetting( settings.TenantWritable, "changefeed.event_consumer_worker_queue_size", - "if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events"+ + "if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events "+ "which a worker can buffer", int64(util.ConstantWithMetamorphicTestRange("changefeed.event_consumer_worker_queue_size", 16, 0, 16)), settings.NonNegativeInt, ).WithPublic() + +// EventConsumerPacerRequestSize specifies how often (measured in CPU time) +// that event consumer workers request CPU time from admission control. +// For example, every N milliseconds of CPU work, request N more +// milliseconds of CPU time. +var EventConsumerPacerRequestSize = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.cpu.per_event_consumer_worker_allocation", + "an event consumer worker will perform a blocking request for CPU time "+ + "before consuming events. after fully utilizing this CPU time, it will "+ + "request more", + 50*time.Millisecond, + settings.PositiveDuration, +) + +// EventConsumerElasticCPUControlEnabled determines whether changefeed event +// processing integrates with elastic CPU control. +var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.cpu.per_event_elastic_control.enabled", + "determines whether changefeed event processing integrates with elastic CPU control", + true, +) diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index de387e1f025c..1ca642fe8e59 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -13,15 +13,19 @@ import ( "hash" "hash/crc32" "runtime" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -32,6 +36,10 @@ import ( "github.com/cockroachdb/errors" ) +// pacerLogEvery is used for logging errors instead of returning terminal +// errors when pacer.Pace returns an error. +var pacerLogEvery log.EveryN = log.Every(100 * time.Millisecond) + // eventContext holds metadata pertaining to event. type eventContext struct { updated, mvcc hlc.Timestamp @@ -62,6 +70,17 @@ type kvEventToRowConsumer struct { topicDescriptorCache map[TopicIdentifier]TopicDescriptor topicNamer *TopicNamer + + // This pacer is used to incorporate event consumption to elastic CPU + // control. This helps ensure that event encoding/decoding does not throttle + // foreground SQL traffic. + // + // Note that for pacer to function correctly, + // kvEventToRowConsumer.ConsumeEvent must be called by the same goroutine in a + // tight loop. + // + // The pacer is closed by kvEventToRowConsumer.Close. + pacer *admission.Pacer } func newEventConsumer( @@ -85,6 +104,9 @@ func newEventConsumer( return nil, nil, err } + pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV) + enablePacer := changefeedbase.EventConsumerElasticCPUControlEnabled.Get(&cfg.Settings.SV) + makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) { var err error encoder, err := getEncoder(encodingOpts, feed.Targets) @@ -100,22 +122,44 @@ func newEventConsumer( } } + // Passing a nil Pacer is effectively a noop Pacer if + // CPU control is disabled. + var pacer *admission.Pacer = nil + if enablePacer { + tenantID, ok := roachpb.TenantFromContext(ctx) + if !ok { + tenantID = roachpb.SystemTenantID + } + pacer = cfg.AdmissionPacerFactory.NewPacer( + pacerRequestUnit, + admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.BulkNormalPri, + CreateTime: timeutil.Now().UnixNano(), + BypassAdmission: false, + }, + ) + } + return newKVEventToRowConsumer(ctx, cfg, evalCtx, frontier, cursor, s, - encoder, details, expr, knobs, topicNamer) + encoder, details, expr, knobs, topicNamer, pacer) } - // TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds numWorkers := changefeedbase.EventConsumerWorkers.Get(&cfg.Settings.SV) if numWorkers == 0 { // Pick a reasonable default. numWorkers = defaultNumWorkers() } + // The descriptions for event_consumer_worker settings should also be updated + // when these TODOs are completed. + // + // TODO (ganeshb) Add support for parallel encoding when using parquet. // We cannot have a separate encoder and sink for parquet format (see // parquet_sink_cloudstorage.go). Because of this the current nprox solution // does not work for parquet format. // - //TODO (ganeshb) Add support for parallel encoding + // TODO (jayshrivastava) enable parallel consumers for sinkless changefeeds. if numWorkers <= 1 || isSinkless || encodingOpts.Format == changefeedbase.OptFormatParquet { c, err := makeConsumer(sink, spanFrontier) if err != nil { @@ -174,6 +218,7 @@ func newKVEventToRowConsumer( expr execinfrapb.Expression, knobs TestingKnobs, topicNamer *TopicNamer, + pacer *admission.Pacer, ) (*kvEventToRowConsumer, error) { includeVirtual := details.Opts.IncludeVirtual() keyOnly := details.Opts.KeyOnly() @@ -215,6 +260,7 @@ func newKVEventToRowConsumer( evaluator: evaluator, safeExpr: safeExpr, encodingFormat: encodingOpts.Format, + pacer: pacer, }, nil } @@ -242,6 +288,15 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even return errors.AssertionFailedf("expected kv ev, got %v", ev.Type()) } + // Request CPU time to use for event consumption, block if this time is + // unavailable. If there is unused CPU time left from the last call to + // Pace, then use that time instead of blocking. + if err := c.pacer.Pace(ctx); err != nil { + if pacerLogEvery.ShouldLog() { + log.Errorf(ctx, "automatic pacing: %v", err) + } + } + schemaTimestamp := ev.KV().Value.Timestamp prevSchemaTimestamp := schemaTimestamp mvccTimestamp := ev.MVCCTimestamp() @@ -382,6 +437,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even // Close is a noop for the kvEventToRowConsumer because it // has no goroutines in flight. func (c *kvEventToRowConsumer) Close() error { + c.pacer.Close() return nil } @@ -527,6 +583,12 @@ func (c *parallelEventConsumer) startWorkers() error { func (c *parallelEventConsumer) workerLoop( ctx context.Context, consumer eventConsumer, id int64, ) error { + defer func() { + err := consumer.Close() + if err != nil { + log.Errorf(ctx, "closing consumer: %v", err) + } + }() for { select { case <-ctx.Done(): From 89f28d23c696753d78912fe71fda7aae73a19393 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 17 Nov 2022 16:52:07 -0500 Subject: [PATCH 3/3] roachtest: add initial scan only case to elastic cdc Previously, this roachtest would not test changefeeds running with `initial_scan_only`. This option tends to have a significant impact on foreground latency due to high CPU usage, thus it should be included in this test which measures CPU usage and foreground latency while changefeeds are running. Release note: None --- .../tests/admission_control_elastic_cdc.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go b/pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go index 57f805e245bb..764f9d9045da 100644 --- a/pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go +++ b/pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go @@ -114,21 +114,23 @@ func registerElasticControlForCDC(r registry.Registry) { time.Sleep(stopFeedsDuration) // buffer for cancellations to take effect/show up in metrics t.Status(fmt.Sprintf("during: round %d: creating %d changefeeds (<%s)", i, changefeeds, time.Minute)) + for j := 0; j < changefeeds; j++ { - stmtWithCursor := fmt.Sprintf(` + var createChangefeedStmt string + if i%2 == 0 { + createChangefeedStmt = fmt.Sprintf(` CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer INTO 'null://' WITH cursor = '-%ds' `, int64(float64(i+1)*padDuration.Seconds())) // scanning as far back as possible (~ when the workload started) - if _, err := db.ExecContext(ctx, stmtWithCursor); err != nil { + } else { + createChangefeedStmt = "CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer " + + "INTO 'null://' WITH initial_scan = 'only'" + } + + if _, err := db.ExecContext(ctx, createChangefeedStmt); err != nil { return err } } - - // TODO(irfansharif): Add a version of this test - // with initial_scan = 'only' to demonstrate the - // need+efficacy of using elastic CPU control in - // changefeed workers. That too has a severe effect - // on scheduling latencies. } return nil })