diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 670fe24eb6559..87eb47561c5fb 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3245,7 +3245,10 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( } } if b.ctx.GetSessionVars().TxnCtx.IsPessimistic { - if !update.MultipleTable { + if update.TableRefs.TableRefs.Right == nil { + // buildSelectLock is an optimization that can reduce RPC call. + // We only need do this optimization for single table update which is the most common case. + // When TableRefs.Right is nil, it is single table update. p = b.buildSelectLock(p, ast.SelectLockForUpdate) } } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 8974ff51b04e9..b5d4cc49041fd 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -1255,3 +1255,14 @@ func (s *testPessimisticSuite) TestTxnWithExpiredPessimisticLocks(c *C) { tk.MustExec("update t1 set c2 = c2 + 1") tk.MustExec("rollback") } + +func (s *testPessimisticSuite) TestDupLockInconsistency(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, index b (b))") + tk.MustExec("insert t (a) values (1), (1)") + tk.MustExec("begin pessimistic") + tk.MustExec("update t, (select a from t) s set t.b = s.a") + tk.MustExec("commit") + tk.MustExec("admin check table t") +} diff --git a/session/txn.go b/session/txn.go old mode 100755 new mode 100644 diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 99acf00ebc515..c6eadf699e46c 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -17,7 +17,9 @@ import ( "context" "fmt" "math" + "math/rand" "runtime" + "strings" "time" . "github.com/pingcap/check" @@ -518,3 +520,28 @@ func (s *testLockSuite) TestZeroMinCommitTS(c *C) { c.Assert(err, IsNil) c.Assert(expire, Equals, int64(0)) } + +func (s *testLockSuite) TestDeduplicateKeys(c *C) { + inputs := []string{ + "a b c", + "a a b c", + "a a a b c", + "a a a b b b b c", + "a b b b b c c c", + } + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, in := range inputs { + strs := strings.Split(in, " ") + keys := make([][]byte, len(strs)) + for _, i := range r.Perm(len(strs)) { + keys[i] = []byte(strs[i]) + } + keys = deduplicateKeys(keys) + strs = strs[:len(keys)] + for i := range keys { + strs[i] = string(keys[i]) + } + out := strings.Join(strs, " ") + c.Assert(out, Equals, "a b c") + } +} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 151d44e246e07..b148686fb8892 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -14,8 +14,10 @@ package tikv import ( + "bytes" "context" "fmt" + "sort" "sync" "sync/atomic" "time" @@ -357,6 +359,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput if len(keys) == 0 { return nil } + keys = deduplicateKeys(keys) if txn.IsPessimistic() && lockCtx.ForUpdateTS > 0 { if txn.committer == nil { // connID is used for log. @@ -428,6 +431,20 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput return nil } +// deduplicateKeys deduplicate the keys, it use sort instead of map to avoid memory allocation. +func deduplicateKeys(keys [][]byte) [][]byte { + sort.Slice(keys, func(i, j int) bool { + return bytes.Compare(keys[i], keys[j]) < 0 + }) + deduped := keys[:1] + for i := 1; i < len(keys); i++ { + if !bytes.Equal(deduped[len(deduped)-1], keys[i]) { + deduped = append(deduped, keys[i]) + } + } + return deduped +} + func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) *sync.WaitGroup { // Clone a new committer for execute in background. committer := &twoPhaseCommitter{