Skip to content

Commit

Permalink
support range for schedulers
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 22, 2019
1 parent 752ecd7 commit a01903d
Show file tree
Hide file tree
Showing 21 changed files with 268 additions and 102 deletions.
12 changes: 6 additions & 6 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,18 +617,18 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo {
}

// RandLeaderRegion returns a random region that has leader on the store.
func (c *RaftCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, opts...)
func (c *RaftCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, ranges, opts...)
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (c *RaftCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, opts...)
func (c *RaftCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, ranges, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *RaftCluster) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, opts...)
func (c *RaftCluster) RandPendingRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, ranges, opts...)
}

// RandHotRegionFromStore randomly picks a hot region in specified store.
Expand Down
10 changes: 5 additions & 5 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,10 +841,10 @@ func (s *testRegionsInfoSuite) Test(c *C) {
}

for i := uint64(0); i < n; i++ {
region := cache.RandLeaderRegion(i, core.HealthRegion())
region := cache.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion())
c.Assert(region.GetLeader().GetStoreId(), Equals, i)

region = cache.RandFollowerRegion(i, core.HealthRegion())
region = cache.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion())
c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i)

c.Assert(region.GetStorePeer(i), NotNil)
Expand All @@ -860,14 +860,14 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// All regions will be filtered out if they have pending peers.
for i := uint64(0); i < n; i++ {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := cache.RandLeaderRegion(i, core.HealthRegion())
region := cache.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion())
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
}
c.Assert(cache.RandLeaderRegion(i, core.HealthRegion()), IsNil)
c.Assert(cache.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
}
for i := uint64(0); i < n; i++ {
c.Assert(cache.RandFollowerRegion(i, core.HealthRegion()), IsNil)
c.Assert(cache.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
}
}

Expand Down
32 changes: 23 additions & 9 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,24 @@ func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regio
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.RandFollowerRegion(storeID, opts...)
return bc.Regions.RandFollowerRegion(storeID, ranges, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.RandLeaderRegion(storeID, opts...)
return bc.Regions.RandLeaderRegion(storeID, ranges, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.RandPendingRegion(storeID, opts...)
return bc.Regions.RandPendingRegion(storeID, ranges, opts...)
}

// GetRegionCount gets the total count of RegionInfo of regionMap.
Expand Down Expand Up @@ -308,9 +308,9 @@ func (bc *BasicCluster) Length() int {

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *RegionInfo
Expand All @@ -335,3 +335,17 @@ type StoreSetController interface {

AttachAvailableFunc(id uint64, f func() bool)
}

// KeyRange is a key range.
type KeyRange struct {
StartKey []byte `json:"start-key"`
EndKey []byte `json:"end-key"`
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange {
StartKey: []byte(startKey),
EndKey: []byte(endKey),
}
}
23 changes: 13 additions & 10 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"strings"
"math/rand"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -746,23 +747,23 @@ func (r *RegionsInfo) GetStoreLearnerCount(storeID uint64) int {
}

// RandRegion get a region by random
func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo {
return randRegion(r.tree, opts...)
func (r *RegionsInfo) RandRegion(ranges []KeyRange, opts ...RegionOption) *RegionInfo {
return randRegion(r.tree, ranges, opts...)
}

// RandPendingRegion randomly gets a store's region with a pending peer.
func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], opts...)
func (r *RegionsInfo) RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], ranges, opts...)
}

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID], opts...)
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID],ranges, opts...)
}

// RandFollowerRegion randomly gets a store's follower region.
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.followers[storeID], opts...)
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
return randRegion(r.followers[storeID], ranges, opts...)
}

// GetLeader return leader RegionInfo by storeID and regionID(now only used in test)
Expand Down Expand Up @@ -827,9 +828,11 @@ type RegionsContainer interface {
RandomRegion(startKey, endKey []byte) *RegionInfo
}

func randRegion(regions RegionsContainer, opts ...RegionOption) *RegionInfo {
func randRegion(regions RegionsContainer, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
for i := 0; i < randomRegionMaxRetry; i++ {
region := regions.RandomRegion(nil, nil)
idx:=rand.Intn(len(ranges))
r:=ranges[idx]
region := regions.RandomRegion(r.StartKey, r.EndKey)
if region == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func BenchmarkRandomRegion(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
regions.RandRegion()
regions.RandRegion([]KeyRange{NewKeyRange("","")})
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/namespace_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (c *namespaceCluster) checkRegion(region *core.RegionInfo) bool {
const randRegionMaxRetry = 10

// RandFollowerRegion returns a random region that has a follower on the store.
func (c *namespaceCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
func (c *namespaceCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
for i := 0; i < randRegionMaxRetry; i++ {
r := c.Cluster.RandFollowerRegion(storeID, opts...)
r := c.Cluster.RandFollowerRegion(storeID, ranges, opts...)
if r == nil {
return nil
}
Expand All @@ -77,9 +77,9 @@ func (c *namespaceCluster) RandFollowerRegion(storeID uint64, opts ...core.Regio
}

// RandLeaderRegion returns a random region that has leader on the store.
func (c *namespaceCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
func (c *namespaceCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
for i := 0; i < randRegionMaxRetry; i++ {
r := c.Cluster.RandLeaderRegion(storeID, opts...)
r := c.Cluster.RandLeaderRegion(storeID, ranges, opts...)
if r == nil {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ func (r *RangeCluster) GetTolerantSizeRatio() float64 {
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (r *RangeCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandFollowerRegion(storeID, opts...)
func (r *RangeCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandFollowerRegion(storeID, ranges, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (r *RangeCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandLeaderRegion(storeID, opts...)
func (r *RangeCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandLeaderRegion(storeID, ranges, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
Expand Down
4 changes: 3 additions & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func init() {
}
conf.LeaderLimit = defaultAdjacentLeaderLimit
conf.PeerLimit = defaultAdjacentPeerLimit
conf.Name=balanceAdjacentRegionName
return nil
}
})
Expand All @@ -77,6 +78,7 @@ func init() {
}

type balanceAdjacentRegionConfig struct {
Name string `json:"name"`
LeaderLimit uint64 `json:"leader-limit"`
PeerLimit uint64 `json:"peer-limit"`
}
Expand Down Expand Up @@ -128,7 +130,7 @@ func newBalanceAdjacentRegionScheduler(opController *schedule.OperatorController
}

func (l *balanceAdjacentRegionScheduler) GetName() string {
return balanceAdjacentRegionName
return l.conf.Name
}

func (l *balanceAdjacentRegionScheduler) GetType() string {
Expand Down
38 changes: 27 additions & 11 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,44 @@ import (
"github.com/pingcap/pd/server/schedule/selector"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"github.com/pkg/errors"
)

func init() {
schedule.RegisterSliceDecoderBuilder("balance-leader", func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*balanceLeaderSchedulerConfig)
if !ok {
return ErrScheduleConfigNotExist
}
ranges, err := getKeyRanges(args)
if err != nil {
return errors.WithStack(err)
}
conf.Ranges = ranges
conf.Name=balanceRegionName
return nil
}
})

schedule.RegisterScheduler("balance-leader", func(opController *schedule.OperatorController, storage *core.Storage, mapper schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newBalanceLeaderScheduler(opController), nil
schedule.RegisterScheduler("balance-leader", func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceLeaderSchedulerConfig{}
decoder(conf)
return newBalanceLeaderScheduler(opController, conf), nil
})
}

type balanceLeaderSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
}

// balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store.
const balanceLeaderRetryLimit = 10

type balanceLeaderScheduler struct {
*baseScheduler
name string
conf *balanceLeaderSchedulerConfig
selector *selector.BalanceSelector
taintStores *cache.TTLUint64
opController *schedule.OperatorController
Expand All @@ -54,12 +72,13 @@ type balanceLeaderScheduler struct {

// newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on
// each store balanced.
func newBalanceLeaderScheduler(opController *schedule.OperatorController, opts ...BalanceLeaderCreateOption) schedule.Scheduler {
func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *balanceLeaderSchedulerConfig, opts ...BalanceLeaderCreateOption) schedule.Scheduler {
taintStores := newTaintCache()
base := newBaseScheduler(opController)

s := &balanceLeaderScheduler{
baseScheduler: base,
conf: conf,
taintStores: taintStores,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -89,15 +108,12 @@ func WithBalanceLeaderCounter(counter *prometheus.CounterVec) BalanceLeaderCreat
// WithBalanceLeaderName sets the name for the scheduler.
func WithBalanceLeaderName(name string) BalanceLeaderCreateOption {
return func(s *balanceLeaderScheduler) {
s.name = name
s.conf.Name = name
}
}

func (l *balanceLeaderScheduler) GetName() string {
if l.name != "" {
return l.name
}
return "balance-leader-scheduler"
return l.conf.Name
}

func (l *balanceLeaderScheduler) GetType() string {
Expand Down Expand Up @@ -168,7 +184,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(cluster opt.Cluster, source *core.StoreInfo) []*operator.Operator {
sourceID := source.GetID()
region := cluster.RandLeaderRegion(sourceID, core.HealthRegion())
region := cluster.RandLeaderRegion(sourceID, l.conf.Ranges, core.HealthRegion())
if region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", sourceID))
schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc()
Expand All @@ -188,7 +204,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(cluster opt.Cluster, source *
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(cluster opt.Cluster, target *core.StoreInfo) []*operator.Operator {
targetID := target.GetID()
region := cluster.RandFollowerRegion(targetID, core.HealthRegion())
region := cluster.RandFollowerRegion(targetID, l.conf.Ranges, core.HealthRegion())
if region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", targetID))
schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc()
Expand Down
Loading

0 comments on commit a01903d

Please sign in to comment.