Skip to content

Commit

Permalink
select replica for forward requests using RegionRequestSender (#264)
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <[email protected]>
  • Loading branch information
sticnarf authored Aug 16, 2021
1 parent df2119f commit b440ea2
Show file tree
Hide file tree
Showing 6 changed files with 583 additions and 385 deletions.
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db h1:PSW6P83KZi5WopPBiecU286PWMSl2rvxCBZT94iBX+I=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1 h1:zl56/I6s/UMG/kH+9epaIAiwfIx2gZO5FW8hvkhDYAg=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
3 changes: 0 additions & 3 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,7 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db h1:PSW6P83KZi5WopPBiecU286PWMSl2rvxCBZT94iBX+I=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1 h1:zl56/I6s/UMG/kH+9epaIAiwfIx2gZO5FW8hvkhDYAg=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
148 changes: 40 additions & 108 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,17 +458,16 @@ func (c *RegionCache) SetPDClient(client pd.Client) {

// RPCContext contains data that is needed to send RPC to a region.
type RPCContext struct {
Region RegionVerID
Meta *metapb.Region
Peer *metapb.Peer
AccessIdx AccessIndex
Store *Store
Addr string
AccessMode accessMode
ProxyStore *Store // nil means proxy is not used
ProxyAccessIdx AccessIndex // valid when ProxyStore is not nil
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.
Region RegionVerID
Meta *metapb.Region
Peer *metapb.Peer
AccessIdx AccessIndex
Store *Store
Addr string
AccessMode accessMode
ProxyStore *Store // nil means proxy is not used
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.

tryTimes int
}
Expand All @@ -481,7 +480,7 @@ func (c *RPCContext) String() string {
res := fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d, reqStoreType: %s, runStoreType: %s",
c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.AccessIdx, c.AccessMode, runStoreType)
if c.ProxyStore != nil {
res += fmt.Sprintf(", proxy store id: %d, proxy addr: %s, proxy idx: %d", c.ProxyStore.storeID, c.ProxyAddr, c.ProxyAccessIdx)
res += fmt.Sprintf(", proxy store id: %d, proxy addr: %s", c.ProxyStore.storeID, c.ProxyStore.addr)
}
return res
}
Expand Down Expand Up @@ -573,15 +572,14 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
}

var (
proxyStore *Store
proxyAddr string
proxyAccessIdx AccessIndex
proxyStore *Store
proxyAddr string
)
if c.enableForwarding && isLeaderReq {
if atomic.LoadInt32(&store.needForwarding) == 0 {
if atomic.LoadInt32(&store.unreachable) == 0 {
regionStore.unsetProxyStoreIfNeeded(cachedRegion)
} else {
proxyStore, proxyAccessIdx, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx)
proxyStore, _, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx)
if proxyStore != nil {
proxyAddr, err = c.getStoreAddr(bo, cachedRegion, proxyStore)
if err != nil {
Expand All @@ -592,17 +590,16 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
}

return &RPCContext{
Region: id,
Meta: cachedRegion.meta,
Peer: peer,
AccessIdx: accessIdx,
Store: store,
Addr: addr,
AccessMode: tiKVOnly,
ProxyStore: proxyStore,
ProxyAccessIdx: proxyAccessIdx,
ProxyAddr: proxyAddr,
TiKVNum: regionStore.accessStoreNum(tiKVOnly),
Region: id,
Meta: cachedRegion.meta,
Peer: peer,
AccessIdx: accessIdx,
Store: store,
Addr: addr,
AccessMode: tiKVOnly,
ProxyStore: proxyStore,
ProxyAddr: proxyAddr,
TiKVNum: regionStore.accessStoreNum(tiKVOnly),
}, nil
}

Expand Down Expand Up @@ -859,53 +856,21 @@ func (c *RegionCache) OnSendFail(bo *retry.Backoffer, ctx *RPCContext, scheduleR
}

rs := r.getStore()
startForwarding := false
incEpochStoreIdx := -1

if err != nil {
storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx)
leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx

// Mark the store as failure if it's not a redirection request because we
// can't know the status of the proxy store by it.
if ctx.ProxyStore == nil {
// send fail but store is reachable, keep retry current peer for replica leader request.
// but we still need switch peer for follower-read or learner-read(i.e. tiflash)
if leaderReq {
if s.requestLiveness(bo, c) == reachable {
return
} else if c.enableForwarding {
s.startHealthCheckLoopIfNeeded(c)
startForwarding = true
}
}

// invalidate regions in store.
incEpochStoreIdx = c.markRegionNeedBeRefill(s, storeIdx, rs)
}
// invalidate regions in store.
c.markRegionNeedBeRefill(s, storeIdx, rs)
}

// try next peer to found new leader.
if ctx.AccessMode == tiKVOnly {
if startForwarding || ctx.ProxyStore != nil {
var currentProxyIdx AccessIndex = -1
if ctx.ProxyStore != nil {
currentProxyIdx = ctx.ProxyAccessIdx
}
// In case the epoch of the store is increased, try to avoid reloading the current region by also
// increasing the epoch stored in `rs`.
rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx)
logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
} else {
rs.switchNextTiKVPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
}
rs.switchNextTiKVPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
} else {
rs.switchNextFlashPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
Expand Down Expand Up @@ -1457,7 +1422,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
}

func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.needForwarding) == 0 {
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.unreachable) == 0 {
return
}

Expand Down Expand Up @@ -1487,7 +1452,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor
}
storeIdx, store := rs.accessStore(tiKVOnly, AccessIndex(index))
// Skip unreachable stores.
if atomic.LoadInt32(&store.needForwarding) != 0 {
if atomic.LoadInt32(&store.unreachable) != 0 {
continue
}

Expand Down Expand Up @@ -1802,41 +1767,6 @@ func (r *regionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex)
rr.compareAndSwapStore(r, newRegionStore)
}

// switchNextProxyStore switches the index of the peer that will forward requests to the leader to the next peer.
// If proxy is currently not used on this region, the value of `currentProxyIdx` should be -1, and a random peer will
// be select in this case.
func (r *regionStore) switchNextProxyStore(rr *Region, currentProxyIdx AccessIndex, incEpochStoreIdx int) {
if r.proxyTiKVIdx != currentProxyIdx {
return
}

tikvNum := r.accessStoreNum(tiKVOnly)
var nextIdx AccessIndex

// If the region is not using proxy before, randomly select a non-leader peer for the first try.
if currentProxyIdx == -1 {
// Randomly select an non-leader peer
// TODO: Skip unreachable peers here.
nextIdx = AccessIndex(rand.Intn(tikvNum - 1))
if nextIdx >= r.workTiKVIdx {
nextIdx++
}
} else {
nextIdx = (currentProxyIdx + 1) % AccessIndex(tikvNum)
// skips the current workTiKVIdx
if nextIdx == r.workTiKVIdx {
nextIdx = (nextIdx + 1) % AccessIndex(tikvNum)
}
}

newRegionStore := r.clone()
newRegionStore.proxyTiKVIdx = nextIdx
if incEpochStoreIdx >= 0 {
newRegionStore.storeEpochs[incEpochStoreIdx]++
}
rr.compareAndSwapStore(r, newRegionStore)
}

func (r *regionStore) setProxyStoreIdx(rr *Region, idx AccessIndex) {
if r.proxyTiKVIdx == idx {
return
Expand Down Expand Up @@ -1917,10 +1847,11 @@ type Store struct {
storeType tikvrpc.EndpointType // type of the store
tokenCount atomic2.Int64 // used store token count

// whether the store is disconnected due to some reason, therefore requests to the store needs to be
// whether the store is unreachable due to some reason, therefore requests to the store needs to be
// forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store.
// this mechanism is currently only applicable for TiKV stores.
needForwarding int32
unreachable int32
unreachableSince time.Time
}

type resolveState uint64
Expand Down Expand Up @@ -2150,13 +2081,14 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) {
}

// It may be already started by another thread.
if atomic.CompareAndSwapInt32(&s.needForwarding, 0, 1) {
if atomic.CompareAndSwapInt32(&s.unreachable, 0, 1) {
s.unreachableSince = time.Now()
go s.checkUntilHealth(c)
}
}

func (s *Store) checkUntilHealth(c *RegionCache) {
defer atomic.CompareAndSwapInt32(&s.needForwarding, 1, 0)
defer atomic.CompareAndSwapInt32(&s.unreachable, 1, 0)

ticker := time.NewTicker(time.Second)
lastCheckPDTime := time.Now()
Expand Down
56 changes: 0 additions & 56 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"errors"
"fmt"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -709,61 +708,6 @@ func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore() {
s.Nil(err)
}

func (s *testRegionCacheSuite) TestSendFailEnableForwarding() {
s.cache.enableForwarding = true

// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := s.cluster.AllocIDs(2)
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])

var storeState uint32 = uint32(unreachable)
s.cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
return livenessState(atomic.LoadUint32(&storeState))
}

// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
s.Nil(err)
s.Equal(loc1.Region.id, s.region1)

// Invoke OnSendFail so that the store will be marked as needForwarding
ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
s.Nil(err)
s.NotNil(ctx)
s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error"))

// ...then on next retry, proxy will be used
ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
s.Nil(err)
s.NotNil(ctx)
s.NotNil(ctx.ProxyStore)
s.Equal(ctx.ProxyStore.storeID, s.store2)

// Proxy will be also applied to other regions whose leader is on the store
loc2, err := s.cache.LocateKey(s.bo, []byte("x"))
s.Nil(err)
s.Equal(loc2.Region.id, region2)
ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc2.Region, kv.ReplicaReadLeader, 0)
s.Nil(err)
s.NotNil(ctx)
s.NotNil(ctx.ProxyStore)
s.Equal(ctx.ProxyStore.storeID, s.store2)

// Recover the store
atomic.StoreUint32(&storeState, uint32(reachable))
// The proxy should be unset after several retries
for retry := 0; retry < 15; retry++ {
ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
s.Nil(err)
if ctx.ProxyStore == nil {
break
}
time.Sleep(time.Millisecond * 200)
}
s.Nil(ctx.ProxyStore)
}

func (s *testRegionCacheSuite) TestSendFailedInMultipleNode() {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
Expand Down
Loading

0 comments on commit b440ea2

Please sign in to comment.