Skip to content

Commit

Permalink
schedule: batch to report the metrics of target filter (#5561)
Browse files Browse the repository at this point in the history
close #5538

Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies authored Oct 26, 2022
1 parent 914e19c commit 8e2bd59
Show file tree
Hide file tree
Showing 16 changed files with 417 additions and 132 deletions.
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) (
continue
}
target := filter.NewCandidates(cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetOpts(), nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}).
FilterTarget(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}).
RandomPick()
if target == nil {
continue
Expand Down
6 changes: 3 additions & 3 deletions server/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e
isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), nil, filters...).
FilterTarget(s.cluster.GetOpts(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
return 0, false
}
target := targetCandidate.FilterTarget(s.cluster.GetOpts(), nil, strictStateFilter).
target := targetCandidate.FilterTarget(s.cluster.GetOpts(), nil, nil, strictStateFilter).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), true) // less region score is better
if target == nil {
return 0, true // filter by temporary states
Expand Down Expand Up @@ -123,7 +123,7 @@ func (s *ReplicaStrategy) swapStoreToFirst(stores []*core.StoreInfo, id uint64)
func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo) uint64 {
isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
source := filter.NewCandidates(coLocationStores).
FilterSource(s.cluster.GetOpts(), nil, &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}).
FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), false)
if source == nil {
Expand Down
8 changes: 4 additions & 4 deletions server/schedule/filter/candidates.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func NewCandidates(stores []*core.StoreInfo) *StoreCandidates {
}

// FilterSource keeps stores that can pass all source filters.
func (c *StoreCandidates) FilterSource(opt *config.PersistOptions, collector *plan.Collector, filters ...Filter) *StoreCandidates {
c.Stores = SelectSourceStores(c.Stores, filters, opt, collector)
func (c *StoreCandidates) FilterSource(opt *config.PersistOptions, collector *plan.Collector, counter *Counter, filters ...Filter) *StoreCandidates {
c.Stores = SelectSourceStores(c.Stores, filters, opt, collector, counter)
return c
}

// FilterTarget keeps stores that can pass all target filters.
func (c *StoreCandidates) FilterTarget(opt *config.PersistOptions, collector *plan.Collector, filters ...Filter) *StoreCandidates {
c.Stores = SelectTargetStores(c.Stores, filters, opt, collector)
func (c *StoreCandidates) FilterTarget(opt *config.PersistOptions, collector *plan.Collector, counter *Counter, filters ...Filter) *StoreCandidates {
c.Stores = SelectTargetStores(c.Stores, filters, opt, collector, counter)
return c
}

Expand Down
10 changes: 5 additions & 5 deletions server/schedule/filter/candidates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func idComparer2(a, b *core.StoreInfo) int {

type idFilter func(uint64) bool

func (f idFilter) Scope() string { return "idFilter" }
func (f idFilter) Type() string { return "idFilter" }
func (f idFilter) Scope() string { return "idFilter" }
func (f idFilter) Type() filterType { return filterType(0) }
func (f idFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status {
if f(store.GetID()) {
return statusOK
Expand All @@ -68,11 +68,11 @@ func (f idFilter) Target(opt *config.PersistOptions, store *core.StoreInfo) *pla
func TestCandidates(t *testing.T) {
re := require.New(t)
cs := newTestCandidates(1, 2, 3, 4, 5)
cs.FilterSource(nil, nil, idFilter(func(id uint64) bool { return id > 2 }))
cs.FilterSource(nil, nil, nil, idFilter(func(id uint64) bool { return id > 2 }))
check(re, cs, 3, 4, 5)
cs.FilterTarget(nil, nil, idFilter(func(id uint64) bool { return id%2 == 1 }))
cs.FilterTarget(nil, nil, nil, idFilter(func(id uint64) bool { return id%2 == 1 }))
check(re, cs, 3, 5)
cs.FilterTarget(nil, nil, idFilter(func(id uint64) bool { return id > 100 }))
cs.FilterTarget(nil, nil, nil, idFilter(func(id uint64) bool { return id > 100 }))
check(re, cs)
store := cs.PickFirst()
re.Nil(store)
Expand Down
213 changes: 213 additions & 0 deletions server/schedule/filter/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filter

import (
"strconv"
)

type action int

const (
source action = iota
target

actionLen
)

var actions = [actionLen]string{
"filter-source",
"filter-target",
}

// String implements fmt.Stringer interface.
func (a action) String() string {
if a < actionLen {
return actions[a]
}
return "unknown"
}

type scope int

const (
// BalanceLeader is the filter type for balance leader.
BalanceLeader scope = iota
// BalanceRegion is the filter type for balance region.
BalanceRegion
// BalanceHotRegion is the filter type for hot region.
BalanceHotRegion
// Label is the filter type for replica.
Label

// EvictLeader is the filter type for evict leader.
EvictLeader
// RegionScatter is the filter type for scatter region.
RegionScatter
// ReplicaChecker is the filter type for replica.
ReplicaChecker
// RuleChecker is the filter type for rule.
RuleChecker

// GrantHotLeader is the filter type for grant hot leader.
GrantHotLeader
// ShuffleHotRegion is the filter type for shuffle hot region.
ShuffleHotRegion
// ShuffleRegion is the filter type for shuffle region.
ShuffleRegion
// RandomMerge is the filter type for random merge.
RandomMerge
scopeLen
)

var scopes = [scopeLen]string{
"balance-leader-scheduler",
"balance-region-scheduler",
"balance-hot-region-scheduler",
"label-scheduler",

"evict-leader-scheduler",
"region-scatter",
"replica-checker",
"rule-checker",

"grant-hot-leader-scheduler",
"shuffle-region-scheduler",
"shuffle-region-scheduler",
"random-merge-scheduler",
}

// String implements fmt.Stringer interface.
func (s scope) String() string {
if s >= scopeLen {
return "unknown"
}
return scopes[s]
}

type filterType int

const (
excluded filterType = iota
storageThreshold
distinctScore
labelConstraint
ruleFit
ruleLeader
engine
specialUse
isolation

storeStateOK
storeStateTombstone
storeStateDown
storeStateOffline
storeStatePauseLeader
storeStateSlow
storeStateDisconnected
storeStateBusy
storeStateExceedRemoveLimit
storeStateExceedAddLimit
storeStateTooManySnapshot
storeStateTooManyPendingPeer
storeStateRejectLeader

filtersLen
)

var filters = [filtersLen]string{
"exclude-filter",
"storage-threshold-filter",
"distinct-filter",
"label-constraint-filter",
"rule-fit-filter",
"rule-fit-leader-filter",
"engine-filter",
"special-use-filter",
"isolation-filter",

"store-state-ok-filter",
"store-state-tombstone-filter",
"store-state-down-filter",
"store-state-offline-filter",
"store-state-pause-leader-filter",
"store-state-slow-filter",
"store-state-disconnect-filter",
"store-state-busy-filter",
"store-state-exceed-remove-limit-filter",
"store-state-exceed-add-limit-filter",
"store-state-too-many-snapshots-filter",
"store-state-too-many-pending-peers-filter",
"store-state-reject-leader-filter",
}

// String implements fmt.Stringer interface.
func (f filterType) String() string {
if f < filtersLen {
return filters[f]
}

return "unknown"
}

// Counter records the filter counter.
type Counter struct {
scope string
// record filter counter for each store.
// [action][type][sourceID][targetID]count
// [source-filter][rule-fit-filter]<1->2><10>
counter [][]map[uint64]map[uint64]int
}

// NewCounter creates a Counter.
func NewCounter(scope string) *Counter {
counter := make([][]map[uint64]map[uint64]int, actionLen)
for i := range counter {
counter[i] = make([]map[uint64]map[uint64]int, filtersLen)
for k := range counter[i] {
counter[i][k] = make(map[uint64]map[uint64]int)
}
}
return &Counter{counter: counter, scope: scope}
}

// Add adds the filter counter.
func (c *Counter) inc(action action, filterType filterType, sourceID uint64, targetID uint64) {
if _, ok := c.counter[action][filterType][sourceID]; !ok {
c.counter[action][filterType][sourceID] = make(map[uint64]int)
}
c.counter[action][filterType][sourceID][targetID]++
}

// Flush flushes the counter to the metrics.
func (c *Counter) Flush() {
for i, actions := range c.counter {
actionName := action(i).String()
for j, counters := range actions {
filterName := filterType(j).String()
for sourceID, count := range counters {
sourceIDStr := strconv.FormatUint(sourceID, 10)
for targetID, value := range count {
targetIDStr := strconv.FormatUint(sourceID, 10)
if value > 0 {
filterCounter.WithLabelValues(actionName, c.scope, filterName, sourceIDStr, targetIDStr).
Add(float64(value))
counters[sourceID][targetID] = 0
}
}
}
}
}
}
50 changes: 50 additions & 0 deletions server/schedule/filter/counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filter

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestString(t *testing.T) {
re := require.New(t)
testcases := []struct {
filterType int
expected string
}{
{int(storeStateTombstone), "store-state-tombstone-filter"},
{int(filtersLen - 1), "store-state-reject-leader-filter"},
{int(filtersLen), "unknown"},
}

for _, data := range testcases {
re.Equal(data.expected, filterType(data.filterType).String())
}
re.Equal(int(filtersLen), len(filters))
}

func TestCounter(t *testing.T) {
re := require.New(t)
counter := NewCounter(BalanceLeader.String())
counter.inc(source, storeStateTombstone, 1, 2)
counter.inc(target, storeStateTombstone, 1, 2)
re.Equal(counter.counter[source][storeStateTombstone][1][2], 1)
re.Equal(counter.counter[target][storeStateTombstone][1][2], 1)
counter.Flush()
re.Equal(counter.counter[source][storeStateTombstone][1][2], 0)
re.Equal(counter.counter[target][storeStateTombstone][1][2], 0)
}
Loading

0 comments on commit 8e2bd59

Please sign in to comment.