Skip to content

Commit

Permalink
make experimental improvements to replica selector (#1109)
Browse files Browse the repository at this point in the history
* make experimental improvements to replica selector

Signed-off-by: zyguan <[email protected]>

* address the comment

Signed-off-by: zyguan <[email protected]>

* fix the test

Signed-off-by: zyguan <[email protected]>

---------

Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored May 14, 2024
1 parent f0ae797 commit 18d0dab
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 10 deletions.
9 changes: 8 additions & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,22 @@ func (c *RPCContext) String() string {
type contextPatcher struct {
staleRead *bool
replicaRead *bool

timeoutFactor float64
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context, timeout time.Duration) time.Duration {
if patcher.staleRead != nil {
pbCtx.StaleRead = *patcher.staleRead
}
if patcher.replicaRead != nil {
pbCtx.ReplicaRead = *patcher.replicaRead
}
if patcher.timeoutFactor > 0 {
pbCtx.MaxExecutionDurationMs = uint64(float64(pbCtx.MaxExecutionDurationMs) * patcher.timeoutFactor)
timeout = time.Duration(float64(timeout) * patcher.timeoutFactor)
}
return timeout
}

type storeSelectorOp struct {
Expand Down
53 changes: 44 additions & 9 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"time"
"unsafe"

uatomic "go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -82,6 +83,30 @@ func LoadShuttingDown() uint32 {
return atomic.LoadUint32(&shuttingDown)
}

// ReplicaSelectorExperimentalOptions defines experimental options of replica selector.
type ReplicaSelectorExperimentalOptions struct {
StaleRead struct {
PreventRetryFollower bool
RetryLeaderTimeoutFactor float64
}
}

var selectorExpOpts uatomic.Pointer[ReplicaSelectorExperimentalOptions]

// SetReplicaSelectorExperimentalOptions sets experimental options of replica selector.
func SetReplicaSelectorExperimentalOptions(opts ReplicaSelectorExperimentalOptions) {
selectorExpOpts.Store(&opts)
}

// GetReplicaSelectorExperimentalOptions gets experimental options of replica selector.
func GetReplicaSelectorExperimentalOptions() (opts ReplicaSelectorExperimentalOptions) {
ptr := selectorExpOpts.Load()
if ptr == nil {
return ReplicaSelectorExperimentalOptions{}
}
return *ptr
}

// RegionRequestSender sends KV/Cop requests to tikv server. It handles network
// errors and some region errors internally.
//
Expand Down Expand Up @@ -265,6 +290,8 @@ type replicaSelector struct {
targetIdx AccessIndex
// replicas[proxyIdx] is the store used to redirect requests this time
proxyIdx AccessIndex

ReplicaSelectorExperimentalOptions
}

// selectorState is the interface of states of the replicaSelector.
Expand Down Expand Up @@ -627,22 +654,28 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
leader := selector.replicas[state.leaderIdx]
leaderEpochStale := leader.isEpochStale()
leaderUnreachable := leader.store.getLivenessState() != reachable
leaderInvalid := leaderEpochStale || leaderUnreachable || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.BgLogger().Warn("unable to find stores with given labels",
leaderExhausted := state.IsLeaderExhausted(leader)
leaderTimeout := leader.deadlineErrUsingConfTimeout
if len(state.option.labels) > 0 && !state.option.leaderOnly {
logutil.BgLogger().Warn("unable to find a store with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-epoch-stale", leaderEpochStale),
zap.Bool("leader-unreachable", leaderUnreachable),
zap.Bool("leader-invalid", leaderInvalid),
zap.Bool("stale-read", state.isStaleRead),
zap.Any("labels", state.option.labels))
}
if leaderInvalid || leader.deadlineErrUsingConfTimeout {
if leaderEpochStale || leaderUnreachable || leaderExhausted || leaderTimeout {
logutil.BgLogger().Warn("unable to find a valid leader",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("stale-read", state.isStaleRead),
zap.Bool("epoch-stale", leaderEpochStale),
zap.Bool("unreachable", leaderUnreachable),
zap.Bool("exhausted", leaderExhausted),
zap.Bool("timeout", leaderTimeout))
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
// If leader tried and received deadline exceeded error, try follower.
if state.isStaleRead || leader.deadlineErrUsingConfTimeout {
if (state.isStaleRead && !selector.StaleRead.PreventRetryFollower) ||
(!state.isStaleRead && leader.deadlineErrUsingConfTimeout) {
selector.state = &tryFollower{
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
Expand All @@ -667,6 +700,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
if resetStaleRead {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
rpcCtx.contextPatcher.timeoutFactor = selector.StaleRead.RetryLeaderTimeoutFactor
}
return rpcCtx, nil
}
Expand Down Expand Up @@ -772,6 +806,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
state,
-1,
-1,
GetReplicaSelectorExperimentalOptions(),
}, nil
}

Expand Down Expand Up @@ -1210,7 +1245,7 @@ func (s *RegionRequestSender) SendReqCtx(
}
}

rpcCtx.contextPatcher.applyTo(&req.Context)
timeout = rpcCtx.contextPatcher.applyTo(&req.Context, timeout)
if req.InputRequestSource != "" && s.replicaSelector != nil {
s.replicaSelector.patchRequestSource(req, rpcCtx)
}
Expand Down
105 changes: 105 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,3 +1519,108 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
}
}
}

func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelectorExperimentalOptions() {
key := []byte("key")
bo := retry.NewBackoffer(context.Background(), -1)

loc, err := s.cache.LocateKey(bo, key)
s.Require().NoError(err)

region := s.cache.GetCachedRegionWithRLock(loc.Region)
leader, _, _, _ := region.WorkStorePeer(region.getStore())
follower, _, _, _ := region.FollowerStorePeer(region.getStore(), 0, &storeSelectorOp{})

newStaleReadReq := func() *tikvrpc.Request {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
return req
}

errRespTimeout := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{Reason: "mock: deadline is exceeded"}}}}
errRespDataNotReady := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}

mockLeaderTimeout := func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr == leader.addr {
return errRespTimeout, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
}

mockLeaderSlow := func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr == leader.addr && timeout <= 300*time.Millisecond {
return errRespTimeout, nil
}
if addr == follower.addr {
if req.StaleRead {
return errRespDataNotReady, nil
} else {
return errRespTimeout, nil
}
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
}

s.Run("PreventRetryFollowerOn", func() {
// 1. access local leader -> timeout
// 2. cannot retry leader (since timeout), cannot fallback to tryFollower -> pseudo error
defer SetReplicaSelectorExperimentalOptions(GetReplicaSelectorExperimentalOptions())
var opts ReplicaSelectorExperimentalOptions
opts.StaleRead.PreventRetryFollower = true
SetReplicaSelectorExperimentalOptions(opts)

s.regionRequestSender.client = &fnClient{fn: mockLeaderTimeout}
s.cache.LocateKey(bo, key)

resp, _, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(leader.labels))
s.Require().NoError(err)
regionErr, err := resp.GetRegionError()
s.Require().NoError(err)
s.Require().NotNil(regionErr)
s.Require().True(IsFakeRegionError(regionErr))
})

s.Run("PreventRetryFollowerOff", func() {
// 1. access local leader -> timeout
// 2. cannot retry leader (since timeout), fallback to tryFollower, access follower -> ok
s.regionRequestSender.client = &fnClient{fn: mockLeaderTimeout}
s.cache.LocateKey(bo, key)

resp, rpcCtx, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(leader.labels))
s.Require().NoError(err)
s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value))
})

s.Run("RetryLeaderTimeoutFactorOn", func() {
// 1. access local follower -> data is not ready
// 2. retry leader with timeout*2 -> ok
defer SetReplicaSelectorExperimentalOptions(GetReplicaSelectorExperimentalOptions())
var opts ReplicaSelectorExperimentalOptions
opts.StaleRead.RetryLeaderTimeoutFactor = 2
SetReplicaSelectorExperimentalOptions(opts)

s.regionRequestSender.client = &fnClient{fn: mockLeaderSlow}
s.cache.LocateKey(bo, key)

resp, _, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, 200*time.Millisecond, tikvrpc.TiKV, WithMatchLabels(follower.labels))
s.Require().NoError(err)
s.Require().Equal(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value))
})

s.Run("RetryLeaderTimeoutFactorOff", func() {
// 1. access local follower -> data is not ready
// 2. retry leader with timeout -> timeout
// 3. fallback to tryFollower, access local follower with replica read -> timeout
// 4. access the other follower -> ok
s.regionRequestSender.client = &fnClient{fn: mockLeaderSlow}
s.cache.LocateKey(bo, key)

resp, rpcCtx, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, 200*time.Millisecond, tikvrpc.TiKV, WithMatchLabels(follower.labels))
s.Require().NoError(err)
s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value))
s.Require().NotEqual(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value))
s.Require().NotEqual(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value))
})
}
13 changes: 13 additions & 0 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type RPCCancellerCtxKey = locate.RPCCancellerCtxKey
// simply return the error to caller.
type RegionRequestSender = locate.RegionRequestSender

// ReplicaSelectorExperimentalOptions defines experimental options of replica selector.
type ReplicaSelectorExperimentalOptions = locate.ReplicaSelectorExperimentalOptions

// StoreSelectorOption configures storeSelectorOp.
type StoreSelectorOption = locate.StoreSelectorOption

Expand Down Expand Up @@ -152,6 +155,16 @@ func StoreShuttingDown(v uint32) {
locate.StoreShuttingDown(v)
}

// SetReplicaSelectorExperimentalOptions sets experimental options of replica selector.
func SetReplicaSelectorExperimentalOptions(opts ReplicaSelectorExperimentalOptions) {
locate.SetReplicaSelectorExperimentalOptions(opts)
}

// GetReplicaSelectorExperimentalOptions gets experimental options of replica selector.
func GetReplicaSelectorExperimentalOptions() (opts ReplicaSelectorExperimentalOptions) {
return locate.GetReplicaSelectorExperimentalOptions()
}

// WithMatchLabels indicates selecting stores with matched labels
func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption {
return locate.WithMatchLabels(labels)
Expand Down

0 comments on commit 18d0dab

Please sign in to comment.