From 7a8fa44f0455e73c78d08b1750d3c1c6c452127a Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 20 Jul 2023 18:46:30 +0800 Subject: [PATCH 1/5] fix unexpected slow query during GC running after stop 1 tikv-server Signed-off-by: crazycs520 --- internal/locate/region_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c2177b2433..b742473457 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -554,7 +554,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) selectReplica := selector.replicas[idx] - if state.isCandidate(idx, selectReplica) { + if state.isCandidate(idx, selectReplica) && selectReplica.store.getLivenessState() != unreachable { state.lastIdx = idx selector.targetIdx = idx break From a2e84f48ae16f0b02a5ff66b0889aeba6f62dbda Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 21 Jul 2023 17:11:07 +0800 Subject: [PATCH 2/5] add test Signed-off-by: crazycs520 --- internal/locate/region_request3_test.go | 77 +++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 4a7b7dee66..a2e374a59c 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -929,3 +929,80 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg // after region error returned, the region should be invalidated. s.False(region.isValid()) } + +func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() { + var leaderAddr string + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + // Returns error when accesses non-leader. + if leaderAddr != addr { + return nil, context.DeadlineExceeded + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + Value: []byte("value"), + }}, nil + }} + + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: []byte("key"), + }) + req.ReplicaReadType = kv.ReplicaReadMixed + + loc, err := s.cache.LocateKey(s.bo, []byte("key")) + s.Nil(err) + region := s.cache.GetCachedRegionWithRLock(loc.Region) + s.NotNil(region) + regionStore := region.getStore() + leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr + s.NotEqual(leaderAddr, "") + for i := 0; i < 10; i++ { + bo := retry.NewBackofferWithVars(context.Background(), 100, nil) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + s.NotNil(resp) + + // Since send req to follower will receive error, then all follower will be marked as unreachable and epoch stale. + allFollowerStoreEpochStale := true + for i, store := range regionStore.stores { + if i == int(regionStore.workTiKVIdx) { + continue + } + if store.epoch == regionStore.storeEpochs[i] { + allFollowerStoreEpochStale = false + break + } else { + s.Equal(store.getLivenessState(), unreachable) + } + } + if allFollowerStoreEpochStale { + break + } + } + + // mock for GC leader reload all regions. + bo := retry.NewBackofferWithVars(context.Background(), 10, nil) + _, err = s.cache.BatchLoadRegionsWithKeyRange(bo, []byte(""), nil, 1) + s.Nil(err) + + loc, err = s.cache.LocateKey(s.bo, []byte("key")) + s.Nil(err) + region = s.cache.GetCachedRegionWithRLock(loc.Region) + s.NotNil(region) + regionStore = region.getStore() + for i, store := range regionStore.stores { + if i == int(regionStore.workTiKVIdx) { + continue + } + // After reload region, the region epoch will be updated, but the store liveness state is still unreachable. + s.Equal(store.epoch, regionStore.storeEpochs[i]) + s.Equal(store.getLivenessState(), unreachable) + } + + for i := 0; i < 100; i++ { + bo := retry.NewBackofferWithVars(context.Background(), 1, nil) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + s.NotNil(resp) + // since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0. + s.Equal(0, bo.GetTotalBackoffTimes()) + } +} From f09ff7d705a9e6601269a71a9ed3b72de09faf40 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 24 Jul 2023 11:07:46 +0800 Subject: [PATCH 3/5] address comment Signed-off-by: crazycs520 --- internal/locate/region_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index b742473457..7a82e5b6af 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -385,7 +385,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } targetReplica = selector.replicas[idx] // Each follower is only tried once - if !targetReplica.isExhausted(1) { + if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable{ state.lastIdx = idx selector.targetIdx = idx break From c23a5d025a517dcad3ee577c849d37085fb8ba86 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 24 Jul 2023 13:05:37 +0800 Subject: [PATCH 4/5] address comment Signed-off-by: crazycs520 --- internal/locate/region_request.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 7a82e5b6af..1edad08e88 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -385,7 +385,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } targetReplica = selector.replicas[idx] // Each follower is only tried once - if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable{ + if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable { state.lastIdx = idx selector.targetIdx = idx break @@ -554,7 +554,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) selectReplica := selector.replicas[idx] - if state.isCandidate(idx, selectReplica) && selectReplica.store.getLivenessState() != unreachable { + if state.isCandidate(idx, selectReplica) { state.lastIdx = idx selector.targetIdx = idx break @@ -604,7 +604,9 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool // The request can only be sent to the leader. ((state.option.leaderOnly && idx == state.leaderIdx) || // Choose a replica with matched labels. - (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) + (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) && + // Make sure the replica is not unreachable. + replica.store.getLivenessState() != unreachable } type invalidStore struct { From 704a482e4720d81be8114b1bfbe78cf040824303 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 24 Jul 2023 15:42:15 +0800 Subject: [PATCH 5/5] fix test Signed-off-by: crazycs520 --- internal/locate/region_request3_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index a2e374a59c..62912d1431 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -276,6 +276,12 @@ func refreshEpochs(regionStore *regionStore) { } } +func refreshLivenessStates(regionStore *regionStore) { + for _, store := range regionStore.stores { + atomic.StoreUint32(&store.livenessState, uint32(reachable)) + } +} + func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) @@ -511,6 +517,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Test accessFollower state with kv.ReplicaReadFollower request type. req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil) refreshEpochs(regionStore) + refreshLivenessStates(regionStore) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) @@ -625,10 +632,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { region, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(region) + regionStore := s.cache.GetCachedRegionWithRLock(region.Region).getStore() + s.NotNil(regionStore) reloadRegion := func() { s.regionRequestSender.replicaSelector.region.invalidate(Other) region, _ = s.cache.LocateRegionByID(s.bo, s.regionID) + regionStore = s.cache.GetCachedRegionWithRLock(region.Region).getStore() } hasFakeRegionError := func(resp *tikvrpc.Response) bool { @@ -660,6 +670,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) s.True(bo.GetTotalBackoffTimes() == 1) s.cluster.StartStore(s.storeIDs[0]) + atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable)) // Leader is updated because of send success, so no backoff. bo = retry.NewBackoffer(context.Background(), -1) @@ -679,6 +690,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.True(hasFakeRegionError(resp)) s.Equal(bo.GetTotalBackoffTimes(), 1) s.cluster.StartStore(s.storeIDs[1]) + atomic.StoreUint32(®ionStore.stores[1].livenessState, uint32(reachable)) // Leader is changed. No backoff. reloadRegion() @@ -695,7 +707,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped + s.Equal(bo.GetTotalBackoffTimes(), 3) s.False(sender.replicaSelector.region.isValid()) s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])