Skip to content

Commit

Permalink
set influence according to region size
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jul 4, 2019
1 parent cb2397b commit 66e1d75
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 39 deletions.
53 changes: 37 additions & 16 deletions server/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
ops := s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.TransferLeader{FromStore: 6, ToStore: 5},
schedule.AddLightLearner{ToStore: 1, PeerID: 1},
schedule.AddLearner{ToStore: 1, PeerID: 1},
schedule.PromoteLearner{ToStore: 1, PeerID: 1},
schedule.RemovePeer{FromStore: 2},
schedule.AddLightLearner{ToStore: 4, PeerID: 2},
schedule.AddLearner{ToStore: 4, PeerID: 2},
schedule.PromoteLearner{ToStore: 4, PeerID: 2},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLightLearner{ToStore: 4, PeerID: 3},
schedule.AddLearner{ToStore: 4, PeerID: 3},
schedule.PromoteLearner{ToStore: 4, PeerID: 3},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
Expand Down Expand Up @@ -240,13 +240,13 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLightLearner{ToStore: 1, PeerID: 4},
schedule.AddLearner{ToStore: 1, PeerID: 4},
schedule.PromoteLearner{ToStore: 1, PeerID: 4},
schedule.RemovePeer{FromStore: 3},
schedule.AddLightLearner{ToStore: 4, PeerID: 5},
schedule.AddLearner{ToStore: 4, PeerID: 5},
schedule.PromoteLearner{ToStore: 4, PeerID: 5},
schedule.RemovePeer{FromStore: 6},
schedule.AddLightLearner{ToStore: 5, PeerID: 6},
schedule.AddLearner{ToStore: 5, PeerID: 6},
schedule.PromoteLearner{ToStore: 5, PeerID: 6},
schedule.TransferLeader{FromStore: 2, ToStore: 1},
schedule.RemovePeer{FromStore: 2},
Expand All @@ -268,17 +268,38 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
oc := schedule.NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream())
s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour
s.cluster.ScheduleOptions.StoreBalanceRate = 0.0
s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}), core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}))
s.cluster.ScheduleOptions.StoreBalanceRate = 60
s.regions[2] = s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}),
core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}),
)
s.cluster.PutRegion(s.regions[2])
ops := s.mc.Check(s.regions[2])
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
c.Assert(ops, NotNil)
// The size of Region is less or equal than 1MB.
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
s.regions[2] = s.regions[2].Clone(
core.SetApproximateSize(2),
core.SetApproximateKeys(2),
)
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is more than 1MB but no more than 20MB.
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
c.Assert(oc.AddOperator(ops...), IsTrue)
c.Assert(oc.AddOperator(ops...), IsFalse)
}
26 changes: 20 additions & 6 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (
RegionOperatorWaitTime = 10 * time.Minute
// RegionInfluence represents the influence of a operator step, which is used by ratelimit.
RegionInfluence int64 = 1000
// smallRegionInfluence represents the influence of a operator step, which is used by ratelimit.
smallRegionInfluence int64 = 200
// smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it.
smallRegionThreshold int64 = 20
)

// OperatorStep describes the basic scheduling steps that can not be subdivided.
Expand Down Expand Up @@ -98,9 +102,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool {
func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// AddLearner is an OperatorStep that adds a region learner peer.
Expand Down Expand Up @@ -128,9 +137,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter.
Expand Down Expand Up @@ -749,11 +763,11 @@ func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.Regio
}
if cluster.IsRaftLearnerEnabled() {
addSteps = append(addSteps,
AddLightLearner{ToStore: storeID, PeerID: peer.Id},
AddLearner{ToStore: storeID, PeerID: peer.Id},
PromoteLearner{ToStore: storeID, PeerID: peer.Id},
)
} else {
addSteps = append(addSteps, AddLightPeer{ToStore: storeID, PeerID: peer.Id})
addSteps = append(addSteps, AddPeer{ToStore: storeID, PeerID: peer.Id})
}
toAdds = append(toAdds, addSteps)

Expand Down
34 changes: 17 additions & 17 deletions server/schedule/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe
leader = peer
}
}
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10))
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50))
return regionInfo
}

Expand Down Expand Up @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 0,
LeaderCount: 0,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: 0,
RegionCount: 0,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

RemovePeer{FromStore: 1}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: false}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: true}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -2,
RegionSize: -10,
RegionSize: -50,
RegionCount: -2,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 0,
StepCost: 1000,
})
Expand Down

0 comments on commit 66e1d75

Please sign in to comment.