Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: keep using origin goroutine when doing retry in commit phase (#16849) #16876

Merged
merged 3 commits into from
Apr 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type twoPhaseCommitAction interface {
}

type actionPrewrite struct{}
type actionCommit struct{}
type actionCommit struct{ retry bool }
type actionCleanup struct{}
type actionPessimisticLock struct {
*kv.LockCtx
Expand Down Expand Up @@ -463,7 +463,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
firstIsPrimary = true
}

_, actionIsCommit := action.(actionCommit)
actionCommit, actionIsCommit := action.(actionCommit)
_, actionIsCleanup := action.(actionCleanup)
_, actionIsPessimiticLock := action.(actionPessimisticLock)

Expand Down Expand Up @@ -499,7 +499,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
}
batches = batches[1:]
}
if actionIsCommit {
if actionIsCommit && !actionCommit.retry {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potential data race in unit test since `CommitMaxBackoff` will be updated
Expand Down Expand Up @@ -527,16 +527,25 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
return nil
}

if len(batches) == 1 {
e := action.handleSingleBatch(c, bo, batches[0])
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
noNeedFork := len(batches) == 1
if !noNeedFork {
if ac, ok := action.(actionCommit); ok && ac.retry {
noNeedFork = true
}
return errors.Trace(e)
}
if noNeedFork {
for _, b := range batches {
e := action.handleSingleBatch(c, bo, b)
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(e)
}
}
return nil
}
rateLim := len(batches)
// Set rateLim here for the large transaction.
Expand Down Expand Up @@ -998,7 +1007,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return errors.Trace(err)
}
// re-split keys and commit again.
err = c.commitMutations(bo, batch.mutations)
err = c.doActionOnMutations(bo, actionCommit{retry: true}, batch.mutations)
return errors.Trace(err)
}
if resp.Resp == nil {
Expand Down