Skip to content

Commit

Permalink
sql,server: add admission control for DistSQL responses
Browse files Browse the repository at this point in the history
The admission.WorkQueue is plumbed through via execinfra.ServerCfg
to colrpc.Inbox and flowinfra.processProducerMessage, and the
AdmissionInfo is initialized using FlowCtx.Txn when possible.

The admission control point for KV=>SQL response processing
is tweaked to move it after the memory accounting for the
response bytes.

Informs cockroachdb#65948

Release note (ops change): DistSQL response admission control
can be enabled using admission.sql_sql_response.enabled.
  • Loading branch information
sumeerbhola committed Jul 14, 2021
1 parent 5a0b11e commit 68beda6
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Setting Type Default Description
admission.kv.enabled boolean false when true, work performed by the KV layer is subject to admission control
admission.sql_kv_response.enabled boolean false when true, work performed by the SQL layer when receiving a KV response is subject to admission control
admission.sql_sql_response.enabled boolean false when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout duration 10m0s the timeout for import/export storage operations
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<tbody>
<tr><td><code>admission.kv.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the KV layer is subject to admission control</td></tr>
<tr><td><code>admission.sql_kv_response.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the SQL layer when receiving a KV response is subject to admission control</td></tr>
<tr><td><code>admission.sql_sql_response.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
<tr><td><code>cloudstorage.timeout</code></td><td>duration</td><td><code>10m0s</code></td><td>the timeout for import/export storage operations</td></tr>
Expand Down
17 changes: 9 additions & 8 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,14 +706,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

sqlServer, err := newSQLServer(ctx, sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus),
nodeLiveness: optionalnodeliveness.MakeContainer(nodeLiveness),
gossip: gossip.MakeOptionalGossip(g),
grpcServer: grpcServer.Server,
nodeIDContainer: idContainer,
externalStorage: externalStorage,
externalStorageFromURI: externalStorageFromURI,
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus),
nodeLiveness: optionalnodeliveness.MakeContainer(nodeLiveness),
gossip: gossip.MakeOptionalGossip(g),
grpcServer: grpcServer.Server,
nodeIDContainer: idContainer,
externalStorage: externalStorage,
externalStorageFromURI: externalStorageFromURI,
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
sqlSQLResponseAdmissionQ: gcoord.GetWorkQueue(admission.SQLSQLResponseWork),
},
SQLConfig: &cfg.SQLConfig,
BaseConfig: &cfg.BaseConfig,
Expand Down
11 changes: 8 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sqlmigrations"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -164,6 +165,9 @@ type sqlServerOptionalKVArgs struct {
// Used by backup/restore.
externalStorage cloud.ExternalStorageFactory
externalStorageFromURI cloud.ExternalStorageFromURIFactory

// The admission queue to use for SQLSQLResponseWork.
sqlSQLResponseAdmissionQ *admission.WorkQueue
}

// sqlServerOptionalTenantArgs are the arguments supplied to newSQLServer which
Expand Down Expand Up @@ -469,9 +473,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ExternalStorage: cfg.externalStorage,
ExternalStorageFromURI: cfg.externalStorageFromURI,

RangeCache: cfg.distSender.RangeDescriptorCache(),
HydratedTables: hydratedTablesCache,
VirtualSchemas: virtualSchemas,
RangeCache: cfg.distSender.RangeDescriptorCache(),
HydratedTables: hydratedTablesCache,
VirtualSchemas: virtualSchemas,
SQLSQLResponseAdmissionQ: cfg.sqlSQLResponseAdmissionQ,
}
cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist)
if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down Expand Up @@ -108,6 +109,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/admission",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/types",
"//pkg/util/admission",
"//pkg/util/cancelchecker",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -98,6 +99,9 @@ type Inbox struct {
// only the Next/DrainMeta goroutine may access it.
stream flowStreamServer

admissionQ *admission.WorkQueue
admissionInfo admission.WorkInfo

// statsAtomics are the execution statistics that need to be atomically
// accessed. This is necessary since Get*() methods can be called from
// different goroutine than Next().
Expand Down Expand Up @@ -151,6 +155,24 @@ func NewInbox(
return i, nil
}

// NewInboxWithAdmissionControl creates a new Inbox that does admission
// control on responses received from DistSQL.
func NewInboxWithAdmissionControl(
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
admissionQ *admission.WorkQueue,
admissionInfo admission.WorkInfo,
) (*Inbox, error) {
i, err := NewInbox(allocator, typs, streamID)
if err != nil {
return nil, err
}
i.admissionQ = admissionQ
i.admissionInfo = admissionInfo
return i, err
}

// close closes the inbox, ensuring that any call to RunWithStream will return
// immediately. close is idempotent.
// NOTE: it is very important to close the Inbox only when execution terminates
Expand Down Expand Up @@ -333,6 +355,15 @@ func (i *Inbox) Next() coldata.Batch {
// Update the allocator since we're holding onto the serialized bytes
// for now.
i.allocator.AdjustMemoryUsage(numSerializedBytes)
// Do admission control after memory accounting for the serialized bytes
// and before deserialization.
if i.admissionQ != nil {
if _, err := i.admissionQ.Admit(i.Ctx, i.admissionInfo); err != nil {
// err includes the case of context cancellation while waiting for
// admission.
colexecerror.ExpectedError(err)
}
}
i.scratch.data = i.scratch.data[:0]
batchLength, err := i.serializer.Deserialize(&i.scratch.data, m.Data.RawBytes)
// Eagerly throw away the RawBytes memory.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colflow/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/treeprinter"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -57,6 +58,7 @@ func convertToVecTree(
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false,
nil, &execinfra.RowChannel{}, &fakeBatchReceiver{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
flowCtx.Cfg.VecFDSemaphore, flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn),
admission.WorkInfo{},
)
// We create an unlimited memory account because we're interested whether the
// flow is supported via the vectorized engine in general (without paying
Expand Down
31 changes: 26 additions & 5 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -208,6 +209,7 @@ func (f *vectorizedFlow) Setup(
diskQueueCfg,
f.countingSemaphore,
flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn),
f.FlowBase.GetAdmissionInfo(),
)
if f.testingKnobs.onSetupFlow != nil {
f.testingKnobs.onSetupFlow(f.creator)
Expand Down Expand Up @@ -449,6 +451,11 @@ type flowCreatorHelper interface {
getCancelFlowFn() context.CancelFunc
}

type admissionOptions struct {
admissionQ *admission.WorkQueue
admissionInfo admission.WorkInfo
}

// remoteComponentCreator is an interface that abstracts the constructors for
// several components in a remote flow. Mostly for testing purposes.
type remoteComponentCreator interface {
Expand All @@ -460,7 +467,8 @@ type remoteComponentCreator interface {
metadataSources []colexecop.MetadataSource,
toClose []colexecop.Closer,
) (*colrpc.Outbox, error)
newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID) (*colrpc.Inbox, error)
newInbox(allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
admissionOpts admissionOptions) (*colrpc.Inbox, error)
}

type vectorizedRemoteComponentCreator struct{}
Expand All @@ -477,9 +485,13 @@ func (vectorizedRemoteComponentCreator) newOutbox(
}

func (vectorizedRemoteComponentCreator) newInbox(
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
allocator *colmem.Allocator,
typs []*types.T,
streamID execinfrapb.StreamID,
admissionOpts admissionOptions,
) (*colrpc.Inbox, error) {
return colrpc.NewInbox(allocator, typs, streamID)
return colrpc.NewInboxWithAdmissionControl(
allocator, typs, streamID, admissionOpts.admissionQ, admissionOpts.admissionInfo)
}

// vectorizedFlowCreator performs all the setup of vectorized flows. Depending
Expand Down Expand Up @@ -508,6 +520,7 @@ type vectorizedFlowCreator struct {
flowID execinfrapb.FlowID
exprHelper *colexecargs.ExprHelper
typeResolver descs.DistSQLTypeResolver
admissionInfo admission.WorkInfo

// numOutboxes counts how many colrpc.Outbox'es have been set up on this
// node. It must be accessed atomically.
Expand Down Expand Up @@ -573,6 +586,7 @@ func newVectorizedFlowCreator(
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
typeResolver descs.DistSQLTypeResolver,
admissionInfo admission.WorkInfo,
) *vectorizedFlowCreator {
creator := vectorizedFlowCreatorPool.Get().(*vectorizedFlowCreator)
*creator = vectorizedFlowCreator{
Expand All @@ -589,6 +603,7 @@ func newVectorizedFlowCreator(
flowID: flowID,
exprHelper: creator.exprHelper,
typeResolver: typeResolver,
admissionInfo: admissionInfo,
procIdxQueue: creator.procIdxQueue,
opChains: creator.opChains,
monitors: creator.monitors,
Expand Down Expand Up @@ -881,8 +896,14 @@ func (s *vectorizedFlowCreator) setupInput(
latency = 0
log.VEventf(ctx, 1, "an error occurred during vectorized planning while getting latency: %v", err)
}

inbox, err := s.remoteComponentCreator.newInbox(colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory), input.ColumnTypes, inputStream.StreamID)
inbox, err := s.remoteComponentCreator.newInbox(
colmem.NewAllocator(ctx, s.newStreamingMemAccount(flowCtx), factory),
input.ColumnTypes,
inputStream.StreamID,
admissionOptions{
admissionQ: flowCtx.Cfg.SQLSQLResponseAdmissionQ,
admissionInfo: s.admissionInfo,
})

if err != nil {
return colexecargs.OpWithMetaInfo{}, err
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)
Expand All @@ -52,7 +53,7 @@ func (c callbackRemoteComponentCreator) newOutbox(
}

func (c callbackRemoteComponentCreator) newInbox(
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID,
allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, _ admissionOptions,
) (*colrpc.Inbox, error) {
return c.newInboxFn(allocator, typs, streamID)
}
Expand Down Expand Up @@ -218,15 +219,17 @@ func TestDrainOnlyInputDAG(t *testing.T) {
ctx := context.Background()
defer evalCtx.Stop(ctx)
f := &flowinfra.FlowBase{
FlowCtx: execinfra.FlowCtx{EvalCtx: &evalCtx,
NodeID: base.TestingIDContainer,
FlowCtx: execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{},
EvalCtx: &evalCtx,
NodeID: base.TestingIDContainer,
},
}
var wg sync.WaitGroup
vfc := newVectorizedFlowCreator(
&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, false, &wg, &execinfra.RowChannel{},
nil /* batchSyncFlowConsumer */, nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
nil /* fdSemaphore */, descs.DistSQLTypeResolver{},
nil /* fdSemaphore */, descs.DistSQLTypeResolver{}, admission.WorkInfo{},
)

_, _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, nil /* localProcessors */, flowinfra.FuseNormally)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_library(
"//pkg/storage/cloud",
"//pkg/storage/fs",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -151,6 +152,10 @@ type ServerConfig struct {

// VirtualSchemas hold the virtual table schemas.
VirtualSchemas catalog.VirtualSchemas

// SQLSQLResponseAdmissionQ is the admission queue to use for
// SQLSQLResponseWork.
SQLSQLResponseAdmissionQ *admission.WorkQueue
}

// RuntimeStats is an interface through which the rowexec layer can get
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/flowinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/rowenc",
"//pkg/sql/types",
"//pkg/util/admission",
"//pkg/util/cancelchecker",
"//pkg/util/contextutil",
"//pkg/util/log",
Expand Down
Loading

0 comments on commit 68beda6

Please sign in to comment.