Skip to content

Commit

Permalink
Merge #67531
Browse files Browse the repository at this point in the history
67531: sql,server: add admission control for DistSQL responses r=sumeerbhola a=sumeerbhola

The admission.WorkQueue is plumbed through via execinfra.ServerCfg
to colrpc.Inbox and flowinfra.processProducerMessage, and the
AdmissionInfo is initialized using FlowCtx.Txn when possible.

The admission control point for KV=>SQL response processing
is tweaked to move it after the memory accounting for the
response bytes.

Informs #65948

Release note (ops change): DistSQL response admission control
can be enabled using admission.sql_sql_response.enabled.

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Jul 15, 2021
2 parents 0f63f21 + 68beda6 commit 41a58d0
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Setting Type Default Description
admission.kv.enabled boolean false when true, work performed by the KV layer is subject to admission control
admission.sql_kv_response.enabled boolean false when true, work performed by the SQL layer when receiving a KV response is subject to admission control
admission.sql_sql_response.enabled boolean false when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout duration 10m0s the timeout for import/export storage operations
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<tbody>
<tr><td><code>admission.kv.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the KV layer is subject to admission control</td></tr>
<tr><td><code>admission.sql_kv_response.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the SQL layer when receiving a KV response is subject to admission control</td></tr>
<tr><td><code>admission.sql_sql_response.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
<tr><td><code>cloudstorage.timeout</code></td><td>duration</td><td><code>10m0s</code></td><td>the timeout for import/export storage operations</td></tr>
Expand Down
17 changes: 9 additions & 8 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,14 +706,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

sqlServer, err := newSQLServer(ctx, sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus),
nodeLiveness: optionalnodeliveness.MakeContainer(nodeLiveness),
gossip: gossip.MakeOptionalGossip(g),
grpcServer: grpcServer.Server,
nodeIDContainer: idContainer,
externalStorage: externalStorage,
externalStorageFromURI: externalStorageFromURI,
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus),
nodeLiveness: optionalnodeliveness.MakeContainer(nodeLiveness),
gossip: gossip.MakeOptionalGossip(g),
grpcServer: grpcServer.Server,
nodeIDContainer: idContainer,
externalStorage: externalStorage,
externalStorageFromURI: externalStorageFromURI,
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
sqlSQLResponseAdmissionQ: gcoord.GetWorkQueue(admission.SQLSQLResponseWork),
},
SQLConfig: &cfg.SQLConfig,
BaseConfig: &cfg.BaseConfig,
Expand Down
11 changes: 8 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/startupmigrations"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -164,6 +165,9 @@ type sqlServerOptionalKVArgs struct {
// Used by backup/restore.
externalStorage cloud.ExternalStorageFactory
externalStorageFromURI cloud.ExternalStorageFromURIFactory

// The admission queue to use for SQLSQLResponseWork.
sqlSQLResponseAdmissionQ *admission.WorkQueue
}

// sqlServerOptionalTenantArgs are the arguments supplied to newSQLServer which
Expand Down Expand Up @@ -469,9 +473,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ExternalStorage: cfg.externalStorage,
ExternalStorageFromURI: cfg.externalStorageFromURI,

RangeCache: cfg.distSender.RangeDescriptorCache(),
HydratedTables: hydratedTablesCache,
VirtualSchemas: virtualSchemas,
RangeCache: cfg.distSender.RangeDescriptorCache(),
HydratedTables: hydratedTablesCache,
VirtualSchemas: virtualSchemas,
SQLSQLResponseAdmissionQ: cfg.sqlSQLResponseAdmissionQ,
}
cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist)
if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down Expand Up @@ -108,6 +109,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/admission",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/types",
"//pkg/util/admission",
"//pkg/util/cancelchecker",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -98,6 +99,9 @@ type Inbox struct {
// only the Next/DrainMeta goroutine may access it.
stream flowStreamServer

admissionQ *admission.WorkQueue
admissionInfo admission.WorkInfo

// statsAtomics are the execution statistics that need to be atomically
// accessed. This is necessary since Get*() methods can be called from
// different goroutine than Next().
Expand Down Expand Up @@ -151,6 +155,24 @@ func NewInbox(
return i, nil
}

// NewInboxWithAdmissionControl creates a new Inbox that does admission
// control on responses received from DistSQL.
func NewInboxWithAdmissionControl(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
admissionQ *admission.WorkQueue,
admissionInfo admission.WorkInfo,
) (*Inbox, error) {
i, err := NewInbox(allocator, typs, streamID)
if err != nil {
return nil, err
}
i.admissionQ = admissionQ
i.admissionInfo = admissionInfo
return i, err
}

// close closes the inbox, ensuring that any call to RunWithStream will return
// immediately. close is idempotent.
// NOTE: it is very important to close the Inbox only when execution terminates
Expand Down Expand Up @@ -333,6 +355,15 @@ func (i *Inbox) Next() coldata.Batch {
// Update the allocator since we're holding onto the serialized bytes
// for now.
i.allocator.AdjustMemoryUsage(numSerializedBytes)
// Do admission control after memory accounting for the serialized bytes
// and before deserialization.
if i.admissionQ != nil {
if _, err := i.admissionQ.Admit(i.Ctx, i.admissionInfo); err != nil {
// err includes the case of context cancellation while waiting for
// admission.
colexecerror.ExpectedError(err)
}
}
i.scratch.data = i.scratch.data[:0]
batchLength, err := i.serializer.Deserialize(&i.scratch.data, m.Data.RawBytes)
// Eagerly throw away the RawBytes memory.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colflow/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/treeprinter"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -57,6 +58,7 @@ func convertToVecTree(
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false,
nil, &execinfra.RowChannel{}, &fakeBatchReceiver{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
flowCtx.Cfg.VecFDSemaphore, flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn),
admission.WorkInfo{},
)
// We create an unlimited memory account because we're interested whether the
// flow is supported via the vectorized engine in general (without paying
Expand Down
31 changes: 26 additions & 5 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -208,6 +209,7 @@ func (f *vectorizedFlow) Setup(
diskQueueCfg,
f.countingSemaphore,
flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn),
f.FlowBase.GetAdmissionInfo(),
)
if f.testingKnobs.onSetupFlow != nil {
f.testingKnobs.onSetupFlow(f.creator)
Expand Down Expand Up @@ -449,6 +451,11 @@ type flowCreatorHelper interface {
getCancelFlowFn() context.CancelFunc
}

type admissionOptions struct {
admissionQ *admission.WorkQueue
admissionInfo admission.WorkInfo
}

// remoteComponentCreator is an interface that abstracts the constructors for
// several components in a remote flow. Mostly for testing purposes.
type remoteComponentCreator interface {
Expand All @@ -460,7 +467,8 @@ type remoteComponentCreator interface {
metadataSources []colexecop.MetadataSource,
toClose []colexecop.Closer,
) (*colrpc.Outbox, error)
newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error)
newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
admissionOpts admissionOptions) (*colrpc.Inbox, error)
}

type vectorizedRemoteComponentCreator struct{}
Expand All @@ -477,9 +485,13 @@ func (vectorizedRemoteComponentCreator) newOutbox(
}

func (vectorizedRemoteComponentCreator) newInbox(
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
admissionOpts admissionOptions,
) (*colrpc.Inbox, error) {
return colrpc.NewInbox(allocator, typs, streamID)
return colrpc.NewInboxWithAdmissionControl(
allocator, typs, streamID, admissionOpts.admissionQ, admissionOpts.admissionInfo)
}

// vectorizedFlowCreator performs all the setup of vectorized flows. Depending
Expand Down Expand Up @@ -508,6 +520,7 @@ type vectorizedFlowCreator struct {
flowID execinfrapb.FlowID
exprHelper *colexecargs.ExprHelper
typeResolver descs.DistSQLTypeResolver
admissionInfo admission.WorkInfo

// numOutboxes counts how many colrpc.Outbox'es have been set up on this
// node. It must be accessed atomically.
Expand Down Expand Up @@ -573,6 +586,7 @@ func newVectorizedFlowCreator(
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
typeResolver descs.DistSQLTypeResolver,
admissionInfo admission.WorkInfo,
) *vectorizedFlowCreator {
creator := vectorizedFlowCreatorPool.Get().(*vectorizedFlowCreator)
*creator = vectorizedFlowCreator{
Expand All @@ -589,6 +603,7 @@ func newVectorizedFlowCreator(
flowID: flowID,
exprHelper: creator.exprHelper,
typeResolver: typeResolver,
admissionInfo: admissionInfo,
procIdxQueue: creator.procIdxQueue,
opChains: creator.opChains,
monitors: creator.monitors,
Expand Down Expand Up @@ -881,8 +896,14 @@ func (s *vectorizedFlowCreator) setupInput(
latency = 0
log.VEventf(ctx, 1, "an error occurred during vectorized planning while getting latency: %v", err)
}

inbox, err := s.remoteComponentCreator.newInbox(colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), input.ColumnTypes, inputStream.StreamID)
inbox, err := s.remoteComponentCreator.newInbox(
colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory),
input.ColumnTypes,
inputStream.StreamID,
admissionOptions{
admissionQ: flowCtx.Cfg.SQLSQLResponseAdmissionQ,
admissionInfo: s.admissionInfo,
})

if err != nil {
return colexecargs.OpWithMetaInfo{}, err
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)
Expand All @@ -52,7 +53,7 @@ func (c callbackRemoteComponentCreator) newOutbox(
}

func (c callbackRemoteComponentCreator) newInbox(
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, _ admissionOptions,
) (*colrpc.Inbox, error) {
return c.newInboxFn(allocator, typs, streamID)
}
Expand Down Expand Up @@ -218,15 +219,17 @@ func TestDrainOnlyInputDAG(t *testing.T) {
ctx := context.Background()
defer evalCtx.Stop(ctx)
f := &flowinfra.FlowBase{
FlowCtx: execinfra.FlowCtx{EvalCtx: &evalCtx,
NodeID: base.TestingIDContainer,
FlowCtx: execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{},
EvalCtx: &evalCtx,
NodeID: base.TestingIDContainer,
},
}
var wg sync.WaitGroup
vfc := newVectorizedFlowCreator(
&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, false, &wg, &execinfra.RowChannel{},
nil /* batchSyncFlowConsumer */, nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
nil /* fdSemaphore */, descs.DistSQLTypeResolver{},
nil /* fdSemaphore */, descs.DistSQLTypeResolver{}, admission.WorkInfo{},
)

_, _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, nil /* localProcessors */, flowinfra.FuseNormally)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
"//pkg/storage/cloud",
"//pkg/storage/fs",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -151,6 +152,10 @@ type ServerConfig struct {

// VirtualSchemas hold the virtual table schemas.
VirtualSchemas catalog.VirtualSchemas

// SQLSQLResponseAdmissionQ is the admission queue to use for
// SQLSQLResponseWork.
SQLSQLResponseAdmissionQ *admission.WorkQueue
}

// RuntimeStats is an interface through which the rowexec layer can get
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/flowinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/rowenc",
"//pkg/sql/types",
"//pkg/util/admission",
"//pkg/util/cancelchecker",
"//pkg/util/contextutil",
"//pkg/util/log",
Expand Down
Loading

0 comments on commit 41a58d0

Please sign in to comment.