From b440ea2f58e0d9131c64b5a1d4f6a30714d3d41d Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 16 Aug 2021 19:27:02 +0800 Subject: [PATCH] select replica for forward requests using RegionRequestSender (#264) Signed-off-by: Yilin Chen --- go.sum | 4 - integration_tests/go.sum | 3 - internal/locate/region_cache.go | 148 ++----- internal/locate/region_cache_test.go | 56 --- internal/locate/region_request.go | 527 +++++++++++++++++++----- internal/locate/region_request3_test.go | 230 ++++++----- 6 files changed, 583 insertions(+), 385 deletions(-) diff --git a/go.sum b/go.sum index a25dbc3463..ab24d3ad34 100644 --- a/go.sum +++ b/go.sum @@ -290,10 +290,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db h1:PSW6P83KZi5WopPBiecU286PWMSl2rvxCBZT94iBX+I= -github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1 h1:zl56/I6s/UMG/kH+9epaIAiwfIx2gZO5FW8hvkhDYAg= -github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E= github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 5e6f853b19..356b436a9b 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -420,10 +420,7 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db h1:PSW6P83KZi5WopPBiecU286PWMSl2rvxCBZT94iBX+I= github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1 h1:zl56/I6s/UMG/kH+9epaIAiwfIx2gZO5FW8hvkhDYAg= -github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E= github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 633f4db824..cf926b6b71 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -458,17 +458,16 @@ func (c *RegionCache) SetPDClient(client pd.Client) { // RPCContext contains data that is needed to send RPC to a region. type RPCContext struct { - Region RegionVerID - Meta *metapb.Region - Peer *metapb.Peer - AccessIdx AccessIndex - Store *Store - Addr string - AccessMode accessMode - ProxyStore *Store // nil means proxy is not used - ProxyAccessIdx AccessIndex // valid when ProxyStore is not nil - ProxyAddr string // valid when ProxyStore is not nil - TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. + Region RegionVerID + Meta *metapb.Region + Peer *metapb.Peer + AccessIdx AccessIndex + Store *Store + Addr string + AccessMode accessMode + ProxyStore *Store // nil means proxy is not used + ProxyAddr string // valid when ProxyStore is not nil + TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. tryTimes int } @@ -481,7 +480,7 @@ func (c *RPCContext) String() string { res := fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d, reqStoreType: %s, runStoreType: %s", c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.AccessIdx, c.AccessMode, runStoreType) if c.ProxyStore != nil { - res += fmt.Sprintf(", proxy store id: %d, proxy addr: %s, proxy idx: %d", c.ProxyStore.storeID, c.ProxyAddr, c.ProxyAccessIdx) + res += fmt.Sprintf(", proxy store id: %d, proxy addr: %s", c.ProxyStore.storeID, c.ProxyStore.addr) } return res } @@ -573,15 +572,14 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep } var ( - proxyStore *Store - proxyAddr string - proxyAccessIdx AccessIndex + proxyStore *Store + proxyAddr string ) if c.enableForwarding && isLeaderReq { - if atomic.LoadInt32(&store.needForwarding) == 0 { + if atomic.LoadInt32(&store.unreachable) == 0 { regionStore.unsetProxyStoreIfNeeded(cachedRegion) } else { - proxyStore, proxyAccessIdx, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx) + proxyStore, _, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx) if proxyStore != nil { proxyAddr, err = c.getStoreAddr(bo, cachedRegion, proxyStore) if err != nil { @@ -592,17 +590,16 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep } return &RPCContext{ - Region: id, - Meta: cachedRegion.meta, - Peer: peer, - AccessIdx: accessIdx, - Store: store, - Addr: addr, - AccessMode: tiKVOnly, - ProxyStore: proxyStore, - ProxyAccessIdx: proxyAccessIdx, - ProxyAddr: proxyAddr, - TiKVNum: regionStore.accessStoreNum(tiKVOnly), + Region: id, + Meta: cachedRegion.meta, + Peer: peer, + AccessIdx: accessIdx, + Store: store, + Addr: addr, + AccessMode: tiKVOnly, + ProxyStore: proxyStore, + ProxyAddr: proxyAddr, + TiKVNum: regionStore.accessStoreNum(tiKVOnly), }, nil } @@ -859,53 +856,21 @@ func (c *RegionCache) OnSendFail(bo *retry.Backoffer, ctx *RPCContext, scheduleR } rs := r.getStore() - startForwarding := false - incEpochStoreIdx := -1 if err != nil { storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx) - leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx - - // Mark the store as failure if it's not a redirection request because we - // can't know the status of the proxy store by it. - if ctx.ProxyStore == nil { - // send fail but store is reachable, keep retry current peer for replica leader request. - // but we still need switch peer for follower-read or learner-read(i.e. tiflash) - if leaderReq { - if s.requestLiveness(bo, c) == reachable { - return - } else if c.enableForwarding { - s.startHealthCheckLoopIfNeeded(c) - startForwarding = true - } - } - // invalidate regions in store. - incEpochStoreIdx = c.markRegionNeedBeRefill(s, storeIdx, rs) - } + // invalidate regions in store. + c.markRegionNeedBeRefill(s, storeIdx, rs) } // try next peer to found new leader. if ctx.AccessMode == tiKVOnly { - if startForwarding || ctx.ProxyStore != nil { - var currentProxyIdx AccessIndex = -1 - if ctx.ProxyStore != nil { - currentProxyIdx = ctx.ProxyAccessIdx - } - // In case the epoch of the store is increased, try to avoid reloading the current region by also - // increasing the epoch stored in `rs`. - rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx) - logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail", - zap.Stringer("current", ctx), - zap.Bool("needReload", scheduleReload), - zap.Error(err)) - } else { - rs.switchNextTiKVPeer(r, ctx.AccessIdx) - logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail", - zap.Stringer("current", ctx), - zap.Bool("needReload", scheduleReload), - zap.Error(err)) - } + rs.switchNextTiKVPeer(r, ctx.AccessIdx) + logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail", + zap.Stringer("current", ctx), + zap.Bool("needReload", scheduleReload), + zap.Error(err)) } else { rs.switchNextFlashPeer(r, ctx.AccessIdx) logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail", @@ -1457,7 +1422,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S } func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) { - if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.needForwarding) == 0 { + if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.unreachable) == 0 { return } @@ -1487,7 +1452,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor } storeIdx, store := rs.accessStore(tiKVOnly, AccessIndex(index)) // Skip unreachable stores. - if atomic.LoadInt32(&store.needForwarding) != 0 { + if atomic.LoadInt32(&store.unreachable) != 0 { continue } @@ -1802,41 +1767,6 @@ func (r *regionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex) rr.compareAndSwapStore(r, newRegionStore) } -// switchNextProxyStore switches the index of the peer that will forward requests to the leader to the next peer. -// If proxy is currently not used on this region, the value of `currentProxyIdx` should be -1, and a random peer will -// be select in this case. -func (r *regionStore) switchNextProxyStore(rr *Region, currentProxyIdx AccessIndex, incEpochStoreIdx int) { - if r.proxyTiKVIdx != currentProxyIdx { - return - } - - tikvNum := r.accessStoreNum(tiKVOnly) - var nextIdx AccessIndex - - // If the region is not using proxy before, randomly select a non-leader peer for the first try. - if currentProxyIdx == -1 { - // Randomly select an non-leader peer - // TODO: Skip unreachable peers here. - nextIdx = AccessIndex(rand.Intn(tikvNum - 1)) - if nextIdx >= r.workTiKVIdx { - nextIdx++ - } - } else { - nextIdx = (currentProxyIdx + 1) % AccessIndex(tikvNum) - // skips the current workTiKVIdx - if nextIdx == r.workTiKVIdx { - nextIdx = (nextIdx + 1) % AccessIndex(tikvNum) - } - } - - newRegionStore := r.clone() - newRegionStore.proxyTiKVIdx = nextIdx - if incEpochStoreIdx >= 0 { - newRegionStore.storeEpochs[incEpochStoreIdx]++ - } - rr.compareAndSwapStore(r, newRegionStore) -} - func (r *regionStore) setProxyStoreIdx(rr *Region, idx AccessIndex) { if r.proxyTiKVIdx == idx { return @@ -1917,10 +1847,11 @@ type Store struct { storeType tikvrpc.EndpointType // type of the store tokenCount atomic2.Int64 // used store token count - // whether the store is disconnected due to some reason, therefore requests to the store needs to be + // whether the store is unreachable due to some reason, therefore requests to the store needs to be // forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store. // this mechanism is currently only applicable for TiKV stores. - needForwarding int32 + unreachable int32 + unreachableSince time.Time } type resolveState uint64 @@ -2150,13 +2081,14 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) { } // It may be already started by another thread. - if atomic.CompareAndSwapInt32(&s.needForwarding, 0, 1) { + if atomic.CompareAndSwapInt32(&s.unreachable, 0, 1) { + s.unreachableSince = time.Now() go s.checkUntilHealth(c) } } func (s *Store) checkUntilHealth(c *RegionCache) { - defer atomic.CompareAndSwapInt32(&s.needForwarding, 1, 0) + defer atomic.CompareAndSwapInt32(&s.unreachable, 1, 0) ticker := time.NewTicker(time.Second) lastCheckPDTime := time.Now() diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index ad05e0ca01..9abfd91e91 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -39,7 +39,6 @@ import ( "errors" "fmt" "math/rand" - "sync/atomic" "testing" "time" @@ -709,61 +708,6 @@ func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore() { s.Nil(err) } -func (s *testRegionCacheSuite) TestSendFailEnableForwarding() { - s.cache.enableForwarding = true - - // key range: ['' - 'm' - 'z'] - region2 := s.cluster.AllocID() - newPeers := s.cluster.AllocIDs(2) - s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0]) - - var storeState uint32 = uint32(unreachable) - s.cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { - return livenessState(atomic.LoadUint32(&storeState)) - } - - // Check the two regions. - loc1, err := s.cache.LocateKey(s.bo, []byte("a")) - s.Nil(err) - s.Equal(loc1.Region.id, s.region1) - - // Invoke OnSendFail so that the store will be marked as needForwarding - ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) - s.Nil(err) - s.NotNil(ctx) - s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error")) - - // ...then on next retry, proxy will be used - ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) - s.Nil(err) - s.NotNil(ctx) - s.NotNil(ctx.ProxyStore) - s.Equal(ctx.ProxyStore.storeID, s.store2) - - // Proxy will be also applied to other regions whose leader is on the store - loc2, err := s.cache.LocateKey(s.bo, []byte("x")) - s.Nil(err) - s.Equal(loc2.Region.id, region2) - ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc2.Region, kv.ReplicaReadLeader, 0) - s.Nil(err) - s.NotNil(ctx) - s.NotNil(ctx.ProxyStore) - s.Equal(ctx.ProxyStore.storeID, s.store2) - - // Recover the store - atomic.StoreUint32(&storeState, uint32(reachable)) - // The proxy should be unset after several retries - for retry := 0; retry < 15; retry++ { - ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) - s.Nil(err) - if ctx.ProxyStore == nil { - break - } - time.Sleep(time.Millisecond * 200) - } - s.Nil(ctx.ProxyStore) -} - func (s *testRegionCacheSuite) TestSendFailedInMultipleNode() { // 3 nodes and no.1 is leader. store3 := s.cluster.AllocID() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 117de1e4d9..6ccfd4557a 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -37,6 +37,7 @@ package locate import ( "context" "fmt" + "math/rand" "strconv" "strings" "sync" @@ -242,11 +243,275 @@ type replica struct { type replicaSelector struct { regionCache *RegionCache region *Region + regionStore *regionStore // replicas contains all TiKV replicas for now and the leader is at the // head of the slice. replicas []*replica - // nextReplicaIdx points to the candidate for the next attempt. - nextReplicaIdx int + state selectorState + // replicas[targetIdx] is the replica handling the request this time + targetIdx AccessIndex + // replicas[proxyIdx] is the store used to redirect requests this time + proxyIdx AccessIndex +} + +// selectorState is the interface of states of the replicaSelector. +// Here is the main state transition diagram: +// +// exceeding maxReplicaAttempt +// +-------------------+ || RPC failure && unreachable && no forwarding +// +-------->+ accessKnownLeader +----------------+ +// | +------+------------+ | +// | | | +// | | RPC failure v +// | | && unreachable +-----+-----+ +// | | && enable forwarding |tryFollower+------+ +// | | +-----------+ | +// | leader becomes v | all followers +// | reachable +----+-------------+ | are tried +// +-----------+accessByKnownProxy| | +// ^ +------+-----------+ | +// | | +-------+ | +// | | RPC failure |backoff+<---+ +// | leader becomes v +---+---+ +// | reachable +-----+-----+ all proxies are tried ^ +// +------------+tryNewProxy+-------------------------+ +// +-----------+ +type selectorState interface { + next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) + onSendSuccess(*replicaSelector) + onSendFailure(*retry.Backoffer, *replicaSelector, error) + onNoLeader(*replicaSelector) +} + +type stateChanged struct{} + +func (c stateChanged) Error() string { + return "replicaSelector state changed" +} + +type stateBase struct{} + +func (s stateBase) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + return nil, nil +} + +func (s stateBase) onSendSuccess(selector *replicaSelector) { +} + +func (s stateBase) onSendFailure(backoffer *retry.Backoffer, selector *replicaSelector, err error) { +} + +func (s stateBase) onNoLeader(selector *replicaSelector) { +} + +// accessKnownLeader is the state where we are sending requests +// to the leader we suppose to be. +// +// After attempting maxReplicaAttempt times without success +// and without receiving new leader from the responses error, +// we should switch to tryFollower state. +type accessKnownLeader struct { + stateBase + leaderIdx AccessIndex +} + +func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + leader := selector.replicas[state.leaderIdx] + if leader.attempts >= maxReplicaAttempt { + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + return nil, stateChanged{} + } + selector.targetIdx = state.leaderIdx + return selector.buildRPCContext(bo) +} + +func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { + liveness := selector.checkLiveness(bo, selector.targetReplica()) + if liveness != reachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding { + selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx} + return + } + if liveness != reachable || selector.targetReplica().attempts >= maxReplicaAttempt { + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + } + if liveness != reachable { + selector.invalidateReplicaStore(selector.targetReplica(), cause) + } +} + +func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} +} + +// tryFollower is the state where we cannot access the known leader +// but still try other replicas in case they have become the leader. +// +// In this state, a follower that is not tried will be used. If all +// followers are tried, we think we have exhausted the replicas. +// On sending failure in this state, if leader info is returned, +// the leader will be updated to replicas[0] and give it another chance. +type tryFollower struct { + stateBase + leaderIdx AccessIndex + lastIdx AccessIndex +} + +func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + var targetReplica *replica + // Search replica that is not attempted from the last accessed replica + for i := 1; i < len(selector.replicas); i++ { + idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + if idx == state.leaderIdx { + continue + } + targetReplica = selector.replicas[idx] + // Each follower is only tried once + if targetReplica.attempts == 0 { + state.lastIdx = idx + selector.targetIdx = idx + break + } + } + // If all followers are tried and fail, backoff and retry. + if selector.targetIdx < 0 { + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() + selector.invalidateRegion() + return nil, nil + } + return selector.buildRPCContext(bo) +} + +func (state *tryFollower) onSendSuccess(selector *replicaSelector) { + if !selector.regionCache.switchWorkLeaderToPeer(selector.region, selector.targetReplica().peer) { + panic("the store must exist") + } +} + +func (state *tryFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { + if selector.checkLiveness(bo, selector.targetReplica()) != reachable { + selector.invalidateReplicaStore(selector.targetReplica(), cause) + } +} + +// accessByKnownProxy is the state where we are sending requests through +// regionStore.proxyTiKVIdx as a proxy. +type accessByKnownProxy struct { + stateBase + leaderIdx AccessIndex +} + +func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + leader := selector.replicas[state.leaderIdx] + if atomic.LoadInt32(&leader.store.unreachable) == 0 { + selector.regionStore.unsetProxyStoreIfNeeded(selector.region) + selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx} + return nil, stateChanged{} + } + + if selector.regionStore.proxyTiKVIdx >= 0 { + selector.targetIdx = state.leaderIdx + selector.proxyIdx = selector.regionStore.proxyTiKVIdx + return selector.buildRPCContext(bo) + } + + selector.state = &tryNewProxy{leaderIdx: state.leaderIdx} + return nil, stateChanged{} +} + +func (state *accessByKnownProxy) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { + selector.state = &tryNewProxy{leaderIdx: state.leaderIdx} + if selector.checkLiveness(bo, selector.proxyReplica()) != reachable { + selector.invalidateReplicaStore(selector.proxyReplica(), cause) + } +} + +func (state *accessByKnownProxy) onNoLeader(selector *replicaSelector) { + selector.state = &invalidLeader{} +} + +// tryNewProxy is the state where we try to find a node from followers +// as proxy. +type tryNewProxy struct { + stateBase + leaderIdx AccessIndex +} + +func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + leader := selector.replicas[state.leaderIdx] + if atomic.LoadInt32(&leader.store.unreachable) == 0 { + selector.regionStore.unsetProxyStoreIfNeeded(selector.region) + selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx} + return nil, stateChanged{} + } + + candidateNum := 0 + for idx, replica := range selector.replicas { + if state.isCandidate(AccessIndex(idx), replica) { + candidateNum++ + } + } + + // If all followers are tried as a proxy and fail, mark the leader store invalid, then backoff and retry. + if candidateNum == 0 { + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() + selector.invalidateReplicaStore(leader, errors.Errorf("all followers are tried as proxy but fail")) + selector.region.scheduleReload() + return nil, nil + } + + // Skip advanceCnt valid candidates to find a proxy peer randomly + advanceCnt := rand.Intn(candidateNum) + for idx, replica := range selector.replicas { + if !state.isCandidate(AccessIndex(idx), replica) { + continue + } + if advanceCnt == 0 { + selector.targetIdx = state.leaderIdx + selector.proxyIdx = AccessIndex(idx) + break + } + advanceCnt-- + } + return selector.buildRPCContext(bo) +} + +func (state *tryNewProxy) isCandidate(idx AccessIndex, replica *replica) bool { + // Try each peer only once + return idx != state.leaderIdx && replica.attempts == 0 +} + +func (state *tryNewProxy) onSendSuccess(selector *replicaSelector) { + selector.regionStore.setProxyStoreIdx(selector.region, selector.proxyIdx) +} + +func (state *tryNewProxy) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { + if selector.checkLiveness(bo, selector.proxyReplica()) != reachable { + selector.invalidateReplicaStore(selector.proxyReplica(), cause) + } +} + +func (state *tryNewProxy) onNoLeader(selector *replicaSelector) { + selector.state = &invalidLeader{} +} + +type invalidStore struct { + stateBase +} + +func (state *invalidStore) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidStore").Inc() + return nil, nil +} + +// TODO(sticnarf): If using request forwarding and the leader is unknown, try other followers +// instead of just switching to this state to backoff and retry. +type invalidLeader struct { + stateBase +} + +func (state *invalidLeader) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidLeader").Inc() + return nil, nil } func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID) (*replicaSelector, error) { @@ -264,112 +529,187 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID) (*replic attempts: 0, }) } - // Move the leader to the first slot. - replicas[regionStore.workTiKVIdx], replicas[0] = replicas[0], replicas[regionStore.workTiKVIdx] + var state selectorState + if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 { + state = &accessByKnownProxy{leaderIdx: regionStore.workTiKVIdx} + } else { + state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx} + } return &replicaSelector{ regionCache, cachedRegion, + regionStore, replicas, - 0, + state, + -1, + -1, }, nil } -// isExhausted returns true if runs out of all replicas. -func (s *replicaSelector) isExhausted() bool { - return s.nextReplicaIdx >= len(s.replicas) -} - -func (s *replicaSelector) nextReplica() *replica { - if s.isExhausted() { - return nil - } - return s.replicas[s.nextReplicaIdx] -} - const maxReplicaAttempt = 10 // next creates the RPCContext of the current candidate replica. // It returns a SendError if runs out of all replicas or the cached region is invalidated. -func (s *replicaSelector) next(bo *retry.Backoffer) (*RPCContext, error) { +func (s *replicaSelector) next(bo *retry.Backoffer) (rpcCtx *RPCContext, err error) { + if !s.region.isValid() { + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc() + return nil, nil + } + + s.targetIdx = -1 + s.proxyIdx = -1 + s.refreshRegionStore() for { - if !s.region.isValid() { - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc() - return nil, nil - } - if s.isExhausted() { - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() - s.invalidateRegion() - return nil, nil + rpcCtx, err = s.state.next(bo, s) + if _, isStateChanged := err.(stateChanged); !isStateChanged { + return } - replica := s.replicas[s.nextReplicaIdx] - s.nextReplicaIdx++ + } +} - // Limit the max attempts of each replica to prevent endless retry. - if replica.attempts >= maxReplicaAttempt { - continue +func (s *replicaSelector) targetReplica() *replica { + if s.targetIdx >= 0 && int(s.targetIdx) < len(s.replicas) { + return s.replicas[s.targetIdx] + } + return nil +} + +func (s *replicaSelector) proxyReplica() *replica { + if s.proxyIdx >= 0 && int(s.proxyIdx) < len(s.replicas) { + return s.replicas[s.proxyIdx] + } + return nil +} + +func (s *replicaSelector) refreshRegionStore() { + oldRegionStore := s.regionStore + newRegionStore := s.region.getStore() + if oldRegionStore == newRegionStore { + return + } + s.regionStore = newRegionStore + + // In the current implementation, if stores change, the address of it must change. + // So we just compare the address here. + // When stores change, we mark this replicaSelector as invalid to let the caller + // recreate a new replicaSelector. + if &oldRegionStore.stores != &newRegionStore.stores { + s.state = &invalidStore{} + return + } + + // If leader has changed, it means a recent request succeeds an RPC on the new + // leader. Give the leader an addition chance. + if oldRegionStore.workTiKVIdx != newRegionStore.workTiKVIdx { + newLeaderIdx := newRegionStore.workTiKVIdx + s.state = &accessKnownLeader{leaderIdx: newLeaderIdx} + if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt { + s.replicas[newLeaderIdx].attempts-- } - replica.attempts++ + return + } +} - storeFailEpoch := atomic.LoadUint32(&replica.store.epoch) - if storeFailEpoch != replica.epoch { - // TODO(youjiali1995): Is it necessary to invalidate the region? - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("stale_store").Inc() - s.invalidateRegion() - return nil, nil +func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, error) { + targetReplica, proxyReplica := s.targetReplica(), s.proxyReplica() + + // Backoff and retry if no replica is selected or the selected replica is stale + if targetReplica == nil || s.isReplicaStoreEpochStale(targetReplica) || + (proxyReplica != nil && s.isReplicaStoreEpochStale(proxyReplica)) { + return nil, nil + } + + rpcCtx := &RPCContext{ + Region: s.region.VerID(), + Meta: s.region.meta, + Peer: targetReplica.peer, + Store: targetReplica.store, + AccessMode: tiKVOnly, + TiKVNum: len(s.replicas), + } + + // Set leader addr + addr, err := s.regionCache.getStoreAddr(bo, s.region, targetReplica.store) + if err != nil { + return nil, err + } + if len(addr) == 0 { + return nil, nil + } + rpcCtx.Addr = addr + targetReplica.attempts++ + + // Set proxy addr + if proxyReplica != nil { + addr, err = s.regionCache.getStoreAddr(bo, s.region, proxyReplica.store) + if err != nil { + return nil, err } - addr, err := s.regionCache.getStoreAddr(bo, s.region, replica.store) - if err == nil && len(addr) != 0 { - return &RPCContext{ - Region: s.region.VerID(), - Meta: s.region.meta, - Peer: replica.peer, - Store: replica.store, - Addr: addr, - AccessMode: tiKVOnly, - TiKVNum: len(s.replicas), - }, nil + if len(addr) == 0 { + return nil, nil } + rpcCtx.ProxyStore = proxyReplica.store + rpcCtx.ProxyAddr = addr + proxyReplica.attempts++ + } + + return rpcCtx, nil +} + +func (s *replicaSelector) isReplicaStoreEpochStale(replica *replica) bool { + storeFailEpoch := atomic.LoadUint32(&replica.store.epoch) + if storeFailEpoch != replica.epoch { + // TODO(youjiali1995): Is it necessary to invalidate the region? + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("stale_store").Inc() + s.invalidateRegion() + return true } + return false } func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { metrics.RegionCacheCounterWithSendFail.Inc() - replica := s.replicas[s.nextReplicaIdx-1] - if replica.store.requestLiveness(bo, s.regionCache) == reachable { - s.rewind() - return + s.state.onSendFailure(bo, s, err) +} + +func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { + store := accessReplica.store + liveness := store.requestLiveness(bo, s.regionCache) + // We only check health in loop if forwarding is enabled now. + // The restriction might be relaxed if necessary, but the implementation + // may be checked carefully again. + if liveness != reachable && s.regionCache.enableForwarding { + store.startHealthCheckLoopIfNeeded(s.regionCache) } + return liveness +} +func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) { store := replica.store - // invalidate regions in store. if atomic.CompareAndSwapUint32(&store.epoch, replica.epoch, replica.epoch+1) { - logutil.BgLogger().Info("mark store's regions need be refill", zap.Uint64("id", store.storeID), zap.String("addr", store.addr), zap.Error(err)) + logutil.BgLogger().Info("mark store's regions need be refill", zap.Uint64("id", store.storeID), zap.String("addr", store.addr), zap.Error(cause)) metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() // schedule a store addr resolve. store.markNeedCheck(s.regionCache.notifyCheckCh) } - // TODO(youjiali1995): It's not necessary, but some tests depend on it and it's not easy to fix. - if s.isExhausted() { - s.region.scheduleReload() - } } -// OnSendSuccess updates the leader of the cached region since the replicaSelector -// is only used for leader request. It's called when the request is sent to the -// replica successfully. -func (s *replicaSelector) OnSendSuccess() { - // The successful replica is not at the head of replicas which means it's not the - // leader in the cached region, so update leader. - if s.nextReplicaIdx-1 != 0 { - leader := s.replicas[s.nextReplicaIdx-1].peer - if !s.regionCache.switchWorkLeaderToPeer(s.region, leader) { - panic("the store must exist") - } - } +func (s *replicaSelector) onSendSuccess() { + s.state.onSendSuccess(s) } -func (s *replicaSelector) rewind() { - s.nextReplicaIdx-- +func (s *replicaSelector) onNotLeader(bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader) (shouldRetry bool, err error) { + leader := notLeader.GetLeader() + if leader == nil { + // The region may be during transferring leader. + s.state.onNoLeader(s) + if err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx)); err != nil { + return false, errors.Trace(err) + } + } else { + s.updateLeader(notLeader.GetLeader()) + } + return true, nil } // updateLeader updates the leader of the cached region. @@ -380,16 +720,12 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { } for i, replica := range s.replicas { if isSamePeer(replica.peer, leader) { - if i < s.nextReplicaIdx { - s.nextReplicaIdx-- - } - // Move the leader replica to the front of candidates. - s.replicas[i], s.replicas[s.nextReplicaIdx] = s.replicas[s.nextReplicaIdx], s.replicas[i] - if s.replicas[s.nextReplicaIdx].attempts == maxReplicaAttempt { - // Give the replica one more chance and because the current replica is skipped, it - // won't result in infinite retry. - s.replicas[s.nextReplicaIdx].attempts = maxReplicaAttempt - 1 + if replica.attempts == maxReplicaAttempt { + // Give the replica one more chance and because each follower is tried only once, + // it won't result in infinite retry. + replica.attempts = maxReplicaAttempt - 1 } + s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)} // Update the workTiKVIdx so that following requests can be sent to the leader immediately. if !s.regionCache.switchWorkLeaderToPeer(s.region, leader) { panic("the store must exist") @@ -422,7 +758,7 @@ func (s *RegionRequestSender) getRPCContext( // Now only requests sent to the replica leader will use the replica selector to get // the RPC context. // TODO(youjiali1995): make all requests use the replica selector. - if !s.regionCache.enableForwarding && req.ReplicaReadType == kv.ReplicaReadLeader { + if req.ReplicaReadType == kv.ReplicaReadLeader { if s.leaderReplicaSelector == nil { selector, err := newReplicaSelector(s.regionCache, regionID) if selector == nil || err != nil { @@ -596,7 +932,7 @@ func (s *RegionRequestSender) SendReqCtx( } } else { if s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.OnSendSuccess() + s.leaderReplicaSelector.onSendSuccess() } } return resp, rpcCtx, nil @@ -925,16 +1261,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext zap.String("ctx", ctx.String())) if s.leaderReplicaSelector != nil { - leader := notLeader.GetLeader() - if leader == nil { - // The region may be during transferring leader. - if err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx)); err != nil { - return false, errors.Trace(err) - } - } else { - s.leaderReplicaSelector.updateLeader(notLeader.GetLeader()) - } - return true, nil + return s.leaderReplicaSelector.onNotLeader(bo, ctx, notLeader) } else if notLeader.GetLeader() == nil { // The peer doesn't know who is the current leader. Generally it's because // the Raft group is in an election, but it's possible that the peer is @@ -1003,9 +1330,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if err != nil { return false, errors.Trace(err) } - if s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.rewind() - } return true, nil } @@ -1047,9 +1371,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if err != nil { return false, errors.Trace(err) } - if s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.rewind() - } return true, nil } @@ -1083,9 +1404,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if err != nil { return false, errors.Trace(err) } - if s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.rewind() - } return true, nil } @@ -1096,9 +1414,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if err != nil { return false, errors.Trace(err) } - if s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.rewind() - } return true, nil } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index ae279e86fe..7b1f4678c2 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -212,7 +212,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { s.Equal(ctx.Addr, leaderAddr) s.NotNil(ctx.ProxyStore) s.NotEqual(ctx.ProxyAddr, leaderAddr) - s.NotEqual(ctx.ProxyAccessIdx, ctx.AccessIdx) s.Nil(err) // Simulate recovering to normal @@ -220,7 +219,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { atomic.StoreUint32(&storeState, uint32(reachable)) start := time.Now() for { - if atomic.LoadInt32(&leaderStore.needForwarding) == 0 { + if atomic.LoadInt32(&leaderStore.unreachable) == 0 { break } if time.Since(start) > 3*time.Second { @@ -276,7 +275,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { s.Equal(len(s.regionRequestSender.failProxyStoreIDs), 0) region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) s.NotNil(region) - s.True(region.checkNeedReload()) + s.False(region.isValid()) loc, err = s.regionRequestSender.regionCache.LocateKey(bo, []byte("k")) s.Nil(err) @@ -295,6 +294,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { s.Nil(ctx.ProxyStore) } +func refreshEpochs(regionStore *regionStore) { + for i, store := range regionStore.stores { + regionStore.storeEpochs[i] = atomic.LoadUint32(&store.epoch) + } +} + func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) @@ -330,11 +335,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.NotNil(replicaSelector) s.Nil(err) s.Equal(replicaSelector.region, region) - // Should only contains TiKV stores. + // Should only contain TiKV stores. s.Equal(len(replicaSelector.replicas), regionStore.accessStoreNum(tiKVOnly)) s.Equal(len(replicaSelector.replicas), len(regionStore.stores)-1) - s.True(replicaSelector.nextReplicaIdx == 0) - s.False(replicaSelector.isExhausted()) + s.IsType(&accessKnownLeader{}, replicaSelector.state) // Verify that the store matches the peer and epoch. for _, replica := range replicaSelector.replicas { @@ -348,107 +352,136 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { } } } - // Verify that the leader replica is at the head of replicas. - leaderStore, leaderPeer, _, _ := region.WorkStorePeer(regionStore) - leaderReplica := replicaSelector.replicas[0] - s.Equal(leaderReplica.store, leaderStore) - s.Equal(leaderReplica.peer, leaderPeer) - - assertRPCCtxEqual := func(rpcCtx *RPCContext, replica *replica) { - s.Equal(rpcCtx.Store, replicaSelector.replicas[replicaSelector.nextReplicaIdx-1].store) - s.Equal(rpcCtx.Peer, replicaSelector.replicas[replicaSelector.nextReplicaIdx-1].peer) - s.Equal(rpcCtx.Addr, replicaSelector.replicas[replicaSelector.nextReplicaIdx-1].store.addr) + + assertRPCCtxEqual := func(rpcCtx *RPCContext, target *replica, proxy *replica) { + s.Equal(rpcCtx.Store, target.store) + s.Equal(rpcCtx.Peer, target.peer) + s.Equal(rpcCtx.Addr, target.store.addr) s.Equal(rpcCtx.AccessMode, tiKVOnly) + if proxy != nil { + s.Equal(rpcCtx.ProxyStore, proxy.store) + s.Equal(rpcCtx.ProxyAddr, proxy.store.addr) + } } - // Verify the correctness of next() - for i := 0; i < len(replicaSelector.replicas); i++ { + // Test accessKnownLeader state + s.IsType(&accessKnownLeader{}, replicaSelector.state) + // Try the leader for maxReplicaAttempt times + for i := 1; i <= maxReplicaAttempt; i++ { rpcCtx, err := replicaSelector.next(s.bo) - s.NotNil(rpcCtx) s.Nil(err) - s.Equal(rpcCtx.Region, regionLoc.Region) - s.Equal(rpcCtx.Meta, region.meta) - replica := replicaSelector.replicas[replicaSelector.nextReplicaIdx-1] - assertRPCCtxEqual(rpcCtx, replica) - s.Equal(replica.attempts, 1) - s.Equal(replicaSelector.nextReplicaIdx, i+1) + assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + s.IsType(&accessKnownLeader{}, replicaSelector.state) + s.Equal(replicaSelector.replicas[regionStore.workTiKVIdx].attempts, i) } - s.True(replicaSelector.isExhausted()) + + // After that it should switch to tryFollower + for i := 0; i < len(replicaSelector.replicas)-1; i++ { + rpcCtx, err := replicaSelector.next(s.bo) + s.Nil(err) + state, ok := replicaSelector.state.(*tryFollower) + s.True(ok) + s.Equal(regionStore.workTiKVIdx, state.leaderIdx) + s.NotEqual(state.lastIdx, regionStore.workTiKVIdx) + s.Equal(replicaSelector.targetIdx, state.lastIdx) + assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) + s.Equal(replicaSelector.targetReplica().attempts, 1) + } + // In tryFollower state, if all replicas are tried, nil RPCContext should be returned rpcCtx, err := replicaSelector.next(s.bo) - s.Nil(rpcCtx) s.Nil(err) - // The region should be invalidated if runs out of all replicas. + s.Nil(rpcCtx) + // The region should be invalidated s.False(replicaSelector.region.isValid()) + // Test switching to tryFollower if leader is unreachable region.lastAccess = time.Now().Unix() replicaSelector, err = newReplicaSelector(cache, regionLoc.Region) s.Nil(err) s.NotNil(replicaSelector) cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { - return reachable + return unreachable } - for i := 0; i < maxReplicaAttempt; i++ { - rpcCtx, err := replicaSelector.next(s.bo) - s.NotNil(rpcCtx) - s.Nil(err) - nextIdx := replicaSelector.nextReplicaIdx - // Verify that retry the same store if it's reachable. - replicaSelector.onSendFailure(s.bo, nil) - s.Equal(nextIdx, replicaSelector.nextReplicaIdx+1) - s.Equal(replicaSelector.nextReplica().attempts, i+1) + s.IsType(&accessKnownLeader{}, replicaSelector.state) + _, err = replicaSelector.next(s.bo) + s.Nil(err) + replicaSelector.onSendFailure(s.bo, nil) + rpcCtx, err = replicaSelector.next(s.bo) + s.NotNil(rpcCtx) + s.Nil(err) + s.IsType(&tryFollower{}, replicaSelector.state) + s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx) + assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), nil) + s.Equal(replicaSelector.targetReplica().attempts, 1) + + // Test switching to tryNewProxy if leader is unreachable and forwarding is enabled + refreshEpochs(regionStore) + cache.enableForwarding = true + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region) + s.Nil(err) + s.NotNil(replicaSelector) + cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { + return unreachable } - // Verify the maxReplicaAttempt limit for each replica. + s.IsType(&accessKnownLeader{}, replicaSelector.state) + _, err = replicaSelector.next(s.bo) + s.Nil(err) + replicaSelector.onSendFailure(s.bo, nil) rpcCtx, err = replicaSelector.next(s.bo) + s.NotNil(rpcCtx) s.Nil(err) - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[1]) - s.Equal(replicaSelector.nextReplicaIdx, 2) - - // Verify updating leader. - replicaSelector, _ = newReplicaSelector(cache, regionLoc.Region) - replicaSelector.next(s.bo) - // The leader is the 3rd replica. After updating leader, it should be the next. - leader := replicaSelector.replicas[2] - replicaSelector.updateLeader(leader.peer) - s.Equal(replicaSelector.nextReplica(), leader) - s.Equal(replicaSelector.nextReplicaIdx, 1) - rpcCtx, _ = replicaSelector.next(s.bo) - assertRPCCtxEqual(rpcCtx, leader) - // Verify the regionStore is updated and the workTiKVIdx points to the leader. - regionStore = region.getStore() - leaderStore, leaderPeer, _, _ = region.WorkStorePeer(regionStore) - s.Equal(leaderStore, leader.store) - s.Equal(leaderPeer, leader.peer) - - replicaSelector, _ = newReplicaSelector(cache, regionLoc.Region) - replicaSelector.next(s.bo) - replicaSelector.next(s.bo) - replicaSelector.next(s.bo) - s.True(replicaSelector.isExhausted()) - // The leader is the 1st replica. After updating leader, it should be the next and - // the currnet replica is skipped. - leader = replicaSelector.replicas[0] - replicaSelector.updateLeader(leader.peer) - // The leader should be the next replica. - s.Equal(replicaSelector.nextReplica(), leader) - s.Equal(replicaSelector.nextReplicaIdx, 2) - rpcCtx, _ = replicaSelector.next(s.bo) - s.True(replicaSelector.isExhausted()) - assertRPCCtxEqual(rpcCtx, leader) - // Verify the regionStore is updated and the workTiKVIdx points to the leader. + state, ok := replicaSelector.state.(*tryNewProxy) + s.True(ok) + s.Equal(regionStore.workTiKVIdx, state.leaderIdx) + s.Equal(AccessIndex(2), replicaSelector.targetIdx) + s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx) + assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + s.Equal(replicaSelector.targetReplica().attempts, 2) + s.Equal(replicaSelector.proxyReplica().attempts, 1) + + // When the current proxy node fails, it should try another one. + lastProxy := replicaSelector.proxyIdx + replicaSelector.onSendFailure(s.bo, nil) + rpcCtx, err = replicaSelector.next(s.bo) + s.NotNil(rpcCtx) + s.Nil(err) + state, ok = replicaSelector.state.(*tryNewProxy) + s.True(ok) + s.Equal(regionStore.workTiKVIdx, state.leaderIdx) + s.Equal(AccessIndex(2), replicaSelector.targetIdx) + s.NotEqual(lastProxy, replicaSelector.proxyIdx) + s.Equal(replicaSelector.targetReplica().attempts, 3) + s.Equal(replicaSelector.proxyReplica().attempts, 1) + + // Test proxy store is saves when proxy is enabled + replicaSelector.onSendSuccess() regionStore = region.getStore() - leaderStore, leaderPeer, _, _ = region.WorkStorePeer(regionStore) - s.Equal(leaderStore, leader.store) - s.Equal(leaderPeer, leader.peer) - - // Give the leader one more chance even if it exceeds the maxReplicaAttempt. - replicaSelector, _ = newReplicaSelector(cache, regionLoc.Region) - leader = replicaSelector.replicas[0] - leader.attempts = maxReplicaAttempt - replicaSelector.updateLeader(leader.peer) - s.Equal(leader.attempts, maxReplicaAttempt-1) - rpcCtx, _ = replicaSelector.next(s.bo) - assertRPCCtxEqual(rpcCtx, leader) - s.Equal(leader.attempts, maxReplicaAttempt) + s.Equal(replicaSelector.proxyIdx, regionStore.proxyTiKVIdx) + + // Test initial state is accessByKnownProxy when proxyTiKVIdx is valid + refreshEpochs(regionStore) + cache.enableForwarding = true + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region) + s.Nil(err) + s.NotNil(replicaSelector) + state2, ok := replicaSelector.state.(*accessByKnownProxy) + s.True(ok) + s.Equal(regionStore.workTiKVIdx, state2.leaderIdx) + _, err = replicaSelector.next(s.bo) + s.Nil(err) + assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + + // Switch to tryNewProxy if the current proxy is not available + replicaSelector.onSendFailure(s.bo, nil) + s.IsType(&tryNewProxy{}, replicaSelector.state) + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(err) + assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + s.Equal(regionStore.workTiKVIdx, state2.leaderIdx) + s.Equal(AccessIndex(2), replicaSelector.targetIdx) + s.NotEqual(regionStore.proxyTiKVIdx, replicaSelector.proxyIdx) + s.Equal(replicaSelector.targetReplica().attempts, 2) + s.Equal(replicaSelector.proxyReplica().attempts, 1) // Invalidate the region if the leader is not in the region. region.lastAccess = time.Now().Unix() @@ -458,18 +491,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { rpcCtx, err = replicaSelector.next(s.bo) s.Nil(rpcCtx) s.Nil(err) - - // Verify on send success. - region.lastAccess = time.Now().Unix() - replicaSelector, _ = newReplicaSelector(cache, regionLoc.Region) - replicaSelector.next(s.bo) - rpcCtx, err = replicaSelector.next(s.bo) - s.Nil(err) - replicaSelector.OnSendSuccess() - // Verify the regionStore is updated and the workTiKVIdx points to the leader. - leaderStore, leaderPeer, _, _ = region.WorkStorePeer(region.getStore()) - s.Equal(leaderStore, rpcCtx.Store) - s.Equal(leaderPeer, rpcCtx.Peer) } // TODO(youjiali1995): Remove duplicated tests. This test may be duplicated with other @@ -514,7 +535,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) - s.Equal(sender.leaderReplicaSelector.nextReplicaIdx, 2) + s.Equal(sender.leaderReplicaSelector.targetIdx, AccessIndex(1)) s.True(bo.GetTotalBackoffTimes() == 1) s.cluster.StartStore(s.storeIDs[0]) @@ -523,7 +544,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) - s.Equal(sender.leaderReplicaSelector.nextReplicaIdx, 1) + s.Equal(sender.leaderReplicaSelector.targetIdx, AccessIndex(1)) s.True(bo.GetTotalBackoffTimes() == 0) // Switch to the next peer due to leader failure but the new leader is not elected. @@ -534,7 +555,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(sender.leaderReplicaSelector.isExhausted()) s.Equal(bo.GetTotalBackoffTimes(), 1) s.cluster.StartStore(s.storeIDs[1]) @@ -554,7 +574,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.Nil(err) s.True(hasFakeRegionError(resp)) s.Equal(bo.GetTotalBackoffTimes(), 3) - s.True(sender.leaderReplicaSelector.isExhausted()) s.False(sender.leaderReplicaSelector.region.isValid()) s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) @@ -569,7 +588,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.True(sender.leaderReplicaSelector.isExhausted()) s.False(sender.leaderReplicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) s.cluster.StartStore(s.storeIDs[0]) @@ -604,7 +622,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.True(sender.leaderReplicaSelector.isExhausted()) s.False(sender.leaderReplicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) }() @@ -625,7 +642,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.True(sender.leaderReplicaSelector.isExhausted()) s.False(sender.leaderReplicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() @@ -645,7 +661,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.True(sender.leaderReplicaSelector.isExhausted()) s.False(sender.leaderReplicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() @@ -679,7 +694,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) } - s.False(sender.leaderReplicaSelector.isExhausted()) s.False(sender.leaderReplicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }()