diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 5f10ac74d7e6f..8f15c0f689b16 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -22,6 +22,7 @@ type ErrLocked struct { Primary []byte StartTS uint64 TTL uint64 + TxnSize uint64 } // Error formats the lock to a string. diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index fda5c5597915a..576bfc4b744c6 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -69,6 +69,7 @@ func (l *mvccLock) MarshalBinary() ([]byte, error) { mh.WriteNumber(&buf, l.op) mh.WriteNumber(&buf, l.ttl) mh.WriteNumber(&buf, l.forUpdateTS) + mh.WriteNumber(&buf, l.txnSize) return buf.Bytes(), errors.Trace(mh.err) } @@ -82,6 +83,7 @@ func (l *mvccLock) UnmarshalBinary(data []byte) error { mh.ReadNumber(buf, &l.op) mh.ReadNumber(buf, &l.ttl) mh.ReadNumber(buf, &l.forUpdateTS) + mh.ReadNumber(buf, &l.txnSize) return errors.Trace(mh.err) } @@ -198,6 +200,7 @@ func (l *mvccLock) lockErr(key []byte) error { Primary: l.primary, StartTS: l.startTS, TTL: l.ttl, + TxnSize: l.txnSize, } } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 1d8edc49d4536..be550fcb8b4a0 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -634,7 +634,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { anyError := false batch := &leveldb.Batch{} errs := make([]error, 0, len(mutations)) - txnSize := len(mutations) + txnSize := req.TxnSize for i, m := range mutations { // If the operation is Insert, check if key is exists at first. var err error @@ -655,7 +655,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { } } isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i] - err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, uint64(txnSize), isPessimisticLock) + err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock) errs = append(errs, err) if err != nil { anyError = true diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index e4919318048fa..f8396a5887e94 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -57,6 +57,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { PrimaryLock: locked.Primary, LockVersion: locked.StartTS, LockTtl: locked.TTL, + TxnSize: locked.TxnSize, }, } } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index b4047fea0c1d3..a5434f9691946 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -88,26 +88,29 @@ type twoPhaseCommitter struct { mutations map[string]*mutationEx lockTTL uint64 commitTS uint64 - mu struct { - sync.RWMutex - committed bool - undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key. - } - priority pb.CommandPri - syncLog bool - connID uint64 // connID is used for log. - cleanWg sync.WaitGroup + priority pb.CommandPri + connID uint64 // connID is used for log. + cleanWg sync.WaitGroup // maxTxnTimeUse represents max time a Txn may use (in ms) from its startTS to commitTS. // We use it to guarantee GC worker will not influence any active txn. The value // should be less than GC life time. - maxTxnTimeUse uint64 - detail *execdetails.CommitDetails - // For pessimistic transaction - isPessimistic bool + maxTxnTimeUse uint64 + detail *execdetails.CommitDetails primaryKey []byte forUpdateTS uint64 - isFirstLock bool pessimisticTTL uint64 + + mu struct { + sync.RWMutex + undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key. + committed bool + } + syncLog bool + // For pessimistic transaction + isPessimistic bool + isFirstLock bool + // regionTxnSize stores the number of keys involved in each region + regionTxnSize map[uint64]int } type mutationEx struct { @@ -119,10 +122,11 @@ type mutationEx struct { // newTwoPhaseCommitter creates a twoPhaseCommitter. func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, error) { return &twoPhaseCommitter{ - store: txn.store, - txn: txn, - startTS: txn.StartTS(), - connID: connID, + store: txn.store, + txn: txn, + startTS: txn.StartTS(), + connID: connID, + regionTxnSize: map[uint64]int{}, }, nil } @@ -323,6 +327,9 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA var batches []batchKeys var sizeFunc = c.keySize if action == actionPrewrite { + for region, keys := range groups { + c.regionTxnSize[region.id] = len(keys) + } sizeFunc = c.keyValueSize atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups))) } @@ -461,7 +468,7 @@ func (c *twoPhaseCommitter) keySize(key []byte) int { return len(key) } -func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Request { +func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64) *tikvrpc.Request { mutations := make([]*pb.Mutation, len(batch.keys)) var isPessimisticLock []bool if c.isPessimistic { @@ -483,7 +490,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Reque LockTtl: c.lockTTL, IsPessimisticLock: isPessimisticLock, ForUpdateTs: c.forUpdateTS, - TxnSize: uint64(len(batch.keys)), + TxnSize: txnSize, }, Context: pb.Context{ Priority: c.priority, @@ -493,7 +500,14 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Reque } func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error { - req := c.buildPrewriteRequest(batch) + txnSize := uint64(c.regionTxnSize[batch.region.id]) + // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here + // to MaxUint64 to avoid unexpected "resolve lock lite". + if len(bo.errors) > 0 { + txnSize = math.MaxUint64 + } + + req := c.buildPrewriteRequest(batch, txnSize) for { resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 9fb990cc749e6..a92f33fdff9fd 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -442,6 +442,40 @@ func (s *testCommitterSuite) TestWrittenKeysOnConflict(c *C) { c.Assert(totalTime, Less, time.Millisecond*200) } +func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) { + // Prepare two regions first: (, 100) and [100, ) + region, _ := s.cluster.GetRegionByKey([]byte{50}) + newRegionID := s.cluster.AllocID() + newPeerID := s.cluster.AllocID() + s.cluster.Split(region.Id, newRegionID, []byte{100}, []uint64{newPeerID}, newPeerID) + + txn := s.begin(c) + var val [1024]byte + for i := byte(50); i < 120; i++ { + err := txn.Set([]byte{i}, val[:]) + c.Assert(err, IsNil) + } + + commiter, err := newTwoPhaseCommitterWithInit(txn, 1) + c.Assert(err, IsNil) + + ctx := context.Background() + err = commiter.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff), commiter.keys) + c.Assert(err, IsNil) + + // Check the written locks in the first region (50 keys) + for i := byte(50); i < 100; i++ { + lock := s.getLockInfo(c, []byte{i}) + c.Assert(int(lock.TxnSize), Equals, 50) + } + + // Check the written locks in the second region (20 keys) + for i := byte(100); i < 120; i++ { + lock := s.getLockInfo(c, []byte{i}) + c.Assert(int(lock.TxnSize), Equals, 20) + } +} + func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) { // This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock. txn := s.begin(c) @@ -454,7 +488,7 @@ func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) { var batch batchKeys batch.keys = append(batch.keys, []byte("t1")) batch.region = RegionVerID{1, 1, 1} - req := commiter.buildPrewriteRequest(batch) + req := commiter.buildPrewriteRequest(batch, 1) c.Assert(len(req.Prewrite.IsPessimisticLock), Greater, 0) c.Assert(req.Prewrite.ForUpdateTs, Equals, uint64(100)) } @@ -520,7 +554,7 @@ func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo { loc, err := s.store.regionCache.LocateKey(bo, key) c.Assert(err, IsNil) batch := batchKeys{region: loc.Region, keys: [][]byte{key}} - req := commiter.buildPrewriteRequest(batch) + req := commiter.buildPrewriteRequest(batch, 1) resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort) c.Assert(err, IsNil) c.Assert(resp.Prewrite, NotNil)