From 68beda64d894f0683858f7a0130222ec21d2c73b Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 13 Jul 2021 10:09:14 -0400 Subject: [PATCH] sql,server: add admission control for DistSQL responses The admission.WorkQueue is plumbed through via execinfra.ServerCfg to colrpc.Inbox and flowinfra.processProducerMessage, and the AdmissionInfo is initialized using FlowCtx.Txn when possible. The admission control point for KV=>SQL response processing is tweaked to move it after the memory accounting for the response bytes. Informs #65948 Release note (ops change): DistSQL response admission control can be enabled using admission.sql_sql_response.enabled. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/server/server.go | 17 +++++----- pkg/server/server_sql.go | 11 +++++-- pkg/sql/colflow/BUILD.bazel | 2 ++ pkg/sql/colflow/colrpc/BUILD.bazel | 1 + pkg/sql/colflow/colrpc/inbox.go | 31 +++++++++++++++++++ pkg/sql/colflow/explain_vec.go | 2 ++ pkg/sql/colflow/vectorized_flow.go | 31 ++++++++++++++++--- pkg/sql/colflow/vectorized_flow_test.go | 11 ++++--- pkg/sql/execinfra/BUILD.bazel | 1 + pkg/sql/execinfra/server_config.go | 5 +++ pkg/sql/flowinfra/BUILD.bazel | 1 + pkg/sql/flowinfra/flow.go | 25 +++++++++++++++ pkg/sql/flowinfra/inbound.go | 15 +++++++-- pkg/sql/row/kv_batch_fetcher.go | 21 +++++++------ pkg/util/admission/work_queue.go | 13 ++++++-- 17 files changed, 155 insertions(+), 34 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 2caddc91426c..f838bbf4f72d 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -1,6 +1,7 @@ Setting Type Default Description admission.kv.enabled boolean false when true, work performed by the KV layer is subject to admission control admission.sql_kv_response.enabled boolean false when true, work performed by the SQL layer when receiving a KV response is subject to admission control +admission.sql_sql_response.enabled boolean false when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage cloudstorage.timeout duration 10m0s the timeout for import/export storage operations diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e9325ba76cab..b349ffea1857 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -3,6 +3,7 @@ admission.kv.enabledbooleanfalsewhen true, work performed by the KV layer is subject to admission control admission.sql_kv_response.enabledbooleanfalsewhen true, work performed by the SQL layer when receiving a KV response is subject to admission control +admission.sql_sql_response.enabledbooleanfalsewhen true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control bulkio.stream_ingestion.minimum_flush_intervalduration5sthe minimum timestamp between flushes; flushes may still occur if internal buffers fill up cloudstorage.http.custom_castringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage cloudstorage.timeoutduration10m0sthe timeout for import/export storage operations diff --git a/pkg/server/server.go b/pkg/server/server.go index 76999b55118c..7afec717fada 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -706,14 +706,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { sqlServer, err := newSQLServer(ctx, sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ - nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus), - nodeLiveness: optionalnodeliveness.MakeContainer(nodeLiveness), - gossip: gossip.MakeOptionalGossip(g), - grpcServer: grpcServer.Server, - nodeIDContainer: idContainer, - externalStorage: externalStorage, - externalStorageFromURI: externalStorageFromURI, - isMeta1Leaseholder: node.stores.IsMeta1Leaseholder, + nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus), + nodeLiveness: optionalnodeliveness.MakeContainer(nodeLiveness), + gossip: gossip.MakeOptionalGossip(g), + grpcServer: grpcServer.Server, + nodeIDContainer: idContainer, + externalStorage: externalStorage, + externalStorageFromURI: externalStorageFromURI, + isMeta1Leaseholder: node.stores.IsMeta1Leaseholder, + sqlSQLResponseAdmissionQ: gcoord.GetWorkQueue(admission.SQLSQLResponseWork), }, SQLConfig: &cfg.SQLConfig, BaseConfig: &cfg.BaseConfig, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index e0951d445f82..a6377811b833 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -74,6 +74,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sqlmigrations" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -164,6 +165,9 @@ type sqlServerOptionalKVArgs struct { // Used by backup/restore. externalStorage cloud.ExternalStorageFactory externalStorageFromURI cloud.ExternalStorageFromURIFactory + + // The admission queue to use for SQLSQLResponseWork. + sqlSQLResponseAdmissionQ *admission.WorkQueue } // sqlServerOptionalTenantArgs are the arguments supplied to newSQLServer which @@ -469,9 +473,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ExternalStorage: cfg.externalStorage, ExternalStorageFromURI: cfg.externalStorageFromURI, - RangeCache: cfg.distSender.RangeDescriptorCache(), - HydratedTables: hydratedTablesCache, - VirtualSchemas: virtualSchemas, + RangeCache: cfg.distSender.RangeDescriptorCache(), + HydratedTables: hydratedTablesCache, + VirtualSchemas: virtualSchemas, + SQLSQLResponseAdmissionQ: cfg.sqlSQLResponseAdmissionQ, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 288fb1cea21f..c505cdc6c70d 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -37,6 +37,7 @@ go_library( "//pkg/sql/sessiondatapb", "//pkg/sql/types", "//pkg/util", + "//pkg/util/admission", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/mon", @@ -108,6 +109,7 @@ go_test( "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/admission", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/leaktest", diff --git a/pkg/sql/colflow/colrpc/BUILD.bazel b/pkg/sql/colflow/colrpc/BUILD.bazel index 04836480eb49..b57d3b63a15b 100644 --- a/pkg/sql/colflow/colrpc/BUILD.bazel +++ b/pkg/sql/colflow/colrpc/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/types", + "//pkg/util/admission", "//pkg/util/cancelchecker", "//pkg/util/log", "//pkg/util/log/logcrash", diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index 3e6fce1b08e5..89fb7571475e 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" @@ -98,6 +99,9 @@ type Inbox struct { // only the Next/DrainMeta goroutine may access it. stream flowStreamServer + admissionQ *admission.WorkQueue + admissionInfo admission.WorkInfo + // statsAtomics are the execution statistics that need to be atomically // accessed. This is necessary since Get*() methods can be called from // different goroutine than Next(). @@ -151,6 +155,24 @@ func NewInbox( return i, nil } +// NewInboxWithAdmissionControl creates a new Inbox that does admission +// control on responses received from DistSQL. +func NewInboxWithAdmissionControl( + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + admissionQ *admission.WorkQueue, + admissionInfo admission.WorkInfo, +) (*Inbox, error) { + i, err := NewInbox(allocator, typs, streamID) + if err != nil { + return nil, err + } + i.admissionQ = admissionQ + i.admissionInfo = admissionInfo + return i, err +} + // close closes the inbox, ensuring that any call to RunWithStream will return // immediately. close is idempotent. // NOTE: it is very important to close the Inbox only when execution terminates @@ -333,6 +355,15 @@ func (i *Inbox) Next() coldata.Batch { // Update the allocator since we're holding onto the serialized bytes // for now. i.allocator.AdjustMemoryUsage(numSerializedBytes) + // Do admission control after memory accounting for the serialized bytes + // and before deserialization. + if i.admissionQ != nil { + if _, err := i.admissionQ.Admit(i.Ctx, i.admissionInfo); err != nil { + // err includes the case of context cancellation while waiting for + // admission. + colexecerror.ExpectedError(err) + } + } i.scratch.data = i.scratch.data[:0] batchLength, err := i.serializer.Deserialize(&i.scratch.data, m.Data.RawBytes) // Eagerly throw away the RawBytes memory. diff --git a/pkg/sql/colflow/explain_vec.go b/pkg/sql/colflow/explain_vec.go index a91d845a1c13..3f27bf14c592 100644 --- a/pkg/sql/colflow/explain_vec.go +++ b/pkg/sql/colflow/explain_vec.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/treeprinter" "github.com/cockroachdb/errors" @@ -57,6 +58,7 @@ func convertToVecTree( newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false, nil, &execinfra.RowChannel{}, &fakeBatchReceiver{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, flowCtx.Cfg.VecFDSemaphore, flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn), + admission.WorkInfo{}, ) // We create an unlimited memory account because we're interested whether the // flow is supported via the vectorized engine in general (without paying diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index b5843f8726bf..ceb752a4e8c3 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -208,6 +209,7 @@ func (f *vectorizedFlow) Setup( diskQueueCfg, f.countingSemaphore, flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn), + f.FlowBase.GetAdmissionInfo(), ) if f.testingKnobs.onSetupFlow != nil { f.testingKnobs.onSetupFlow(f.creator) @@ -449,6 +451,11 @@ type flowCreatorHelper interface { getCancelFlowFn() context.CancelFunc } +type admissionOptions struct { + admissionQ *admission.WorkQueue + admissionInfo admission.WorkInfo +} + // remoteComponentCreator is an interface that abstracts the constructors for // several components in a remote flow. Mostly for testing purposes. type remoteComponentCreator interface { @@ -460,7 +467,8 @@ type remoteComponentCreator interface { metadataSources []colexecop.MetadataSource, toClose []colexecop.Closer, ) (*colrpc.Outbox, error) - newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error) + newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + admissionOpts admissionOptions) (*colrpc.Inbox, error) } type vectorizedRemoteComponentCreator struct{} @@ -477,9 +485,13 @@ func (vectorizedRemoteComponentCreator) newOutbox( } func (vectorizedRemoteComponentCreator) newInbox( - allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, + typs []*types.T, + streamID execinfrapb.StreamID, + admissionOpts admissionOptions, ) (*colrpc.Inbox, error) { - return colrpc.NewInbox(allocator, typs, streamID) + return colrpc.NewInboxWithAdmissionControl( + allocator, typs, streamID, admissionOpts.admissionQ, admissionOpts.admissionInfo) } // vectorizedFlowCreator performs all the setup of vectorized flows. Depending @@ -508,6 +520,7 @@ type vectorizedFlowCreator struct { flowID execinfrapb.FlowID exprHelper *colexecargs.ExprHelper typeResolver descs.DistSQLTypeResolver + admissionInfo admission.WorkInfo // numOutboxes counts how many colrpc.Outbox'es have been set up on this // node. It must be accessed atomically. @@ -573,6 +586,7 @@ func newVectorizedFlowCreator( diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, typeResolver descs.DistSQLTypeResolver, + admissionInfo admission.WorkInfo, ) *vectorizedFlowCreator { creator := vectorizedFlowCreatorPool.Get().(*vectorizedFlowCreator) *creator = vectorizedFlowCreator{ @@ -589,6 +603,7 @@ func newVectorizedFlowCreator( flowID: flowID, exprHelper: creator.exprHelper, typeResolver: typeResolver, + admissionInfo: admissionInfo, procIdxQueue: creator.procIdxQueue, opChains: creator.opChains, monitors: creator.monitors, @@ -881,8 +896,14 @@ func (s *vectorizedFlowCreator) setupInput( latency = 0 log.VEventf(ctx, 1, "an error occurred during vectorized planning while getting latency: %v", err) } - - inbox, err := s.remoteComponentCreator.newInbox(colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), input.ColumnTypes, inputStream.StreamID) + inbox, err := s.remoteComponentCreator.newInbox( + colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), + input.ColumnTypes, + inputStream.StreamID, + admissionOptions{ + admissionQ: flowCtx.Cfg.SQLSQLResponseAdmissionQ, + admissionInfo: s.admissionInfo, + }) if err != nil { return colexecargs.OpWithMetaInfo{}, err diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 83064f6714e3..0a90fb71a639 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/require" ) @@ -52,7 +53,7 @@ func (c callbackRemoteComponentCreator) newOutbox( } func (c callbackRemoteComponentCreator) newInbox( - allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, + allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, _ admissionOptions, ) (*colrpc.Inbox, error) { return c.newInboxFn(allocator, typs, streamID) } @@ -218,15 +219,17 @@ func TestDrainOnlyInputDAG(t *testing.T) { ctx := context.Background() defer evalCtx.Stop(ctx) f := &flowinfra.FlowBase{ - FlowCtx: execinfra.FlowCtx{EvalCtx: &evalCtx, - NodeID: base.TestingIDContainer, + FlowCtx: execinfra.FlowCtx{ + Cfg: &execinfra.ServerConfig{}, + EvalCtx: &evalCtx, + NodeID: base.TestingIDContainer, }, } var wg sync.WaitGroup vfc := newVectorizedFlowCreator( &vectorizedFlowCreatorHelper{f: f}, componentCreator, false, false, &wg, &execinfra.RowChannel{}, nil /* batchSyncFlowConsumer */, nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, - nil /* fdSemaphore */, descs.DistSQLTypeResolver{}, + nil /* fdSemaphore */, descs.DistSQLTypeResolver{}, admission.WorkInfo{}, ) _, _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, nil /* localProcessors */, flowinfra.FuseNormally) diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index 98863ad61eca..8af3e57b6b49 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -52,6 +52,7 @@ go_library( "//pkg/storage/cloud", "//pkg/storage/fs", "//pkg/util", + "//pkg/util/admission", "//pkg/util/log", "//pkg/util/log/logcrash", "//pkg/util/metric", diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 41798c0c914f..d6bcf1657911 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/storage/fs" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -151,6 +152,10 @@ type ServerConfig struct { // VirtualSchemas hold the virtual table schemas. VirtualSchemas catalog.VirtualSchemas + + // SQLSQLResponseAdmissionQ is the admission queue to use for + // SQLSQLResponseWork. + SQLSQLResponseAdmissionQ *admission.WorkQueue } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/sql/flowinfra/BUILD.bazel b/pkg/sql/flowinfra/BUILD.bazel index d400bc690b86..01e7ca774cc8 100644 --- a/pkg/sql/flowinfra/BUILD.bazel +++ b/pkg/sql/flowinfra/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", "//pkg/sql/types", + "//pkg/util/admission", "//pkg/util/cancelchecker", "//pkg/util/contextutil", "//pkg/util/log", diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index b424670b21b1..fd357ac980d6 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -15,12 +15,15 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/optional" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -181,6 +184,8 @@ type FlowBase struct { // spec is the request that produced this flow. Only used for debugging. spec *execinfrapb.FlowSpec + + admissionInfo admission.WorkInfo } // Setup is part of the Flow interface. @@ -223,12 +228,26 @@ func NewFlowBase( batchSyncFlowConsumer execinfra.BatchReceiver, localProcessors []execinfra.LocalProcessor, ) *FlowBase { + // We are either in a single tenant cluster, or a SQL node in a multi-tenant + // cluster, where the SQL node is single tenant. The tenant below is used + // within SQL (not KV), so using an arbitrary tenant is ok -- we choose to + // use SystemTenantID since it is already defined. + admissionInfo := admission.WorkInfo{TenantID: roachpb.SystemTenantID} + if flowCtx.Txn == nil { + admissionInfo.Priority = admission.NormalPri + admissionInfo.CreateTime = timeutil.Now().UnixNano() + } else { + h := flowCtx.Txn.AdmissionHeader() + admissionInfo.Priority = admission.WorkPriority(h.Priority) + admissionInfo.CreateTime = h.CreateTime + } base := &FlowBase{ FlowCtx: flowCtx, flowRegistry: flowReg, rowSyncFlowConsumer: rowSyncFlowConsumer, batchSyncFlowConsumer: batchSyncFlowConsumer, localProcessors: localProcessors, + admissionInfo: admissionInfo, } base.status = FlowNotStarted return base @@ -305,6 +324,12 @@ func (f *FlowBase) GetLocalProcessors() []execinfra.LocalProcessor { return f.localProcessors } +// GetAdmissionInfo returns the information to use for admission control on +// responses received from a remote flow. +func (f *FlowBase) GetAdmissionInfo() admission.WorkInfo { + return f.admissionInfo +} + // StartInternal starts the flow. All processors are started, each in their own // goroutine. The caller must forward any returned error to rowSyncFlowConsumer if // set. diff --git a/pkg/sql/flowinfra/inbound.go b/pkg/sql/flowinfra/inbound.go index 199eabc52ad7..b752b25ba96d 100644 --- a/pkg/sql/flowinfra/inbound.go +++ b/pkg/sql/flowinfra/inbound.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -109,7 +110,7 @@ func processInboundStreamHelper( if firstMsg != nil { if res := processProducerMessage( - ctx, stream, dst, &sd, &draining, firstMsg, + ctx, f, stream, dst, &sd, &draining, firstMsg, ); res.err != nil || res.consumerClosed { sendErrToConsumer(res.err) return res.err @@ -148,7 +149,7 @@ func processInboundStreamHelper( } if res := processProducerMessage( - ctx, stream, dst, &sd, &draining, msg, + ctx, f, stream, dst, &sd, &draining, msg, ); res.err != nil || res.consumerClosed { sendErrToConsumer(res.err) errChan <- res.err @@ -184,6 +185,7 @@ func sendDrainSignalToStreamProducer( // closed), the caller must return the error to the producer. func processProducerMessage( ctx context.Context, + flowBase *FlowBase, stream execinfrapb.DistSQL_FlowStreamServer, dst execinfra.RowReceiver, sd *StreamDecoder, @@ -201,6 +203,15 @@ func processProducerMessage( consumerClosed: false, } } + var admissionQ *admission.WorkQueue + if flowBase.Cfg != nil { + admissionQ = flowBase.Cfg.SQLSQLResponseAdmissionQ + } + if admissionQ != nil { + if _, err := admissionQ.Admit(ctx, flowBase.admissionInfo); err != nil { + return processMessageResult{err: err, consumerClosed: false} + } + } var types []*types.T for { row, meta, err := sd.GetRow(nil /* rowBuf */) diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index d7ebbffec952..2f8b84511fb2 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -444,16 +444,6 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { if err != nil { return err } - if f.responseAdmissionQ != nil { - responseAdmission := admission.WorkInfo{ - TenantID: roachpb.SystemTenantID, - Priority: admission.WorkPriority(f.requestAdmissionHeader.Priority), - CreateTime: f.requestAdmissionHeader.CreateTime, - } - if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { - return err - } - } if br != nil { f.responses = br.Responses } else { @@ -483,6 +473,17 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { return err } } + // Do admission control after we've accounted for the response bytes. + if br != nil && f.responseAdmissionQ != nil { + responseAdmission := admission.WorkInfo{ + TenantID: roachpb.SystemTenantID, + Priority: admission.WorkPriority(f.requestAdmissionHeader.Priority), + CreateTime: f.requestAdmissionHeader.CreateTime, + } + if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { + return err + } + } // Set end to true until disproved. f.fetchEnd = true diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 3ec71106cdf8..3d41bac41224 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -45,9 +45,18 @@ var SQLKVResponseAdmissionControlEnabled = settings.RegisterBoolSetting( "admission control", false).WithPublic() +// SQLSQLResponseAdmissionControlEnabled controls whether response processing +// in SQL, for DistSQL requests, is enabled. +var SQLSQLResponseAdmissionControlEnabled = settings.RegisterBoolSetting( + "admission.sql_sql_response.enabled", + "when true, work performed by the SQL layer when receiving a DistSQL response is subject "+ + "to admission control", + false).WithPublic() + var admissionControlEnabledSettings = [numWorkKinds]*settings.BoolSetting{ - KVWork: KVAdmissionControlEnabled, - SQLKVResponseWork: SQLKVResponseAdmissionControlEnabled, + KVWork: KVAdmissionControlEnabled, + SQLKVResponseWork: SQLKVResponseAdmissionControlEnabled, + SQLSQLResponseWork: SQLSQLResponseAdmissionControlEnabled, } // WorkPriority represents the priority of work. In an WorkQueue, it is only