From f54b182487423fe029077a327726b12d5ad663b0 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 10 Aug 2023 20:01:28 +0800 Subject: [PATCH 1/8] support tidb_kv_read_timeout as first round kv request timeout Signed-off-by: crazycs520 --- internal/client/client_batch.go | 6 ++ internal/locate/region_request.go | 77 ++++++++++++-- internal/locate/region_request3_test.go | 129 +++++++++++++++++++++++- tikvrpc/tikvrpc.go | 2 +- txnkv/txnsnapshot/snapshot.go | 29 +++++- 5 files changed, 226 insertions(+), 17 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index c8842e9c26..6a270bc6d2 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -347,6 +347,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } func (a *batchConn) getClientAndSend() { + if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil { + if timeout, ok := val.(int); ok && timeout > 0 { + time.Sleep(time.Duration(timeout * int(time.Millisecond))) + } + } + // Choose a connection by round-robbin. var ( cli *batchCommandsClient diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9f4840588e..e071d739aa 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -245,6 +245,8 @@ type replica struct { peer *metapb.Peer epoch uint32 attempts int + // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. + deadlineErrUsingConfTimeout bool } func (r *replica) isEpochStale() bool { @@ -377,7 +379,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true} } // tryFollower is the state where we cannot access the known leader @@ -391,19 +393,23 @@ type tryFollower struct { stateBase leaderIdx AccessIndex lastIdx AccessIndex + // fromOnNotLeader indicates whether the state is changed from onNotLeader. + fromOnNotLeader bool } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { var targetReplica *replica + hasDeadlineExceededErr := false // Search replica that is not attempted from the last accessed replica for i := 1; i < len(selector.replicas); i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + targetReplica = selector.replicas[idx] + hasDeadlineExceededErr = hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout if idx == state.leaderIdx { continue } - targetReplica = selector.replicas[idx] // Each follower is only tried once - if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable { + if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable && !targetReplica.deadlineErrUsingConfTimeout { state.lastIdx = idx selector.targetIdx = idx break @@ -411,16 +417,33 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } // If all followers are tried and fail, backoff and retry. if selector.targetIdx < 0 { + if hasDeadlineExceededErr { + // when meet deadline exceeded error, do fast retry without invalidate region cache. + return nil, nil + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil } - return selector.buildRPCContext(bo) + rpcCtx, err := selector.buildRPCContext(bo) + if err != nil || rpcCtx == nil { + return nil, err + } + // If the state is changed from onNotLeader, the `replicaRead` flag should not be set as leader read would still be used. + if !state.fromOnNotLeader { + replicaRead := selector.targetIdx != state.leaderIdx + rpcCtx.contextPatcher.replicaRead = &replicaRead + } + disableStaleRead := false + rpcCtx.contextPatcher.staleRead = &disableStaleRead + return rpcCtx, nil } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { - panic("the store must exist") + if state.fromOnNotLeader { + if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { + panic("the store must exist") + } } } @@ -617,6 +640,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector zap.Bool("leader-invalid", leaderInvalid), zap.Any("labels", state.option.labels)) } + // If leader tried and received deadline exceeded error, return nil to upper layer to retry with default timeout. + if leader.deadlineErrUsingConfTimeout { + return nil, nil + } if leaderInvalid { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() @@ -665,7 +692,7 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { // the epoch is staled or retry exhausted, or the store is unreachable. - if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable { + if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { return false } // The request can only be sent to the leader. @@ -947,6 +974,16 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { s.state.onSendFailure(bo, s, err) } +func (s *replicaSelector) onDeadlineExceeded() { + if target := s.targetReplica(); target != nil { + target.deadlineErrUsingConfTimeout = true + } + if accessLeader, ok := s.state.(*accessKnownLeader); ok { + // If leader return deadline exceeded error, we should try to access follower next time. + s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} + } +} + func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { store := accessReplica.store liveness := store.requestLiveness(bo, s.regionCache) @@ -1608,7 +1645,7 @@ func (s *RegionRequestSender) sendReqToRegion( return nil, false, err } } - if e := s.onSendFail(bo, rpcCtx, err); e != nil { + if e := s.onSendFail(bo, rpcCtx, req, err); e != nil { return nil, false, err } return nil, true, nil @@ -1638,7 +1675,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { logutil.BgLogger().Warn("release store token failed, count equals to 0") } -func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error { +func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1649,6 +1686,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e return errors.WithStack(err) } else if LoadShuttingDown() > 0 { return errors.WithStack(tikverr.ErrTiDBShuttingDown) + } else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) { + if s.replicaSelector != nil { + s.replicaSelector.onDeadlineExceeded() + return nil + } } if status.Code(errors.Cause(err)) == codes.Canceled { select { @@ -1740,6 +1782,9 @@ func regionErrorToLabel(e *errorpb.Error) string { } else if e.GetEpochNotMatch() != nil { return "epoch_not_match" } else if e.GetServerIsBusy() != nil { + if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") { + return "deadline_exceeded" + } return "server_is_busy" } else if e.GetStaleCommand() != nil { return "stale_command" @@ -1767,10 +1812,16 @@ func regionErrorToLabel(e *errorpb.Error) string { return "flashback_not_prepared" } else if e.GetIsWitness() != nil { return "peer_is_witness" + } else if isDeadlineExceeded(e) { + return "deadline_exceeded" } return "unknown" } +func isDeadlineExceeded(e *errorpb.Error) bool { + return strings.Contains(e.GetMessage(), "Deadline is exceeded") +} + func (s *RegionRequestSender) onRegionError( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, ) (shouldRetry bool, err error) { @@ -1918,6 +1969,10 @@ func (s *RegionRequestSender) onRegionError( } if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil { + if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") { + s.replicaSelector.onDeadlineExceeded() + return true, nil + } if s.replicaSelector != nil { return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy) } @@ -2046,6 +2101,10 @@ func (s *RegionRequestSender) onRegionError( return true, nil } + if isDeadlineExceeded(regionErr) && s.replicaSelector != nil { + s.replicaSelector.onDeadlineExceeded() + } + logutil.Logger(bo.GetCtx()).Debug( "tikv reports region failed", zap.Stringer("regionErr", regionErr), diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 91f7f22f19..cb6a916f9f 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -50,6 +50,7 @@ import ( "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/apicodec" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" @@ -711,7 +712,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { // Normal bo := retry.NewBackoffer(context.Background(), -1) sender := s.regionRequestSender - resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) + resp, _, err := sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.True(bo.GetTotalBackoffTimes() == 0) @@ -720,7 +721,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { bo = retry.NewBackoffer(context.Background(), -1) s.cluster.ChangeLeader(s.regionID, s.peerIDs[1]) s.cluster.StopStore(s.storeIDs[0]) - resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -729,8 +730,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable)) // Leader is updated because of send success, so no backoff. + reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) - resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -1092,7 +1094,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() s.NotEqual(leaderAddr, "") for i := 0; i < 10; i++ { bo := retry.NewBackofferWithVars(context.Background(), 100, nil) - resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) @@ -1135,7 +1137,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() for i := 0; i < 100; i++ { bo := retry.NewBackofferWithVars(context.Background(), 1, nil) - resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) // since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0. @@ -1199,3 +1201,120 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { s.True(ok) s.Equal(getResp.Value, value) } + +func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { + leaderAddr := "" + reqTargetAddrs := make(map[string]struct{}) + s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + bo := retry.NewBackoffer(context.Background(), 10000) + mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + reqTargetAddrs[addr] = struct{}{} + if req.Context.MaxExecutionDurationMs < 10 { + return nil, context.DeadlineExceeded + } + if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil + }} + getLocFn := func() *KeyLocation { + loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) + s.Nil(err) + region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) + leaderStore, _, _, _ := region.WorkStorePeer(region.getStore()) + leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) + s.Nil(err) + return loc + } + resetStats := func() { + reqTargetAddrs = make(map[string]struct{}) + s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) + s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + } + + resetStats() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + loc := getLocFn() + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + // warn: must rest MaxExecutionDurationMs before retry. + resetStats() + req.Context.MaxExecutionDurationMs = 0 + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + + // Test stale read. + resetStats() + req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + req.EnableStaleRead() + loc = getLocFn() + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.True(s.regionRequestSender.Stats[tikvrpc.CmdGet].Count >= 1) + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + // warn: must rest MaxExecutionDurationMs before retry. + resetStats() + req.EnableStaleRead() + req.Context.MaxExecutionDurationMs = 0 + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + + //Test different read type. + replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed} + for _, tp := range replicaReadTypes { + resetStats() + req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + loc = getLocFn() + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.True(IsFakeRegionError(regionErr)) + //s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(3, len(reqTargetAddrs)) + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + // warn: must rest MaxExecutionDurationMs before retry. + resetStats() + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + req.Context.MaxExecutionDurationMs = 0 + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + } +} diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 4d81e1481a..4540ccf4fe 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -233,7 +233,7 @@ type Request struct { // the forwarded host. It's useful when network partition occurs. ForwardedHost string // ReplicaNumber is the number of current replicas, which is used to calculate the RU cost. - ReplicaNumber int64 + ReplicaNumber int64 } // NewRequest returns new kv rpc request. diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index a4253d62e7..ab29655355 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -118,6 +118,7 @@ type KVSnapshot struct { resolvedLocks util.TSSet committedLocks util.TSSet scanBatchSize int + readTimeout time.Duration // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -387,6 +388,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, pending := batch.keys var resolvingRecordToken *int + useConfigurableKVTimeout := true for { s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{ @@ -416,6 +418,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, if isStaleness { req.EnableStaleRead() } + timeout := client.ReadTimeoutMedium + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) ops := make([]locate.StoreSelectorOption, 0, 2) if len(matchStoreLabels) > 0 { ops = append(ops, locate.WithMatchLabels(matchStoreLabels)) @@ -427,7 +435,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, } req.ReplicaReadType = readType } - resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, client.ReadTimeoutMedium, tikvrpc.TiKV, "", ops...) + resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return err } @@ -651,13 +659,20 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] var firstLock *txnlock.Lock var resolvingRecordToken *int + useConfigurableKVTimeout := true for { util.EvalFailpoint("beforeSendPointGet") loc, err := s.store.GetRegionCache().LocateKey(bo, k) if err != nil { return nil, err } - resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV, "", ops...) + timeout := client.ReadTimeoutShort + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) + resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return nil, err } @@ -984,6 +999,16 @@ func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.R } } +// SetKVReadTimeout sets timeout for individual KV read operations under this snapshot +func (s *KVSnapshot) SetKVReadTimeout(readTimeout time.Duration) { + s.readTimeout = readTimeout +} + +// GetKVReadTimeout returns timeout for individual KV read operations under this snapshot or 0 if timeout is not set +func (s *KVSnapshot) GetKVReadTimeout() time.Duration { + return s.readTimeout +} + func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail { s.mu.RLock() defer s.mu.RUnlock() From 6fcf70fa57c2528a92ae122b8515aeef7a3c28e0 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 11 Aug 2023 00:21:45 +0800 Subject: [PATCH 2/8] fix ci Signed-off-by: crazycs520 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 50c4909737..fc9cde1dae 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,5 +47,5 @@ jobs: - name: Lint uses: golangci/golangci-lint-action@v3 with: - version: v1.51.2 + version: v1.54.0 From bb71f08c9c5df18a3642dc5dd38b93f8e71462bf Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 11 Aug 2023 00:32:09 +0800 Subject: [PATCH 3/8] fix ci Signed-off-by: crazycs520 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fc9cde1dae..5ae47e37b0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,5 +47,5 @@ jobs: - name: Lint uses: golangci/golangci-lint-action@v3 with: - version: v1.54.0 + version: v1.53.3 From 70cab5c9582c9bfacc7dbc598ae990a6adbc5788 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 11 Aug 2023 00:38:46 +0800 Subject: [PATCH 4/8] fix ci Signed-off-by: crazycs520 --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5ae47e37b0..6bfd7a0c54 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,10 +42,10 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: 1.21.0 + go-version: 1.20.2 - name: Lint uses: golangci/golangci-lint-action@v3 with: - version: v1.53.3 + version: v1.51.2 From 47d8bf302273ababf510c5a2174eab521dbf5084 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 11 Aug 2023 01:13:25 +0800 Subject: [PATCH 5/8] fix ci Signed-off-by: crazycs520 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6bfd7a0c54..50c4909737 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,7 +42,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: 1.20.2 + go-version: 1.21.0 - name: Lint uses: golangci/golangci-lint-action@v3 From d439bfaca582714b9b17c948f838d6f7c382e210 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 11 Aug 2023 01:30:03 +0800 Subject: [PATCH 6/8] fix ci Signed-off-by: crazycs520 --- examples/gcworker/gcworker.go | 1 - tikvrpc/tikvrpc.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/gcworker/gcworker.go b/examples/gcworker/gcworker.go index 1191adcd28..1deafdd7e5 100644 --- a/examples/gcworker/gcworker.go +++ b/examples/gcworker/gcworker.go @@ -37,7 +37,6 @@ func main() { panic(err) } - sysSafepoint, err := client.GC(context.Background(), *safepoint, tikv.WithConcurrency(10)) if err != nil { panic(err) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 4540ccf4fe..4d81e1481a 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -233,7 +233,7 @@ type Request struct { // the forwarded host. It's useful when network partition occurs. ForwardedHost string // ReplicaNumber is the number of current replicas, which is used to calculate the RU cost. - ReplicaNumber int64 + ReplicaNumber int64 } // NewRequest returns new kv rpc request. From 120208047d20ce95ed5957accc46d104e525f6c2 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 11 Aug 2023 10:06:40 +0800 Subject: [PATCH 7/8] update comment Signed-off-by: crazycs520 --- internal/locate/region_request3_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index cb6a916f9f..9ad1d21002 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1241,7 +1241,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { s.Nil(err) s.True(IsFakeRegionError(regionErr)) s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. // warn: must rest MaxExecutionDurationMs before retry. @@ -1254,7 +1254,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. // Test stale read. @@ -1281,7 +1281,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. //Test different read type. @@ -1297,9 +1297,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { regionErr, err = resp.GetRegionError() s.Nil(err) s.True(IsFakeRegionError(regionErr)) - //s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc s.Equal(3, len(reqTargetAddrs)) s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. // warn: must rest MaxExecutionDurationMs before retry. @@ -1314,7 +1313,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } } From 8bd26b0b932bbcaf2f39381b3ad816a844a41ee6 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 11 Aug 2023 10:54:04 +0800 Subject: [PATCH 8/8] refine test Signed-off-by: crazycs520 --- internal/locate/region_request3_test.go | 129 +++++++++--------------- 1 file changed, 48 insertions(+), 81 deletions(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 9ad1d21002..f56b4022f9 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pkg/errors" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" @@ -56,6 +57,7 @@ import ( "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" ) func TestRegionRequestToThreeStores(t *testing.T) { @@ -1232,88 +1234,53 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() } - resetStats() - req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) - loc := getLocFn() - resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) - s.Nil(err) - regionErr, err := resp.GetRegionError() - s.Nil(err) - s.True(IsFakeRegionError(regionErr)) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc - s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. - // warn: must rest MaxExecutionDurationMs before retry. - resetStats() - req.Context.MaxExecutionDurationMs = 0 - resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) - s.Nil(err) - regionErr, err = resp.GetRegionError() - s.Nil(err) - s.Nil(regionErr) - s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. - - // Test stale read. - resetStats() - req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) - req.EnableStaleRead() - loc = getLocFn() - resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) - s.Nil(err) - regionErr, err = resp.GetRegionError() - s.Nil(err) - s.True(IsFakeRegionError(regionErr)) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.True(s.regionRequestSender.Stats[tikvrpc.CmdGet].Count >= 1) - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. - // warn: must rest MaxExecutionDurationMs before retry. - resetStats() - req.EnableStaleRead() - req.Context.MaxExecutionDurationMs = 0 - resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) - s.Nil(err) - regionErr, err = resp.GetRegionError() - s.Nil(err) - s.Nil(regionErr) - s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. - //Test different read type. + staleReadTypes := []bool{false, true} replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed} - for _, tp := range replicaReadTypes { - resetStats() - req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) - req.ReplicaRead = tp.IsFollowerRead() - req.ReplicaReadType = tp - loc = getLocFn() - resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) - s.Nil(err) - regionErr, err = resp.GetRegionError() - s.Nil(err) - s.True(IsFakeRegionError(regionErr)) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc - s.Equal(3, len(reqTargetAddrs)) - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. - // warn: must rest MaxExecutionDurationMs before retry. - resetStats() - req.ReplicaRead = tp.IsFollowerRead() - req.ReplicaReadType = tp - req.Context.MaxExecutionDurationMs = 0 - resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) - s.Nil(err) - regionErr, err = resp.GetRegionError() - s.Nil(err) - s.Nil(regionErr) - s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + for _, staleRead := range staleReadTypes { + for _, tp := range replicaReadTypes { + log.Info("TestSendReqFirstTimeout", zap.Bool("stale-read", staleRead), zap.String("replica-read-type", tp.String())) + resetStats() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + loc := getLocFn() + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, len(s.regionRequestSender.Stats)) + if staleRead { + rpcNum := s.regionRequestSender.Stats[tikvrpc.CmdGet].Count + s.True(rpcNum == 1 || rpcNum == 2) // 1 rpc or 2 rpc + } else { + s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc + s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. + } + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + // warn: must rest MaxExecutionDurationMs before retry. + resetStats() + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + req.Context.MaxExecutionDurationMs = 0 + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + } } }