diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 395c1f875a304..6cb8fd5c8de35 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -745,38 +745,30 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err1 != nil { return errors.Trace(err1) } - // Check lock conflict error for nowait, if nowait set and key locked by others, - // report error immediately and do no more resolve locks. - // if the lock left behind whose related txn is already committed or rollbacked, - // (eg secondary locks not committed or rollbacked yet) - // we cant return "nowait conflict" directly - if lock.LockType == pb.Op_PessimisticLock { - if action.LockWaitTime == kv.LockNoWait { - // the pessimistic lock found could be invalid locks which is timeout but not recycled yet - if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { - return ErrLockAcquireFailAndNoWaitSet - } - } else if action.LockWaitTime == kv.LockAlwaysWait { - // do nothing but keep wait - } else { - // the lockWaitTime is set, check the lock wait timeout or not - // the pessimistic lock found could be invalid locks which is timeout but not recycled yet - if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { - if time.Since(lockWaitStartTime).Milliseconds() >= action.LockWaitTime { - return ErrLockWaitTimeout - } - } - } - } locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. // tikv default will wait 3s(also the maximum wait value) when lock error occurs - _, _, err = c.store.lockResolver.ResolveLocks(bo, 0, locks) + msBeforeTxnExpired, _, err := c.store.lockResolver.ResolveLocks(bo, 0, locks) if err != nil { return errors.Trace(err) } + // If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring + // the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary. + if msBeforeTxnExpired > 0 { + if action.LockWaitTime == kv.LockNoWait { + return ErrLockAcquireFailAndNoWaitSet + } else if action.LockWaitTime == kv.LockAlwaysWait { + // do nothing but keep wait + } else { + // the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock + if time.Since(lockWaitStartTime).Milliseconds() >= action.LockWaitTime { + return ErrLockWaitTimeout + } + } + } + // Handle the killed flag when waiting for the pessimistic lock. // When a txn runs into LockKeys() and backoff here, it has no chance to call // executor.Next() and check the killed flag. diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 57fc61fb032bf..dd6ac876ad5d4 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -642,6 +642,53 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { c.Assert(lockInfo.LockTtl-ManagedLockTTL, Less, uint64(150)) } +// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction. +// The lock's own TTL is expired but the primary key is still alive due to heartbeats. +func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) { + // k1 is the primary lock of txn1 + k1 := kv.Key("k1") + // k2 is a secondary lock of txn1 and a key txn2 wants to lock + k2 := kv.Key("k2") + + txn1 := s.begin(c) + txn1.SetOption(kv.Pessimistic, true) + // lock the primary key + lockCtx := &kv.LockCtx{ForUpdateTS: txn1.startTS, WaitStartTime: time.Now()} + err := txn1.LockKeys(context.Background(), lockCtx, k1) + c.Assert(err, IsNil) + // lock the secondary key + lockCtx = &kv.LockCtx{ForUpdateTS: txn1.startTS, WaitStartTime: time.Now()} + err = txn1.LockKeys(context.Background(), lockCtx, k2) + c.Assert(err, IsNil) + + // Heartbeats will increase the TTL of the primary key + + // wait until secondary key exceeds its own TTL + time.Sleep(time.Duration(ManagedLockTTL) * time.Millisecond) + txn2 := s.begin(c) + txn2.SetOption(kv.Pessimistic, true) + + // test no wait + lockCtx = &kv.LockCtx{ForUpdateTS: txn2.startTS, LockWaitTime: kv.LockNoWait, WaitStartTime: time.Now()} + startTime := time.Now() + err = txn2.LockKeys(context.Background(), lockCtx, k2) + elapsed := time.Now().Sub(startTime) + // cannot acquire lock immediately thus error + c.Assert(err.Error(), Equals, ErrLockAcquireFailAndNoWaitSet.Error()) + // it should return immediately + c.Assert(elapsed, Less, 50*time.Millisecond) + + // test for wait limited time (300ms) + lockCtx = &kv.LockCtx{ForUpdateTS: txn2.startTS, LockWaitTime: 300, WaitStartTime: time.Now()} + startTime = time.Now() + err = txn2.LockKeys(context.Background(), lockCtx, k2) + elapsed = time.Now().Sub(startTime) + // cannot acquire lock in time thus error + c.Assert(err.Error(), Equals, ErrLockWaitTimeout.Error()) + // it should return after about 300ms + c.Assert(elapsed, Less, 350*time.Millisecond) +} + func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo { txn := s.begin(c) err := txn.Set(key, key)