Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: balance region consider pending peer #1617

Merged
merged 4 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOpti
return c.core.RandFollowerRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *clusterInfo) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.core.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (c *clusterInfo) GetAverageRegionSize() int64 {
c.RLock()
Expand Down
5 changes: 5 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *
return bc.Regions.RandLeaderRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return bc.Regions.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (bc *BasicCluster) GetAverageRegionSize() int64 {
return bc.Regions.GetAverageRegionSize()
Expand Down
9 changes: 7 additions & 2 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,17 @@ func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo {
return randRegion(r.regions, opts...)
}

// RandLeaderRegion get a store's leader region by random
// RandPendingRegion randomly gets a store's region with a pending peer.
func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], opts...)
}

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID], opts...)
}

// RandFollowerRegion get a store's follower region by random
// RandFollowerRegion randomly gets a store's follower region.
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.followers[storeID], opts...)
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ func HealthRegion() RegionOption {
}
}

// HealthRegionAllowPending checks if the region is healthy with allowing the pending peer.
func HealthRegionAllowPending() RegionOption {
return func(region *RegionInfo) bool {
return len(region.downPeers) == 0 && len(region.learners) == 0
}
}

// RegionCreateOption used to create region.
type RegionCreateOption func(region *RegionInfo)

Expand Down
14 changes: 9 additions & 5 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,15 @@ func (f StoreStateFilter) FilterTarget(opt Options, store *core.StoreInfo) bool
return true
}

if f.MoveRegion && f.filterMoveRegion(opt, store) {
return true
if f.MoveRegion {
// only target consider the pending peers because pending more means the disk is slower.
if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}

if f.filterMoveRegion(opt, store) {
return true
}
}
return false
}
Expand All @@ -430,9 +437,6 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b
return true
}

if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}
if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() {
Expand Down
18 changes: 12 additions & 6 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@ import (
"go.uber.org/zap"
)

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
// RegionSetInformer provides access to a shared informer of regions.
// TODO: move to core package
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *core.RegionInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo
}

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
RegionSetInformer
GetStores() []*core.StoreInfo
GetStore(id uint64) *core.StoreInfo
GetRegion(id uint64) *core.RegionInfo

GetRegionStores(region *core.RegionInfo) []*core.StoreInfo
GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo
GetLeaderStore(region *core.RegionInfo) *core.StoreInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo

BlockStore(id uint64) error
UnblockStore(id uint64)

Expand Down
11 changes: 8 additions & 3 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,15 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.
opInfluence := s.opController.GetOpInfluence(cluster)
var hasPotentialTarget bool
for i := 0; i < balanceRegionRetryLimit; i++ {
// Priority the region that has a follower in the source store.
region := cluster.RandFollowerRegion(sourceID, core.HealthRegion())
// Priority picks the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
region := cluster.RandPendingRegion(sourceID, core.HealthRegionAllowPending())
if region == nil {
// Then the region has the leader in the source store
// Then picks the region that has a follower in the source store.
region = cluster.RandFollowerRegion(sourceID, core.HealthRegion())
}
if region == nil {
// Last, picks the region has the leader in the source store.
region = cluster.RandLeaderRegion(sourceID, core.HealthRegion())
}
if region == nil {
Expand Down
29 changes: 29 additions & 0 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,35 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) {
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 3)
}

func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
oc := schedule.NewOperatorController(nil, nil)

newTestReplication(opt, 3, "zone", "rack", "host")

sb, err := schedule.CreateScheduler("balance-region", oc)
c.Assert(err, IsNil)

// Store 1 has the largest region score, so the balancer try to replace peer in store 1.
tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 7, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"})
tc.AddLabelsStore(3, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"})
// Store 4 has smaller region score than store 1 and more better place than store 2.
tc.AddLabelsStore(4, 10, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})

// set pending peer
tc.AddLeaderRegion(1, 1, 2, 3)
tc.AddLeaderRegion(2, 1, 2, 3)
tc.AddLeaderRegion(3, 2, 1, 3)
region := tc.GetRegion(3)
region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(1)}))
tc.PutRegion(region)

c.Assert(sb.Schedule(tc)[0].RegionID(), Equals, uint64(3))
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 4)
}

var _ = Suite(&testReplicaCheckerSuite{})

type testReplicaCheckerSuite struct{}
Expand Down