Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: update kvrpc.Cleanup proto and change its behaviour #12212

Merged
merged 15 commits into from
Sep 23, 2019
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20190912032624-978b8272c04e
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 h1:wBORZD4gvEKK0tGP4g1Rv0Y7f2cNnObzI/ckPhsU11M=
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg=
github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) {
s.mustPutOK(c, "test", "test2", 5, 8)

// simulate `getTxnStatus` for txn 2.
err := s.store.Cleanup([]byte("test"), 2)
err := s.store.Cleanup([]byte("test"), 2, math.MaxUint64)
c.Assert(err, IsNil)
req = &kvrpcpb.PrewriteRequest{
Mutations: putMutations("test", "test3"),
Expand Down Expand Up @@ -712,7 +712,7 @@ func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) {
c.Assert(ttl, Greater, uint64(300))

// The lock has already been clean up
c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil)
c.Assert(s.store.Cleanup([]byte("pk"), 5, math.MaxUint64), IsNil)
_, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000)
c.Assert(err, NotNil)
}
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ type MVCCStore interface {
Prewrite(req *kvrpcpb.PrewriteRequest) []error
Commit(keys [][]byte, startTS, commitTS uint64) error
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
Cleanup(key []byte, startTS, currentTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
Expand Down
60 changes: 57 additions & 3 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,19 +926,73 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal
}

// Cleanup implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
// Cleanup API is depreciated, use CheckTxnStatus instead.
func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
mvcc.mu.Lock()
defer func() {
mvcc.mu.Unlock()
mvcc.deadlockDetector.CleanUp(startTS)
}()

batch := &leveldb.Batch{}
err := rollbackKey(mvcc.db, batch, key, startTS)
startKey := mvccEncode(key, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

if iter.Valid() {
dec := lockDecoder{
expectKey: key,
}
ok, err := dec.Decode(iter)
if err != nil {
return err
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == startTS {

// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
return err
}
return mvcc.db.Write(batch, nil)
}

// Otherwise, return a locked error with the TTL information.
return dec.lock.lockErr(key)
}

// If current transaction's lock does not exist.
// If the commit information of the current transaction exist.
c, ok, err := getTxnCommitInfo(iter, key, startTS)
if err != nil {
return errors.Trace(err)
}
if ok {
// If the current transaction has already committed.
if c.valueType != typeRollback {
return ErrAlreadyCommitted(c.commitTS)
}
// If the current transaction has already rollbacked.
return nil
}
}

// If current transaction is not prewritted before.
value := mvccValue{
valueType: typeRollback,
startTS: startTS,
commitTS: startTS,
}
writeKey := mvccEncode(key, startTS)
writeValue, err := value.MarshalBinary()
if err != nil {
return errors.Trace(err)
}
return mvcc.db.Write(batch, nil)
batch.Put(writeKey, writeValue)
return nil
}

// CheckTxnStatus checks the primary lock of a transaction to decide its status.
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean
panic("KvCleanup: key not in region")
}
var resp kvrpcpb.CleanupResponse
err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion())
err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion(), req.GetCurrentTs())
if err != nil {
if commitTS, ok := errors.Cause(err).(ErrAlreadyCommitted); ok {
resp.CommitVersion = uint64(commitTS)
Expand Down
97 changes: 66 additions & 31 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,61 +266,85 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) {
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return
return msBeforeTxnExpired.value(), nil
}

tikvLockResolverCountWithResolve.Inc()

var expiredLocks []*Lock
var expiredSecondaryLocks []*Lock
for _, l := range locks {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
tikvLockResolverCountWithExpired.Inc()
expiredLocks = append(expiredLocks, l)
} else {
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
tikvLockResolverCountWithNotExpired.Inc()
expiredSecondaryLocks = append(expiredSecondaryLocks, l)
}
}
if len(expiredLocks) == 0 {
if msBeforeTxnExpired > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
for _, l := range expiredSecondaryLocks {
var status TxnStatus
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
if err != nil {
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
return msBeforeTxnExpired.value(), err
}

cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}
if status.ttl == 0 {
tikvLockResolverCountWithExpired.Inc()
// If the lock is committed or rollbacked, resolve lock.
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
err = errors.Trace(err)
return msBeforeTxnExpired.value(), err
}
} else {
tikvLockResolverCountWithNotExpired.Inc()
// If the lock is valid, the txn may be a pessimistic transaction.
// Update the txn expire time.
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl)
msBeforeTxnExpired.update(msBeforeLockExpired)
}
}

if msBeforeTxnExpired.value() > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return msBeforeTxnExpired.value(), nil
}

type txnExpireTime struct {
initialized bool
txnExpire int64
}

func (t *txnExpireTime) update(lockExpire int64) {
t.initialized = true
if lockExpire <= 0 {
t.txnExpire = 0
return
}
if lockExpire < t.txnExpire {
t.txnExpire = lockExpire
}
return
}

func (t *txnExpireTime) value() int64 {
if !t.initialized {
return 0
}
return t.txnExpire
}

// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
// If the primary key is still locked, it will launch a Rollback to abort it.
// To avoid unnecessarily aborting too many txns, it is wiser to wait a few
Expand All @@ -339,9 +363,14 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
tikvLockResolverCountWithQueryTxnStatus.Inc()

var status TxnStatus
currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return status, err
}
req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{
Key: primary,
StartVersion: txnID,
CurrentTs: currentTS,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand All @@ -368,6 +397,12 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
}
cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
// If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL.
if lockInfo := keyErr.GetLocked(); lockInfo != nil {
status.ttl = lockInfo.LockTtl
status.commitTS = 0
return status, nil
}
err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
Expand Down
45 changes: 41 additions & 4 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,43 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
}

func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

// Check the lock TTL of a transaction.
bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
lr := newLockResolver(s.store)
status, err := lr.GetTxnStatus(txn.StartTS(), []byte("key"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.CommitTS(), Equals, uint64(0))

// Rollback the txn.
lock := s.mustGetLock(c, []byte("key"))
status = TxnStatus{}
cleanRegions := make(map[RegionVerID]struct{})
err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions)
c.Assert(err, IsNil)

// Check its status is rollbacked.
status, err = lr.GetTxnStatus(txn.StartTS(), []byte("key"))
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))

// Check a committed txn.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err = lr.GetTxnStatus(startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, commitTS)
}

func (s *testLockSuite) TestTxnHeartBeat(c *C) {
Expand All @@ -217,11 +254,11 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) {
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))

// The getTxnStatus API is confusing, it really means rollback!
status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"))
lock := s.mustGetLock(c, []byte("key"))
status := TxnStatus{ttl: newTTL}
cleanRegions := make(map[RegionVerID]struct{})
err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, NotNil)
Expand Down