diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 3c1129bc5f..6729821e9f 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -347,6 +347,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } func (a *batchConn) getClientAndSend() { + if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil { + if timeout, ok := val.(int); ok && timeout > 0 { + time.Sleep(time.Duration(timeout * int(time.Millisecond))) + } + } + // Choose a connection by round-robbin. var ( cli *batchCommandsClient diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index a9134509e2..9c488a2bc0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -240,6 +240,8 @@ type replica struct { peer *metapb.Peer epoch uint32 attempts int + // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. + deadlineErrUsingConfTimeout bool } func (r *replica) isEpochStale() bool { @@ -337,7 +339,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} return nil, stateChanged{} } selector.targetIdx = state.leaderIdx @@ -352,7 +354,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep return } if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} } if liveness != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) @@ -360,7 +362,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} } // tryFollower is the state where we cannot access the known leader @@ -372,22 +374,24 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { // the leader will be updated to replicas[0] and give it another chance. type tryFollower struct { stateBase - // if the leader is unavailable, but it still holds the leadership, fallbackFromLeader is true and replica read is enabled. - fallbackFromLeader bool - leaderIdx AccessIndex - lastIdx AccessIndex - labels []*metapb.StoreLabel + leaderIdx AccessIndex + lastIdx AccessIndex + labels []*metapb.StoreLabel + // fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`. + fromAccessKnownLeader bool } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + hasDeadlineExceededErr := false filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) { for i := 0; i < len(selector.replicas); i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + selectReplica := selector.replicas[idx] + hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout if idx == state.leaderIdx { continue } - selectReplica := selector.replicas[idx] - if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable { + if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout { return idx, selectReplica } } @@ -418,6 +422,10 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( // If all followers are tried and fail, backoff and retry. if selector.targetIdx < 0 { + if hasDeadlineExceededErr { + // when meet deadline exceeded error, do fast retry without invalidate region cache. + return nil, nil + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -426,17 +434,17 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( if err != nil || rpcCtx == nil { return rpcCtx, err } - if state.fallbackFromLeader { - staleRead := false - rpcCtx.contextPatcher.staleRead = &staleRead + if !state.fromAccessKnownLeader { replicaRead := true rpcCtx.contextPatcher.replicaRead = &replicaRead } + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead return rpcCtx, nil } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !state.fallbackFromLeader { + if state.fromAccessKnownLeader { peer := selector.targetReplica().peer if !selector.region.switchWorkLeaderToPeer(peer) { logutil.BgLogger().Warn("the store must exist", @@ -623,16 +631,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector zap.Bool("leader-invalid", leaderInvalid), zap.Any("labels", state.option.labels)) } + // If leader tried and received deadline exceeded error, return nil to upper layer to retry with default timeout. + if leader.deadlineErrUsingConfTimeout { + return nil, nil + } if leaderInvalid { // In stale-read, the request will fallback to leader after the local follower failure. // If the leader is also unavailable, we can fallback to the follower and use replica-read flag again, // The remote follower not tried yet, and the local follower can retry without stale-read flag. if state.isStaleRead { selector.state = &tryFollower{ - fallbackFromLeader: true, - leaderIdx: state.leaderIdx, - lastIdx: state.leaderIdx, - labels: state.option.labels, + leaderIdx: state.leaderIdx, + lastIdx: state.leaderIdx, + labels: state.option.labels, } if leaderEpochStale { selector.regionCache.scheduleReloadRegion(selector.region) @@ -677,7 +688,8 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic } func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { - if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable { + // the epoch is staled or retry exhausted, or the store is unreachable. + if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { return false } if state.option.leaderOnly && idx == state.leaderIdx { @@ -884,6 +896,16 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { s.state.onSendFailure(bo, s, err) } +func (s *replicaSelector) onDeadlineExceeded() { + if target := s.targetReplica(); target != nil { + target.deadlineErrUsingConfTimeout = true + } + if accessLeader, ok := s.state.(*accessKnownLeader); ok { + // If leader return deadline exceeded error, we should try to access follower next time. + s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} + } +} + func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { store := accessReplica.store liveness := store.requestLiveness(bo, s.regionCache) @@ -1484,7 +1506,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo return nil, false, err } } - if e := s.onSendFail(bo, rpcCtx, err); e != nil { + if e := s.onSendFail(bo, rpcCtx, req, err); e != nil { return nil, false, err } return nil, true, nil @@ -1514,7 +1536,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { logutil.BgLogger().Warn("release store token failed, count equals to 0") } -func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error { +func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1525,6 +1547,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e return errors.WithStack(err) } else if LoadShuttingDown() > 0 { return errors.WithStack(tikverr.ErrTiDBShuttingDown) + } else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) { + if s.replicaSelector != nil { + s.replicaSelector.onDeadlineExceeded() + return nil + } } if status.Code(errors.Cause(err)) == codes.Canceled { select { @@ -1598,6 +1625,9 @@ func regionErrorToLabel(e *errorpb.Error) string { } else if e.GetEpochNotMatch() != nil { return "epoch_not_match" } else if e.GetServerIsBusy() != nil { + if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") { + return "deadline_exceeded" + } return "server_is_busy" } else if e.GetStaleCommand() != nil { return "stale_command" @@ -1628,10 +1658,16 @@ func regionErrorToLabel(e *errorpb.Error) string { // the `mismatch peer id` error does not has a specific error type, so we have to match the error message. // TODO: add a specific error type for `mismatch peer id`. return "mismatch_peer_id" + } else if isDeadlineExceeded(e) { + return "deadline_exceeded" } return "unknown" } +func isDeadlineExceeded(e *errorpb.Error) bool { + return strings.Contains(e.GetMessage(), "Deadline is exceeded") +} + func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error) (shouldRetry bool, err error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context())) @@ -1725,8 +1761,13 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext return retry, err } - if regionErr.GetServerIsBusy() != nil { - logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later", + if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil { + if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") { + s.replicaSelector.onDeadlineExceeded() + return true, nil + } + logutil.Logger(bo.GetCtx()).Warn( + "tikv reports `ServerIsBusy` retry later", zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) if s.replicaSelector.canFallback2Follower() { @@ -1846,7 +1887,12 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext return true, nil } - logutil.BgLogger().Debug("tikv reports region failed", + if isDeadlineExceeded(regionErr) && s.replicaSelector != nil { + s.replicaSelector.onDeadlineExceeded() + } + + logutil.Logger(bo.GetCtx()).Debug( + "tikv reports region failed", zap.Stringer("regionErr", regionErr), zap.Stringer("ctx", ctx)) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 19b1624daa..12add6fc5a 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -45,14 +45,17 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pkg/errors" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" ) func TestRegionRequestToThreeStores(t *testing.T) { @@ -656,7 +659,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { // Normal bo := retry.NewBackoffer(context.Background(), -1) sender := s.regionRequestSender - resp, err := sender.SendReq(bo, req, region.Region, time.Second) + resp, err := sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.True(bo.GetTotalBackoffTimes() == 0) @@ -665,7 +668,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { bo = retry.NewBackoffer(context.Background(), -1) s.cluster.ChangeLeader(s.regionID, s.peerIDs[1]) s.cluster.StopStore(s.storeIDs[0]) - resp, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -674,8 +677,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable)) // Leader is updated because of send success, so no backoff. + reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) - resp, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -969,7 +973,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() s.NotEqual(leaderAddr, "") for i := 0; i < 10; i++ { bo := retry.NewBackofferWithVars(context.Background(), 100, nil) - resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) @@ -1012,7 +1016,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() for i := 0; i < 100; i++ { bo := retry.NewBackofferWithVars(context.Background(), 1, nil) - resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) // since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0. @@ -1190,3 +1194,84 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { } } } + +func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { + leaderAddr := "" + reqTargetAddrs := make(map[string]struct{}) + s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + bo := retry.NewBackoffer(context.Background(), 10000) + mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + reqTargetAddrs[addr] = struct{}{} + if req.Context.MaxExecutionDurationMs < 10 { + return nil, context.DeadlineExceeded + } + if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil + }} + getLocFn := func() *KeyLocation { + loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) + s.Nil(err) + region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) + leaderStore, _, _, _ := region.WorkStorePeer(region.getStore()) + leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) + s.Nil(err) + return loc + } + resetStats := func() { + reqTargetAddrs = make(map[string]struct{}) + s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) + s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + } + + //Test different read type. + staleReadTypes := []bool{false, true} + replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed} + for _, staleRead := range staleReadTypes { + for _, tp := range replicaReadTypes { + log.Info("TestSendReqFirstTimeout", zap.Bool("stale-read", staleRead), zap.String("replica-read-type", tp.String())) + resetStats() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + loc := getLocFn() + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, len(s.regionRequestSender.Stats)) + if staleRead { + rpcNum := s.regionRequestSender.Stats[tikvrpc.CmdGet].Count + s.True(rpcNum == 1 || rpcNum == 2) // 1 rpc or 2 rpc + } else { + s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc + s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. + } + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + // warn: must reset MaxExecutionDurationMs before retry. + resetStats() + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + req.Context.MaxExecutionDurationMs = 0 + resp, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + } + } +} diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 76b5aed5a7..5d21dc4c61 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -118,6 +118,7 @@ type KVSnapshot struct { resolvedLocks util.TSSet committedLocks util.TSSet scanBatchSize int + readTimeout time.Duration // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -370,6 +371,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, pending := batch.keys var resolvingRecordToken *int + useConfigurableKVTimeout := true for { s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{ @@ -395,6 +397,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, if isStaleness { req.EnableStaleRead() } + timeout := client.ReadTimeoutMedium + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) ops := make([]locate.StoreSelectorOption, 0, 2) if len(matchStoreLabels) > 0 { ops = append(ops, locate.WithMatchLabels(matchStoreLabels)) @@ -406,7 +414,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, } req.ReplicaReadType = readType } - resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, client.ReadTimeoutMedium, tikvrpc.TiKV, "", ops...) + resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return err } @@ -616,13 +624,20 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] var firstLock *txnlock.Lock var resolvingRecordToken *int + useConfigurableKVTimeout := true for { util.EvalFailpoint("beforeSendPointGet") loc, err := s.store.GetRegionCache().LocateKey(bo, k) if err != nil { return nil, err } - resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV, "", ops...) + timeout := client.ReadTimeoutShort + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) + resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return nil, err } @@ -923,6 +938,16 @@ func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.R } } +// SetKVReadTimeout sets timeout for individual KV read operations under this snapshot +func (s *KVSnapshot) SetKVReadTimeout(readTimeout time.Duration) { + s.readTimeout = readTimeout +} + +// GetKVReadTimeout returns timeout for individual KV read operations under this snapshot or 0 if timeout is not set +func (s *KVSnapshot) GetKVReadTimeout() time.Duration { + return s.readTimeout +} + func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail { s.mu.RLock() defer s.mu.RUnlock()