diff --git a/ddl/index.go b/ddl/index.go index a030e428e8fd9..31b565e812c73 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -980,7 +980,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx // Lock the row key to notify us that someone delete or update the row, // then we should not backfill the index of it, otherwise the adding index is redundant. - err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key) + err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), idxRecord.key) if err != nil { return errors.Trace(err) } diff --git a/executor/adapter.go b/executor/adapter.go index 4e6b47d3d84d6..556164f9249d7 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -530,7 +530,8 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { return nil } forUpdateTS := txnCtx.GetForUpdateTS() - err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...) + err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, + sctx.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...) if err == nil { return nil } diff --git a/executor/admin.go b/executor/admin.go index 73ce92dad6444..4566172e4fca7 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -16,6 +16,7 @@ package executor import ( "context" "math" + "time" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -432,7 +433,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa } recordKey := e.table.RecordKey(row.handle) - err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey) + err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, time.Now(), recordKey) if err != nil { return result, err } diff --git a/executor/executor.go b/executor/executor.go index d11969fa58e61..2654ce4abae49 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -835,7 +835,8 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, waitTime int64, keys return err } forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() - return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime, keys...) + return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime, + se.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...) } // LimitExec represents limit executor diff --git a/kv/kv.go b/kv/kv.go index 137064a1d4fc0..054fd0a0cc180 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -168,7 +168,8 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. - LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error + LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, + lockWaitTime int64, waitStartTime time.Time, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt Option, val interface{}) diff --git a/kv/mock.go b/kv/mock.go index eb3d1d66c315e..8742ec6fa82e8 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -39,7 +40,7 @@ func (t *mockTxn) String() string { return "" } -func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error { +func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ time.Time, _ ...Key) error { return nil } diff --git a/kv/mock_test.go b/kv/mock_test.go index e52db8fad819d..394f78b332602 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" . "github.com/pingcap/check" ) @@ -38,7 +39,7 @@ func (s testMockSuite) TestInterface(c *C) { transaction, err := storage.Begin() c.Check(err, IsNil) - err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock")) + err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, time.Now(), Key("lock")) c.Check(err, IsNil) transaction.SetOption(Option(23), struct{}{}) if mock, ok := transaction.(*mockTxn); ok { diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index dd8dd3c7e5725..a523736339347 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -718,3 +718,40 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { wg.Wait() } + +func (s *testPessimisticSuite) TestInnodbLockWaitTimeoutWaitStart(c *C) { + // prepare work + tk := testkit.NewTestKitWithInit(c, s.store) + defer tk.MustExec("drop table if exists tk") + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)") + tk.MustExec("set global innodb_lock_wait_timeout = 1") + + // raise pessimistic transaction in tk2 and trigger failpoint returning ErrWriteConflict + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk3 := testkit.NewTestKitWithInit(c, s.store) + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + + // tk3 gets the pessimistic lock + tk3.MustExec("begin pessimistic") + tk3.MustQuery("select * from tk where c1 = 1 for update") + + tk2.MustExec("begin pessimistic") + done := make(chan error) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict", "return"), IsNil) + start := time.Now() + go func() { + var err error + defer func() { + done <- err + }() + _, err = tk2.Exec("select * from tk where c1 = 1 for update") + }() + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict"), IsNil) + waitErr := <-done + c.Assert(waitErr, NotNil) + c.Check(waitErr.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) +} diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 9bff0a4df9860..e980a3e93c4c0 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -140,8 +140,9 @@ type StatementContext struct { normalized string digest string } - Tables []TableEntry - PointExec bool // for point update cached execution, Constant expression need to set "paramMarker" + Tables []TableEntry + PointExec bool // for point update cached execution, Constant expression need to set "paramMarker" + lockWaitStartTime *time.Time // LockWaitStartTime stores the pessimistic lock wait start time } // StmtHints are SessionVars related sql hints. @@ -584,6 +585,15 @@ func (sc *StatementContext) SetFlagsFromPBFlag(flags uint64) { sc.DividedByZeroAsWarning = (flags & model.FlagDividedByZeroAsWarning) > 0 } +// GetLockWaitStartTime returns the statement pessimistic lock wait start time +func (sc *StatementContext) GetLockWaitStartTime() time.Time { + if sc.lockWaitStartTime == nil { + curTime := time.Now() + sc.lockWaitStartTime = &curTime + } + return *sc.lockWaitStartTime +} + //CopTasksDetails collects some useful information of cop-tasks during execution. type CopTasksDetails struct { NumCopTasks int diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 2e74fe816f8c7..ef1c67aa866c9 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -48,8 +48,9 @@ type actionPrewrite struct{} type actionCommit struct{} type actionCleanup struct{} type actionPessimisticLock struct { - killed *uint32 - lockWaitTime int64 + killed *uint32 + lockWaitTime int64 + waitStartTime time.Time } type actionPessimisticRollback struct{} @@ -687,7 +688,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * IsFirstLock: c.isFirstLock, WaitTimeout: action.lockWaitTime, }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) - lockWaitStartTime := time.Now() + lockWaitStartTime := action.waitStartTime for { // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit if action.lockWaitTime > 0 { @@ -698,6 +699,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * req.PessimisticLock().WaitTimeout = timeLeft } } + failpoint.Inject("PessimisticLockErrWriteConflict", func() error { + time.Sleep(300 * time.Millisecond) + return kv.ErrWriteConflict + }) resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { return errors.Trace(err) @@ -711,7 +716,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err != nil { return errors.Trace(err) } - err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, batch.keys) + err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, lockWaitStartTime, batch.keys) return errors.Trace(err) } if resp.Resp == nil { @@ -1006,8 +1011,8 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { } func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64, - keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime}, keys) + waitStartTime time.Time, keys [][]byte) error { + return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime, waitStartTime}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index f5cb8ad103831..6c469f1e6883a 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -561,7 +561,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { c.Assert(txn.Set(key, key), IsNil) txn.DelOption(kv.PresumeKeyNotExistsError) txn.DelOption(kv.PresumeKeyNotExists) - err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key) c.Assert(err, NotNil) c.Assert(txn.Delete(key), IsNil) key2 := kv.Key("key2") @@ -573,9 +573,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) - err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) + err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) + err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) c.Assert(txn.lockKeys, HasLen, 2) } @@ -585,11 +585,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) time.Sleep(time.Millisecond * 100) - err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key) c.Assert(err, IsNil) time.Sleep(time.Millisecond * 100) key2 := kv.Key("key2") - err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2) + err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key2) c.Assert(err, IsNil) lockInfo := s.getLockInfo(c, key) msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl) @@ -627,7 +627,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { txn.SetOption(kv.Pessimistic, true) time.Sleep(time.Millisecond * 100) forUpdateTS := oracle.ComposeTS(oracle.ExtractPhysical(txn.startTS)+100, 1) - err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, key) + err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, time.Now(), key) c.Assert(err, IsNil) lockInfo := s.getLockInfo(c, key) c.Assert(lockInfo.LockTtl-ManagedLockTTL, GreaterEqual, uint64(100)) diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index 86cccc95d6e70..d64f1a176008c 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { txn := s.beginTxn(c) err := txn.Set(encodeKey(s.prefix, "key"), []byte("value")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, encodeKey(s.prefix, "key")) + err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), encodeKey(s.prefix, "key")) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index eb878fd78fe54..61a7874435f54 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -367,7 +367,8 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { } // lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock -func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keysInput ...kv.Key) error { +func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, + lockWaitTime int64, waitStartTime time.Time, keysInput ...kv.Key) error { // Exclude keys that are already locked. keys := make([][]byte, 0, len(keysInput)) txn.mu.Lock() @@ -406,7 +407,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1 - err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, keys) + err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, waitStartTime, keys) if killed != nil { // If the kill signal is received during waiting for pessimisticLock, // pessimisticLockKeys would handle the error but it doesn't reset the flag.