From eaf7b64aafd0fdb55bf01500ad310d06e24dfc37 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 16 Sep 2019 19:41:37 +0800 Subject: [PATCH 01/12] store: update kvrpc.Cleanup proto and change its behaviour Before this PR, Cleanup always rollback a transaction if it's not committed. After this PR, Cleanup will not rollback a transaction if the lock is active. --- go.mod | 2 +- go.sum | 4 +- store/mockstore/mocktikv/mock_tikv_test.go | 4 +- store/mockstore/mocktikv/mvcc.go | 2 +- store/mockstore/mocktikv/mvcc_leveldb.go | 68 +++++++++++++++++++++- store/mockstore/mocktikv/rpc.go | 2 +- store/tikv/lock_resolver.go | 45 ++++++++++---- store/tikv/lock_test.go | 45 ++++++++++++-- 8 files changed, 149 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 51f5e84f294b4..84964e0c60171 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 + github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20190912032624-978b8272c04e github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b diff --git a/go.sum b/go.sum index 4969e9175359b..7f1caebbf12c8 100644 --- a/go.sum +++ b/go.sum @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 h1:wBORZD4gvEKK0tGP4g1Rv0Y7f2cNnObzI/ckPhsU11M= -github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg= +github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index fa0d10cfe8923..9a354365c04f7 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -556,7 +556,7 @@ func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) { s.mustPutOK(c, "test", "test2", 5, 8) // simulate `getTxnStatus` for txn 2. - err := s.store.Cleanup([]byte("test"), 2) + err := s.store.Cleanup([]byte("test"), 2, math.MaxUint64) c.Assert(err, IsNil) req = &kvrpcpb.PrewriteRequest{ Mutations: putMutations("test", "test3"), @@ -712,7 +712,7 @@ func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) { c.Assert(ttl, Greater, uint64(300)) // The lock has already been clean up - c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil) + c.Assert(s.store.Cleanup([]byte("pk"), 5, math.MaxUint64), IsNil) _, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000) c.Assert(err, NotNil) } diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index d9aa80fa4e433..be85563479903 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -259,7 +259,7 @@ type MVCCStore interface { Prewrite(req *kvrpcpb.PrewriteRequest) []error Commit(keys [][]byte, startTS, commitTS uint64) error Rollback(keys [][]byte, startTS uint64) error - Cleanup(key []byte, startTS uint64) error + Cleanup(key []byte, startTS, currentTS uint64) error ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error) ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index cb97d8a874502..28f0bcc4647b1 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -926,7 +926,8 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal } // Cleanup implements the MVCCStore interface. -func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { +// Cleanup API is depreciated, use CheckTxnStatus instead. +func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { mvcc.mu.Lock() defer func() { mvcc.mu.Unlock() @@ -934,7 +935,69 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { }() batch := &leveldb.Batch{} - err := rollbackKey(mvcc.db, batch, key, startTS) + startKey := mvccEncode(key, lockVer) + iter := newIterator(mvcc.db, &util.Range{ + Start: startKey, + }) + defer iter.Release() + + if iter.Valid() { + dec := lockDecoder{ + expectKey: key, + } + ok, err := dec.Decode(iter) + if err != nil { + return err + } + // If current transaction's lock exist. + if ok && dec.lock.startTS == startTS { + + // If the lock has already outdated, clean up it. + if uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { + if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { + return err + } + if err = mvcc.db.Write(batch, nil); err != nil { + return err + } + return nil + } + + // Otherwise, return a locked error to with the TTL information. + return dec.lock.lockErr(key) + } + + // If current transaction's lock not exist. + // If commit info of current transaction exist. + c, ok, err := getTxnCommitInfo(iter, key, startTS) + if err != nil { + return errors.Trace(err) + } + if ok { + // If current transaction is already committed. + if c.valueType != typeRollback { + return ErrAlreadyCommitted(c.commitTS) + } + // If current transaction is already rollback. + return nil + } + } + + // If current transaction is not prewritted before. + value := mvccValue{ + valueType: typeRollback, + startTS: startTS, + commitTS: startTS, + } + writeKey := mvccEncode(key, startTS) + writeValue, err := value.MarshalBinary() + if err != nil { + return errors.Trace(err) + } + batch.Put(writeKey, writeValue) + return nil + + err = rollbackKey(mvcc.db, batch, key, startTS) if err != nil { return errors.Trace(err) } @@ -1079,6 +1142,7 @@ func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint return lock.ttl, nil } } + return 0, errors.New("lock doesn't exist") } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index c4b79af71d6a2..21f9acca10efc 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -357,7 +357,7 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean panic("KvCleanup: key not in region") } var resp kvrpcpb.CleanupResponse - err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion()) + err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion(), req.GetCurrentTs()) if err != nil { if commitTS, ok := errors.Cause(err).(ErrAlreadyCommitted); ok { resp.CommitVersion = uint64(commitTS) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index f440f93956870..8f7bebbea7ff8 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -305,17 +305,31 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE return } - cleanRegions, exists := cleanTxns[l.TxnID] - if !exists { - cleanRegions = make(map[RegionVerID]struct{}) - cleanTxns[l.TxnID] = cleanRegions - } + if status.ttl == 0 { + // If the lock is committed or rollbacked, resolve lock. + cleanRegions, exists := cleanTxns[l.TxnID] + if !exists { + cleanRegions = make(map[RegionVerID]struct{}) + cleanTxns[l.TxnID] = cleanRegions + } - err = lr.resolveLock(bo, l, status, cleanRegions) - if err != nil { - msBeforeTxnExpired = 0 - err = errors.Trace(err) - return + err = lr.resolveLock(bo, l, status, cleanRegions) + if err != nil { + msBeforeTxnExpired = 0 + err = errors.Trace(err) + return + } + } else { + // If the lock is valid, the txn may be a pessimistic transaction. + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl) + if msBeforeLockExpired <= 0 { + // The txn is a pessimistic transaction, and it's primary lock will expire soon, but + // TxnHeartBeat could update the TTL, so we should not clean up the lock. + continue + } + if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { + msBeforeTxnExpired = msBeforeLockExpired + } } } return @@ -339,9 +353,14 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte tikvLockResolverCountWithQueryTxnStatus.Inc() var status TxnStatus + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) + if err != nil { + return status, err + } req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{ Key: primary, StartVersion: txnID, + CurrentTs: currentTS, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -368,6 +387,12 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte } cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse) if keyErr := cmdResp.GetError(); keyErr != nil { + // If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL. + if lockInfo := keyErr.GetLocked(); lockInfo != nil { + status.ttl = lockInfo.LockTtl + status.commitTS = 0 + return status, nil + } err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID) logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) return status, err diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index e5bb7e063ae47..7fac316899b0f 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -200,6 +200,43 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) { status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) + c.Assert(status.ttl, Greater, uint64(0)) +} + +func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("key"), []byte("value")) + s.prewriteTxn(c, txn.(*tikvTxn)) + + // Check the lock TTL of a transaction. + bo := NewBackoffer(context.Background(), prewriteMaxBackoff) + lr := newLockResolver(s.store) + status, err := lr.GetTxnStatus(txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + c.Assert(status.IsCommitted(), IsFalse) + c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(status.CommitTS(), Equals, uint64(0)) + + // Rollback the txn. + lock := s.mustGetLock(c, []byte("key")) + status = TxnStatus{} + cleanRegions := make(map[RegionVerID]struct{}) + err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions) + c.Assert(err, IsNil) + + // Check its status is rollbacked. + status, err = lr.GetTxnStatus(txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, uint64(0)) + + // Check a committed txn. + startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) + status, err = lr.GetTxnStatus(startTS, []byte("a")) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, commitTS) } func (s *testLockSuite) TestTxnHeartBeat(c *C) { @@ -217,11 +254,11 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) { c.Assert(err, IsNil) c.Assert(newTTL, Equals, uint64(666)) - // The getTxnStatus API is confusing, it really means rollback! - status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key")) + lock := s.mustGetLock(c, []byte("key")) + status := TxnStatus{ttl: newTTL} + cleanRegions := make(map[RegionVerID]struct{}) + err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions) c.Assert(err, IsNil) - c.Assert(status.ttl, Equals, uint64(0)) - c.Assert(status.commitTS, Equals, uint64(0)) newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666) c.Assert(err, NotNil) From 4dd7dca1f7ea6f12a081e1f278aff7ec4ef63ad2 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 16 Sep 2019 20:12:51 +0800 Subject: [PATCH 02/12] make golint happy --- store/mockstore/mocktikv/mvcc_leveldb.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 28f0bcc4647b1..28250e81ed280 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -957,10 +957,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { return err } - if err = mvcc.db.Write(batch, nil); err != nil { - return err - } - return nil + return mvcc.db.Write(batch, nil) } // Otherwise, return a locked error to with the TTL information. @@ -996,12 +993,6 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { } batch.Put(writeKey, writeValue) return nil - - err = rollbackKey(mvcc.db, batch, key, startTS) - if err != nil { - return errors.Trace(err) - } - return mvcc.db.Write(batch, nil) } // CheckTxnStatus checks the primary lock of a transaction to decide its status. From 8f09776ce7f6a05c2cf2e08e028710ec142214b2 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 17 Sep 2019 14:06:19 +0800 Subject: [PATCH 03/12] address comment --- store/mockstore/mocktikv/mvcc_leveldb.go | 13 ++++++------- store/tikv/lock_resolver.go | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 28250e81ed280..81cd745262bcd 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -949,7 +949,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { if err != nil { return err } - // If current transaction's lock exist. + // If current transaction's lock exists. if ok && dec.lock.startTS == startTS { // If the lock has already outdated, clean up it. @@ -960,22 +960,22 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { return mvcc.db.Write(batch, nil) } - // Otherwise, return a locked error to with the TTL information. + // Otherwise, return a locked error with the TTL information. return dec.lock.lockErr(key) } - // If current transaction's lock not exist. - // If commit info of current transaction exist. + // If current transaction's lock does not exist. + // If the commit information of the current transaction exist. c, ok, err := getTxnCommitInfo(iter, key, startTS) if err != nil { return errors.Trace(err) } if ok { - // If current transaction is already committed. + // If the current transaction has already committed. if c.valueType != typeRollback { return ErrAlreadyCommitted(c.commitTS) } - // If current transaction is already rollback. + // If the current transaction has already rollbacked. return nil } } @@ -1133,7 +1133,6 @@ func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint return lock.ttl, nil } } - return 0, errors.New("lock doesn't exist") } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 8f7bebbea7ff8..684f28f4a8837 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -325,7 +325,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE if msBeforeLockExpired <= 0 { // The txn is a pessimistic transaction, and it's primary lock will expire soon, but // TxnHeartBeat could update the TTL, so we should not clean up the lock. - continue + break } if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { msBeforeTxnExpired = msBeforeLockExpired From bf7877bd3b88268af913c0d6377ce46c46ebf698 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 17 Sep 2019 14:25:38 +0800 Subject: [PATCH 04/12] reset last change --- store/tikv/lock_resolver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 684f28f4a8837..8f7bebbea7ff8 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -325,7 +325,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE if msBeforeLockExpired <= 0 { // The txn is a pessimistic transaction, and it's primary lock will expire soon, but // TxnHeartBeat could update the TTL, so we should not clean up the lock. - break + continue } if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { msBeforeTxnExpired = msBeforeLockExpired From 4212f2e78172c28e9ad64465da52c7b4a785727a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 17 Sep 2019 15:10:01 +0800 Subject: [PATCH 05/12] address comment --- store/tikv/lock_resolver.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 8f7bebbea7ff8..822330a5aceab 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -273,12 +273,12 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE tikvLockResolverCountWithResolve.Inc() - var expiredLocks []*Lock + var expiredSecondaryLocks []*Lock for _, l := range locks { msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) if msBeforeLockExpired <= 0 { tikvLockResolverCountWithExpired.Inc() - expiredLocks = append(expiredLocks, l) + expiredSecondaryLocks = append(expiredSecondaryLocks, l) } else { if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { msBeforeTxnExpired = msBeforeLockExpired @@ -286,7 +286,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE tikvLockResolverCountWithNotExpired.Inc() } } - if len(expiredLocks) == 0 { + if len(expiredSecondaryLocks) == 0 { if msBeforeTxnExpired > 0 { tikvLockResolverCountWithWaitExpired.Inc() } @@ -296,7 +296,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) - for _, l := range expiredLocks { + for _, l := range expiredSecondaryLocks { var status TxnStatus status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary) if err != nil { @@ -321,20 +321,27 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE } } else { // If the lock is valid, the txn may be a pessimistic transaction. + // Update the txn expire time. msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl) - if msBeforeLockExpired <= 0 { - // The txn is a pessimistic transaction, and it's primary lock will expire soon, but - // TxnHeartBeat could update the TTL, so we should not clean up the lock. - continue - } - if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { - msBeforeTxnExpired = msBeforeLockExpired - } + msBeforeTxnExpired = updateExpireTime(msBeforeTxnExpired, msBeforeLockExpired) } } return } +// updateExpireTime compares the current value of the transaction expire time with the lock expire time to get a new value. +// The expire time of a transaction is set to the smallest one between all its lock expire time. +// The return value is always >= 0 +func updateExpireTime(txnExpire, lockExpire int64) int64 { + if lockExpire <= 0 { + return 0 + } + if lockExpire < txnExpire { + return lockExpire + } + return txnExpire +} + // GetTxnStatus queries tikv-server for a txn's status (commit/rollback). // If the primary key is still locked, it will launch a Rollback to abort it. // To avoid unnecessarily aborting too many txns, it is wiser to wait a few From 5b1e94dd7a60d83d32dd73531a1e9b70fb5ee833 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 17 Sep 2019 15:43:38 +0800 Subject: [PATCH 06/12] address comment --- store/tikv/lock_resolver.go | 59 ++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 822330a5aceab..db5d86e6b2b3e 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -266,9 +266,10 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) { +func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) { + var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { - return + return msBeforeTxnExpired.value(), nil } tikvLockResolverCountWithResolve.Inc() @@ -277,35 +278,22 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE for _, l := range locks { msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) if msBeforeLockExpired <= 0 { - tikvLockResolverCountWithExpired.Inc() expiredSecondaryLocks = append(expiredSecondaryLocks, l) - } else { - if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { - msBeforeTxnExpired = msBeforeLockExpired - } - tikvLockResolverCountWithNotExpired.Inc() - } - } - if len(expiredSecondaryLocks) == 0 { - if msBeforeTxnExpired > 0 { - tikvLockResolverCountWithWaitExpired.Inc() } - return } - // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredSecondaryLocks { var status TxnStatus - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary) if err != nil { - msBeforeTxnExpired = 0 err = errors.Trace(err) - return + return msBeforeTxnExpired.value(), err } if status.ttl == 0 { + tikvLockResolverCountWithExpired.Inc() // If the lock is committed or rollbacked, resolve lock. cleanRegions, exists := cleanTxns[l.TxnID] if !exists { @@ -315,31 +303,48 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE err = lr.resolveLock(bo, l, status, cleanRegions) if err != nil { - msBeforeTxnExpired = 0 err = errors.Trace(err) - return + return msBeforeTxnExpired.value(), err } } else { + tikvLockResolverCountWithNotExpired.Inc() // If the lock is valid, the txn may be a pessimistic transaction. // Update the txn expire time. msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl) - msBeforeTxnExpired = updateExpireTime(msBeforeTxnExpired, msBeforeLockExpired) + msBeforeTxnExpired.update(msBeforeLockExpired) } } - return + + if msBeforeTxnExpired.value() > 0 { + tikvLockResolverCountWithWaitExpired.Inc() + } + return msBeforeTxnExpired.value(), nil +} + +type txnExpireTime struct { + initialized bool + txnExpire int64 } // updateExpireTime compares the current value of the transaction expire time with the lock expire time to get a new value. // The expire time of a transaction is set to the smallest one between all its lock expire time. // The return value is always >= 0 -func updateExpireTime(txnExpire, lockExpire int64) int64 { +func (t *txnExpireTime) update(lockExpire int64) { + t.initialized = true if lockExpire <= 0 { - return 0 + return + } + if lockExpire < t.txnExpire { + t.txnExpire = lockExpire } - if lockExpire < txnExpire { - return lockExpire + return +} + +func (t *txnExpireTime) value() int64 { + if !t.initialized { + return 0 } - return txnExpire + return t.txnExpire } // GetTxnStatus queries tikv-server for a txn's status (commit/rollback). From c28396af345d5818712df3c3dce9a10f4f4c41f5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 17 Sep 2019 15:47:33 +0800 Subject: [PATCH 07/12] update comment --- store/tikv/lock_resolver.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index db5d86e6b2b3e..00c60a9c138d3 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -326,9 +326,6 @@ type txnExpireTime struct { txnExpire int64 } -// updateExpireTime compares the current value of the transaction expire time with the lock expire time to get a new value. -// The expire time of a transaction is set to the smallest one between all its lock expire time. -// The return value is always >= 0 func (t *txnExpireTime) update(lockExpire int64) { t.initialized = true if lockExpire <= 0 { From ecb52941c3366d00d95d6acd93c5ed6a47f22787 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 18 Sep 2019 11:27:15 +0800 Subject: [PATCH 08/12] address comment --- store/tikv/lock_resolver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 00c60a9c138d3..9edb536ded6ba 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -329,6 +329,7 @@ type txnExpireTime struct { func (t *txnExpireTime) update(lockExpire int64) { t.initialized = true if lockExpire <= 0 { + t.txnExpire = 0 return } if lockExpire < t.txnExpire { From 5e66b91f7f3dcd65a0b89b63c9ab0d40ee17b90f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 18 Sep 2019 15:56:12 +0800 Subject: [PATCH 09/12] address comment --- store/mockstore/mocktikv/mvcc_leveldb.go | 6 +++ store/tikv/lock_resolver.go | 57 +++++++++++++++++------- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 81cd745262bcd..8c2e7fd31207a 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -700,6 +700,12 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, } if ok { if dec.lock.startTS != startTS { + if isPessimisticLock { + // NOTE: A special handling. + // When pessimistic txn prewrite meets lock, set the TTL = 0 means + // telling TiDB to rollback the transaction **unconditionly**. + dec.lock.ttl = 0 + } return dec.lock.lockErr(mutation.Key) } if dec.lock.op != kvrpcpb.Op_PessimisticLock { diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 9edb536ded6ba..863a043659ddd 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -174,7 +174,8 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) { return s, ok } -// BatchResolveLocks resolve locks in a batch +// BatchResolveLocks resolve locks in a batch. +// Used it in gcworker only! func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) { if len(locks) == 0 { return true, nil @@ -182,7 +183,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithBatchResolve.Inc() - var expiredLocks []*Lock + expiredLocks := make([]*Lock, 0, len(locks)) for _, l := range locks { if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) { tikvLockResolverCountWithExpired.Inc() @@ -205,7 +206,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) if err != nil { return false, errors.Trace(err) } @@ -274,20 +275,29 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error tikvLockResolverCountWithResolve.Inc() - var expiredSecondaryLocks []*Lock + var expiredLocks []*Lock for _, l := range locks { msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) if msBeforeLockExpired <= 0 { - expiredSecondaryLocks = append(expiredSecondaryLocks, l) + expiredLocks = append(expiredLocks, l) + } else { + msBeforeTxnExpired.update(int64(l.TTL)) + tikvLockResolverCountWithNotExpired.Inc() } } // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) - for _, l := range expiredSecondaryLocks { - var status TxnStatus - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary) + for _, l := range expiredLocks { + status, err := lr.getTxnStatusFromLock(bo, l) + if err != nil { + msBeforeTxnExpired.update(0) + err = errors.Trace(err) + return msBeforeTxnExpired.value(), err + } + if err != nil { + msBeforeTxnExpired.update(0) err = errors.Trace(err) return msBeforeTxnExpired.value(), err } @@ -303,6 +313,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error err = lr.resolveLock(bo, l, status, cleanRegions) if err != nil { + msBeforeTxnExpired.update(0) err = errors.Trace(err) return msBeforeTxnExpired.value(), err } @@ -350,12 +361,32 @@ func (t *txnExpireTime) value() int64 { // To avoid unnecessarily aborting too many txns, it is wiser to wait a few // seconds before calling it after Prewrite. func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) { + var status TxnStatus bo := NewBackoffer(context.Background(), cleanupMaxBackoff) - status, err := lr.getTxnStatus(bo, txnID, primary) - return status, errors.Trace(err) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) + if err != nil { + return status, err + } + return lr.getTxnStatus(bo, txnID, primary, currentTS) } -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) { + // NOTE: l.TTL = 0 is a special protocol!!! + // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. + // In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call + // getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction. + if l.TTL == 0 { + return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) + } + + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) + if err != nil { + return TxnStatus{}, err + } + return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS) +} + +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -363,10 +394,6 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte tikvLockResolverCountWithQueryTxnStatus.Inc() var status TxnStatus - currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) - if err != nil { - return status, err - } req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{ Key: primary, StartVersion: txnID, From b71d8ee6eaef1d766db4f37df91b1b2d4628b796 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 18 Sep 2019 16:28:54 +0800 Subject: [PATCH 10/12] address comment --- store/mockstore/mocktikv/mvcc_leveldb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 8c2e7fd31207a..bedb34b58fc23 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -959,7 +959,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { if ok && dec.lock.startTS == startTS { // If the lock has already outdated, clean up it. - if uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { + if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { return err } From 9124ecb1b7d752fd0b6584829b6c2cfb8f2c2dd5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 23 Sep 2019 13:59:51 +0800 Subject: [PATCH 11/12] address comment --- store/mockstore/mocktikv/mvcc_leveldb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index bedb34b58fc23..4afb1299f69e3 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -932,7 +932,7 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal } // Cleanup implements the MVCCStore interface. -// Cleanup API is depreciated, use CheckTxnStatus instead. +// Cleanup API is deprecated, use CheckTxnStatus instead. func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { mvcc.mu.Lock() defer func() { From e5868f1890cfde71b88eb63e15442d214f033b96 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 23 Sep 2019 16:37:31 +0800 Subject: [PATCH 12/12] address comment & fix CI --- store/tikv/lock_resolver.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 863a043659ddd..263caab94cfd2 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -296,12 +296,6 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error return msBeforeTxnExpired.value(), err } - if err != nil { - msBeforeTxnExpired.update(0) - err = errors.Trace(err) - return msBeforeTxnExpired.value(), err - } - if status.ttl == 0 { tikvLockResolverCountWithExpired.Inc() // If the lock is committed or rollbacked, resolve lock. @@ -338,9 +332,12 @@ type txnExpireTime struct { } func (t *txnExpireTime) update(lockExpire int64) { - t.initialized = true if lockExpire <= 0 { - t.txnExpire = 0 + lockExpire = 0 + } + if !t.initialized { + t.txnExpire = lockExpire + t.initialized = true return } if lockExpire < t.txnExpire {