From ec29064bb77ec744e1a8bf94864e0c558fbd85cd Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 17 Aug 2021 11:45:06 -0400 Subject: [PATCH] sql: fix deadlock when updating backfill progress The root cause here is that we acquired the mutex inside the transaction which also laid down intents. This was not a problem in earlier iterations of this code because of the FOR UPDATE logic which would, generally, in theory, order the transactions such that the first one to acquire the mutex would be the first to lay down an intent, thus avoiding the deadlock by ordering the acquisitions. That was changed in #68244, which removed the FOR UPDATE. What we see now is that you have a transaction doing the progress update which hits a restart but has laid down an intent. Then we have a transaction which is doing a details update that starts and acquires the mutex but blocks on the intent of the other transaction. That other transaction now is blocked on the mutex and we have a deadlock. The solution here is to not acquire the mutex inside these transactions. Instead, the code copies out the relevant state prior to issuing the transaction. The cost here should be pretty minimal and the staleness in the fact of retries is the least of my concerns. No release note because the code in #68244 has never been released. Release note: None --- pkg/sql/backfill.go | 62 +++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index e063fa9d2676..501935e287c2 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -935,14 +935,15 @@ func getJobIDForMutationWithDescriptor( "job not found for table id %d, mutation %d", tableDesc.GetID(), mutationID) } -// nRanges returns the number of ranges that cover a set of spans. +// numRangesInSpans returns the number of ranges that cover a set of spans. // // It operates entirely on the current goroutine and is thus able to // reuse an existing kv.Txn safely. -func (sc *SchemaChanger) nRanges( - ctx context.Context, txn *kv.Txn, spans []roachpb.Span, +func numRangesInSpans( + ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span, ) (int, error) { - spanResolver := sc.distSQLPlanner.spanResolver.NewSpanResolverIterator(txn) + txn := db.NewTxn(ctx, "num-ranges-in-spans") + spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn) rangeIds := make(map[int64]struct{}) for _, span := range spans { // For each span, iterate the spanResolver until it's exhausted, storing @@ -1135,6 +1136,17 @@ func (sc *SchemaChanger) distIndexBackfill( ) defer recv.Release() + getTodoSpansForUpdate := func() []roachpb.Span { + mu.Lock() + defer mu.Unlock() + if mu.updatedTodoSpans == nil { + return nil + } + return append( + make([]roachpb.Span, 0, len(mu.updatedTodoSpans)), + mu.updatedTodoSpans..., + ) + } updateJobProgress = func() error { // Report schema change progress. We define progress at this point as the // the fraction of fully-backfilled ranges of the primary index of the @@ -1143,22 +1155,19 @@ func (sc *SchemaChanger) distIndexBackfill( // change state machine or from a previous backfill attempt, we scale that // fraction of ranges completed by the remaining fraction of the job's // progress bar. - err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - mu.Lock() + updatedTodoSpans := getTodoSpansForUpdate() + if updatedTodoSpans == nil { + return nil + } + nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, mu.updatedTodoSpans) + if err != nil { + return err + } + if origNRanges == -1 { + origNRanges = nRanges + } + return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // No processor has returned completed spans yet. - if mu.updatedTodoSpans == nil { - mu.Unlock() - return nil - } - nRanges, err := sc.nRanges(ctx, txn, mu.updatedTodoSpans) - mu.Unlock() - if err != nil { - return err - } - if origNRanges == -1 { - origNRanges = nRanges - } - if nRanges < origNRanges { fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges) fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished @@ -1169,21 +1178,18 @@ func (sc *SchemaChanger) distIndexBackfill( } return nil }) - return err } updateJobDetails = func() error { - err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - mu.Lock() - defer mu.Unlock() + updatedTodoSpans := getTodoSpansForUpdate() + return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // No processor has returned completed spans yet. - if mu.updatedTodoSpans == nil { + if updatedTodoSpans == nil { return nil } - log.VEventf(ctx, 2, "writing todo spans to job details: %+v", mu.updatedTodoSpans) - return rowexec.SetResumeSpansInJob(ctx, mu.updatedTodoSpans, mutationIdx, txn, sc.job) + log.VEventf(ctx, 2, "writing todo spans to job details: %+v", updatedTodoSpans) + return rowexec.SetResumeSpansInJob(ctx, updatedTodoSpans, mutationIdx, txn, sc.job) }) - return err } // Setup periodic progress update. @@ -1310,7 +1316,7 @@ func (sc *SchemaChanger) distColumnBackfill( // schema change state machine or from a previous backfill attempt, // we scale that fraction of ranges completed by the remaining fraction // of the job's progress bar. - nRanges, err := sc.nRanges(ctx, txn, todoSpans) + nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans) if err != nil { return err }