Skip to content

Commit

Permalink
tikv: fix TxnSize to be the number of involved keys in the region (pi…
Browse files Browse the repository at this point in the history
…ngcap#11725)

Signed-off-by: Yilin Chen <[email protected]>
  • Loading branch information
sticnarf committed Aug 20, 2019
1 parent ec24062 commit b23dee6
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 25 deletions.
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ErrLocked struct {
Primary []byte
StartTS uint64
TTL uint64
TxnSize uint64
}

// Error formats the lock to a string.
Expand Down
3 changes: 3 additions & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (l *mvccLock) MarshalBinary() ([]byte, error) {
mh.WriteNumber(&buf, l.op)
mh.WriteNumber(&buf, l.ttl)
mh.WriteNumber(&buf, l.forUpdateTS)
mh.WriteNumber(&buf, l.txnSize)
return buf.Bytes(), errors.Trace(mh.err)
}

Expand All @@ -82,6 +83,7 @@ func (l *mvccLock) UnmarshalBinary(data []byte) error {
mh.ReadNumber(buf, &l.op)
mh.ReadNumber(buf, &l.ttl)
mh.ReadNumber(buf, &l.forUpdateTS)
mh.ReadNumber(buf, &l.txnSize)
return errors.Trace(mh.err)
}

Expand Down Expand Up @@ -198,6 +200,7 @@ func (l *mvccLock) lockErr(key []byte) error {
Primary: l.primary,
StartTS: l.startTS,
TTL: l.ttl,
TxnSize: l.txnSize,
}
}

Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
anyError := false
batch := &leveldb.Batch{}
errs := make([]error, 0, len(mutations))
txnSize := len(mutations)
txnSize := req.TxnSize
for i, m := range mutations {
// If the operation is Insert, check if key is exists at first.
var err error
Expand All @@ -655,7 +655,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
}
}
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i]
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, uint64(txnSize), isPessimisticLock)
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock)
errs = append(errs, err)
if err != nil {
anyError = true
Expand Down
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
PrimaryLock: locked.Primary,
LockVersion: locked.StartTS,
LockTtl: locked.TTL,
TxnSize: locked.TxnSize,
},
}
}
Expand Down
56 changes: 35 additions & 21 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,29 @@ type twoPhaseCommitter struct {
mutations map[string]*mutationEx
lockTTL uint64
commitTS uint64
mu struct {
sync.RWMutex
committed bool
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
}
priority pb.CommandPri
syncLog bool
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
priority pb.CommandPri
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
// maxTxnTimeUse represents max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
detail *execdetails.CommitDetails
// For pessimistic transaction
isPessimistic bool
maxTxnTimeUse uint64
detail *execdetails.CommitDetails
primaryKey []byte
forUpdateTS uint64
isFirstLock bool
pessimisticTTL uint64

mu struct {
sync.RWMutex
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
committed bool
}
syncLog bool
// For pessimistic transaction
isPessimistic bool
isFirstLock bool
// regionTxnSize stores the number of keys involved in each region
regionTxnSize map[uint64]int
}

type mutationEx struct {
Expand All @@ -119,10 +122,11 @@ type mutationEx struct {
// newTwoPhaseCommitter creates a twoPhaseCommitter.
func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, error) {
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
startTS: txn.StartTS(),
connID: connID,
store: txn.store,
txn: txn,
startTS: txn.StartTS(),
connID: connID,
regionTxnSize: map[uint64]int{},
}, nil
}

Expand Down Expand Up @@ -323,6 +327,9 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
var batches []batchKeys
var sizeFunc = c.keySize
if action == actionPrewrite {
for region, keys := range groups {
c.regionTxnSize[region.id] = len(keys)
}
sizeFunc = c.keyValueSize
atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups)))
}
Expand Down Expand Up @@ -461,7 +468,7 @@ func (c *twoPhaseCommitter) keySize(key []byte) int {
return len(key)
}

func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Request {
func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64) *tikvrpc.Request {
mutations := make([]*pb.Mutation, len(batch.keys))
var isPessimisticLock []bool
if c.isPessimistic {
Expand All @@ -483,7 +490,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Reque
LockTtl: c.lockTTL,
IsPessimisticLock: isPessimisticLock,
ForUpdateTs: c.forUpdateTS,
TxnSize: uint64(len(batch.keys)),
TxnSize: txnSize,
},
Context: pb.Context{
Priority: c.priority,
Expand All @@ -493,7 +500,14 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Reque
}

func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
req := c.buildPrewriteRequest(batch)
txnSize := uint64(c.regionTxnSize[batch.region.id])
// When we retry because of a region miss, we don't know the transaction size. We set the transaction size here
// to MaxUint64 to avoid unexpected "resolve lock lite".
if len(bo.errors) > 0 {
txnSize = math.MaxUint64
}

req := c.buildPrewriteRequest(batch, txnSize)
for {
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
Expand Down
38 changes: 36 additions & 2 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,40 @@ func (s *testCommitterSuite) TestWrittenKeysOnConflict(c *C) {
c.Assert(totalTime, Less, time.Millisecond*200)
}

func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
// Prepare two regions first: (, 100) and [100, )
region, _ := s.cluster.GetRegionByKey([]byte{50})
newRegionID := s.cluster.AllocID()
newPeerID := s.cluster.AllocID()
s.cluster.Split(region.Id, newRegionID, []byte{100}, []uint64{newPeerID}, newPeerID)

txn := s.begin(c)
var val [1024]byte
for i := byte(50); i < 120; i++ {
err := txn.Set([]byte{i}, val[:])
c.Assert(err, IsNil)
}

commiter, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(err, IsNil)

ctx := context.Background()
err = commiter.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff), commiter.keys)
c.Assert(err, IsNil)

// Check the written locks in the first region (50 keys)
for i := byte(50); i < 100; i++ {
lock := s.getLockInfo(c, []byte{i})
c.Assert(int(lock.TxnSize), Equals, 50)
}

// Check the written locks in the second region (20 keys)
for i := byte(100); i < 120; i++ {
lock := s.getLockInfo(c, []byte{i})
c.Assert(int(lock.TxnSize), Equals, 20)
}
}

func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
// This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock.
txn := s.begin(c)
Expand All @@ -454,7 +488,7 @@ func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
var batch batchKeys
batch.keys = append(batch.keys, []byte("t1"))
batch.region = RegionVerID{1, 1, 1}
req := commiter.buildPrewriteRequest(batch)
req := commiter.buildPrewriteRequest(batch, 1)
c.Assert(len(req.Prewrite.IsPessimisticLock), Greater, 0)
c.Assert(req.Prewrite.ForUpdateTs, Equals, uint64(100))
}
Expand Down Expand Up @@ -520,7 +554,7 @@ func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
loc, err := s.store.regionCache.LocateKey(bo, key)
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{key}}
req := commiter.buildPrewriteRequest(batch)
req := commiter.buildPrewriteRequest(batch, 1)
resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)
c.Assert(resp.Prewrite, NotNil)
Expand Down

0 comments on commit b23dee6

Please sign in to comment.