Skip to content

Commit

Permalink
Merge branch 'release-4.0' into release-4.0-66c26169b813
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 26, 2021
2 parents dcd6804 + d00a83a commit 879155e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
24 changes: 24 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -6052,6 +6052,30 @@
"metric": "pd_scheduler_event_count",
"refId": "A",
"step": 4
},
{
"expr": "sum(delta(pd_schedule_filter{action=\"filter-target\",type=\"distinct-filter\"}[1m])) by (source, target, type, scope)",
"format": "time_series",
"hide": true,
"intervalFactor": 2,
"legendFormat": "{{scope}}-{{type}}-{{source}}-{{target}}",
"refId": "B"
},
{
"expr": "sum(delta(pd_schedule_filter{action=\"filter-target\",type=\"rule-fit-filter\"}[1m])) by (source, target, type, scope)",
"format": "time_series",
"hide": true,
"intervalFactor": 2,
"legendFormat": "{{scope}}-{{type}}-{{source}}-{{target}}",
"refId": "C"
},
{
"expr": "sum(delta(pd_schedule_filter{action=\"filter-target\",type=\"rule-fit-leader-filter\"}[1m])) by (source, target, type, scope)",
"format": "time_series",
"hide": true,
"intervalFactor": 2,
"legendFormat": "{{scope}}-{{type}}-{{source}}-{{target}}",
"refId": "D"
}
],
"thresholds": [],
Expand Down
66 changes: 55 additions & 11 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ func SelectSourceStores(stores []*core.StoreInfo, filters []Filter, opt opt.Opti
return filterStoresBy(stores, func(s *core.StoreInfo) bool {
return slice.AllOf(filters, func(i int) bool {
if !filters[i].Source(opt, s) {
filterCounter.WithLabelValues("filter-source", s.GetAddress(), fmt.Sprintf("%d", s.GetID()), filters[i].Scope(), filters[i].Type()).Inc()
sourceID := fmt.Sprintf("%d", s.GetID())
targetID := ""
filterCounter.WithLabelValues("filter-source", s.GetAddress(),
sourceID, filters[i].Scope(), filters[i].Type(), sourceID, targetID).Inc()
return false
}
return true
Expand All @@ -46,8 +49,16 @@ func SelectSourceStores(stores []*core.StoreInfo, filters []Filter, opt opt.Opti
func SelectTargetStores(stores []*core.StoreInfo, filters []Filter, opt opt.Options) []*core.StoreInfo {
return filterStoresBy(stores, func(s *core.StoreInfo) bool {
return slice.AllOf(filters, func(i int) bool {
if !filters[i].Target(opt, s) {
filterCounter.WithLabelValues("filter-target", s.GetAddress(), fmt.Sprintf("%d", s.GetID()), filters[i].Scope(), filters[i].Type()).Inc()
filter := filters[i]
if !filter.Target(opt, s) {
cfilter, ok := filter.(comparingFilter)
targetID := fmt.Sprintf("%d", s.GetID())
sourceID := ""
if ok {
sourceID = fmt.Sprintf("%d", cfilter.GetSourceStoreID())
}
filterCounter.WithLabelValues("filter-target", s.GetAddress(),
targetID, filters[i].Scope(), filters[i].Type(), sourceID, targetID).Inc()
return false
}
return true
Expand Down Expand Up @@ -75,13 +86,23 @@ type Filter interface {
Target(opt opt.Options, store *core.StoreInfo) bool
}

// comparingFilter is an interface to filter target store by comparing source and target stores
type comparingFilter interface {
Filter
// GetSourceStoreID returns the source store when comparing.
GetSourceStoreID() uint64
}

// Source checks if store can pass all Filters as source store.
func Source(opt opt.Options, store *core.StoreInfo, filters []Filter) bool {
storeAddress := store.GetAddress()
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if !filter.Source(opt, store) {
filterCounter.WithLabelValues("filter-source", storeAddress, storeID, filter.Scope(), filter.Type()).Inc()
sourceID := storeID
targetID := ""
filterCounter.WithLabelValues("filter-source", storeAddress,
sourceID, filter.Scope(), filter.Type(), sourceID, targetID).Inc()
return false
}
}
Expand All @@ -94,7 +115,14 @@ func Target(opt opt.Options, store *core.StoreInfo, filters []Filter) bool {
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if !filter.Target(opt, store) {
filterCounter.WithLabelValues("filter-target", storeAddress, storeID, filter.Scope(), filter.Type()).Inc()
cfilter, ok := filter.(comparingFilter)
targetID := storeID
sourceID := ""
if ok {
sourceID = fmt.Sprintf("%d", cfilter.GetSourceStoreID())
}
filterCounter.WithLabelValues("filter-target", storeAddress,
targetID, filter.Scope(), filter.Type(), sourceID, targetID).Inc()
return false
}
}
Expand Down Expand Up @@ -328,6 +356,7 @@ type distinctScoreFilter struct {
labels []string
stores []*core.StoreInfo
safeScore float64
srcStore uint64
}

// NewDistinctScoreFilter creates a filter that filters all stores that have
Expand All @@ -346,6 +375,7 @@ func NewDistinctScoreFilter(scope string, labels []string, stores []*core.StoreI
labels: labels,
stores: newStores,
safeScore: core.DistinctScore(labels, newStores, source),
srcStore: source.GetID(),
}
}

Expand All @@ -365,6 +395,11 @@ func (f *distinctScoreFilter) Target(opt opt.Options, store *core.StoreInfo) boo
return core.DistinctScore(f.labels, f.stores, store) >= f.safeScore
}

// GetSourceStoreID implements the ComparingFilter
func (f *distinctScoreFilter) GetSourceStoreID() uint64 {
return f.srcStore
}

// StoreStateFilter is used to determine whether a store can be selected as the
// source or target of the schedule based on the store's state.
type StoreStateFilter struct {
Expand Down Expand Up @@ -595,7 +630,7 @@ type ruleFitFilter struct {
fitter RegionFitter
region *core.RegionInfo
oldFit *placement.RegionFit
oldStore uint64
srcStore uint64
}

// NewRuleFitFilter creates a filter that ensures after replace a peer with new
Expand All @@ -607,7 +642,7 @@ func NewRuleFitFilter(scope string, fitter RegionFitter, region *core.RegionInfo
fitter: fitter,
region: region,
oldFit: fitter.FitRegion(region),
oldStore: oldStoreID,
srcStore: oldStoreID,
}
}

Expand All @@ -626,28 +661,33 @@ func (f *ruleFitFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
func (f *ruleFitFilter) Target(opt opt.Options, store *core.StoreInfo) bool {
region := createRegionForRuleFit(f.region.GetStartKey(), f.region.GetEndKey(),
f.region.GetPeers(), f.region.GetLeader(),
core.WithReplacePeerStore(f.oldStore, store.GetID()))
core.WithReplacePeerStore(f.srcStore, store.GetID()))
newFit := f.fitter.FitRegion(region)
return placement.CompareRegionFit(f.oldFit, newFit) <= 0
}

// GetSourceStoreID implements the ComparingFilter
func (f *ruleFitFilter) GetSourceStoreID() uint64 {
return f.srcStore
}

type ruleLeaderFitFilter struct {
scope string
fitter RegionFitter
region *core.RegionInfo
oldFit *placement.RegionFit
oldLeaderStoreID uint64
srcLeaderStoreID uint64
}

// newRuleLeaderFitFilter creates a filter that ensures after transfer leader with new store,
// the isolation level will not decrease.
func newRuleLeaderFitFilter(scope string, fitter RegionFitter, region *core.RegionInfo, oldLeaderStoreID uint64) Filter {
func newRuleLeaderFitFilter(scope string, fitter RegionFitter, region *core.RegionInfo, srcLeaderStoreID uint64) Filter {
return &ruleLeaderFitFilter{
scope: scope,
fitter: fitter,
region: region,
oldFit: fitter.FitRegion(region),
oldLeaderStoreID: oldLeaderStoreID,
srcLeaderStoreID: srcLeaderStoreID,
}
}

Expand Down Expand Up @@ -676,6 +716,10 @@ func (f *ruleLeaderFitFilter) Target(opt opt.Options, store *core.StoreInfo) boo
return placement.CompareRegionFit(f.oldFit, newFit) <= 0
}

func (f *ruleLeaderFitFilter) GetSourceStoreID() uint64 {
return f.srcLeaderStoreID
}

// NewPlacementLeaderSafeguard creates a filter that ensures after transfer a leader with
// existed peer, the placement restriction will not become worse.
// Note that it only worked when PlacementRules enabled otherwise it will always permit the sourceStore.
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/filter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
Subsystem: "schedule",
Name: "filter",
Help: "Counter of the filter",
}, []string{"action", "address", "store", "scope", "type"})
}, []string{"action", "address", "store", "scope", "type", "source", "target"})
)

func init() {
Expand Down

0 comments on commit 879155e

Please sign in to comment.