Skip to content

Commit

Permalink
operator: add additional info for operator (#2993) (#3009)
Browse files Browse the repository at this point in the history
* cherry pick #2993 to release-4.0

Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
ti-srebot authored Oct 20, 2020
1 parent 3327af6 commit a698d06
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 23 deletions.
30 changes: 22 additions & 8 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package operator

import (
"encoding/json"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -61,6 +62,7 @@ type Operator struct {
level core.PriorityLevel
Counters []prometheus.Counter
FinishedCounters []prometheus.Counter
AdditionalInfos map[string]string
}

// NewOperator creates a new operator.
Expand All @@ -70,14 +72,15 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
level = core.HighPriority
}
return &Operator{
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
status: NewOpStatusTracker(),
level: level,
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
status: NewOpStatusTracker(),
level: level,
AdditionalInfos: make(map[string]string),
}
}

Expand Down Expand Up @@ -340,3 +343,14 @@ func (o *Operator) History() []OpHistory {
}
return histories
}

// GetAdditionalInfo returns additional info with string
func (o *Operator) GetAdditionalInfo() string {
if len(o.AdditionalInfos) != 0 {
additionalInfo, err := json.Marshal(o.AdditionalInfos)
if err == nil {
return string(additionalInfo)
}
}
return ""
}
6 changes: 4 additions & 2 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {

log.Info("add operator",
zap.Uint64("region-id", regionID),
zap.Reflect("operator", op))
zap.Reflect("operator", op),
zap.String("additional info", op.GetAdditionalInfo()))

// If there is an old operator, replace it. The priority should be checked
// already.
Expand Down Expand Up @@ -536,7 +537,8 @@ func (oc *OperatorController) buryOperator(op *operator.Operator, extraFileds ..
log.Info("operator finish",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
zap.Reflect("operator", op),
zap.String("additional info", op.GetAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds())
for _, counter := range op.FinishedCounters {
Expand Down
5 changes: 4 additions & 1 deletion server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ func (l *balanceLeaderScheduler) createOperator(cluster opt.Cluster, region *cor

opInfluence := l.opController.GetOpInfluence(cluster)
kind := core.NewScheduleKind(core.LeaderKind, cluster.GetLeaderSchedulePolicy())
if !shouldBalance(cluster, source, target, region, kind, opInfluence, l.GetName()) {
shouldBalance, sourceScore, targetScore := shouldBalance(cluster, source, target, region, kind, opInfluence, l.GetName())
if !shouldBalance {
schedulerCounter.WithLabelValues(l.GetName(), "skip").Inc()
return nil
}
Expand All @@ -293,5 +294,7 @@ func (l *balanceLeaderScheduler) createOperator(cluster opt.Cluster, region *cor
l.counter.WithLabelValues("move-leader", source.GetAddress()+"-out", sourceLabel),
l.counter.WithLabelValues("move-leader", target.GetAddress()+"-in", targetLabel),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(targetScore, 'f', 2, 64)
return []*operator.Operator{op}
}
7 changes: 5 additions & 2 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ func (s *balanceRegionScheduler) transferPeer(cluster opt.Cluster, region *core.

opInfluence := s.opController.GetOpInfluence(cluster)
kind := core.NewScheduleKind(core.RegionKind, core.BySize)
if !shouldBalance(cluster, source, target, region, kind, opInfluence, s.GetName()) {
shouldBalance, sourceScore, targetScore := shouldBalance(cluster, source, target, region, kind, opInfluence, s.GetName())
if !shouldBalance {
schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc()
continue
}

newPeer := &metapb.Peer{StoreId: target.GetID(), IsLearner: oldPeer.IsLearner}
op, err := operator.CreateMovePeerOperator("balance-region", cluster, region, operator.OpBalance, oldPeer.GetStoreId(), newPeer)
op, err := operator.CreateMovePeerOperator(BalanceRegionType, cluster, region, operator.OpBalance, oldPeer.GetStoreId(), newPeer)
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create-operator-fail").Inc()
return nil
Expand All @@ -254,6 +255,8 @@ func (s *balanceRegionScheduler) transferPeer(cluster opt.Cluster, region *core.
s.counter.WithLabelValues("move-peer", source.GetAddress()+"-out", sourceLabel),
s.counter.WithLabelValues("move-peer", target.GetAddress()+"-in", targetLabel),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(targetScore, 'f', 2, 64)
return op
}
}
12 changes: 7 additions & 5 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (s *testBalanceSuite) TestShouldBalance(c *C) {
tc.PutRegion(region)
tc.LeaderSchedulePolicy = t.kind.String()
kind := core.NewScheduleKind(core.LeaderKind, t.kind)
c.Assert(shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), ""), Equals, t.expectedResult)
shouldBalance, _, _ := shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), "")
c.Assert(shouldBalance, Equals, t.expectedResult)
}

for _, t := range tests {
Expand All @@ -133,7 +134,8 @@ func (s *testBalanceSuite) TestShouldBalance(c *C) {
region := tc.GetRegion(1).Clone(core.SetApproximateSize(t.regionSize))
tc.PutRegion(region)
kind := core.NewScheduleKind(core.RegionKind, t.kind)
c.Assert(shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), ""), Equals, t.expectedResult)
shouldBalance, _, _ := shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), "")
c.Assert(shouldBalance, Equals, t.expectedResult)
}
}
}
Expand Down Expand Up @@ -749,9 +751,9 @@ func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) {
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
c.Assert(err, IsNil)

s.checkReplacePendingRegion(c, tc, opt, sb)
s.checkReplacePendingRegion(c, tc, sb)
opt.EnablePlacementRules = true
s.checkReplacePendingRegion(c, tc, opt, sb)
s.checkReplacePendingRegion(c, tc, sb)
}

func (s *testBalanceRegionSchedulerSuite) TestOpInfluence(c *C) {
Expand Down Expand Up @@ -781,7 +783,7 @@ func (s *testBalanceRegionSchedulerSuite) TestOpInfluence(c *C) {
testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], operator.OpBalance, 3, 1)
}

func (s *testBalanceRegionSchedulerSuite) checkReplacePendingRegion(c *C, tc *mockcluster.Cluster, opt *mockoption.ScheduleOptions, sb schedule.Scheduler) {
func (s *testBalanceRegionSchedulerSuite) checkReplacePendingRegion(c *C, tc *mockcluster.Cluster, sb schedule.Scheduler) {
// Store 1 has the largest region score, so the balance scheduler try to replace peer in store 1.
tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 7, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"})
Expand Down
10 changes: 5 additions & 5 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func minDuration(a, b time.Duration) time.Duration {
return b
}

func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *core.RegionInfo, kind core.ScheduleKind, opInfluence operator.OpInfluence, scheduleName string) bool {
func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *core.RegionInfo, kind core.ScheduleKind, opInfluence operator.OpInfluence, scheduleName string) (shouldBalance bool, sourceScore float64, targetScore float64) {
// The reason we use max(regionSize, averageRegionSize) to check is:
// 1. prevent moving small regions between stores with close scores, leading to unnecessary balance.
// 2. prevent moving huge regions, leading to over balance.
Expand All @@ -66,15 +66,15 @@ func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *
tolerantResource := getTolerantResource(cluster, region, kind)
sourceInfluence := opInfluence.GetStoreInfluence(sourceID).ResourceProperty(kind)
targetInfluence := opInfluence.GetStoreInfluence(targetID).ResourceProperty(kind)
sourceScore := source.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), sourceInfluence-tolerantResource)
targetScore := target.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), targetInfluence+tolerantResource)
sourceScore = source.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), sourceInfluence-tolerantResource)
targetScore = target.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), targetInfluence+tolerantResource)
if cluster.IsDebugMetricsEnabled() {
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(sourceID, 10), "source").Set(float64(sourceInfluence))
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(targetID, 10), "target").Set(float64(targetInfluence))
tolerantResourceStatus.WithLabelValues(scheduleName, strconv.FormatUint(sourceID, 10), strconv.FormatUint(targetID, 10)).Set(float64(tolerantResource))
}
// Make sure after move, source score is still greater than target score.
shouldBalance := sourceScore > targetScore
shouldBalance = sourceScore > targetScore

if !shouldBalance {
log.Debug("skip balance "+kind.Resource.String(),
Expand All @@ -86,7 +86,7 @@ func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *
zap.Int64("average-region-size", cluster.GetAverageRegionSize()),
zap.Int64("tolerant-resource", tolerantResource))
}
return shouldBalance
return shouldBalance, sourceScore, targetScore
}

func getTolerantResource(cluster opt.Cluster, region *core.RegionInfo, kind core.ScheduleKind) int64 {
Expand Down

0 comments on commit a698d06

Please sign in to comment.