From 18d0dab321a9788273b4c151c889bbf53464532a Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 14 May 2024 14:13:45 +0800 Subject: [PATCH] make experimental improvements to replica selector (#1109) * make experimental improvements to replica selector Signed-off-by: zyguan * address the comment Signed-off-by: zyguan * fix the test Signed-off-by: zyguan --------- Signed-off-by: zyguan --- internal/locate/region_cache.go | 9 +- internal/locate/region_request.go | 53 ++++++++++-- internal/locate/region_request3_test.go | 105 ++++++++++++++++++++++++ tikv/region.go | 13 +++ 4 files changed, 170 insertions(+), 10 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index d78400233d..b10958cd38 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -576,15 +576,22 @@ func (c *RPCContext) String() string { type contextPatcher struct { staleRead *bool replicaRead *bool + + timeoutFactor float64 } -func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) { +func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context, timeout time.Duration) time.Duration { if patcher.staleRead != nil { pbCtx.StaleRead = *patcher.staleRead } if patcher.replicaRead != nil { pbCtx.ReplicaRead = *patcher.replicaRead } + if patcher.timeoutFactor > 0 { + pbCtx.MaxExecutionDurationMs = uint64(float64(pbCtx.MaxExecutionDurationMs) * patcher.timeoutFactor) + timeout = time.Duration(float64(timeout) * patcher.timeoutFactor) + } + return timeout } type storeSelectorOp struct { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 2556f4c973..8bbce0eb54 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -47,6 +47,7 @@ import ( "time" "unsafe" + uatomic "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -82,6 +83,30 @@ func LoadShuttingDown() uint32 { return atomic.LoadUint32(&shuttingDown) } +// ReplicaSelectorExperimentalOptions defines experimental options of replica selector. +type ReplicaSelectorExperimentalOptions struct { + StaleRead struct { + PreventRetryFollower bool + RetryLeaderTimeoutFactor float64 + } +} + +var selectorExpOpts uatomic.Pointer[ReplicaSelectorExperimentalOptions] + +// SetReplicaSelectorExperimentalOptions sets experimental options of replica selector. +func SetReplicaSelectorExperimentalOptions(opts ReplicaSelectorExperimentalOptions) { + selectorExpOpts.Store(&opts) +} + +// GetReplicaSelectorExperimentalOptions gets experimental options of replica selector. +func GetReplicaSelectorExperimentalOptions() (opts ReplicaSelectorExperimentalOptions) { + ptr := selectorExpOpts.Load() + if ptr == nil { + return ReplicaSelectorExperimentalOptions{} + } + return *ptr +} + // RegionRequestSender sends KV/Cop requests to tikv server. It handles network // errors and some region errors internally. // @@ -265,6 +290,8 @@ type replicaSelector struct { targetIdx AccessIndex // replicas[proxyIdx] is the store used to redirect requests this time proxyIdx AccessIndex + + ReplicaSelectorExperimentalOptions } // selectorState is the interface of states of the replicaSelector. @@ -627,22 +654,28 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector leader := selector.replicas[state.leaderIdx] leaderEpochStale := leader.isEpochStale() leaderUnreachable := leader.store.getLivenessState() != reachable - leaderInvalid := leaderEpochStale || leaderUnreachable || state.IsLeaderExhausted(leader) - if len(state.option.labels) > 0 { - logutil.BgLogger().Warn("unable to find stores with given labels", + leaderExhausted := state.IsLeaderExhausted(leader) + leaderTimeout := leader.deadlineErrUsingConfTimeout + if len(state.option.labels) > 0 && !state.option.leaderOnly { + logutil.BgLogger().Warn("unable to find a store with given labels", zap.Uint64("region", selector.region.GetID()), - zap.Bool("leader-epoch-stale", leaderEpochStale), - zap.Bool("leader-unreachable", leaderUnreachable), - zap.Bool("leader-invalid", leaderInvalid), zap.Bool("stale-read", state.isStaleRead), zap.Any("labels", state.option.labels)) } - if leaderInvalid || leader.deadlineErrUsingConfTimeout { + if leaderEpochStale || leaderUnreachable || leaderExhausted || leaderTimeout { + logutil.BgLogger().Warn("unable to find a valid leader", + zap.Uint64("region", selector.region.GetID()), + zap.Bool("stale-read", state.isStaleRead), + zap.Bool("epoch-stale", leaderEpochStale), + zap.Bool("unreachable", leaderUnreachable), + zap.Bool("exhausted", leaderExhausted), + zap.Bool("timeout", leaderTimeout)) // In stale-read, the request will fallback to leader after the local follower failure. // If the leader is also unavailable, we can fallback to the follower and use replica-read flag again, // The remote follower not tried yet, and the local follower can retry without stale-read flag. // If leader tried and received deadline exceeded error, try follower. - if state.isStaleRead || leader.deadlineErrUsingConfTimeout { + if (state.isStaleRead && !selector.StaleRead.PreventRetryFollower) || + (!state.isStaleRead && leader.deadlineErrUsingConfTimeout) { selector.state = &tryFollower{ leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, @@ -667,6 +700,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if resetStaleRead { staleRead := false rpcCtx.contextPatcher.staleRead = &staleRead + rpcCtx.contextPatcher.timeoutFactor = selector.StaleRead.RetryLeaderTimeoutFactor } return rpcCtx, nil } @@ -772,6 +806,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik state, -1, -1, + GetReplicaSelectorExperimentalOptions(), }, nil } @@ -1210,7 +1245,7 @@ func (s *RegionRequestSender) SendReqCtx( } } - rpcCtx.contextPatcher.applyTo(&req.Context) + timeout = rpcCtx.contextPatcher.applyTo(&req.Context, timeout) if req.InputRequestSource != "" && s.replicaSelector != nil { s.replicaSelector.patchRequestSource(req, rpcCtx) } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 4e5c0bc391..c674f1218b 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1519,3 +1519,108 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { } } } + +func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelectorExperimentalOptions() { + key := []byte("key") + bo := retry.NewBackoffer(context.Background(), -1) + + loc, err := s.cache.LocateKey(bo, key) + s.Require().NoError(err) + + region := s.cache.GetCachedRegionWithRLock(loc.Region) + leader, _, _, _ := region.WorkStorePeer(region.getStore()) + follower, _, _, _ := region.FollowerStorePeer(region.getStore(), 0, &storeSelectorOp{}) + + newStaleReadReq := func() *tikvrpc.Request { + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + return req + } + + errRespTimeout := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{Reason: "mock: deadline is exceeded"}}}} + errRespDataNotReady := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}} + + mockLeaderTimeout := func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr == leader.addr { + return errRespTimeout, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + } + + mockLeaderSlow := func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr == leader.addr && timeout <= 300*time.Millisecond { + return errRespTimeout, nil + } + if addr == follower.addr { + if req.StaleRead { + return errRespDataNotReady, nil + } else { + return errRespTimeout, nil + } + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + } + + s.Run("PreventRetryFollowerOn", func() { + // 1. access local leader -> timeout + // 2. cannot retry leader (since timeout), cannot fallback to tryFollower -> pseudo error + defer SetReplicaSelectorExperimentalOptions(GetReplicaSelectorExperimentalOptions()) + var opts ReplicaSelectorExperimentalOptions + opts.StaleRead.PreventRetryFollower = true + SetReplicaSelectorExperimentalOptions(opts) + + s.regionRequestSender.client = &fnClient{fn: mockLeaderTimeout} + s.cache.LocateKey(bo, key) + + resp, _, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(leader.labels)) + s.Require().NoError(err) + regionErr, err := resp.GetRegionError() + s.Require().NoError(err) + s.Require().NotNil(regionErr) + s.Require().True(IsFakeRegionError(regionErr)) + }) + + s.Run("PreventRetryFollowerOff", func() { + // 1. access local leader -> timeout + // 2. cannot retry leader (since timeout), fallback to tryFollower, access follower -> ok + s.regionRequestSender.client = &fnClient{fn: mockLeaderTimeout} + s.cache.LocateKey(bo, key) + + resp, rpcCtx, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(leader.labels)) + s.Require().NoError(err) + s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + }) + + s.Run("RetryLeaderTimeoutFactorOn", func() { + // 1. access local follower -> data is not ready + // 2. retry leader with timeout*2 -> ok + defer SetReplicaSelectorExperimentalOptions(GetReplicaSelectorExperimentalOptions()) + var opts ReplicaSelectorExperimentalOptions + opts.StaleRead.RetryLeaderTimeoutFactor = 2 + SetReplicaSelectorExperimentalOptions(opts) + + s.regionRequestSender.client = &fnClient{fn: mockLeaderSlow} + s.cache.LocateKey(bo, key) + + resp, _, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, 200*time.Millisecond, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Require().NoError(err) + s.Require().Equal(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + }) + + s.Run("RetryLeaderTimeoutFactorOff", func() { + // 1. access local follower -> data is not ready + // 2. retry leader with timeout -> timeout + // 3. fallback to tryFollower, access local follower with replica read -> timeout + // 4. access the other follower -> ok + s.regionRequestSender.client = &fnClient{fn: mockLeaderSlow} + s.cache.LocateKey(bo, key) + + resp, rpcCtx, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, 200*time.Millisecond, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Require().NoError(err) + s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Require().NotEqual(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Require().NotEqual(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + }) +} diff --git a/tikv/region.go b/tikv/region.go index 4fd0a25642..8b9576578a 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -79,6 +79,9 @@ type RPCCancellerCtxKey = locate.RPCCancellerCtxKey // simply return the error to caller. type RegionRequestSender = locate.RegionRequestSender +// ReplicaSelectorExperimentalOptions defines experimental options of replica selector. +type ReplicaSelectorExperimentalOptions = locate.ReplicaSelectorExperimentalOptions + // StoreSelectorOption configures storeSelectorOp. type StoreSelectorOption = locate.StoreSelectorOption @@ -152,6 +155,16 @@ func StoreShuttingDown(v uint32) { locate.StoreShuttingDown(v) } +// SetReplicaSelectorExperimentalOptions sets experimental options of replica selector. +func SetReplicaSelectorExperimentalOptions(opts ReplicaSelectorExperimentalOptions) { + locate.SetReplicaSelectorExperimentalOptions(opts) +} + +// GetReplicaSelectorExperimentalOptions gets experimental options of replica selector. +func GetReplicaSelectorExperimentalOptions() (opts ReplicaSelectorExperimentalOptions) { + return locate.GetReplicaSelectorExperimentalOptions() +} + // WithMatchLabels indicates selecting stores with matched labels func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption { return locate.WithMatchLabels(labels)