Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator: add additional info for operator (#2993) #3009

Merged
merged 4 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package operator

import (
"encoding/json"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -61,6 +62,7 @@ type Operator struct {
level core.PriorityLevel
Counters []prometheus.Counter
FinishedCounters []prometheus.Counter
AdditionalInfos map[string]string
}

// NewOperator creates a new operator.
Expand All @@ -70,14 +72,15 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
level = core.HighPriority
}
return &Operator{
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
status: NewOpStatusTracker(),
level: level,
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
status: NewOpStatusTracker(),
level: level,
AdditionalInfos: make(map[string]string),
}
}

Expand Down Expand Up @@ -340,3 +343,14 @@ func (o *Operator) History() []OpHistory {
}
return histories
}

// GetAdditionalInfo returns additional info with string
func (o *Operator) GetAdditionalInfo() string {
if len(o.AdditionalInfos) != 0 {
additionalInfo, err := json.Marshal(o.AdditionalInfos)
if err == nil {
return string(additionalInfo)
}
}
return ""
}
6 changes: 4 additions & 2 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {

log.Info("add operator",
zap.Uint64("region-id", regionID),
zap.Reflect("operator", op))
zap.Reflect("operator", op),
zap.String("additional info", op.GetAdditionalInfo()))

// If there is an old operator, replace it. The priority should be checked
// already.
Expand Down Expand Up @@ -536,7 +537,8 @@ func (oc *OperatorController) buryOperator(op *operator.Operator, extraFileds ..
log.Info("operator finish",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
zap.Reflect("operator", op),
zap.String("additional info", op.GetAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds())
for _, counter := range op.FinishedCounters {
Expand Down
5 changes: 4 additions & 1 deletion server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ func (l *balanceLeaderScheduler) createOperator(cluster opt.Cluster, region *cor

opInfluence := l.opController.GetOpInfluence(cluster)
kind := core.NewScheduleKind(core.LeaderKind, cluster.GetLeaderSchedulePolicy())
if !shouldBalance(cluster, source, target, region, kind, opInfluence, l.GetName()) {
shouldBalance, sourceScore, targetScore := shouldBalance(cluster, source, target, region, kind, opInfluence, l.GetName())
if !shouldBalance {
schedulerCounter.WithLabelValues(l.GetName(), "skip").Inc()
return nil
}
Expand All @@ -293,5 +294,7 @@ func (l *balanceLeaderScheduler) createOperator(cluster opt.Cluster, region *cor
l.counter.WithLabelValues("move-leader", source.GetAddress()+"-out", sourceLabel),
l.counter.WithLabelValues("move-leader", target.GetAddress()+"-in", targetLabel),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(targetScore, 'f', 2, 64)
return []*operator.Operator{op}
}
7 changes: 5 additions & 2 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ func (s *balanceRegionScheduler) transferPeer(cluster opt.Cluster, region *core.

opInfluence := s.opController.GetOpInfluence(cluster)
kind := core.NewScheduleKind(core.RegionKind, core.BySize)
if !shouldBalance(cluster, source, target, region, kind, opInfluence, s.GetName()) {
shouldBalance, sourceScore, targetScore := shouldBalance(cluster, source, target, region, kind, opInfluence, s.GetName())
if !shouldBalance {
schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc()
continue
}

newPeer := &metapb.Peer{StoreId: target.GetID(), IsLearner: oldPeer.IsLearner}
op, err := operator.CreateMovePeerOperator("balance-region", cluster, region, operator.OpBalance, oldPeer.GetStoreId(), newPeer)
op, err := operator.CreateMovePeerOperator(BalanceRegionType, cluster, region, operator.OpBalance, oldPeer.GetStoreId(), newPeer)
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create-operator-fail").Inc()
return nil
Expand All @@ -254,6 +255,8 @@ func (s *balanceRegionScheduler) transferPeer(cluster opt.Cluster, region *core.
s.counter.WithLabelValues("move-peer", source.GetAddress()+"-out", sourceLabel),
s.counter.WithLabelValues("move-peer", target.GetAddress()+"-in", targetLabel),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(targetScore, 'f', 2, 64)
return op
}
}
12 changes: 7 additions & 5 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (s *testBalanceSuite) TestShouldBalance(c *C) {
tc.PutRegion(region)
tc.LeaderSchedulePolicy = t.kind.String()
kind := core.NewScheduleKind(core.LeaderKind, t.kind)
c.Assert(shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), ""), Equals, t.expectedResult)
shouldBalance, _, _ := shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), "")
c.Assert(shouldBalance, Equals, t.expectedResult)
}

for _, t := range tests {
Expand All @@ -133,7 +134,8 @@ func (s *testBalanceSuite) TestShouldBalance(c *C) {
region := tc.GetRegion(1).Clone(core.SetApproximateSize(t.regionSize))
tc.PutRegion(region)
kind := core.NewScheduleKind(core.RegionKind, t.kind)
c.Assert(shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), ""), Equals, t.expectedResult)
shouldBalance, _, _ := shouldBalance(tc, source, target, region, kind, oc.GetOpInfluence(tc), "")
c.Assert(shouldBalance, Equals, t.expectedResult)
}
}
}
Expand Down Expand Up @@ -749,9 +751,9 @@ func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) {
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
c.Assert(err, IsNil)

s.checkReplacePendingRegion(c, tc, opt, sb)
s.checkReplacePendingRegion(c, tc, sb)
opt.EnablePlacementRules = true
s.checkReplacePendingRegion(c, tc, opt, sb)
s.checkReplacePendingRegion(c, tc, sb)
}

func (s *testBalanceRegionSchedulerSuite) TestOpInfluence(c *C) {
Expand Down Expand Up @@ -781,7 +783,7 @@ func (s *testBalanceRegionSchedulerSuite) TestOpInfluence(c *C) {
testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], operator.OpBalance, 3, 1)
}

func (s *testBalanceRegionSchedulerSuite) checkReplacePendingRegion(c *C, tc *mockcluster.Cluster, opt *mockoption.ScheduleOptions, sb schedule.Scheduler) {
func (s *testBalanceRegionSchedulerSuite) checkReplacePendingRegion(c *C, tc *mockcluster.Cluster, sb schedule.Scheduler) {
// Store 1 has the largest region score, so the balance scheduler try to replace peer in store 1.
tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 7, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"})
Expand Down
10 changes: 5 additions & 5 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func minDuration(a, b time.Duration) time.Duration {
return b
}

func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *core.RegionInfo, kind core.ScheduleKind, opInfluence operator.OpInfluence, scheduleName string) bool {
func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *core.RegionInfo, kind core.ScheduleKind, opInfluence operator.OpInfluence, scheduleName string) (shouldBalance bool, sourceScore float64, targetScore float64) {
// The reason we use max(regionSize, averageRegionSize) to check is:
// 1. prevent moving small regions between stores with close scores, leading to unnecessary balance.
// 2. prevent moving huge regions, leading to over balance.
Expand All @@ -66,15 +66,15 @@ func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *
tolerantResource := getTolerantResource(cluster, region, kind)
sourceInfluence := opInfluence.GetStoreInfluence(sourceID).ResourceProperty(kind)
targetInfluence := opInfluence.GetStoreInfluence(targetID).ResourceProperty(kind)
sourceScore := source.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), sourceInfluence-tolerantResource)
targetScore := target.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), targetInfluence+tolerantResource)
sourceScore = source.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), sourceInfluence-tolerantResource)
targetScore = target.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), targetInfluence+tolerantResource)
if cluster.IsDebugMetricsEnabled() {
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(sourceID, 10), "source").Set(float64(sourceInfluence))
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(targetID, 10), "target").Set(float64(targetInfluence))
tolerantResourceStatus.WithLabelValues(scheduleName, strconv.FormatUint(sourceID, 10), strconv.FormatUint(targetID, 10)).Set(float64(tolerantResource))
}
// Make sure after move, source score is still greater than target score.
shouldBalance := sourceScore > targetScore
shouldBalance = sourceScore > targetScore

if !shouldBalance {
log.Debug("skip balance "+kind.Resource.String(),
Expand All @@ -86,7 +86,7 @@ func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *
zap.Int64("average-region-size", cluster.GetAverageRegionSize()),
zap.Int64("tolerant-resource", tolerantResource))
}
return shouldBalance
return shouldBalance, sourceScore, targetScore
}

func getTolerantResource(cluster opt.Cluster, region *core.RegionInfo, kind core.ScheduleKind) int64 {
Expand Down