Skip to content

Commit

Permalink
schedule: one operator only occupy one limit (#3820) (#3858)
Browse files Browse the repository at this point in the history
* This is an automated cherry-pick of #3820

Signed-off-by: ti-chi-bot <[email protected]>

* fix conflicts

Signed-off-by: nolouch <[email protected]>

* fix test

Signed-off-by: nolouch <[email protected]>

Co-authored-by: buffer <[email protected]>
Co-authored-by: nolouch <[email protected]>
  • Loading branch information
3 people authored Jul 15, 2021
1 parent 09a7830 commit 11497a4
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 32 deletions.
6 changes: 3 additions & 3 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,19 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {

op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader)
oc.AddWaitingOperator(op1)
c.Assert(oc.OperatorCount(op1.Kind()), Equals, uint64(1))
c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op1.RegionID())

// Region 1 already has an operator, cannot add another one.
op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0))
c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(0))

// Remove the operator manually, then we can add a new operator.
c.Assert(oc.RemoveOperator(op1), IsTrue)
op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion)
oc.AddWaitingOperator(op3)
c.Assert(oc.OperatorCount(op3.Kind()), Equals, uint64(1))
c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op3.RegionID())
}

Expand Down
29 changes: 19 additions & 10 deletions server/schedule/operator/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,25 @@ type OpKind uint32

// Flags for operators.
const (
OpLeader OpKind = 1 << iota // Include leader transfer.
OpRegion // Include peer movement.
OpSplit // Include region split.
OpAdmin // Initiated by admin.
OpHotRegion // Initiated by hot region scheduler.
OpAdjacent // Initiated by adjacent region scheduler.
OpReplica // Initiated by replica checkers.
OpBalance // Initiated by balancers.
OpMerge // Initiated by merge checkers or merge schedulers.
OpRange // Initiated by range scheduler.
// Initiated by admin.
OpAdmin OpKind = 1 << iota
// Initiated by merge checker or merge scheduler. Note that it may not include region merge.
// the order describe the operator's producer and is very helpful to decouple scheduler or checker limit
OpMerge
// Initiated by range scheduler.
OpRange
// Initiated by replica checker.
OpReplica
// Include region split. Initiated by rule checker if `kind & OpAdmin == 0`.
OpSplit
// Initiated by hot region scheduler.
OpHotRegion
// Include peer addition or removal. This means that this operator may take a long time.
OpRegion
// Include leader transfer.
OpLeader
OpBalance
OpAdjacent
opMax
)

Expand Down
10 changes: 10 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ func (o *Operator) Kind() OpKind {
return o.kind
}

// SchedulerKind return the highest OpKind even if the operator has many OpKind
// fix #3778
func (o *Operator) SchedulerKind() OpKind {
// LowBit ref: https://en.wikipedia.org/wiki/Find_first_set
// 6(110) ==> 2(10)
// 5(101) ==> 1(01)
// 4(100) ==> 4(100)
return o.kind & (-o.kind)
}

// Status returns operator status.
func (o *Operator) Status() OpStatus {
return o.status.Status()
Expand Down
43 changes: 40 additions & 3 deletions server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *testOperatorSuite) TestOperatorStep(c *C) {
}

func (s *testOperatorSuite) newTestOperator(regionID uint64, kind OpKind, steps ...OpStep) *Operator {
return NewOperator("test", "test", regionID, &metapb.RegionEpoch{}, OpAdmin|kind, steps...)
return NewOperator("test", "test", regionID, &metapb.RegionEpoch{}, kind, steps...)
}

func (s *testOperatorSuite) checkSteps(c *C, op *Operator, steps []OpStep) {
Expand All @@ -105,7 +105,7 @@ func (s *testOperatorSuite) TestOperator(c *C) {
TransferLeader{FromStore: 3, ToStore: 1},
RemovePeer{FromStore: 3},
}
op := s.newTestOperator(1, OpLeader|OpRegion, steps...)
op := s.newTestOperator(1, OpAdmin|OpLeader|OpRegion, steps...)
c.Assert(op.GetPriorityLevel(), Equals, core.HighPriority)
s.checkSteps(c, op, steps)
op.Start()
Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
}

func (s *testOperatorSuite) TestOperatorKind(c *C) {
c.Assert((OpLeader | OpReplica).String(), Equals, "leader,replica")
c.Assert((OpLeader | OpReplica).String(), Equals, "replica,leader")
c.Assert(OpKind(0).String(), Equals, "unknown")
k, err := ParseOperatorKind("balance,region,leader")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -374,3 +374,40 @@ func (s *testOperatorSuite) TestCheck(c *C) {
c.Assert(op.Status(), Equals, SUCCESS)
}
}

func (s *testOperatorSuite) TestSchedulerKind(c *C) {
testdata := []struct {
op *Operator
expect OpKind
}{
{
op: s.newTestOperator(1, OpAdmin|OpMerge|OpRegion),
expect: OpAdmin,
},
{
op: s.newTestOperator(1, OpMerge|OpLeader|OpRegion),
expect: OpMerge,
}, {
op: s.newTestOperator(1, OpReplica|OpRegion),
expect: OpReplica,
}, {
op: s.newTestOperator(1, OpSplit|OpRegion),
expect: OpSplit,
}, {
op: s.newTestOperator(1, OpRange|OpRegion),
expect: OpRange,
}, {
op: s.newTestOperator(1, OpHotRegion|OpLeader|OpRegion),
expect: OpHotRegion,
}, {
op: s.newTestOperator(1, OpRegion|OpLeader),
expect: OpRegion,
}, {
op: s.newTestOperator(1, OpLeader),
expect: OpLeader,
},
}
for _, v := range testdata {
c.Assert(v.op.SchedulerKind(), Equals, v.expect)
}
}
15 changes: 5 additions & 10 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,21 +775,16 @@ func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operat
delete(oc.counts, k)
}
for _, op := range operators {
oc.counts[op.Kind()]++
oc.counts[op.SchedulerKind()]++
}
}

// OperatorCount gets the count of operators filtered by mask.
func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64 {
// OperatorCount gets the count of operators filtered by kind.
// kind only has one OpKind.
func (oc *OperatorController) OperatorCount(kind operator.OpKind) uint64 {
oc.RLock()
defer oc.RUnlock()
var total uint64
for k, count := range oc.counts {
if k&mask != 0 {
total += count
}
}
return total
return oc.counts[kind]
}

// GetOpInfluence gets OpInfluence.
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
}

func (s *balanceRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
allowed := s.opController.OperatorCount(operator.OpRegion)-s.opController.OperatorCount(operator.OpMerge) < cluster.GetRegionScheduleLimit()
allowed := s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
}
Expand Down
5 changes: 4 additions & 1 deletion server/schedulers/random_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,14 @@ func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operato
return nil
}

ops, err := operator.CreateMergeRegionOperator(RandomMergeType, cluster, region, target, operator.OpAdmin)
ops, err := operator.CreateMergeRegionOperator(RandomMergeType, cluster, region, target, operator.OpMerge)
if err != nil {
log.Debug("fail to create merge region operator", errs.ZapError(err))
return nil
}
for _, op := range ops {
op.SetPriorityLevel(core.HighPriority)
}
ops[0].Counters = append(ops[0].Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
return ops
}
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *testShuffleLeaderSuite) TestShuffle(c *C) {
for i := 0; i < 4; i++ {
op := sl.Schedule(tc)
c.Assert(op, NotNil)
c.Assert(op[0].Kind(), Equals, operator.OpLeader|operator.OpAdmin)
c.Assert(op[0].Kind(), Equals, operator.OpLeader)
}
}

Expand Down Expand Up @@ -388,7 +388,7 @@ func (s *testShuffleRegionSuite) TestShuffle(c *C) {
for i := 0; i < 4; i++ {
op := sl.Schedule(tc)
c.Assert(op, NotNil)
c.Assert(op[0].Kind(), Equals, operator.OpRegion|operator.OpAdmin)
c.Assert(op[0].Kind(), Equals, operator.OpRegion)
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc()
return nil
}
op, err := operator.CreateTransferLeaderOperator(ShuffleLeaderType, cluster, region, region.GetLeader().GetId(), targetStore.GetID(), operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator(ShuffleLeaderType, cluster, region, region.GetLeader().GetId(), targetStore.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create shuffle leader operator", errs.ZapError(err))
return nil
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
return nil
}

op, err := operator.CreateMovePeerOperator(ShuffleRegionType, cluster, region, operator.OpAdmin, oldPeer.GetStoreId(), newPeer)
op, err := operator.CreateMovePeerOperator(ShuffleRegionType, cluster, region, operator.OpRegion, oldPeer.GetStoreId(), newPeer)
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create-operator-fail").Inc()
return nil
Expand Down

0 comments on commit 11497a4

Please sign in to comment.