Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: parallelize FK and UNIQUE constraints #96123

Merged
merged 3 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func distBackup(
p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, []*types.T{}, execinfrapb.Ordering{})
p.PlanToStreamColMap = []int{}

dsp.FinalizePlan(ctx, planCtx, p)
sql.FinalizePlan(ctx, planCtx, p)

metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.BulkProcessorProgress != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func distRestore(
}
}

dsp.FinalizePlan(ctx, planCtx, p)
sql.FinalizePlan(ctx, planCtx, p)
return p, planCtx, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func makePlan(
)

p.PlanToStreamColMap = []int{1, 2, 3}
dsp.FinalizePlan(ctx, planCtx, p)
sql.FinalizePlan(ctx, planCtx, p)

return p, planCtx, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func distStreamIngest(
execinfrapb.PostProcessSpec{}, streamIngestionResultTypes)

p.PlanToStreamColMap = []int{0}
dsp.FinalizePlan(ctx, planCtx, p)
sql.FinalizePlan(ctx, planCtx, p)

rw := sql.NewRowResultWriter(nil /* rowContainer */)

Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree
if err := runPlanInsidePlan(ctx, params, plan, rowResultWriter); err != nil {
return err
}
a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows, a.rightTypes)
a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows)
return nil
}

Expand Down Expand Up @@ -318,7 +318,8 @@ func runPlanInsidePlan(
evalCtx := params.p.ExtendedEvalContextCopy()
plannerCopy := *params.p
distributePlan := getPlanDistribution(
ctx, &plannerCopy, plannerCopy.execCfg.NodeInfo.NodeID, plannerCopy.SessionData().DistSQLMode, plan.main,
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
plannerCopy.SessionData().DistSQLMode, plan.main,
)
distributeType := DistributionType(DistributionTypeNone)
if distributePlan.WillDistribute() {
Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -71,6 +72,12 @@ func (n *bufferNode) Close(ctx context.Context) {
// referencing. The bufferNode can be iterated over multiple times
// simultaneously, however, a new scanBufferNode is needed.
type scanBufferNode struct {
// mu, if non-nil, protects access buffer as well as creation and closure of
// iterator (rowcontainer.RowIterator which is wrapped by
// rowContainerIterator is safe for concurrent usage outside of creation and
// closure).
mu *syncutil.Mutex

buffer *bufferNode

iterator *rowContainerIterator
Expand All @@ -80,8 +87,18 @@ type scanBufferNode struct {
label string
}

// makeConcurrencySafe can be called to synchronize access to bufferNode across
// scanBufferNodes that run in parallel.
func (n *scanBufferNode) makeConcurrencySafe(mu *syncutil.Mutex) {
n.mu = mu
}

func (n *scanBufferNode) startExec(params runParams) error {
n.iterator = newRowContainerIterator(params.ctx, n.buffer.rows, n.buffer.typs)
if n.mu != nil {
n.mu.Lock()
defer n.mu.Unlock()
}
n.iterator = newRowContainerIterator(params.ctx, n.buffer.rows)
return nil
}

Expand All @@ -99,6 +116,10 @@ func (n *scanBufferNode) Values() tree.Datums {
}

func (n *scanBufferNode) Close(context.Context) {
if n.mu != nil {
n.mu.Lock()
defer n.mu.Unlock()
}
if n.iterator != nil {
n.iterator.Close()
n.iterator = nil
Expand Down
19 changes: 3 additions & 16 deletions pkg/sql/buffer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,12 @@ func (c *rowContainerHelper) Close(ctx context.Context) {
// tree.Datums.
type rowContainerIterator struct {
iter rowcontainer.RowIterator

typs []*types.T
datums tree.Datums
da tree.DatumAlloc
}

// newRowContainerIterator returns a new rowContainerIterator that must be
// closed once no longer needed.
func newRowContainerIterator(
ctx context.Context, c rowContainerHelper, typs []*types.T,
) *rowContainerIterator {
i := &rowContainerIterator{
iter: c.rows.NewIterator(ctx),
typs: typs,
datums: make(tree.Datums, len(typs)),
}
func newRowContainerIterator(ctx context.Context, c rowContainerHelper) *rowContainerIterator {
i := &rowContainerIterator{iter: c.rows.NewIterator(ctx)}
i.iter.Rewind()
return i
}
Expand All @@ -175,10 +165,7 @@ func (i *rowContainerIterator) Next() (tree.Datums, error) {
if err != nil {
return nil, err
}
if err = rowenc.EncDatumRowToDatums(i.typs, i.datums, row, &i.da); err != nil {
return nil, err
}
return i.datums, nil
return row, nil
}

func (i *rowContainerIterator) Close() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (tc *Collection) ResetMaxTimestampBound() {
tc.maxTimestampBoundDeadlineHolder.maxTimestampBound = hlc.Timestamp{}
}

// GetMaxTimestampBound returns the maximum timestamp to read schemas at.
func (tc *Collection) GetMaxTimestampBound() hlc.Timestamp {
return tc.maxTimestampBoundDeadlineHolder.maxTimestampBound
}

// SkipValidationOnWrite avoids validating stored descriptors prior to
// a transaction commit.
func (tc *Collection) SkipValidationOnWrite() {
Expand Down
34 changes: 21 additions & 13 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2842,6 +2842,8 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
// return for statements executed with this evalCtx. Since generally each
// statement is supposed to have a different timestamp, the evalCtx generally
// shouldn't be reused across statements.
//
// Safe for concurrent use.
func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, stmtTS time.Time) {
newTxn := txn == nil || evalCtx.Txn != txn
evalCtx.TxnState = ex.getTransactionState()
Expand All @@ -2865,22 +2867,12 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn,
evalCtx.SchemaChangerState = ex.extraTxnState.schemaChangerState
evalCtx.DescIDGenerator = ex.getDescIDGenerator()

// If we are retrying due to an unsatisfiable timestamp bound which is
// retriable, it means we were unable to serve the previous minimum timestamp
// as there was a schema update in between. When retrying, we want to keep the
// same minimum timestamp for the AOST read, but set the maximum timestamp
// to the point just before our failed read to ensure we don't try to read
// data which may be after the schema change when we retry.
// See resetPlanner for more context on setting the maximum timestamp for
// AOST read retries.
var minTSErr *roachpb.MinTimestampBoundUnsatisfiableError
if err := ex.state.mu.autoRetryReason; err != nil && errors.As(err, &minTSErr) {
nextMax := minTSErr.MinTimestampBound
ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax)
evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax
evalCtx.AsOfSystemTime.MaxTimestampBound = ex.extraTxnState.descCollection.GetMaxTimestampBound()
} else if newTxn {
// Otherwise, only change the historical timestamps if this is a new txn.
// This is because resetPlanner can be called multiple times for the same
// txn during the extended protocol.
ex.extraTxnState.descCollection.ResetMaxTimestampBound()
evalCtx.AsOfSystemTime = nil
}
}
Expand Down Expand Up @@ -2926,6 +2918,22 @@ func (ex *connExecutor) resetPlanner(
ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time,
) {
p.resetPlanner(ctx, txn, stmtTS, ex.sessionData(), ex.state.mon)
// If we are retrying due to an unsatisfiable timestamp bound which is
// retriable, it means we were unable to serve the previous minimum timestamp
// as there was a schema update in between. When retrying, we want to keep the
// same minimum timestamp for the AOST read, but set the maximum timestamp
// to the point just before our failed read to ensure we don't try to read
// data which may be after the schema change when we retry.
var minTSErr *roachpb.MinTimestampBoundUnsatisfiableError
if err := ex.state.mu.autoRetryReason; err != nil && errors.As(err, &minTSErr) {
nextMax := minTSErr.MinTimestampBound
ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax)
} else if newTxn := txn == nil || p.extendedEvalCtx.Txn != txn; newTxn {
// Otherwise, only change the historical timestamps if this is a new txn.
// This is because resetPlanner can be called multiple times for the same
// txn during the extended protocol.
ex.extraTxnState.descCollection.ResetMaxTimestampBound()
}
ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS)
}

Expand Down
34 changes: 23 additions & 11 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(

ex.sessionTracing.TracePlanCheckStart(ctx)
distributePlan := getPlanDistribution(
ctx, planner, planner.execCfg.NodeInfo.NodeID, ex.sessionData().DistSQLMode, planner.curPlan.main,
ctx, planner.Descriptors().HasUncommittedTypes(),
ex.sessionData().DistSQLMode, planner.curPlan.main,
)
ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan.WillDistribute())

Expand Down Expand Up @@ -1579,6 +1580,13 @@ type topLevelQueryStats struct {
networkEgressEstimate int64
}

func (s *topLevelQueryStats) add(other *topLevelQueryStats) {
s.bytesRead += other.bytesRead
s.rowsRead += other.rowsRead
s.rowsWritten += other.rowsWritten
s.networkEgressEstimate += other.networkEgressEstimate
}

// execWithDistSQLEngine converts a plan to a distributed SQL physical plan and
// runs it.
// If an error is returned, the connection needs to stop processing queries.
Expand Down Expand Up @@ -1616,27 +1624,31 @@ func (ex *connExecutor) execWithDistSQLEngine(
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.traceMetadata = planner.instrumentation.traceMetadata
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func() *extendedEvalContext
var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
// The factory reuses the same object because the contexts are not used
// concurrently.
var factoryEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &factoryEvalCtx, planner)
evalCtxFactory = func() *extendedEvalContext {
ex.resetEvalCtx(&factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return &factoryEvalCtx
return factoryEvalCtx
}
}
err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
return *recv.stats, err
return recv.stats, err
}

// beginTransactionTimestampsAndReadMode computes the timestamps and
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ type LocalState struct {
// HasConcurrency indicates whether the local flow uses multiple goroutines.
HasConcurrency bool

// ParallelCheck indicates whether the local flow is for a "check" postquery
// that runs in parallel with other checks and, thus, the LeafTxn must be
// used by this flow.
ParallelCheck bool

// Txn is filled in on the gateway only. It is the RootTxn that the query is running in.
// This will be used directly by the flow if the flow has no concurrency and IsLocal is set.
// If there is concurrency, a LeafTxn will be created.
Expand All @@ -561,7 +566,7 @@ type LocalState struct {
// MustUseLeafTxn returns true if a LeafTxn must be used. It is valid to call
// this method only after IsLocal and HasConcurrency have been set correctly.
func (l LocalState) MustUseLeafTxn() bool {
return !l.IsLocal || l.HasConcurrency
return !l.IsLocal || l.HasConcurrency || l.ParallelCheck
}

// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node,
Expand Down
Loading