From 37f42913f298bb813902cd591d080b5aa33087c5 Mon Sep 17 00:00:00 2001 From: lysu Date: Tue, 28 Apr 2020 10:33:52 +0800 Subject: [PATCH] tikv: keep using origin goroutine when doing retry in commit phase (#16849) --- store/tikv/2pc.go | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 8001131665d68..a4d7b4f58e4e6 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -50,7 +50,7 @@ type twoPhaseCommitAction interface { } type actionPrewrite struct{} -type actionCommit struct{} +type actionCommit struct{ retry bool } type actionCleanup struct{} type actionPessimisticLock struct { *kv.LockCtx @@ -463,7 +463,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo firstIsPrimary = true } - _, actionIsCommit := action.(actionCommit) + actionCommit, actionIsCommit := action.(actionCommit) _, actionIsCleanup := action.(actionCleanup) _, actionIsPessimiticLock := action.(actionPessimisticLock) @@ -499,7 +499,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo } batches = batches[1:] } - if actionIsCommit { + if actionIsCommit && !actionCommit.retry { // Commit secondary batches in background goroutine to reduce latency. // The backoffer instance is created outside of the goroutine to avoid // potential data race in unit test since `CommitMaxBackoff` will be updated @@ -527,16 +527,25 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm return nil } - if len(batches) == 1 { - e := action.handleSingleBatch(c, bo, batches[0]) - if e != nil { - logutil.BgLogger().Debug("2PC doActionOnBatches failed", - zap.Uint64("conn", c.connID), - zap.Stringer("action type", action), - zap.Error(e), - zap.Uint64("txnStartTS", c.startTS)) + noNeedFork := len(batches) == 1 + if !noNeedFork { + if ac, ok := action.(actionCommit); ok && ac.retry { + noNeedFork = true } - return errors.Trace(e) + } + if noNeedFork { + for _, b := range batches { + e := action.handleSingleBatch(c, bo, b) + if e != nil { + logutil.BgLogger().Debug("2PC doActionOnBatches failed", + zap.Uint64("conn", c.connID), + zap.Stringer("action type", action), + zap.Error(e), + zap.Uint64("txnStartTS", c.startTS)) + return errors.Trace(e) + } + } + return nil } rateLim := len(batches) // Set rateLim here for the large transaction. @@ -998,7 +1007,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch return errors.Trace(err) } // re-split keys and commit again. - err = c.commitMutations(bo, batch.mutations) + err = c.doActionOnMutations(bo, actionCommit{retry: true}, batch.mutations) return errors.Trace(err) } if resp.Resp == nil {