Skip to content

Commit

Permalink
scheduler: fix policy with write-leader in hot scheduler (#5526)
Browse files Browse the repository at this point in the history
ref #4949

Signed-off-by: HunDunDM <[email protected]>
Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
HunDunDM and lhy1024 authored Sep 17, 2022
1 parent 1218b4d commit d269010
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 16 deletions.
51 changes: 35 additions & 16 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ type balanceSolver struct {
minorDecRatio float64
maxPeerNum int
minHotDegree int

checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -460,6 +462,18 @@ func (bs *balanceSolver) init() {
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()
bs.pickCheckPolicy()
}

func (bs *balanceSolver) pickCheckPolicy() {
switch {
case bs.resourceTy == writeLeader:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
case bs.sche.conf.IsStrictPickingStoreEnabled():
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf
default:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAnyOf
}
}

func (bs *balanceSolver) isSelectedDim(dim int) bool {
Expand All @@ -483,14 +497,14 @@ func (bs *balanceSolver) getPriorities() []string {
}

func newBalanceSolver(sche *hotScheduler, cluster schedule.Cluster, rwTy statistics.RWType, opTy opType) *balanceSolver {
solver := &balanceSolver{
bs := &balanceSolver{
Cluster: cluster,
sche: sche,
rwTy: rwTy,
opTy: opTy,
}
solver.init()
return solver
bs.init()
return bs
}

func (bs *balanceSolver) isValid() bool {
Expand Down Expand Up @@ -688,7 +702,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
continue
}

if bs.checkSrcByDimPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
Expand All @@ -698,7 +712,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
return ret
}

func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
return bs.checkByPriorityAndTolerance(minLoad.Loads, func(i int) bool {
return minLoad.Loads[i] > toleranceRatio*expectLoad.Loads[i]
})
Expand Down Expand Up @@ -919,23 +933,28 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
})
}

func (bs *balanceSolver) checkByPriorityAndTolerance(s interface{}, p func(int) bool) bool {
if bs.sche.conf.IsStrictPickingStoreEnabled() {
return slice.AllOf(s, func(i int) bool {
if bs.isSelectedDim(i) {
return p(i)
}
return true
})
}
return slice.AnyOf(s, func(i int) bool {
func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return p(i)
return f(i)
}
return true
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return false
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) isUniformFirstPriority(store *statistics.StoreLoadDetail) bool {
// first priority should be more uniform than second priority
return store.IsUniform(bs.firstPriority, stddevThreshold*0.5)
Expand Down
223 changes: 223 additions & 0 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,3 +2598,226 @@ func TestMaxZombieDuration(t *testing.T) {
re.Equal(time.Duration(testCase.maxZombieDur)*time.Second, bs.calcMaxZombieDur())
}
}

type expectTestCase struct {
strict bool
isSrc bool
allow bool
toleranceRatio float64
rs resourceType
load *statistics.StoreLoad
expect *statistics.StoreLoad
}

func TestExpect(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
hb, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder("hot-region", nil))
re.NoError(err)
testCases := []expectTestCase{
// test src, it will be allowed when loads are higher than expect
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are higher than expect, allow schedule
Loads: []float64{2.0, 2.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are higher than expect, but lower than expect*toleranceRatio, not allow schedule
Loads: []float64{2.0, 2.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
toleranceRatio: 2.2,
allow: false,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // only queryDim is lower, but the dim is no selected, allow schedule
Loads: []float64{2.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // only keyDim is lower, and the dim is selected, not allow schedule
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // keyDim is higher, and the dim is selected, allow schedule
Loads: []float64{1.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, not allow schedule
Loads: []float64{1.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // all dims are lower than expect, not allow schedule
Loads: []float64{1.0, 1.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: true,
allow: false,
},
{
strict: true,
rs: writeLeader,
load: &statistics.StoreLoad{ // only keyDim is higher, but write leader only consider the first priority
Loads: []float64{1.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
// test dst, it will be allowed when loads are lower than expect
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are lower than expect, allow schedule
Loads: []float64{1.0, 1.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are lower than expect, but higher than expect*toleranceRatio, not allow schedule
Loads: []float64{1.0, 1.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
toleranceRatio: 2.0,
allow: false,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, allow schedule
Loads: []float64{1.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // byteDim is higher, and the dim is selected, not allow schedule
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // keyDim is lower, the dim is selected, allow schedule
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // although queryDim is lower, the dim is no selected, not allow schedule
Loads: []float64{2.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // all dims are higher than expect, not allow schedule
Loads: []float64{2.0, 2.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: false,
allow: false,
},
{
strict: true,
rs: writeLeader,
load: &statistics.StoreLoad{ // only keyDim is lower, but write leader only consider the first priority
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
}
for _, testCase := range testCases {
toleranceRatio := testCase.toleranceRatio
if toleranceRatio == 0.0 {
toleranceRatio = 1.0 // default for test case
}
bs := &balanceSolver{
sche: hb.(*hotScheduler),
firstPriority: statistics.KeyDim,
secondPriority: statistics.ByteDim,
resourceTy: testCase.rs,
}
bs.sche.conf.StrictPickingStore = testCase.strict
bs.pickCheckPolicy()
if testCase.isSrc {
re.Equal(testCase.allow, bs.checkSrcByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio))
} else {
re.Equal(testCase.allow, bs.checkDstByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio))
}
}
}

0 comments on commit d269010

Please sign in to comment.