diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index e660b331bed..6bcb1fb4abb 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -169,19 +169,19 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) oc.AddWaitingOperator(op1) - c.Assert(oc.OperatorCount(op1.Kind()), Equals, uint64(1)) + c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(1)) c.Assert(oc.GetOperator(1).RegionID(), Equals, op1.RegionID()) // Region 1 already has an operator, cannot add another one. op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) oc.AddWaitingOperator(op2) - c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0)) + c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(0)) // Remove the operator manually, then we can add a new operator. c.Assert(oc.RemoveOperator(op1), IsTrue) op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) oc.AddWaitingOperator(op3) - c.Assert(oc.OperatorCount(op3.Kind()), Equals, uint64(1)) + c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(1)) c.Assert(oc.GetOperator(1).RegionID(), Equals, op3.RegionID()) } diff --git a/server/schedule/operator/kind.go b/server/schedule/operator/kind.go index 85181552332..a03b958e4a2 100644 --- a/server/schedule/operator/kind.go +++ b/server/schedule/operator/kind.go @@ -24,16 +24,25 @@ type OpKind uint32 // Flags for operators. const ( - OpLeader OpKind = 1 << iota // Include leader transfer. - OpRegion // Include peer movement. - OpSplit // Include region split. - OpAdmin // Initiated by admin. - OpHotRegion // Initiated by hot region scheduler. - OpAdjacent // Initiated by adjacent region scheduler. - OpReplica // Initiated by replica checkers. - OpBalance // Initiated by balancers. - OpMerge // Initiated by merge checkers or merge schedulers. - OpRange // Initiated by range scheduler. + // Initiated by admin. + OpAdmin OpKind = 1 << iota + // Initiated by merge checker or merge scheduler. Note that it may not include region merge. + // the order describe the operator's producer and is very helpful to decouple scheduler or checker limit + OpMerge + // Initiated by range scheduler. + OpRange + // Initiated by replica checker. + OpReplica + // Include region split. Initiated by rule checker if `kind & OpAdmin == 0`. + OpSplit + // Initiated by hot region scheduler. + OpHotRegion + // Include peer addition or removal. This means that this operator may take a long time. + OpRegion + // Include leader transfer. + OpLeader + OpBalance + OpAdjacent opMax ) diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index febf54b5024..d38a4bdcb01 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -134,6 +134,16 @@ func (o *Operator) Kind() OpKind { return o.kind } +// SchedulerKind return the highest OpKind even if the operator has many OpKind +// fix #3778 +func (o *Operator) SchedulerKind() OpKind { + // LowBit ref: https://en.wikipedia.org/wiki/Find_first_set + // 6(110) ==> 2(10) + // 5(101) ==> 1(01) + // 4(100) ==> 4(100) + return o.kind & (-o.kind) +} + // Status returns operator status. func (o *Operator) Status() OpStatus { return o.status.Status() diff --git a/server/schedule/operator/operator_test.go b/server/schedule/operator/operator_test.go index b1bb9dcddc3..340da0b4f0e 100644 --- a/server/schedule/operator/operator_test.go +++ b/server/schedule/operator/operator_test.go @@ -87,7 +87,7 @@ func (s *testOperatorSuite) TestOperatorStep(c *C) { } func (s *testOperatorSuite) newTestOperator(regionID uint64, kind OpKind, steps ...OpStep) *Operator { - return NewOperator("test", "test", regionID, &metapb.RegionEpoch{}, OpAdmin|kind, steps...) + return NewOperator("test", "test", regionID, &metapb.RegionEpoch{}, kind, steps...) } func (s *testOperatorSuite) checkSteps(c *C, op *Operator, steps []OpStep) { @@ -105,7 +105,7 @@ func (s *testOperatorSuite) TestOperator(c *C) { TransferLeader{FromStore: 3, ToStore: 1}, RemovePeer{FromStore: 3}, } - op := s.newTestOperator(1, OpLeader|OpRegion, steps...) + op := s.newTestOperator(1, OpAdmin|OpLeader|OpRegion, steps...) c.Assert(op.GetPriorityLevel(), Equals, core.HighPriority) s.checkSteps(c, op, steps) op.Start() @@ -225,7 +225,7 @@ func (s *testOperatorSuite) TestInfluence(c *C) { } func (s *testOperatorSuite) TestOperatorKind(c *C) { - c.Assert((OpLeader | OpReplica).String(), Equals, "leader,replica") + c.Assert((OpLeader | OpReplica).String(), Equals, "replica,leader") c.Assert(OpKind(0).String(), Equals, "unknown") k, err := ParseOperatorKind("balance,region,leader") c.Assert(err, IsNil) @@ -374,3 +374,40 @@ func (s *testOperatorSuite) TestCheck(c *C) { c.Assert(op.Status(), Equals, SUCCESS) } } + +func (s *testOperatorSuite) TestSchedulerKind(c *C) { + testdata := []struct { + op *Operator + expect OpKind + }{ + { + op: s.newTestOperator(1, OpAdmin|OpMerge|OpRegion), + expect: OpAdmin, + }, + { + op: s.newTestOperator(1, OpMerge|OpLeader|OpRegion), + expect: OpMerge, + }, { + op: s.newTestOperator(1, OpReplica|OpRegion), + expect: OpReplica, + }, { + op: s.newTestOperator(1, OpSplit|OpRegion), + expect: OpSplit, + }, { + op: s.newTestOperator(1, OpRange|OpRegion), + expect: OpRange, + }, { + op: s.newTestOperator(1, OpHotRegion|OpLeader|OpRegion), + expect: OpHotRegion, + }, { + op: s.newTestOperator(1, OpRegion|OpLeader), + expect: OpRegion, + }, { + op: s.newTestOperator(1, OpLeader), + expect: OpLeader, + }, + } + for _, v := range testdata { + c.Assert(v.op.SchedulerKind(), Equals, v.expect) + } +} diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 9886bdbc8c6..a778c52f4b0 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -775,21 +775,16 @@ func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operat delete(oc.counts, k) } for _, op := range operators { - oc.counts[op.Kind()]++ + oc.counts[op.SchedulerKind()]++ } } -// OperatorCount gets the count of operators filtered by mask. -func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64 { +// OperatorCount gets the count of operators filtered by kind. +// kind only has one OpKind. +func (oc *OperatorController) OperatorCount(kind operator.OpKind) uint64 { oc.RLock() defer oc.RUnlock() - var total uint64 - for k, count := range oc.counts { - if k&mask != 0 { - total += count - } - } - return total + return oc.counts[kind] } // GetOpInfluence gets OpInfluence. diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 0947b012391..899cefe7f6e 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -127,7 +127,7 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) { } func (s *balanceRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - allowed := s.opController.OperatorCount(operator.OpRegion)-s.opController.OperatorCount(operator.OpMerge) < cluster.GetRegionScheduleLimit() + allowed := s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc() } diff --git a/server/schedulers/random_merge.go b/server/schedulers/random_merge.go index 17dad45f3b0..8e428e20e88 100644 --- a/server/schedulers/random_merge.go +++ b/server/schedulers/random_merge.go @@ -133,11 +133,14 @@ func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operato return nil } - ops, err := operator.CreateMergeRegionOperator(RandomMergeType, cluster, region, target, operator.OpAdmin) + ops, err := operator.CreateMergeRegionOperator(RandomMergeType, cluster, region, target, operator.OpMerge) if err != nil { log.Debug("fail to create merge region operator", errs.ZapError(err)) return nil } + for _, op := range ops { + op.SetPriorityLevel(core.HighPriority) + } ops[0].Counters = append(ops[0].Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator")) return ops } diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 858d447e0bb..10796c7e606 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -63,7 +63,7 @@ func (s *testShuffleLeaderSuite) TestShuffle(c *C) { for i := 0; i < 4; i++ { op := sl.Schedule(tc) c.Assert(op, NotNil) - c.Assert(op[0].Kind(), Equals, operator.OpLeader|operator.OpAdmin) + c.Assert(op[0].Kind(), Equals, operator.OpLeader) } } @@ -388,7 +388,7 @@ func (s *testShuffleRegionSuite) TestShuffle(c *C) { for i := 0; i < 4; i++ { op := sl.Schedule(tc) c.Assert(op, NotNil) - c.Assert(op[0].Kind(), Equals, operator.OpRegion|operator.OpAdmin) + c.Assert(op[0].Kind(), Equals, operator.OpRegion) } } diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index f5690280381..1c6ef2f1ac2 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -119,7 +119,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc() return nil } - op, err := operator.CreateTransferLeaderOperator(ShuffleLeaderType, cluster, region, region.GetLeader().GetId(), targetStore.GetID(), operator.OpAdmin) + op, err := operator.CreateTransferLeaderOperator(ShuffleLeaderType, cluster, region, region.GetLeader().GetId(), targetStore.GetID(), operator.OpLeader) if err != nil { log.Debug("fail to create shuffle leader operator", errs.ZapError(err)) return nil diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index a78b45e9829..334caccefda 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -117,7 +117,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera return nil } - op, err := operator.CreateMovePeerOperator(ShuffleRegionType, cluster, region, operator.OpAdmin, oldPeer.GetStoreId(), newPeer) + op, err := operator.CreateMovePeerOperator(ShuffleRegionType, cluster, region, operator.OpRegion, oldPeer.GetStoreId(), newPeer) if err != nil { schedulerCounter.WithLabelValues(s.GetName(), "create-operator-fail").Inc() return nil