Skip to content

Commit

Permalink
store: stop updating pessimistic transaction's lock TTL when the sess…
Browse files Browse the repository at this point in the history
…ion is killed (#12959) (#13046)
  • Loading branch information
tiancaiamao authored and coocood committed Nov 11, 2019
1 parent 4099dc0 commit 5b44298
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 54 deletions.
19 changes: 3 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (

// Config number limitations
const (
MaxLogFileSize = 4096 // MB
MinPessimisticTTL = time.Second * 15
MaxPessimisticTTL = time.Second * 120
MaxLogFileSize = 4096 // MB
// DefTxnTotalSizeLimit is the default value of TxnTxnTotalSizeLimit.
DefTxnTotalSizeLimit = 100 * 1024 * 1024
)

// Valid config maps
Expand Down Expand Up @@ -308,8 +308,6 @@ type PessimisticTxn struct {
Enable bool `toml:"enable" json:"enable"`
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
// The pessimistic lock ttl.
TTL string `toml:"ttl" json:"ttl"`
}

// StmtSummary is the config for statement summary.
Expand Down Expand Up @@ -411,7 +409,6 @@ var defaultConf = Config{
PessimisticTxn: PessimisticTxn{
Enable: true,
MaxRetryCount: 256,
TTL: "40s",
},
StmtSummary: StmtSummary{
MaxStmtCount: 100,
Expand Down Expand Up @@ -586,16 +583,6 @@ func (c *Config) Valid() error {
if c.TiKVClient.MaxTxnTimeUse == 0 {
return fmt.Errorf("max-txn-time-use should be greater than 0")
}
if c.PessimisticTxn.TTL != "" {
dur, err := time.ParseDuration(c.PessimisticTxn.TTL)
if err != nil {
return err
}
if dur < MinPessimisticTTL || dur > MaxPessimisticTTL {
return fmt.Errorf("pessimistic transaction ttl %s out of range [%s, %s]",
dur, MinPessimisticTTL, MaxPessimisticTTL)
}
}
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ enable = true
# max retry count for a statement in a pessimistic transaction.
max-retry-count = 256

# default TTL in milliseconds for pessimistic lock.
# The value must between "15s" and "120s".
ttl = "40s"

[stmt-summary]
# max number of statements kept in memory.
max-stmt-count = 100
Expand Down
17 changes: 0 additions & 17 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,6 @@ func (s *testConfigSuite) TestConfigDiff(c *C) {
c.Assert(diffs["Performance.FeedbackProbability"][1], Equals, float64(23.33))
}

func (s *testConfigSuite) TestValid(c *C) {
c1 := NewConfig()
tests := []struct {
ttl string
valid bool
}{
{"14s", false},
{"15s", true},
{"120s", true},
{"121s", false},
}
for _, tt := range tests {
c1.PessimisticTxn.TTL = tt.ttl
c.Assert(c1.Valid() == nil, Equals, tt.valid)
}
}

func (s *testConfigSuite) TestOOMActionValid(c *C) {
c1 := NewConfig()
tests := []struct {
Expand Down
18 changes: 18 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,24 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) {
tk.MustExec("rollback")
}

func (s *testPessimisticSuite) TestKillStopTTLManager(c *C) {
// Test killing an idle pessimistic session stop its ttlManager.
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists test_kill")
tk.MustExec("create table test_kill (id int primary key, c int)")
tk.MustExec("insert test_kill values (1, 1)")
tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
tk.MustQuery("select * from test_kill where id = 1 for update")
sessVars := tk.Se.GetSessionVars()
succ := atomic.CompareAndSwapUint32(&sessVars.Killed, 0, 1)
c.Assert(succ, IsTrue)

// This query should success rather than returning a ResolveLock error.
tk2.MustExec("update test_kill set c = c + 1 where id = 1")
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tk")
Expand Down
4 changes: 4 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,10 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
return nil
}
// Overwrite the pessimistic lock.
if ttl < dec.lock.ttl {
// Maybe ttlManager has already set the lock TTL, don't decrease it.
ttl = dec.lock.ttl
}
} else {
if isPessimisticLock {
return ErrAbort("pessimistic lock not found")
Expand Down
22 changes: 15 additions & 7 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var (

// Global variable set by config file.
var (
PessimisticLockTTL uint64 = 15000 // 15s ~ 40s
PessimisticLockTTL uint64 = 20000 // 20s
)

func (actionPrewrite) String() string {
Expand Down Expand Up @@ -651,15 +651,17 @@ const (
)

type ttlManager struct {
state ttlManagerState
ch chan struct{}
state ttlManagerState
ch chan struct{}
killed *uint32
}

func (tm *ttlManager) run(c *twoPhaseCommitter) {
func (tm *ttlManager) run(c *twoPhaseCommitter, killed *uint32) {
// Run only once.
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) {
return
}
tm.killed = killed
go tm.keepAlive(c)
}

Expand All @@ -671,14 +673,18 @@ func (tm *ttlManager) close() {
}

func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
// Ticker is set to 1/3 of the PessimisticLockTTL.
ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 3)
// Ticker is set to 1/2 of the PessimisticLockTTL.
ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 2)
defer ticker.Stop()
for {
select {
case <-tm.ch:
return
case <-ticker.C:
// If kill signal is received, the ttlManager should exit.
if tm.killed != nil && atomic.LoadUint32(tm.killed) != 0 {
return
}
bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff)
now, err := c.store.GetOracle().GetTimestamp(bo.ctx)
if err != nil {
Expand Down Expand Up @@ -730,14 +736,16 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
mutations[i] = mut
}

t0 := oracle.GetTimeFromTS(c.forUpdateTS)
elapsed := uint64(time.Since(t0) / time.Millisecond)
req := &tikvrpc.Request{
Type: tikvrpc.CmdPessimisticLock,
PessimisticLock: &pb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
LockTtl: c.pessimisticTTL,
LockTtl: elapsed + PessimisticLockTTL,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.lockWaitTime,
},
Expand Down
11 changes: 8 additions & 3 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type testCommitterSuite struct {

var _ = Suite(&testCommitterSuite{})

func (s *testCommitterSuite) SetUpSuite(c *C) {
PessimisticLockTTL = 3000 // 3s
s.OneByOneSuite.SetUpSuite(c)
}

func (s *testCommitterSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
Expand Down Expand Up @@ -538,14 +543,14 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL
c.Assert(elapsedTTL, GreaterEqual, uint64(100))
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl)
c.Assert(msBeforeLockExpired, GreaterEqual, int64(100))

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, lockInfo.LockTtl)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

// Check primary lock TTL is auto increasing while the pessimistic txn is ongoing.
for i := 0; i < 50; i++ {
Expand Down
7 changes: 1 addition & 6 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui
return err
}
}
if txn.committer.pessimisticTTL == 0 {
// add elapsed time to pessimistic TTL on the first LockKeys request.
elapsed := uint64(time.Since(txn.startTime) / time.Millisecond)
txn.committer.pessimisticTTL = PessimisticLockTTL + elapsed
}
var assignedPrimaryKey bool
if txn.committer.primaryKey == nil {
txn.committer.primaryKey = keys[0]
Expand Down Expand Up @@ -428,7 +423,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui
return err
}
if assignedPrimaryKey {
txn.committer.ttlManager.run(txn.committer)
txn.committer.ttlManager.run(txn.committer, killed)
}
}
txn.mu.Lock()
Expand Down
1 change: 0 additions & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ func setGlobalVars() {
}

tikv.CommitMaxBackoff = int(parseDuration(cfg.TiKVClient.CommitTimeout).Seconds() * 1000)
tikv.PessimisticLockTTL = uint64(parseDuration(cfg.PessimisticTxn.TTL).Seconds() * 1000)
tikv.RegionCacheTTLSec = int64(cfg.TiKVClient.RegionCacheTTL)
}

Expand Down

0 comments on commit 5b44298

Please sign in to comment.