Skip to content

Commit

Permalink
store/tikv: fix context cancelation for 2PC commit (#3330)
Browse files Browse the repository at this point in the history
  • Loading branch information
hhkbp2 authored and tiancaiamao committed May 25, 2017
1 parent 06ec635 commit 1fb0220
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
18 changes: 15 additions & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,21 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
ch := make(chan error, len(batches))
for _, batch := range batches {
go func(batch batchKeys) {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in backgroud goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the backgroud goroutines will be canceled execeptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := backoffer.Clone()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
}(batch)
}
var err error
Expand Down
11 changes: 11 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ func (b *Backoffer) String() string {
return fmt.Sprintf(" backoff(%dms %s)", b.totalSleep, b.types)
}

// Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares
// current Backoffer's context.
func (b *Backoffer) Clone() *Backoffer {
return &Backoffer{
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
errors: b.errors,
ctx: b.ctx,
}
}

// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds
// a child context of current Backoffer's context.
func (b *Backoffer) Fork() (*Backoffer, goctx.CancelFunc) {
Expand Down

0 comments on commit 1fb0220

Please sign in to comment.