From b1c67f0372b9e57beec0865a2828312077793df5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 2 Sep 2019 12:04:32 +0800 Subject: [PATCH 01/17] *: implement the CheckTxnStatus API for the large transaction --- go.mod | 4 + go.sum | 2 + store/mockstore/mocktikv/mock_tikv_test.go | 29 +++++++ store/mockstore/mocktikv/mvcc.go | 2 + store/mockstore/mocktikv/mvcc_leveldb.go | 95 +++++++++++++++++++++- store/mockstore/mocktikv/rpc.go | 21 +++++ store/tikv/lock_resolver.go | 61 ++++++++++++-- store/tikv/lock_test.go | 28 +++++++ store/tikv/region_request_test.go | 6 ++ store/tikv/tikvrpc/tikvrpc.go | 13 +++ 10 files changed, 253 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 6413c20dd1b09..611f6164087aa 100644 --- a/go.mod +++ b/go.mod @@ -75,3 +75,7 @@ require ( sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) + +replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d + +go 1.13 diff --git a/go.sum b/go.sum index 87f81d487e605..c87c51bed2c38 100644 --- a/go.sum +++ b/go.sum @@ -162,6 +162,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE= github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d h1:/yAB1ikjMxGOyBCYO8O+POBlarPfcMjJkhVy2ZHENCA= +github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index ddb71d775d46e..5f0af7d64bfe2 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -157,10 +157,15 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi } func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) { + s.mustPrewriteOK1(c, mutations, primary, startTS, 0) +} + +func (s *testMockTiKVSuite) mustPrewriteOK1(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) { req := &kvrpcpb.PrewriteRequest{ Mutations: mutations, PrimaryLock: []byte(primary), StartVersion: startTS, + LockTtl: ttl, } errs := s.store.Prewrite(req) for _, err := range errs { @@ -650,3 +655,27 @@ func (s *testMVCCLevelDB) TestErrors(c *C) { c.Assert(ErrAlreadyCommitted(0).Error(), Equals, "txn already committed") c.Assert((&ErrConflict{}).Error(), Equals, "write conflict") } + +func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { + s.mustPrewriteOK1(c, putMutations("pk", "val"), "pk", 5, 666) + + ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0) + c.Assert(err, IsNil) + c.Assert(ttl, Equals, uint64(666)) + c.Assert(commitTS, Equals, uint64(0)) + + s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30) + + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0) + c.Assert(err, IsNil) + c.Assert(ttl, Equals, uint64(0)) + c.Assert(commitTS, Equals, uint64(30)) + + s.mustPrewriteOK1(c, putMutations("pk1", "val"), "pk1", 5, 666) + s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5) + + ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0) + c.Assert(err, IsNil) + c.Assert(ttl, Equals, uint64(0)) + c.Assert(commitTS, Equals, uint64(0)) +} diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 02b4ba3c5a6c4..502a3b7be0586 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -48,6 +48,7 @@ type mvccLock struct { ttl uint64 forUpdateTS uint64 txnSize uint64 + minCommitTS uint64 } type mvccEntry struct { @@ -262,6 +263,7 @@ type MVCCStore interface { BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error GC(startKey, endKey []byte, safePoint uint64) error DeleteRange(startKey, endKey []byte) error + CheckTxnStatus(primaryKey []byte, startTS uint64, currentTS uint64) (ttl, commitTS uint64, err error) Close() error } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 056824834d437..86f50875d90b6 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/goleveldb/leveldb/util" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/deadlock" "github.com/pingcap/tidb/util/logutil" @@ -589,6 +590,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { primary := req.PrimaryLock startTS := req.StartVersion ttl := req.LockTtl + minCommitTS := req.MinCommitTs mvcc.mu.Lock() defer mvcc.mu.Unlock() @@ -616,7 +618,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { } } isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i] - err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock) + err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS) errs = append(errs, err) if err != nil { anyError = true @@ -679,7 +681,10 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, startTS uint64) err return nil } -func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64, txnSize uint64, isPessimisticLock bool) error { +func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, + mutation *kvrpcpb.Mutation, startTS uint64, + primary []byte, ttl uint64, txnSize uint64, + isPessimisticLock bool, minCommitTS uint64) error { startKey := mvccEncode(mutation.Key, lockVer) iter := newIterator(db, &util.Range{ Start: startKey, @@ -723,6 +728,11 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu ttl: ttl, txnSize: txnSize, } + // Write minCommitTS on the primary lock. + if bytes.Equal(primary, mutation.GetKey()) { + lock.minCommitTS = minCommitTS + } + writeKey := mvccEncode(mutation.Key, lockVer) writeValue, err := lock.MarshalBinary() if err != nil { @@ -931,6 +941,87 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { return mvcc.db.Write(batch, nil) } +// CheckTxnStatus checks the primary lock of a transaction to decide its status. +// The return values are (ttl, commitTS, err): +// If the transaction is active, this function returns the ttl of the lock; +// If the transaction is committed, this function returns the commitTS; +// If the transaction is rollbacked, this function returns (0, 0, nil) +// Note that CheckTxnStatus may also push forward the `minCommitTS` of the +// transaction, so it's not simply a read-only operation. +func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, startTS uint64, currentTS uint64) (uint64, uint64, error) { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + startKey := mvccEncode(primaryKey, lockVer) + iter := newIterator(mvcc.db, &util.Range{ + Start: startKey, + }) + defer iter.Release() + + if iter.Valid() { + dec := lockDecoder{ + expectKey: primaryKey, + } + ok, err := dec.Decode(iter) + if err != nil { + return 0, 0, errors.Trace(err) + } + // If current transaction's lock exists. + if ok && dec.lock.startTS == startTS { + lock := dec.lock + batch := &leveldb.Batch{} + + // If the lock has already outdated, clean up it. + if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { + if err = rollbackLock(batch, lock, primaryKey, startTS); err != nil { + return 0, 0, errors.Trace(err) + } + if err = mvcc.db.Write(batch, nil); err != nil { + return 0, 0, errors.Trace(err) + } + return 0, 0, nil + } + + // If this is a large transaction and the lock is active, push forward the minCommitTS. + // lock.minCommitTS == 0 may be a secondary lock, or not a large transaction. + if lock.minCommitTS > 0 { + lock.minCommitTS = currentTS + writeKey := mvccEncode(primaryKey, lockVer) + writeValue, err := lock.MarshalBinary() + if err != nil { + return 0, 0, errors.Trace(err) + } + batch.Put(writeKey, writeValue) + if err = mvcc.db.Write(batch, nil); err != nil { + return 0, 0, errors.Trace(err) + } + } + + return lock.ttl, 0, nil + } + + // If current transaction's lock does not exist. + // If the commit info of the current transaction exists. + c, ok, err := getTxnCommitInfo(iter, primaryKey, startTS) + if err != nil { + return 0, 0, errors.Trace(err) + } + if ok { + // If current transaction is already committed. + if c.valueType != typeRollback { + return 0, c.commitTS, nil + } + // If current transaction is already rollback. + return 0, 0, nil + } + } + + // If current transaction is not prewritted before, it may be pessimistic lock. + // When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone. + logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?") + return 0, 0, nil +} + // ScanLock implements the MVCCStore interface. func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) { mvcc.mu.RLock() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 8dd8109a523e2..a68e63e10fc54 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -368,6 +368,20 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean return &resp } +func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest) *kvrpcpb.CheckTxnStatusResponse { + if !h.checkKeyInRegion(req.PrimaryKey) { + panic("KvCheckTxnStatus: key not in region") + } + var resp kvrpcpb.CheckTxnStatusResponse + ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetStartVersion(), req.GetCurrentTs()) + if err != nil { + resp.Error = convertToKeyError(err) + } else { + resp.LockTtl, resp.CommitVersion = ttl, commitTS + } + return &resp +} + func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.BatchGetResponse { for _, k := range req.Keys { if !h.checkKeyInRegion(k) { @@ -766,6 +780,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.Resp = handler.handleKvCleanup(r) + case tikvrpc.CmdCheckTxnStatus: + r := req.CheckTxnStatus() + if err := handler.checkRequest(reqCtx, r.Size()); err != nil { + resp.Resp = &kvrpcpb.CheckTxnStatusResponse{RegionError: err} + return resp, nil + } + resp.Resp = handler.handleKvCheckTxnStatus(r) case tikvrpc.CmdBatchGet: r := req.BatchGet() if err := handler.checkRequest(reqCtx, r.Size()); err != nil { diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index f20116c26af55..f1543167abbaa 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -103,14 +103,17 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve return s.lockResolver, nil } -// TxnStatus represents a txn's final status. It should be Commit or Rollback. -type TxnStatus uint64 +// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback. +type TxnStatus struct { + ttl uint64 + commitTS uint64 +} // IsCommitted returns true if the txn's final status is Commit. -func (s TxnStatus) IsCommitted() bool { return s > 0 } +func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 } // CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true. -func (s TxnStatus) CommitTS() uint64 { return uint64(s) } +func (s TxnStatus) CommitTS() uint64 { return uint64(s.commitTS) } // By default, locks after 3000ms is considered unusual (the client created the // lock might be dead). Other client may cleanup this kind of lock. @@ -206,7 +209,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi if err != nil { return false, errors.Trace(err) } - txnInfos[l.TxnID] = uint64(status) + txnInfos[l.TxnID] = uint64(status.commitTS) } logutil.BgLogger().Info("BatchResolveLocks: lookup txn status", zap.Duration("cost time", time.Since(startTime)), @@ -318,6 +321,52 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE return } +func (lr *LockResolver) checkTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) { + var status TxnStatus + req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ + PrimaryKey: primary, + StartVersion: txnID, + CurrentTs: currentTS, + }) + for { + loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) + if err != nil { + return status, errors.Trace(err) + } + resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort) + if err != nil { + return status, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return status, errors.Trace(err) + } + if regionErr != nil { + err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return status, errors.Trace(err) + } + continue + } + if resp.Resp == nil { + return status, errors.Trace(ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) + if keyErr := cmdResp.GetError(); keyErr != nil { + err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID) + logutil.BgLogger().Error("checkTxnStatus error", zap.Error(err)) + return status, err + } + if cmdResp.LockTtl != 0 { + status.ttl = cmdResp.LockTtl + } else { + status.commitTS = cmdResp.CommitVersion + } + lr.saveResolved(txnID, status) + return status, nil + } +} + // GetTxnStatus queries tikv-server for a txn's status (commit/rollback). // If the primary key is still locked, it will launch a Rollback to abort it. // To avoid unnecessarily aborting too many txns, it is wiser to wait a few @@ -370,7 +419,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte return status, err } if cmdResp.CommitVersion != 0 { - status = TxnStatus(cmdResp.GetCommitVersion()) + status = TxnStatus{0, cmdResp.GetCommitVersion()} tikvLockResolverCountWithQueryTxnStatusCommitted.Inc() } else { tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 2675ef436011f..24ffb918fc575 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -202,6 +202,34 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) { c.Assert(status.IsCommitted(), IsFalse) } +func (s *testLockSuite) TestCheckTxnStatus(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("key"), []byte("value")) + s.prewriteTxn(c, txn.(*tikvTxn)) + + bo := NewBackoffer(context.Background(), prewriteMaxBackoff) + status, err := newLockResolver(s.store).checkTxnStatus(bo, txn.StartTS(), []byte("key"), math.MaxUint64) + c.Assert(err, IsNil) + c.Assert(status.IsCommitted(), IsFalse) + c.Assert(status.CommitTS(), Equals, uint64(0)) + + // The getTxnStatus API is confusing, it really means rollback! + status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + + status, err = newLockResolver(s.store).checkTxnStatus(bo, txn.StartTS(), []byte("key"), math.MaxUint64) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, uint64(0)) + + startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) + status, err = newLockResolver(s.store).checkTxnStatus(bo, startTS, []byte("a"), math.MaxUint64) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, commitTS) +} + func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) { committer, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index e14bdc0208147..2fe3c838fbd36 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -243,6 +243,12 @@ func (s *mockTikvGrpcServer) KvPessimisticLock(context.Context, *kvrpcpb.Pessimi func (s *mockTikvGrpcServer) KVPessimisticRollback(context.Context, *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) KvCheckTxnStatus(ctx context.Context, in *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) { + return nil, errors.New("unreachable") +} +func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { + return nil, errors.New("unreachable") +} func (s *mockTikvGrpcServer) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { return nil, errors.New("unreachable") } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 9f2e1f6eb14fb..f496581762742 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -47,6 +47,7 @@ const ( CmdDeleteRange CmdPessimisticLock CmdPessimisticRollback + CmdCheckTxnStatus CmdRawGet CmdType = 256 + iota CmdRawBatchGet @@ -127,6 +128,8 @@ func (t CmdType) String() string { return "MvccGetByStartTS" case CmdSplitRegion: return "SplitRegion" + case CmdCheckTxnStatus: + return "CheckTxnStatus" case CmdDebugGetRegionProperties: return "DebugGetRegionProperties" } @@ -304,6 +307,10 @@ func (req *Request) Empty() *tikvpb.BatchCommandsEmptyRequest { return req.req.(*tikvpb.BatchCommandsEmptyRequest) } +func (req *Request) CheckTxnStatus() *kvrpcpb.CheckTxnStatusRequest { + return req.req.(*kvrpcpb.CheckTxnStatusRequest) +} + // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request { switch req.Type { @@ -353,6 +360,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback()}} case CmdEmpty: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty()}} + case CmdCheckTxnStatus: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_CheckTxnStatus{CheckTxnStatus: req.CheckTxnStatus()}} } return nil } @@ -498,6 +507,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.SplitRegion().Context = ctx case CmdEmpty: req.SplitRegion().Context = ctx + case CmdCheckTxnStatus: + req.CheckTxnStatus().Context = ctx default: return fmt.Errorf("invalid request type %v", req.Type) } @@ -714,6 +725,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.SplitRegion(ctx, req.SplitRegion()) case CmdEmpty: resp.Resp, err = &tikvpb.BatchCommandsEmptyResponse{}, nil + case CmdCheckTxnStatus: + resp.Resp, err = client.KvCheckTxnStatus(ctx, req.CheckTxnStatus()) default: return nil, errors.Errorf("invalid request type: %v", req.Type) } From 52c1d292e984f3660432f1121e37c46a94a6c516 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 2 Sep 2019 15:15:54 +0800 Subject: [PATCH 02/17] make check --- go.sum | 3 --- store/tikv/tikvrpc/tikvrpc.go | 3 ++- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/go.sum b/go.sum index c87c51bed2c38..3ba8e90a1d267 100644 --- a/go.sum +++ b/go.sum @@ -159,9 +159,6 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7x github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE= -github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d h1:/yAB1ikjMxGOyBCYO8O+POBlarPfcMjJkhVy2ZHENCA= github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index f496581762742..9670238339b73 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -302,11 +302,12 @@ func (req *Request) DebugGetRegionProperties() *debugpb.GetRegionPropertiesReque return req.req.(*debugpb.GetRegionPropertiesRequest) } -// Empty returns BatchCommandsEmptyRequest in request +// Empty returns BatchCommandsEmptyRequest in request. func (req *Request) Empty() *tikvpb.BatchCommandsEmptyRequest { return req.req.(*tikvpb.BatchCommandsEmptyRequest) } +// CheckTxnStatus returns CheckTxnStatusRequest in request. func (req *Request) CheckTxnStatus() *kvrpcpb.CheckTxnStatusRequest { return req.req.(*kvrpcpb.CheckTxnStatusRequest) } From 299a84e8fba3c0939e3b4cfd356f18ee67740371 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 3 Sep 2019 12:51:42 +0800 Subject: [PATCH 03/17] store/tikv: refactor the ResolveLocks() function for large transaction's implementation After introducing the large transaction, the true lock TTL information is stored in the primary lock, and the TTL may change. This commit updates the resolve lock method, using the new kvproto API --- store/tikv/2pc.go | 4 +-- store/tikv/coprocessor.go | 2 +- store/tikv/lock_resolver.go | 52 +++++++++++++++++++++++++------------ store/tikv/snapshot.go | 4 +-- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 05ba781a50ba8..f98433ad38281 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -527,7 +527,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) locks = append(locks, lock) } start := time.Now() - msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks) + msBeforeExpired, _, err := c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } @@ -610,7 +610,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } locks = append(locks, lock) } - _, err = c.store.lockResolver.ResolveLocks(bo, locks) + _, _, err = c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 1dea973858225..3af99eb1e277a 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -777,7 +777,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon if lockErr := resp.pbResp.GetLocked(); lockErr != nil { logutil.BgLogger().Debug("coprocessor encounters", zap.Stringer("lock", lockErr)) - msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)}) + msBeforeExpired, _, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)}) if err1 != nil { return nil, errors.Trace(err1) } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index f1543167abbaa..674519823d006 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -266,19 +266,19 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) { +func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, resolved []uint64, err error) { if len(locks) == 0 { return } tikvLockResolverCountWithResolve.Inc() - var expiredLocks []*Lock + var expiredSecondaryLocks []*Lock for _, l := range locks { msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) if msBeforeLockExpired <= 0 { tikvLockResolverCountWithExpired.Inc() - expiredLocks = append(expiredLocks, l) + expiredSecondaryLocks = append(expiredSecondaryLocks, l) } else { if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { msBeforeTxnExpired = msBeforeLockExpired @@ -286,36 +286,56 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE tikvLockResolverCountWithNotExpired.Inc() } } - if len(expiredLocks) == 0 { + if len(expiredSecondaryLocks) == 0 { if msBeforeTxnExpired > 0 { tikvLockResolverCountWithWaitExpired.Inc() } return } + currentTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx) + if err != nil { + return 0, nil, err + } + // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) - for _, l := range expiredLocks { + for _, l := range expiredSecondaryLocks { var status TxnStatus - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary) + status, err = lr.checkTxnStatus(bo, l.TxnID, l.Primary, currentTS) if err != nil { msBeforeTxnExpired = 0 err = errors.Trace(err) return } - cleanRegions, exists := cleanTxns[l.TxnID] - if !exists { - cleanRegions = make(map[RegionVerID]struct{}) - cleanTxns[l.TxnID] = cleanRegions - } + if status.ttl == 0 { + // If the lock is committed or rollbacked, resolve lock. + cleanRegions, exists := cleanTxns[l.TxnID] + if !exists { + cleanRegions = make(map[RegionVerID]struct{}) + cleanTxns[l.TxnID] = cleanRegions + } - err = lr.resolveLock(bo, l, status, cleanRegions) - if err != nil { - msBeforeTxnExpired = 0 - err = errors.Trace(err) - return + err = lr.resolveLock(bo, l, status, cleanRegions) + if err != nil { + msBeforeTxnExpired = 0 + err = errors.Trace(err) + return + } + } else { + // If the lock is valid, the txn may be a large transaction. + resolved = append(resolved, l.TxnID) + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl) + if msBeforeLockExpired <= 0 { + // The txn is a large transaction, and it's primary lock will expire soon, but + // TxnHeartBeat could update the TTL, so we should not clean up the lock. + continue + } + if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { + msBeforeTxnExpired = msBeforeLockExpired + } } } return diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 875b45ae45212..d69cc9ee3d62d 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -202,7 +202,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll locks = append(locks, lock) } if len(lockedKeys) > 0 { - msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, locks) + msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } @@ -273,7 +273,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } - msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock}) + msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock}) if err != nil { return nil, errors.Trace(err) } From 4c6c5c2384f2e5d66673e5d73a875f0cefc534ff Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 12 Sep 2019 00:29:51 +0800 Subject: [PATCH 04/17] tiny cleanup --- store/mockstore/mocktikv/mvcc_leveldb.go | 7 ++++--- store/tikv/2pc.go | 9 ++------- store/tikv/lock_resolver.go | 2 +- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 549c20c634f49..cb97d8a874502 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -942,15 +942,16 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { } // CheckTxnStatus checks the primary lock of a transaction to decide its status. -// primaryKey + lockTS together could locate the primary lock. -// callerStartTS is the start ts of reader transaction. -// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL. // The return values are (ttl, commitTS, err): // If the transaction is active, this function returns the ttl of the lock; // If the transaction is committed, this function returns the commitTS; // If the transaction is rollbacked, this function returns (0, 0, nil) // Note that CheckTxnStatus may also push forward the `minCommitTS` of the // transaction, so it's not simply a read-only operation. +// +// primaryKey + lockTS together could locate the primary lock. +// callerStartTS is the start ts of reader transaction. +// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL. func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) { mvcc.mu.Lock() defer mvcc.mu.Unlock() diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 1763ea0a6efb3..eb18a82b7c949 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -565,13 +565,8 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } start := time.Now() - callerTS := c.startTS - if c.forUpdateTS > callerTS { - // Prewrite pessimistic transaction should not meet lock, so the code should - // not run here? Anyway, use a larger one is always correct. - callerTS = c.forUpdateTS - } - msBeforeExpired, _, err := c.store.lockResolver.ResolveLocks(bo, callerTS, locks) + // Prewrite pessimistic transaction should not meet lock? + msBeforeExpired, _, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index c47b9e7eccae8..818b3beb562da 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -293,7 +293,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Loc return } - currentTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { return 0, nil, err } From ce3b782d09481cc6032dd3140511b6896078b3db Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 28 Sep 2019 02:00:01 +0800 Subject: [PATCH 05/17] use CheckTxnStatus to implement getTxnStatus --- go.mod | 1 - store/tikv/2pc_test.go | 2 +- store/tikv/lock_resolver.go | 103 +++++++++++------------------------- store/tikv/lock_test.go | 10 ++-- 4 files changed, 37 insertions(+), 79 deletions(-) diff --git a/go.mod b/go.mod index 22987a370bd12..e1ae085082f7b 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,6 @@ require ( golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect golang.org/x/tools v0.0.0-20190911022129-16c5e0f7d110 - google.golang.org/appengine v1.4.0 // indirect google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514 // indirect google.golang.org/grpc v1.23.0 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 668d3274ed766..a84a4c12a30f4 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -542,7 +542,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackoffer(context.Background(), getMaxBackoff) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, lockInfo.LockTtl) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 0a4281a724bf9..98d7390a4fd81 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -206,7 +206,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, 0) if err != nil { return false, errors.Trace(err) } @@ -289,7 +289,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Loc // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredLocks { - status, err := lr.getTxnStatusFromLock(bo, l, startTS) + status, err := lr.getTxnStatusFromLock(bo, l, startTS, cleanTxns) if err != nil { msBeforeTxnExpired.update(0) err = errors.Trace(err) @@ -325,53 +325,6 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Loc return msBeforeTxnExpired.value(), nil } -func (lr *LockResolver) checkTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS uint64, currentTS uint64) (TxnStatus, error) { - var status TxnStatus - req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ - PrimaryKey: primary, - LockTs: txnID, - CallerStartTs: callerStartTS, - CurrentTs: currentTS, - }) - for { - loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) - if err != nil { - return status, errors.Trace(err) - } - resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort) - if err != nil { - return status, errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return status, errors.Trace(err) - } - if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return status, errors.Trace(err) - } - continue - } - if resp.Resp == nil { - return status, errors.Trace(ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) - if keyErr := cmdResp.GetError(); keyErr != nil { - err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID) - logutil.BgLogger().Error("checkTxnStatus error", zap.Error(err)) - return status, err - } - if cmdResp.LockTtl != 0 { - status.ttl = cmdResp.LockTtl - } else { - status.commitTS = cmdResp.CommitVersion - } - lr.saveResolved(txnID, status) - return status, nil - } -} - type txnExpireTime struct { initialized bool txnExpire int64 @@ -410,37 +363,44 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, e if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, currentTS) + return lr.getTxnStatus(bo, txnID, primary, 0, currentTS) } -func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, cleanTxns map[uint64]map[RegionVerID]struct{}) (TxnStatus, error) { // NOTE: l.TTL = 0 is a special protocol!!! // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. // In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call // getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction. if l.TTL == 0 { - return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) + var status TxnStatus + cleanRegions, exists := cleanTxns[l.TxnID] + if !exists { + cleanRegions = make(map[RegionVerID]struct{}) + cleanTxns[l.TxnID] = cleanRegions + } + err := lr.resolveLock(bo, l, status, cleanRegions) + return status, err } currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { return TxnStatus{}, err } - return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS) + return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) } -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } tikvLockResolverCountWithQueryTxnStatus.Inc() - var status TxnStatus - req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{ - Key: primary, - StartVersion: txnID, - CurrentTs: currentTS, + req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ + PrimaryKey: primary, + LockTs: txnID, + CallerStartTs: callerStartTS, + CurrentTs: currentTS, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -465,25 +425,24 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte if resp.Resp == nil { return status, errors.Trace(ErrBodyMissing) } - cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse) + cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) if keyErr := cmdResp.GetError(); keyErr != nil { - // If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL. - if lockInfo := keyErr.GetLocked(); lockInfo != nil { - status.ttl = lockInfo.LockTtl - status.commitTS = 0 - return status, nil - } - err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID) + err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID) logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) return status, err } - if cmdResp.CommitVersion != 0 { - status = TxnStatus{0, cmdResp.GetCommitVersion()} - tikvLockResolverCountWithQueryTxnStatusCommitted.Inc() + if cmdResp.LockTtl != 0 { + status.ttl = cmdResp.LockTtl } else { - tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() + if cmdResp.CommitVersion == 0 { + tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() + } else { + tikvLockResolverCountWithQueryTxnStatusCommitted.Inc() + } + + status.commitTS = cmdResp.CommitVersion + lr.saveResolved(txnID, status) } - lr.saveResolved(txnID, status) return status, nil } } diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 362d263980ec3..0e44f90db4389 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -275,25 +275,25 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { currentTS, err := oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), prewriteMaxBackoff) - status, err := newLockResolver(s.store).checkTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) c.Assert(status.CommitTS(), Equals, uint64(0)) - // The getTxnStatus API is confusing, it really means rollback! - status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key")) + // It is confusing here, this getTxnStatus really means rollback! + status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, math.MaxUint64) c.Assert(err, IsNil) currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = newLockResolver(s.store).checkTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).checkTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) + status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) From 5b0a56c9cb9671529aef04ac0fa49298bb80f6ec Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 8 Oct 2019 14:55:05 +0800 Subject: [PATCH 06/17] update comment --- store/tikv/lock_resolver.go | 5 +++-- store/tikv/lock_test.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 98d7390a4fd81..0b97359c5629f 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -304,6 +304,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Loc cleanRegions = make(map[RegionVerID]struct{}) cleanTxns[l.TxnID] = cleanRegions } + err = lr.resolveLock(bo, l, status, cleanRegions) if err != nil { msBeforeTxnExpired.update(0) @@ -369,8 +370,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, e func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, cleanTxns map[uint64]map[RegionVerID]struct{}) (TxnStatus, error) { // NOTE: l.TTL = 0 is a special protocol!!! // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. - // In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call - // getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction. + // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock! if l.TTL == 0 { var status TxnStatus cleanRegions, exists := cleanTxns[l.TxnID] @@ -395,6 +395,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte } tikvLockResolverCountWithQueryTxnStatus.Inc() + var status TxnStatus req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ PrimaryKey: primary, diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 0e44f90db4389..266912952b188 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -281,7 +281,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { c.Assert(status.ttl, Greater, uint64(0)) c.Assert(status.CommitTS(), Equals, uint64(0)) - // It is confusing here, this getTxnStatus really means rollback! + // It is confusing here, getTxnStatus with currentTS = MaxUint64 really means rollback! status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, math.MaxUint64) c.Assert(err, IsNil) From 7754fbd9b6db49e2f4c32b9534e1a2f99fab8b3f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 8 Oct 2019 15:51:06 +0800 Subject: [PATCH 07/17] resolve lock for GC --- store/tikv/lock_resolver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 0b97359c5629f..213e74d5aca59 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -206,7 +206,8 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, 0) + // Use math.MaxUint64 as currentTS, so the txn is either committed or rollbacked. + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64) if err != nil { return false, errors.Trace(err) } From edc03ff8d59490c2ef61aef110af1d17e7a78ca8 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 8 Oct 2019 16:16:48 +0800 Subject: [PATCH 08/17] fix build --- store/tikv/lock_resolver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 213e74d5aca59..1908bfeca54a1 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -17,6 +17,7 @@ import ( "container/list" "context" "fmt" + "math" "sync" "time" From 3c96f5750fda3f88b9b3189dfe1ba5bed37c8dd2 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 8 Oct 2019 17:34:38 +0800 Subject: [PATCH 09/17] fix integrated test --- store/tikv/tikvrpc/tikvrpc.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 33da476e608d3..bc6efc867621d 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -442,6 +442,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp return &Response{Resp: res.Empty} case *tikvpb.BatchCommandsResponse_Response_TxnHeartBeat: return &Response{Resp: res.TxnHeartBeat} + case *tikvpb.BatchCommandsResponse_Response_CheckTxnStatus: + return &Response{Resp: res.CheckTxnStatus} } return nil } From 6da84f703a1904115198b63ae4548b374f1373bc Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 10 Oct 2019 13:06:01 +0800 Subject: [PATCH 10/17] address comment --- store/tikv/lock_resolver.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 1908bfeca54a1..510a0603878be 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -17,7 +17,6 @@ import ( "container/list" "context" "fmt" - "math" "sync" "time" @@ -200,18 +199,32 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi return false, nil } - startTime := time.Now() + startTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx) + if err != nil { + return false, errors.Trace(err) + } + txnInfos := make(map[uint64]uint64) + startTime := time.Now() for _, l := range expiredLocks { if _, ok := txnInfos[l.TxnID]; ok { continue } - // Use math.MaxUint64 as currentTS, so the txn is either committed or rollbacked. - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { - return false, errors.Trace(err) + return false, err } + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, startTS, currentTS) + if err != nil { + return false, err + } + + if status.ttl > 0 { + // Do not clean lock that is not expired. + continue + } + txnInfos[l.TxnID] = uint64(status.commitTS) } logutil.BgLogger().Info("BatchResolveLocks: lookup txn status", From c1fb4a6ab94ae8d0dc6e547ccbd14d8780cf419d Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 10 Oct 2019 13:52:54 +0800 Subject: [PATCH 11/17] address comment --- store/tikv/lock_resolver.go | 4 ++-- store/tikv/lock_test.go | 27 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 510a0603878be..df5fcc12c59c2 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -372,14 +372,14 @@ func (t *txnExpireTime) value() int64 { // If the primary key is still locked, it will launch a Rollback to abort it. // To avoid unnecessarily aborting too many txns, it is wiser to wait a few // seconds before calling it after Prewrite. -func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) { +func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary []byte) (TxnStatus, error) { var status TxnStatus bo := NewBackoffer(context.Background(), cleanupMaxBackoff) currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, 0, currentTS) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS) } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, cleanTxns map[uint64]map[RegionVerID]struct{}) (TxnStatus, error) { diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 266912952b188..e53cc39bed6d3 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -185,19 +185,19 @@ func (s *testLockSuite) TestCleanLock(c *C) { func (s *testLockSuite) TestGetTxnStatus(c *C) { startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err := s.store.lockResolver.GetTxnStatus(startTS, []byte("a")) + status, err := s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsTrue) c.Assert(status.CommitTS(), Equals, commitTS) startTS, commitTS = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), true) - status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a")) + status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsTrue) c.Assert(status.CommitTS(), Equals, commitTS) startTS, _ = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), false) - status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a")) + status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -209,10 +209,13 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { txn.Set(kv.Key("key"), []byte("value")) s.prewriteTxn(c, txn.(*tikvTxn)) - // Check the lock TTL of a transaction. bo := NewBackoffer(context.Background(), prewriteMaxBackoff) lr := newLockResolver(s.store) - status, err := lr.GetTxnStatus(txn.StartTS(), []byte("key")) + callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx) + c.Assert(err, IsNil) + + // Check the lock TTL of a transaction. + status, err := lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -226,14 +229,14 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { c.Assert(err, IsNil) // Check its status is rollbacked. - status, err = lr.GetTxnStatus(txn.StartTS(), []byte("key")) + status, err = lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key")) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) // Check a committed txn. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = lr.GetTxnStatus(startTS, []byte("a")) + status, err = lr.GetTxnStatus(startTS, callerStartTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) @@ -275,16 +278,21 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { currentTS, err := oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), prewriteMaxBackoff) + // Call getTxnStatus to check the lock status. status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) c.Assert(status.CommitTS(), Equals, uint64(0)) - // It is confusing here, getTxnStatus with currentTS = MaxUint64 really means rollback! - status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, math.MaxUint64) + // Rollback the lock. + err = newLockResolver(s.store).resolveLock(bo, + &Lock{Key: []byte("key"), Primary: []byte("key"), TxnID: txn.StartTS()}, + TxnStatus{}, + make(map[RegionVerID]struct{})) c.Assert(err, IsNil) + // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) @@ -292,6 +300,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) + // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS) c.Assert(err, IsNil) From de4d54084b03783e82dfd22cc101f1d852e4f376 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 12 Oct 2019 14:57:40 +0800 Subject: [PATCH 12/17] address comment --- store/tikv/lock_resolver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index df5fcc12c59c2..166d401ac075f 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -282,7 +282,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Lock) (int64, error) { +func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { return msBeforeTxnExpired.value(), nil @@ -304,7 +304,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Loc // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredLocks { - status, err := lr.getTxnStatusFromLock(bo, l, startTS, cleanTxns) + status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, cleanTxns) if err != nil { msBeforeTxnExpired.update(0) err = errors.Trace(err) From 3e1f4a7f7d5d07b54b1449ec4256a0c5d6e790c2 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 12 Oct 2019 17:49:41 +0800 Subject: [PATCH 13/17] address comment --- store/tikv/lock_resolver.go | 61 +++++++++++++++++++++++++++++++++++-- store/tikv/lock_test.go | 21 ++++++++----- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 166d401ac075f..d2793fc11fb77 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -387,14 +387,12 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock! if l.TTL == 0 { - var status TxnStatus cleanRegions, exists := cleanTxns[l.TxnID] if !exists { cleanRegions = make(map[RegionVerID]struct{}) cleanTxns[l.TxnID] = cleanRegions } - err := lr.resolveLock(bo, l, status, cleanRegions) - return status, err + return lr.cleanupPrimaryLock(bo, l.TxnID, l.Primary) } currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) @@ -404,6 +402,63 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) } +func (lr *LockResolver) cleanupPrimaryLock(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) { + if s, ok := lr.getResolved(txnID); ok { + return s, nil + } + + var status TxnStatus + req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{ + Key: primary, + StartVersion: txnID, + CurrentTs: 0, + }) + for { + loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) + if err != nil { + return status, errors.Trace(err) + } + resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort) + if err != nil { + return status, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return status, errors.Trace(err) + } + if regionErr != nil { + err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return status, errors.Trace(err) + } + continue + } + if resp.Resp == nil { + return status, errors.Trace(ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse) + if keyErr := cmdResp.GetError(); keyErr != nil { + // If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL. + if lockInfo := keyErr.GetLocked(); lockInfo != nil { + status.ttl = lockInfo.LockTtl + status.commitTS = 0 + return status, nil + } + err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID) + logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) + return status, err + } + if cmdResp.CommitVersion != 0 { + status = TxnStatus{0, cmdResp.GetCommitVersion()} + tikvLockResolverCountWithQueryTxnStatusCommitted.Inc() + } else { + tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() + } + lr.saveResolved(txnID, status) + return status, nil + } +} + func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index e53cc39bed6d3..088577117b518 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -272,30 +272,37 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) txn.Set(kv.Key("key"), []byte("value")) + txn.Set(kv.Key("second"), []byte("xxx")) s.prewriteTxn(c, txn.(*tikvTxn)) oracle := s.store.GetOracle() currentTS, err := oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) bo := NewBackoffer(context.Background(), prewriteMaxBackoff) + resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. - status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) c.Assert(status.CommitTS(), Equals, uint64(0)) - // Rollback the lock. - err = newLockResolver(s.store).resolveLock(bo, - &Lock{Key: []byte("key"), Primary: []byte("key"), TxnID: txn.StartTS()}, - TxnStatus{}, - make(map[RegionVerID]struct{})) + // Test the ResolveLocks API + lock := s.mustGetLock(c, []byte("second")) + timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) c.Assert(err, IsNil) + c.Assert(timeBeforeExpire > int64(0), IsTrue) + + // Force rollback the lock using lock.TTL = 0. + lock.TTL = uint64(0) + timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) + c.Assert(err, IsNil) + c.Assert(timeBeforeExpire, Equals, int64(0)) // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) + status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) From 42616144a463f706b2caf6c5ecdb31cc3765e78f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 12 Oct 2019 17:54:30 +0800 Subject: [PATCH 14/17] make lint happy --- store/tikv/lock_resolver.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index d2793fc11fb77..bb0a5c7bc0323 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -304,7 +304,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredLocks { - status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, cleanTxns) + status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS) if err != nil { msBeforeTxnExpired.update(0) err = errors.Trace(err) @@ -382,16 +382,11 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS) } -func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, cleanTxns map[uint64]map[RegionVerID]struct{}) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { // NOTE: l.TTL = 0 is a special protocol!!! // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock! if l.TTL == 0 { - cleanRegions, exists := cleanTxns[l.TxnID] - if !exists { - cleanRegions = make(map[RegionVerID]struct{}) - cleanTxns[l.TxnID] = cleanRegions - } return lr.cleanupPrimaryLock(bo, l.TxnID, l.Primary) } From f45db06d58cf83ea1c9308fd7fdddb5442707474 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 12 Oct 2019 19:25:59 +0800 Subject: [PATCH 15/17] address comment --- store/tikv/lock_resolver.go | 77 ++++++------------------------------- 1 file changed, 12 insertions(+), 65 deletions(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index bb0a5c7bc0323..33e5f2dec81d4 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -17,6 +17,7 @@ import ( "container/list" "context" "fmt" + "math" "sync" "time" @@ -383,75 +384,21 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { - // NOTE: l.TTL = 0 is a special protocol!!! - // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. - // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock! + var currentTS uint64 if l.TTL == 0 { - return lr.cleanupPrimaryLock(bo, l.TxnID, l.Primary) - } - - currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) - if err != nil { - return TxnStatus{}, err - } - return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) -} - -func (lr *LockResolver) cleanupPrimaryLock(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) { - if s, ok := lr.getResolved(txnID); ok { - return s, nil - } - - var status TxnStatus - req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{ - Key: primary, - StartVersion: txnID, - CurrentTs: 0, - }) - for { - loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) - if err != nil { - return status, errors.Trace(err) - } - resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort) - if err != nil { - return status, errors.Trace(err) - } - regionErr, err := resp.GetRegionError() + // NOTE: l.TTL = 0 is a special protocol!!! + // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. + // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock! + // Set currentTS to max uint64 to make the lock expired. + currentTS = math.MaxUint64 + } else { + var err error + currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { - return status, errors.Trace(err) - } - if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return status, errors.Trace(err) - } - continue - } - if resp.Resp == nil { - return status, errors.Trace(ErrBodyMissing) + return TxnStatus{}, err } - cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse) - if keyErr := cmdResp.GetError(); keyErr != nil { - // If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL. - if lockInfo := keyErr.GetLocked(); lockInfo != nil { - status.ttl = lockInfo.LockTtl - status.commitTS = 0 - return status, nil - } - err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID) - logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) - return status, err - } - if cmdResp.CommitVersion != 0 { - status = TxnStatus{0, cmdResp.GetCommitVersion()} - tikvLockResolverCountWithQueryTxnStatusCommitted.Inc() - } else { - tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() - } - lr.saveResolved(txnID, status) - return status, nil } + return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS) } func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) { From d0f6fc5443e03040c23acf3ce3b58234762f6b2f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 12 Oct 2019 20:10:48 +0800 Subject: [PATCH 16/17] make CI stable --- store/tikv/lock_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 088577117b518..cb9873d759283 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -291,7 +291,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { lock := s.mustGetLock(c, []byte("second")) timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) c.Assert(err, IsNil) - c.Assert(timeBeforeExpire > int64(0), IsTrue) + c.Assert(timeBeforeExpire >= int64(0), IsTrue) // Force rollback the lock using lock.TTL = 0. lock.TTL = uint64(0) From f2a887fa97f888da258138c20290de3ba25040b7 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 12 Oct 2019 20:36:33 +0800 Subject: [PATCH 17/17] address comment --- store/tikv/lock_resolver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 33e5f2dec81d4..bae4fa76516ab 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -387,7 +387,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart var currentTS uint64 if l.TTL == 0 { // NOTE: l.TTL = 0 is a special protocol!!! - // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. + // When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**. // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock! // Set currentTS to max uint64 to make the lock expired. currentTS = math.MaxUint64