Skip to content

Commit

Permalink
copy: add vectorize insert support used solely by copy for now
Browse files Browse the repository at this point in the history
Implement new on by default vectorized insert for COPY FROM statements.
Controlled by vectorize and copy_fast_path_enabled session variables
which both default to true.  If you set copy_fast_path_enabled to false
you get the old unoptimized behavior (22.1).  If you leave
copy_fast_path_enabled enabled but turn off vectorize you get the 22.2
behavior.

COPY FROM fast path row version and vectorized version both now respect
memory limits on a per row basis, ie if huge rows are encountered
COPY buffers will be flushed before we reach the configured copy row
batch size. Also if lots of rows are sent in one CopyData statement
we will now flush when we reach the copy row batch size limit instead
of inserting all the data. This matters little with psql clients which
typically do a row per CopyData segment but matters a lot with pgx
which will do 64k CopyData segments.

Keys are not inserted in the exact same order as they were with the row
version of copy. Now they are sorted per batch so that all the PK Keys
are inserted and then the first secondary index etc.

The vectorized insert benefits from larger batch sizes so we are more
generous with how big they can get.  By default we start with 64 row
batches and double up till a limit derived by KV raft command batch
size parameterized by schema (ie wider bigger schema will get smaller
batch size upper limit) not to exceed 32k which is roughly where
performance gains from bigger batches start to trail off.

Epic: CRDB-18892
Informs: cockroachdb#91831
Release note: (sql change): Bulk COPY FROM statements are now
processed with a vectorized insert and can be anywhere from %50
to 5x faster. Typical hardware and schemas should see a 2x improvement.
Vectorized inserts are only used for COPY statements and are not yet
applied to regular inserts. Both the vectorize and copy_fast_path_enabled
session variables can be used to disable this feature.
  • Loading branch information
cucaroach committed Mar 14, 2023
1 parent 3129782 commit 2e76506
Show file tree
Hide file tree
Showing 36 changed files with 1,140 additions and 116 deletions.
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
distSQLServer.ServerConfig.SQLStatsController = pgServer.SQLServer.GetSQLStatsController()
distSQLServer.ServerConfig.SchemaTelemetryController = pgServer.SQLServer.GetSchemaTelemetryController()
distSQLServer.ServerConfig.IndexUsageStatsController = pgServer.SQLServer.GetIndexUsageStatsController()
distSQLServer.ServerConfig.StatsRefresher = statsRefresher

// We use one BytesMonitor for all Executor's created by the
// internalDB.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ go_library(
"//pkg/cloud/externalconn",
"//pkg/clusterversion",
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/docs",
Expand Down Expand Up @@ -378,6 +379,7 @@ go_library(
"//pkg/sql/colexec",
"//pkg/sql/colfetcher",
"//pkg/sql/colflow",
"//pkg/sql/colmem",
"//pkg/sql/compengine",
"//pkg/sql/comprules",
"//pkg/sql/contention",
Expand Down Expand Up @@ -551,6 +553,7 @@ go_library(
"@com_github_cockroachdb_errors//hintdetail",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_lib_pq//:pq",
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"count.go",
"hash_aggregator.go",
"hash_group_joiner.go",
"insert.go",
"invariants_checker.go",
"limit.go",
"materializer.go",
Expand Down Expand Up @@ -43,12 +44,16 @@ go_library(
"//pkg/col/coldata",
"//pkg/col/coldataext", # keep
"//pkg/col/typeconv", # keep
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/server/telemetry", # keep
"//pkg/settings",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catenumpb",
"//pkg/sql/catalog/colinfo", # keep
"//pkg/sql/catalog/descpb",
"//pkg/sql/colconv",
"//pkg/sql/colenc",
"//pkg/sql/colexec/colexecagg", # keep
"//pkg/sql/colexec/colexecargs",
"//pkg/sql/colexec/colexecbase",
Expand All @@ -64,7 +69,9 @@ go_library(
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/memsize",
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqltelemetry", # keep
Expand All @@ -73,7 +80,9 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/intsets",
"//pkg/util/json", # keep
"//pkg/util/log",
"//pkg/util/stringarena",
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v3//:apd", # keep
Expand Down
30 changes: 25 additions & 5 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ func supportedNatively(core *execinfrapb.ProcessorCoreUnion) error {
// distinguish from other unsupported cores.
return errLocalPlanNodeWrap

case core.Insert != nil:
return nil

default:
return errCoreUnsupportedNatively
}
Expand Down Expand Up @@ -818,11 +821,20 @@ func NewColOperator(
return r, err
}
if core.Values.NumRows == 0 || len(core.Values.Columns) == 0 {
// To simplify valuesOp we handle some special cases with
// fixedNumTuplesNoInputOp.
result.Root = colexecutils.NewFixedNumTuplesNoInputOp(
getStreamingAllocator(ctx, args), int(core.Values.NumRows), nil, /* opToInitialize */
)
// Handle coldata.Batch vector source
if b, ok := args.LocalVectorSources[args.Spec.ProcessorID]; ok {
batch, ok := b.(coldata.Batch)
if !ok {
colexecerror.InternalError(errors.AssertionFailedf("LocalVectorSource wasn't a coldata.Batch"))
}
result.Root = colexecutils.NewRawColDataBatchOp(batch)
} else {
// To simplify valuesOp we handle some special cases with
// fixedNumTuplesNoInputOp.
result.Root = colexecutils.NewFixedNumTuplesNoInputOp(
getStreamingAllocator(ctx, args), int(core.Values.NumRows), nil, /* opToInitialize */
)
}
} else {
result.Root = colexec.NewValuesOp(
getStreamingAllocator(ctx, args), core.Values, execinfra.GetWorkMemLimit(flowCtx),
Expand Down Expand Up @@ -1728,6 +1740,14 @@ func NewColOperator(
input = result.Root
}

case core.Insert != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
}
outputIdx := len(spec.Input[0].ColumnTypes)
result.Root = colexec.NewInsertOp(ctx, flowCtx, core.Insert, inputs[0].Root, spec.ResultTypes, outputIdx)
result.ColumnTypes = spec.ResultTypes

default:
return r, errors.AssertionFailedf("unsupported processor core %q", core)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type NewColOperatorArgs struct {
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
LocalProcessors []execinfra.LocalProcessor
LocalVectorSources map[int32]any
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/colexec/colexecutils/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ func (s *fixedNumTuplesNoInputOp) Next() coldata.Batch {
return s.batch
}

type rawBatchOp struct {
colexecop.ZeroInputNode
batch coldata.Batch
}

var _ colexecop.Operator = &rawBatchOp{}

func (s *rawBatchOp) Init(ctx context.Context) {
}

func (s *rawBatchOp) Next() coldata.Batch {
b := s.batch
s.batch = coldata.ZeroBatch
return b
}

// NewRawColDataBatchOp allocates a rawBatchOp. This is used by COPY to perform
// vectorized inserts.
func NewRawColDataBatchOp(b coldata.Batch) colexecop.Operator {
return &rawBatchOp{batch: b}
}

// vectorTypeEnforcer is a utility Operator that on every call to Next
// enforces that non-zero length batch from the input has a vector of the
// desired type in the desired position. If the width of the batch is less than
Expand Down
228 changes: 228 additions & 0 deletions pkg/sql/colexec/insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexec

import (
"bytes"
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/colenc"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type vectorInserter struct {
colexecop.OneInputHelper
desc catalog.TableDescriptor
insertCols []catalog.Column
retBatch coldata.Batch
flowCtx *execinfra.FlowCtx
// Check ords are the columns containing bool values with check expression
// results.
checkOrds intsets.Fast
// If we have checkOrds we need a sema context to format error messages.
semaCtx *tree.SemaContext
// mutationQuota is the number of bytes we'll allow in the kv.Batch before
// finishing it and starting a new one.
mutationQuota int
// If auto commit is true we'll commit the last batch.
autoCommit bool
}

var _ colexecop.Operator = &vectorInserter{}

// NewInsertOp allocates a new vector insert operator. Currently the only input
// will be a rawBatchOp and only output is row count so this doesn't support
// the full gamut of insert operations.
func NewInsertOp(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.InsertSpec,
input colexecop.Operator,
typs []*types.T,
outputIdx int,
) colexecop.Operator {
desc := flowCtx.TableDescriptor(ctx, &spec.Table)
insCols := make([]catalog.Column, len(spec.ColumnIDs))
for i, c := range spec.ColumnIDs {
col, err := catalog.MustFindColumnByID(desc, c)
if err != nil {
colexecerror.InternalError(err)
}
insCols[i] = col
}

// Empirical testing shows that if is ApproximateMutationBytes approaches
// 32MB we'll hit the command limit. So set limit to a fraction of
// command limit to be safe.
mutationQuota := int(kvserverbase.MaxCommandSize.Get(&flowCtx.Cfg.Settings.SV) / 3)

alloc := colmem.NewAllocator(ctx, nil, coldata.StandardColumnFactory)
v := vectorInserter{
OneInputHelper: colexecop.MakeOneInputHelper(input),
desc: desc,
retBatch: alloc.NewMemBatchWithFixedCapacity(typs, 1),
flowCtx: flowCtx,
insertCols: insCols,
mutationQuota: mutationQuota,
autoCommit: spec.AutoCommit,
}
if spec.CheckOrds != nil {
if err := v.checkOrds.Decode(bytes.NewReader(spec.CheckOrds)); err != nil {
colexecerror.InternalError(err)
}
v.semaCtx = flowCtx.NewSemaContext(v.flowCtx.Txn)
}
return &v
}

func (v *vectorInserter) Init(ctx context.Context) {
v.OneInputHelper.Init(ctx)
}

func (v *vectorInserter) GetPartialIndexMap(b coldata.Batch) map[catid.IndexID][]bool {
var partialIndexColMap map[descpb.IndexID][]bool
// Create a set of partial index IDs to not write to. Indexes should not be
// written to when they are partial indexes and the row does not satisfy the
// predicate. This set is passed as a parameter to tableInserter.row below.
pindexes := v.desc.PartialIndexes()
if n := len(pindexes); n > 0 {
colOffset := len(v.insertCols) + v.checkOrds.Len()
numCols := len(b.ColVecs()) - colOffset
if numCols != len(pindexes) {
colexecerror.InternalError(errors.AssertionFailedf("num extra columns didn't match number of partial indexes"))
}
for i := 0; i < numCols; i++ {
if partialIndexColMap == nil {
partialIndexColMap = make(map[descpb.IndexID][]bool)
}
partialIndexColMap[pindexes[i].GetID()] = b.ColVec(i + colOffset).Bool()
}
}
return partialIndexColMap
}

func (v *vectorInserter) Next() coldata.Batch {
ctx := v.Ctx
b := v.Input.Next()
if b.Length() == 0 {
return coldata.ZeroBatch
}

if !v.checkOrds.Empty() {
if err := v.checkMutationInput(ctx, b); err != nil {
colexecerror.ExpectedError(err)
}
}
partialIndexColMap := v.GetPartialIndexMap(b)

kvba := row.KVBatchAdapter{Batch: v.flowCtx.Txn.NewBatch()}
var p row.Putter = &kvba
if v.flowCtx.TraceKV {
p = &row.TracePutter{Putter: p, Ctx: ctx}
}
// In the future we could sort across multiple goroutines, not worth it yet,
// time here is minimal compared to time spent executing batch.
p = &row.SortingPutter{Putter: p}
enc := colenc.MakeEncoder(v.flowCtx.Codec(), v.desc, &v.flowCtx.Cfg.Settings.SV, b, v.insertCols, v.flowCtx.GetRowMetrics(), partialIndexColMap,
func() error {
if kvba.Batch.ApproximateMutationBytes() > v.mutationQuota {
return colenc.ErrOverMemLimit
}
return nil
})
// PrepareBatch is called in a loop to partially insert til everything is
// done, if there are a ton of secondary indexes we could hit raft
// command limit building kv batch so we need to be able to do
// it in chunks of rows.
end := b.Length()
start := 0
for start < b.Length() {
if err := enc.PrepareBatch(ctx, p, start, end); err != nil {
if errors.Is(err, colenc.ErrOverMemLimit) {
log.VEventf(ctx, 2, "vector insert memory limit err %d, numrows: %d", start, end)
end /= 2
// If one row blows out memory limit, just do one row at a time.
if end <= start {
// Disable memory limit, if the system can't handle this row
// a KV error will encountered below.
v.mutationQuota = math.MaxInt
end = start + 1
}
// Throw everything away and start over.
kvba.Batch = v.flowCtx.Txn.NewBatch()
continue
}
colexecerror.ExpectedError(err)
}
log.VEventf(ctx, 2, "copy running batch, autocommit: %v, final: %v, numrows: %d", v.autoCommit, end == b.Length(), end-start)
var err error
if v.autoCommit && end == b.Length() {
err = v.flowCtx.Txn.CommitInBatch(ctx, kvba.Batch)
} else {
err = v.flowCtx.Txn.Run(ctx, kvba.Batch)
}
if err != nil {
colexecerror.ExpectedError(row.ConvertBatchError(ctx, v.desc, kvba.Batch))
}
numRows := end - start
start = end
end += numRows
if end > b.Length() {
end = b.Length()
}
if start < b.Length() {
kvba.Batch = v.flowCtx.Txn.NewBatch()
}
}

v.retBatch.ColVec(0).Int64()[0] = int64(b.Length())
v.retBatch.SetLength(1)

v.flowCtx.Cfg.StatsRefresher.NotifyMutation(v.desc, b.Length())

return v.retBatch
}

func (v *vectorInserter) checkMutationInput(ctx context.Context, b coldata.Batch) error {
checks := v.desc.EnforcedCheckConstraints()
colIdx := 0
for i, ch := range checks {
if !v.checkOrds.Contains(i) {
continue
}
vec := b.ColVec(colIdx + len(v.insertCols))
bools := vec.Bool()
nulls := vec.Nulls()
for r := 0; r < b.Length(); r++ {
if !bools[r] && !nulls.NullAt(r) {
return row.CheckFailed(ctx, v.semaCtx, v.flowCtx.EvalCtx.SessionData(), v.desc, ch)
}
}
colIdx++
}
return nil
}
Loading

0 comments on commit 2e76506

Please sign in to comment.