Skip to content

Commit

Permalink
*: configurable kv_read_timeout should not affect write request (#978) (
Browse files Browse the repository at this point in the history
#983)

* *: configurable kv_read_timeout should not affect write request (#978)

---------

Signed-off-by: crazycs520 <[email protected]>

* add test

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 authored Sep 18, 2023
1 parent 9126d07 commit 32c4ef5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 13 deletions.
38 changes: 25 additions & 13 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,13 +897,25 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
s.state.onSendFailure(bo, s, err)
}

func (s *replicaSelector) onDeadlineExceeded() {
if target := s.targetReplica(); target != nil {
target.deadlineErrUsingConfTimeout = true
func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) {
// Configurable timeout should less than `ReadTimeoutShort`.
return false
}
if accessLeader, ok := s.state.(*accessKnownLeader); ok {
// If leader return deadline exceeded error, we should try to access follower next time.
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
switch req.Type {
case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan,
tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream:
if target := s.targetReplica(); target != nil {
target.deadlineErrUsingConfTimeout = true
}
if accessLeader, ok := s.state.(*accessKnownLeader); ok {
// If leader return deadline exceeded error, we should try to access follower next time.
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
}
return true
default:
// Only work for read requests, return false for non-read requests.
return false
}
}

Expand Down Expand Up @@ -1553,9 +1565,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
return errors.WithStack(err)
} else if LoadShuttingDown() > 0 {
return errors.WithStack(tikverr.ErrTiDBShuttingDown)
} else if isCauseByDeadlineExceeded(err) && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
if s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
} else if isCauseByDeadlineExceeded(err) {
if s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) {
return nil
}
}
Expand Down Expand Up @@ -1776,8 +1787,9 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext

if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") {
s.replicaSelector.onDeadlineExceeded()
return true, nil
if s.replicaSelector.onReadReqConfigurableTimeout(req) {
return true, nil
}
}
logutil.Logger(bo.GetCtx()).Warn(
"tikv reports `ServerIsBusy` retry later",
Expand Down Expand Up @@ -1900,8 +1912,8 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
return true, nil
}

if isDeadlineExceeded(regionErr) && s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
if isDeadlineExceeded(regionErr) && s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) {
return true, nil
}

logutil.Logger(bo.GetCtx()).Debug(
Expand Down
16 changes: 16 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,22 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
}
}

// Test for write request.
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
resetStats()
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{})
req.ReplicaReadType = kv.ReplicaReadLeader
loc := getLocFn()
bo = retry.NewBackoffer(context.Background(), 1000)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV)
s.Nil(resp)
s.Equal(context.DeadlineExceeded, err)
backoffTimes := bo.GetBackoffTimes()
s.True(backoffTimes["tikvRPC"] > 0) // write request timeout won't do fast retry, so backoff times should be more than 0.
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeout() {
Expand Down

0 comments on commit 32c4ef5

Please sign in to comment.