diff --git a/go.mod b/go.mod index 14366645cc7b9..216737688710e 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 + github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20190912032624-978b8272c04e github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b diff --git a/go.sum b/go.sum index 4969e9175359b..7f1caebbf12c8 100644 --- a/go.sum +++ b/go.sum @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 h1:wBORZD4gvEKK0tGP4g1Rv0Y7f2cNnObzI/ckPhsU11M= -github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1 h1:DNvxkdcjA0TBIIIF+K2w9KMlTzMZzLZ5JVF26kTCPhg= +github.com/pingcap/kvproto v0.0.0-20190910074005-0e61b6f435c1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index fa0d10cfe8923..9a354365c04f7 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -556,7 +556,7 @@ func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) { s.mustPutOK(c, "test", "test2", 5, 8) // simulate `getTxnStatus` for txn 2. - err := s.store.Cleanup([]byte("test"), 2) + err := s.store.Cleanup([]byte("test"), 2, math.MaxUint64) c.Assert(err, IsNil) req = &kvrpcpb.PrewriteRequest{ Mutations: putMutations("test", "test3"), @@ -712,7 +712,7 @@ func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) { c.Assert(ttl, Greater, uint64(300)) // The lock has already been clean up - c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil) + c.Assert(s.store.Cleanup([]byte("pk"), 5, math.MaxUint64), IsNil) _, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000) c.Assert(err, NotNil) } diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index d9aa80fa4e433..be85563479903 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -259,7 +259,7 @@ type MVCCStore interface { Prewrite(req *kvrpcpb.PrewriteRequest) []error Commit(keys [][]byte, startTS, commitTS uint64) error Rollback(keys [][]byte, startTS uint64) error - Cleanup(key []byte, startTS uint64) error + Cleanup(key []byte, startTS, currentTS uint64) error ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error) ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index cb97d8a874502..4afb1299f69e3 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -700,6 +700,12 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, } if ok { if dec.lock.startTS != startTS { + if isPessimisticLock { + // NOTE: A special handling. + // When pessimistic txn prewrite meets lock, set the TTL = 0 means + // telling TiDB to rollback the transaction **unconditionly**. + dec.lock.ttl = 0 + } return dec.lock.lockErr(mutation.Key) } if dec.lock.op != kvrpcpb.Op_PessimisticLock { @@ -926,7 +932,8 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal } // Cleanup implements the MVCCStore interface. -func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { +// Cleanup API is deprecated, use CheckTxnStatus instead. +func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { mvcc.mu.Lock() defer func() { mvcc.mu.Unlock() @@ -934,11 +941,64 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { }() batch := &leveldb.Batch{} - err := rollbackKey(mvcc.db, batch, key, startTS) + startKey := mvccEncode(key, lockVer) + iter := newIterator(mvcc.db, &util.Range{ + Start: startKey, + }) + defer iter.Release() + + if iter.Valid() { + dec := lockDecoder{ + expectKey: key, + } + ok, err := dec.Decode(iter) + if err != nil { + return err + } + // If current transaction's lock exists. + if ok && dec.lock.startTS == startTS { + + // If the lock has already outdated, clean up it. + if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { + if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { + return err + } + return mvcc.db.Write(batch, nil) + } + + // Otherwise, return a locked error with the TTL information. + return dec.lock.lockErr(key) + } + + // If current transaction's lock does not exist. + // If the commit information of the current transaction exist. + c, ok, err := getTxnCommitInfo(iter, key, startTS) + if err != nil { + return errors.Trace(err) + } + if ok { + // If the current transaction has already committed. + if c.valueType != typeRollback { + return ErrAlreadyCommitted(c.commitTS) + } + // If the current transaction has already rollbacked. + return nil + } + } + + // If current transaction is not prewritted before. + value := mvccValue{ + valueType: typeRollback, + startTS: startTS, + commitTS: startTS, + } + writeKey := mvccEncode(key, startTS) + writeValue, err := value.MarshalBinary() if err != nil { return errors.Trace(err) } - return mvcc.db.Write(batch, nil) + batch.Put(writeKey, writeValue) + return nil } // CheckTxnStatus checks the primary lock of a transaction to decide its status. diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index c4b79af71d6a2..21f9acca10efc 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -357,7 +357,7 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean panic("KvCleanup: key not in region") } var resp kvrpcpb.CleanupResponse - err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion()) + err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion(), req.GetCurrentTs()) if err != nil { if commitTS, ok := errors.Cause(err).(ErrAlreadyCommitted); ok { resp.CommitVersion = uint64(commitTS) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index f440f93956870..263caab94cfd2 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -174,7 +174,8 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) { return s, ok } -// BatchResolveLocks resolve locks in a batch +// BatchResolveLocks resolve locks in a batch. +// Used it in gcworker only! func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) { if len(locks) == 0 { return true, nil @@ -182,7 +183,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithBatchResolve.Inc() - var expiredLocks []*Lock + expiredLocks := make([]*Lock, 0, len(locks)) for _, l := range locks { if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) { tikvLockResolverCountWithExpired.Inc() @@ -205,7 +206,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) if err != nil { return false, errors.Trace(err) } @@ -266,9 +267,10 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) { +func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) { + var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { - return + return msBeforeTxnExpired.value(), nil } tikvLockResolverCountWithResolve.Inc() @@ -277,61 +279,111 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE for _, l := range locks { msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) if msBeforeLockExpired <= 0 { - tikvLockResolverCountWithExpired.Inc() expiredLocks = append(expiredLocks, l) } else { - if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { - msBeforeTxnExpired = msBeforeLockExpired - } + msBeforeTxnExpired.update(int64(l.TTL)) tikvLockResolverCountWithNotExpired.Inc() } } - if len(expiredLocks) == 0 { - if msBeforeTxnExpired > 0 { - tikvLockResolverCountWithWaitExpired.Inc() - } - return - } - // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredLocks { - var status TxnStatus - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary) + status, err := lr.getTxnStatusFromLock(bo, l) if err != nil { - msBeforeTxnExpired = 0 + msBeforeTxnExpired.update(0) err = errors.Trace(err) - return + return msBeforeTxnExpired.value(), err } - cleanRegions, exists := cleanTxns[l.TxnID] - if !exists { - cleanRegions = make(map[RegionVerID]struct{}) - cleanTxns[l.TxnID] = cleanRegions - } + if status.ttl == 0 { + tikvLockResolverCountWithExpired.Inc() + // If the lock is committed or rollbacked, resolve lock. + cleanRegions, exists := cleanTxns[l.TxnID] + if !exists { + cleanRegions = make(map[RegionVerID]struct{}) + cleanTxns[l.TxnID] = cleanRegions + } - err = lr.resolveLock(bo, l, status, cleanRegions) - if err != nil { - msBeforeTxnExpired = 0 - err = errors.Trace(err) - return + err = lr.resolveLock(bo, l, status, cleanRegions) + if err != nil { + msBeforeTxnExpired.update(0) + err = errors.Trace(err) + return msBeforeTxnExpired.value(), err + } + } else { + tikvLockResolverCountWithNotExpired.Inc() + // If the lock is valid, the txn may be a pessimistic transaction. + // Update the txn expire time. + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl) + msBeforeTxnExpired.update(msBeforeLockExpired) } } + + if msBeforeTxnExpired.value() > 0 { + tikvLockResolverCountWithWaitExpired.Inc() + } + return msBeforeTxnExpired.value(), nil +} + +type txnExpireTime struct { + initialized bool + txnExpire int64 +} + +func (t *txnExpireTime) update(lockExpire int64) { + if lockExpire <= 0 { + lockExpire = 0 + } + if !t.initialized { + t.txnExpire = lockExpire + t.initialized = true + return + } + if lockExpire < t.txnExpire { + t.txnExpire = lockExpire + } return } +func (t *txnExpireTime) value() int64 { + if !t.initialized { + return 0 + } + return t.txnExpire +} + // GetTxnStatus queries tikv-server for a txn's status (commit/rollback). // If the primary key is still locked, it will launch a Rollback to abort it. // To avoid unnecessarily aborting too many txns, it is wiser to wait a few // seconds before calling it after Prewrite. func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) { + var status TxnStatus bo := NewBackoffer(context.Background(), cleanupMaxBackoff) - status, err := lr.getTxnStatus(bo, txnID, primary) - return status, errors.Trace(err) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) + if err != nil { + return status, err + } + return lr.getTxnStatus(bo, txnID, primary, currentTS) +} + +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) { + // NOTE: l.TTL = 0 is a special protocol!!! + // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. + // In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call + // getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction. + if l.TTL == 0 { + return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) + } + + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) + if err != nil { + return TxnStatus{}, err + } + return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS) } -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -342,6 +394,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{ Key: primary, StartVersion: txnID, + CurrentTs: currentTS, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -368,6 +421,12 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte } cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse) if keyErr := cmdResp.GetError(); keyErr != nil { + // If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL. + if lockInfo := keyErr.GetLocked(); lockInfo != nil { + status.ttl = lockInfo.LockTtl + status.commitTS = 0 + return status, nil + } err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID) logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) return status, err diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index e5bb7e063ae47..7fac316899b0f 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -200,6 +200,43 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) { status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) + c.Assert(status.ttl, Greater, uint64(0)) +} + +func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("key"), []byte("value")) + s.prewriteTxn(c, txn.(*tikvTxn)) + + // Check the lock TTL of a transaction. + bo := NewBackoffer(context.Background(), prewriteMaxBackoff) + lr := newLockResolver(s.store) + status, err := lr.GetTxnStatus(txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + c.Assert(status.IsCommitted(), IsFalse) + c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(status.CommitTS(), Equals, uint64(0)) + + // Rollback the txn. + lock := s.mustGetLock(c, []byte("key")) + status = TxnStatus{} + cleanRegions := make(map[RegionVerID]struct{}) + err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions) + c.Assert(err, IsNil) + + // Check its status is rollbacked. + status, err = lr.GetTxnStatus(txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, uint64(0)) + + // Check a committed txn. + startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) + status, err = lr.GetTxnStatus(startTS, []byte("a")) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, commitTS) } func (s *testLockSuite) TestTxnHeartBeat(c *C) { @@ -217,11 +254,11 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) { c.Assert(err, IsNil) c.Assert(newTTL, Equals, uint64(666)) - // The getTxnStatus API is confusing, it really means rollback! - status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key")) + lock := s.mustGetLock(c, []byte("key")) + status := TxnStatus{ttl: newTTL} + cleanRegions := make(map[RegionVerID]struct{}) + err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions) c.Assert(err, IsNil) - c.Assert(status.ttl, Equals, uint64(0)) - c.Assert(status.commitTS, Equals, uint64(0)) newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666) c.Assert(err, NotNil)