Skip to content

Commit

Permalink
store/tikv: fix duplicated lock keys caused data inconsistenc… (#16769)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Apr 23, 2020
1 parent ae09d8c commit d9919fb
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 1 deletion.
5 changes: 4 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3245,7 +3245,10 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) (
}
}
if b.ctx.GetSessionVars().TxnCtx.IsPessimistic {
if !update.MultipleTable {
if update.TableRefs.TableRefs.Right == nil {
// buildSelectLock is an optimization that can reduce RPC call.
// We only need do this optimization for single table update which is the most common case.
// When TableRefs.Right is nil, it is single table update.
p = b.buildSelectLock(p, ast.SelectLockForUpdate)
}
}
Expand Down
11 changes: 11 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,3 +1255,14 @@ func (s *testPessimisticSuite) TestTxnWithExpiredPessimisticLocks(c *C) {
tk.MustExec("update t1 set c2 = c2 + 1")
tk.MustExec("rollback")
}

func (s *testPessimisticSuite) TestDupLockInconsistency(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, index b (b))")
tk.MustExec("insert t (a) values (1), (1)")
tk.MustExec("begin pessimistic")
tk.MustExec("update t, (select a from t) s set t.b = s.a")
tk.MustExec("commit")
tk.MustExec("admin check table t")
}
Empty file modified session/txn.go
100755 → 100644
Empty file.
27 changes: 27 additions & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"context"
"fmt"
"math"
"math/rand"
"runtime"
"strings"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -518,3 +520,28 @@ func (s *testLockSuite) TestZeroMinCommitTS(c *C) {
c.Assert(err, IsNil)
c.Assert(expire, Equals, int64(0))
}

func (s *testLockSuite) TestDeduplicateKeys(c *C) {
inputs := []string{
"a b c",
"a a b c",
"a a a b c",
"a a a b b b b c",
"a b b b b c c c",
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, in := range inputs {
strs := strings.Split(in, " ")
keys := make([][]byte, len(strs))
for _, i := range r.Perm(len(strs)) {
keys[i] = []byte(strs[i])
}
keys = deduplicateKeys(keys)
strs = strs[:len(keys)]
for i := range keys {
strs[i] = string(keys[i])
}
out := strings.Join(strs, " ")
c.Assert(out, Equals, "a b c")
}
}
17 changes: 17 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package tikv

import (
"bytes"
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -357,6 +359,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
if len(keys) == 0 {
return nil
}
keys = deduplicateKeys(keys)
if txn.IsPessimistic() && lockCtx.ForUpdateTS > 0 {
if txn.committer == nil {
// connID is used for log.
Expand Down Expand Up @@ -428,6 +431,20 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
return nil
}

// deduplicateKeys deduplicate the keys, it use sort instead of map to avoid memory allocation.
func deduplicateKeys(keys [][]byte) [][]byte {
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
})
deduped := keys[:1]
for i := 1; i < len(keys); i++ {
if !bytes.Equal(deduped[len(deduped)-1], keys[i]) {
deduped = append(deduped, keys[i])
}
}
return deduped
}

func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) *sync.WaitGroup {
// Clone a new committer for execute in background.
committer := &twoPhaseCommitter{
Expand Down

0 comments on commit d9919fb

Please sign in to comment.