Skip to content

Commit

Permalink
scheduler: fix region scatter may transfer leader to removed peer
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Mar 27, 2019
1 parent 5717017 commit 1d01343
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
21 changes: 21 additions & 0 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,24 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct

return intersection
}

// CheckOperatorValied checks if the operator is valid.
func CheckOperatorValid(op *Operator) bool {
removeStores := []uint64{}
for _, step := range op.steps {
if tr, ok := step.(TransferLeader); ok {
for _, store := range removeStores {
if store == tr.FromStore {
return false
}
if store == tr.ToStore {
return false
}
}
}
if rp, ok := step.(RemovePeer); ok {
removeStores = append(removeStores, rp.FromStore)
}
}
return true
}
6 changes: 4 additions & 2 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {

stores := r.collectAvailableStores(region)
var kind OperatorKind
newRegion := region.Clone()
for _, peer := range region.GetPeers() {
if len(stores) == 0 {
// Reset selected stores if we have no available stores.
Expand All @@ -116,13 +117,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())

op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
op, err := CreateMovePeerOperator("scatter-peer", r.cluster, newRegion, OpAdmin,
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
continue
}
steps = append(steps, op.steps...)
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
newRegion = newRegion.Clone(core.WithRemoveStorePeer(peer.GetStoreId()), core.WithAddPeer(newPeer))

kind |= op.Kind()
}

Expand Down
7 changes: 6 additions & 1 deletion server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) {
s.scatter(c, 5, 5)
}

func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) {
c.Assert(schedule.CheckOperatorValid(op), IsTrue)
}

func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
opt := schedule.NewMockSchedulerOptions()
tc := schedule.NewMockCluster(opt)
Expand All @@ -184,7 +188,7 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
}

// Add regions 1~4.
seq := newSequencer(numStores)
seq := newSequencer(3)
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
tc.AddLeaderRegion(1, 1, 2, 3)
for i := uint64(2); i <= numRegions; i++ {
Expand All @@ -196,6 +200,7 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
if op := scatterer.Scatter(region); op != nil {
s.checkOperator(op, c)
tc.ApplyOperator(op)
}
}
Expand Down

0 comments on commit 1d01343

Please sign in to comment.