diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 6ff059d5fae..480e9cc8b33 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -45,6 +45,8 @@ const ( defaultRegionSize = 96 * units.MiB // 96MiB ) +var _ statistics.StoreStatInformer = &Cluster{} + // Cluster is used to mock a cluster for test purpose. type Cluster struct { *core.BasicCluster @@ -114,6 +116,12 @@ func (mc *Cluster) GetStoresLoads() map[uint64][]float64 { return mc.HotStat.GetStoresLoads() } +// GetStoresHistoryLoads gets stores load statistics. +func (mc *Cluster) GetStoresHistoryLoads() map[uint64][][]float64 { + mc.HotStat.FilterUnhealthyStore(mc) + return mc.HotStat.GetStoresHistoryLoads() +} + // GetStore gets a store with a given store ID. func (mc *Cluster) GetStore(storeID uint64) *core.StoreInfo { return mc.Stores.GetStore(storeID) diff --git a/pkg/movingaverage/avg_over_time_test.go b/pkg/movingaverage/avg_over_time_test.go index a0787f5af81..37e13bebfad 100644 --- a/pkg/movingaverage/avg_over_time_test.go +++ b/pkg/movingaverage/avg_over_time_test.go @@ -84,6 +84,7 @@ func TestMinFilled(t *testing.T) { for aotSize := 2; aotSize < 10; aotSize++ { for mfSize := 2; mfSize < 10; mfSize++ { tm := NewTimeMedian(aotSize, mfSize, interval) + re.Equal([]float64{}, tm.GetAll()) for i := 0; i < aotSize; i++ { re.Equal(0.0, tm.Get()) tm.Add(rate*interval.Seconds(), interval) diff --git a/pkg/movingaverage/exponential_moving_average.go b/pkg/movingaverage/exponential_moving_average.go index 6c1904a9730..ee2f22344c1 100644 --- a/pkg/movingaverage/exponential_moving_average.go +++ b/pkg/movingaverage/exponential_moving_average.go @@ -31,6 +31,8 @@ type EMA struct { instantaneous float64 } +var _ MovingAvg = &EMA{} + // NewEMA returns an EMA. func NewEMA(decays ...float64) *EMA { decay := defaultDecay @@ -70,6 +72,11 @@ func (e *EMA) Get() float64 { return e.value } +// GetAll returns all the data set. +func (e *EMA) GetAll() []float64 { + return []float64{e.Get()} +} + // Reset cleans the data set. func (e *EMA) Reset() { e.count = 0 diff --git a/pkg/movingaverage/hull_moving_average.go b/pkg/movingaverage/hull_moving_average.go index 14b550d43f5..69d22a5bd65 100644 --- a/pkg/movingaverage/hull_moving_average.go +++ b/pkg/movingaverage/hull_moving_average.go @@ -26,6 +26,8 @@ type HMA struct { wma []*WMA } +var _ MovingAvg = &HMA{} + // NewHMA returns a WMA. func NewHMA(sizes ...float64) *HMA { size := defaultHMASize @@ -54,6 +56,11 @@ func (h *HMA) Get() float64 { return h.wma[2].Get() } +// GetAll returns all the data points. +func (h *HMA) GetAll() []float64 { + return h.wma[2].GetAll() +} + // Reset cleans the data set. func (h *HMA) Reset() { h.wma[0] = NewWMA(int(h.size / 2)) diff --git a/pkg/movingaverage/max_filter.go b/pkg/movingaverage/max_filter.go index 70bd45f98de..21678147c57 100644 --- a/pkg/movingaverage/max_filter.go +++ b/pkg/movingaverage/max_filter.go @@ -16,6 +16,8 @@ package movingaverage import "github.com/elliotchance/pie/v2" +var _ MovingAvg = &MaxFilter{} + // MaxFilter works as a maximum filter with specified window size. // There are at most `size` data points for calculating. type MaxFilter struct { @@ -50,6 +52,14 @@ func (r *MaxFilter) Get() float64 { return pie.Max(records) } +// GetAll returns all the data points. +func (r *MaxFilter) GetAll() []float64 { + if r.count < r.size { + return r.records[:r.count] + } + return r.records +} + // Reset cleans the data set. func (r *MaxFilter) Reset() { r.count = 0 diff --git a/pkg/movingaverage/max_filter_test.go b/pkg/movingaverage/max_filter_test.go index bba770cecc2..3e9f88e73e7 100644 --- a/pkg/movingaverage/max_filter_test.go +++ b/pkg/movingaverage/max_filter_test.go @@ -29,6 +29,7 @@ func TestMaxFilter(t *testing.T) { mf := NewMaxFilter(5) re.Equal(empty, mf.Get()) + re.Equal([]float64{}, mf.GetAll()) checkReset(re, mf, empty) checkAdd(re, mf, data, expected) diff --git a/pkg/movingaverage/median_filter.go b/pkg/movingaverage/median_filter.go index fa499da0dda..9b510c498d8 100644 --- a/pkg/movingaverage/median_filter.go +++ b/pkg/movingaverage/median_filter.go @@ -54,6 +54,14 @@ func (r *MedianFilter) Get() float64 { return r.result } +// GetAll return all the data set. +func (r *MedianFilter) GetAll() []float64 { + if r.count < r.size { + return r.records[:r.count] + } + return r.records +} + // Reset cleans the data set. func (r *MedianFilter) Reset() { r.count = 0 diff --git a/pkg/movingaverage/moving_average.go b/pkg/movingaverage/moving_average.go index 3434f7bf0a5..8e936f7dc25 100644 --- a/pkg/movingaverage/moving_average.go +++ b/pkg/movingaverage/moving_average.go @@ -21,6 +21,8 @@ type MovingAvg interface { Add(data float64) // Get returns the moving average. Get() float64 + // GetAll returns all the data points. + GetAll() []float64 // GetInstantaneous returns the value just added. GetInstantaneous() float64 // Reset cleans the data set. diff --git a/pkg/movingaverage/moving_average_test.go b/pkg/movingaverage/moving_average_test.go index 49c20637c20..84c235db484 100644 --- a/pkg/movingaverage/moving_average_test.go +++ b/pkg/movingaverage/moving_average_test.go @@ -80,6 +80,7 @@ func TestMedianFilter(t *testing.T) { mf := NewMedianFilter(5) re.Equal(empty, mf.Get()) + re.Equal([]float64{}, mf.GetAll()) checkReset(re, mf, empty) checkAdd(re, mf, data, expected) diff --git a/pkg/movingaverage/time_median.go b/pkg/movingaverage/time_median.go index 88ad562a3cf..ccccd608f09 100644 --- a/pkg/movingaverage/time_median.go +++ b/pkg/movingaverage/time_median.go @@ -38,6 +38,11 @@ func (t *TimeMedian) Get() float64 { return t.mf.Get() } +// GetAll returns all the data points in the median filter. +func (t *TimeMedian) GetAll() []float64 { + return t.mf.GetAll() +} + // Add adds recent change to TimeMedian. func (t *TimeMedian) Add(delta float64, interval time.Duration) { t.aot.Add(delta, interval) diff --git a/pkg/movingaverage/weight_moving_average.go b/pkg/movingaverage/weight_moving_average.go index f8a72b493ee..d00859b7c9e 100644 --- a/pkg/movingaverage/weight_moving_average.go +++ b/pkg/movingaverage/weight_moving_average.go @@ -16,6 +16,8 @@ package movingaverage const defaultWMASize = 10 +var _ MovingAvg = &WMA{} + // WMA works as a weight with specified window size. // There are at most `size` data points for calculating. // References:https://en.wikipedia.org/wiki/Moving_average#Weighted_moving_average @@ -64,6 +66,14 @@ func (w *WMA) Get() float64 { return w.score / float64((w.size+1)*w.size/2) } +// GetAll returns all the data points. +func (w *WMA) GetAll() []float64 { + if w.count < w.size { + return w.records[:w.count] + } + return w.records +} + // Reset cleans the data set. func (w *WMA) Reset() { w.count = 0 diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index ee50080f1a5..f8a267d0ca8 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -112,12 +112,14 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched h.summaryPendingInfluence(cluster) h.storesLoads = cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow() + storesHistoryLoads := cluster.GetStoresHistoryLoads() prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) { ty := buildResourceType(rw, resource) h.stLoadInfos[ty] = statistics.SummaryStoresLoad( h.stInfos, h.storesLoads, + storesHistoryLoads, regionStats, isTraceRegionFlow, rw, resource) @@ -266,7 +268,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) if h.conf.IsForbidRWType(typ) { return nil } - switch typ { case statistics.Read: return h.balanceHotReadRegions(cluster) @@ -463,6 +464,7 @@ type balanceSolver struct { betterThan func(*solution) bool rankToDimString func() string checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool + checkHistoryLoadsByPriority func(loads [][]float64, f func(int) bool) bool } func (bs *balanceSolver) init() { @@ -527,10 +529,13 @@ func (bs *balanceSolver) pickCheckPolicyV1() { switch { case bs.resourceTy == writeLeader: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly case bs.sche.conf.IsStrictPickingStoreEnabled(): bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf + bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceAllOf default: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly } } @@ -599,7 +604,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { if !bs.isValid() { return nil } - bs.cur = &solution{} tryUpdateBestSolution := func() { if label, ok := bs.filterUniformStore(); ok { @@ -781,12 +785,17 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai continue } - if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { - ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() - } else { - hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc() + if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("src-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue + } + + if !bs.checkSrcHistoryLoadByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue } + ret[id] = detail + hotSchedulerResultCounter.WithLabelValues("src-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() } return ret } @@ -797,6 +806,17 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta }) } +func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { + if len(current.HistoryLoads) == 0 { + return true + } + return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { + return slice.AllOf(current.HistoryLoads[i], func(j int) bool { + return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] + }) + }) +} + // filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status. // The returned hotPeer count in controlled by `max-peer-number`. func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) { @@ -995,12 +1015,17 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st } if filter.Target(bs.GetOpts(), store, filters) { id := store.GetID() - if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { - ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc() - } else { - hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc() + if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("dst-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue + } + + if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue } + hotSchedulerResultCounter.WithLabelValues("dst-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + ret[id] = detail } } return ret @@ -1012,6 +1037,14 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist }) } +func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool { + return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { + return slice.AllOf(current.HistoryLoads[i], func(j int) bool { + return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j] + }) + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool { return slice.AllOf(loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -1021,6 +1054,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f fun }) } +func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceAllOf(loads [][]float64, f func(int) bool) bool { + return slice.AllOf(loads, func(i int) bool { + if bs.isSelectedDim(i) { + return f(i) + } + return true + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool { return slice.AnyOf(loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -1030,10 +1072,23 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f fun }) } +func (bs *balanceSolver) checkHistoryByPriorityAndToleranceAnyOf(loads [][]float64, f func(int) bool) bool { + return slice.AnyOf(loads, func(i int) bool { + if bs.isSelectedDim(i) { + return f(i) + } + return false + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool { return f(bs.firstPriority) } +func (bs *balanceSolver) checkHistoryByPriorityAndToleranceFirstOnly(_ [][]float64, f func(int) bool) bool { + return f(bs.firstPriority) +} + func (bs *balanceSolver) enableExpectation() bool { return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0 } @@ -1542,6 +1597,21 @@ func (ty opType) String() string { type resourceType int +func (rt resourceType) String() string { + switch rt { + case writePeer: + return "write-peer" + case writeLeader: + return "write-leader" + case readLeader: + return "read-leader" + case readPeer: + return "read-peer" + default: + return "unknown" + } +} + const ( writePeer resourceType = iota writeLeader diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 71b7805f1b9..7a11b5206ca 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -520,11 +520,13 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { tikvKeysSum += float64(storesBytes[i]/100) / 10 tikvQuerySum += float64(storesBytes[i]/100) / 10 } + for i := uint64(1); i <= storeCount; i++ { if i != downStoreID { tc.UpdateStorageWrittenBytes(i, storesBytes[i]) } } + { // Check the load expect aliveTiKVCount := float64(aliveTiKVLastID - aliveTiKVStartID + 1) allowLeaderTiKVCount := aliveTiKVCount - 1 // store 5 with evict leader @@ -2451,10 +2453,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // all dims are higher than expect, allow schedule - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2463,10 +2467,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // all dims are higher than expect, but lower than expect*toleranceRatio, not allow schedule - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, toleranceRatio: 2.2, @@ -2476,10 +2482,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // only queryDim is lower, but the dim is no selected, allow schedule - Loads: []float64{2.0, 2.0, 1.0}, + Loads: []float64{2.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2488,10 +2496,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // only keyDim is lower, and the dim is selected, not allow schedule - Loads: []float64{2.0, 1.0, 2.0}, + Loads: []float64{2.0, 1.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2500,10 +2510,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // keyDim is higher, and the dim is selected, allow schedule - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2512,10 +2524,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // although byteDim is higher, the dim is not first, not allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2524,10 +2538,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, not allow schedule - Loads: []float64{1.0, 1.0, 2.0}, + Loads: []float64{1.0, 1.0, 2.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2536,10 +2552,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // all dims are lower than expect, not allow schedule - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, isSrc: true, allow: false, @@ -2549,10 +2567,12 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // only keyDim is higher, but write leader only consider the first priority - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2562,10 +2582,27 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // although byteDim is higher, the dim is not first, not allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {2.0, 2.0}}, + }, + expect: &statistics.StoreLoad{ + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, + }, + isSrc: true, + allow: false, + }, + { + initFunc: (*balanceSolver).pickCheckPolicyV1, + strict: true, + rs: writeLeader, + load: &statistics.StoreLoad{ // history loads is not higher than the expected. + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 2.0}, {1.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 2.0}, {1.0, 2.0}, {1.0, 2.0}}, }, isSrc: true, allow: false, @@ -2575,10 +2612,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // keyDim is higher, and the dim is selected, allow schedule - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2587,10 +2626,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // byteDim is higher, and the dim is selected, allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2599,10 +2640,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, not allow schedule - Loads: []float64{1.0, 1.0, 2.0}, + Loads: []float64{1.0, 1.0, 2.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2611,10 +2654,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // all dims are lower than expect, not allow schedule - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, isSrc: true, allow: false, @@ -2624,10 +2669,12 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // only keyDim is higher, but write leader only consider the first priority - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2637,10 +2684,12 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // although byteDim is higher, the dim is not first, not allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2652,8 +2701,16 @@ func TestExpect(t *testing.T) { for i, v := range src.Loads { dst[i] = 3.0 - v } + historyLoads := make([][]float64, len(src.HistoryLoads)) + for i, dim := range src.HistoryLoads { + historyLoads[i] = make([]float64, len(dim)) + for j, load := range dim { + historyLoads[i][j] = 3.0 - load + } + } return &statistics.StoreLoad{ - Loads: dst, + Loads: dst, + HistoryLoads: historyLoads, } } @@ -2672,6 +2729,8 @@ func TestExpect(t *testing.T) { testCase.initFunc(bs) re.Equal(testCase.allow, bs.checkSrcByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio)) re.Equal(testCase.allow, bs.checkDstByPriorityAndTolerance(srcToDst(testCase.load), srcToDst(testCase.expect), toleranceRatio)) + re.Equal(testCase.allow, bs.checkSrcHistoryLoadByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio)) + re.Equal(testCase.allow, bs.checkDstHistoryLoadsByPriorityAndTolerance(srcToDst(testCase.load), srcToDst(testCase.expect), toleranceRatio)) } } diff --git a/pkg/schedule/schedulers/hot_region_v2.go b/pkg/schedule/schedulers/hot_region_v2.go index 48ede517c8b..57ef4c97c62 100644 --- a/pkg/schedule/schedulers/hot_region_v2.go +++ b/pkg/schedule/schedulers/hot_region_v2.go @@ -86,8 +86,10 @@ func (bs *balanceSolver) pickCheckPolicyV2() { switch { case bs.resourceTy == writeLeader: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly default: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAnyOf + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceAnyOf } } diff --git a/pkg/statistics/collector.go b/pkg/statistics/collector.go index f4785103c89..7010761effa 100644 --- a/pkg/statistics/collector.go +++ b/pkg/statistics/collector.go @@ -27,8 +27,12 @@ type storeCollector interface { Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool // GetLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind. GetLoads(storeLoads, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads []float64) + GetHistoryLoads(storeLoads [][]float64, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads [][]float64) } +var _ storeCollector = tikvCollector{} +var _ storeCollector = tiflashCollector{} + type tikvCollector struct{} func newTikvCollector() storeCollector { @@ -78,6 +82,32 @@ func (c tikvCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy RWType, return } +func (c tikvCollector) GetHistoryLoads(storeLoads [][]float64, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads [][]float64) { + loads = make([][]float64, DimLen) + switch rwTy { + case Read: + loads[ByteDim] = storeLoads[StoreReadBytes] + loads[KeyDim] = storeLoads[StoreReadKeys] + loads[QueryDim] = storeLoads[StoreReadQuery] + case Write: + switch kind { + case constant.LeaderKind: + // Use sum of hot peers to estimate leader-only byte rate. + // For Write requests, Write{Bytes, Keys} is applied to all Peers at the same time, + // while the Leader and Follower are under different loads (usually the Leader consumes more CPU). + // Write{Query} does not require such processing. + loads[ByteDim] = []float64{peerLoadSum[ByteDim]} + loads[KeyDim] = []float64{peerLoadSum[KeyDim]} + loads[QueryDim] = storeLoads[StoreWriteQuery] + case constant.RegionKind: + loads[ByteDim] = storeLoads[StoreWriteBytes] + loads[KeyDim] = storeLoads[StoreWriteKeys] + // The `Write-peer` does not have `QueryDim` + } + } + return +} + type tiflashCollector struct { isTraceRegionFlow bool } @@ -124,3 +154,28 @@ func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy RWTyp } return } + +func (c tiflashCollector) GetHistoryLoads(storeLoads [][]float64, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads [][]float64) { + loads = make([][]float64, DimLen) + switch rwTy { + case Read: + // TODO: Need TiFlash StoreHeartbeat support + case Write: + switch kind { + case constant.LeaderKind: + // There is no Leader on TiFlash + case constant.RegionKind: + // TiFlash is currently unable to report statistics in the same unit as Region, + // so it uses the sum of Regions. If it is not accurate enough, use sum of hot peer. + if c.isTraceRegionFlow { + loads[ByteDim] = storeLoads[StoreRegionsWriteBytes] + loads[KeyDim] = storeLoads[StoreRegionsWriteKeys] + } else { + loads[ByteDim] = []float64{peerLoadSum[ByteDim]} + loads[KeyDim] = []float64{peerLoadSum[KeyDim]} + } + // The `Write-peer` does not have `QueryDim` + } + } + return +} diff --git a/pkg/statistics/store.go b/pkg/statistics/store.go index d8288a9b9cf..278aad8793d 100644 --- a/pkg/statistics/store.go +++ b/pkg/statistics/store.go @@ -34,6 +34,8 @@ const ( RegionsStatsRollingWindowsSize = 9 ) +var _ StoreStatInformer = &StoresStats{} + // StoresStats is a cache hold hot regions. type StoresStats struct { syncutil.RWMutex @@ -114,6 +116,19 @@ func (s *StoresStats) GetStoresLoads() map[uint64][]float64 { return res } +// GetStoresHistoryLoads returns all stores loads. +func (s *StoresStats) GetStoresHistoryLoads() map[uint64][][]float64 { + s.RLock() + defer s.RUnlock() + res := make(map[uint64][][]float64, len(s.rollingStoresStats)) + for storeID, stats := range s.rollingStoresStats { + for i := StoreStatKind(0); i < StoreStatCount; i++ { + res[storeID] = append(res[storeID], stats.GetLoads(i)) + } + } + return res +} + // FilterUnhealthyStore filter unhealthy store func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) { s.Lock() @@ -253,6 +268,19 @@ func (r *RollingStoreStats) GetLoad(k StoreStatKind) float64 { return 0 } +// GetLoads returns store's loads. +func (r *RollingStoreStats) GetLoads(k StoreStatKind) []float64 { + r.RLock() + defer r.RUnlock() + switch k { + case StoreReadBytes, StoreReadKeys, StoreReadQuery, StoreWriteBytes, StoreWriteKeys, StoreWriteQuery: + return r.timeMedians[k].GetAll() + case StoreCPUUsage, StoreDiskReadRate, StoreDiskWriteRate, StoreRegionsWriteBytes, StoreRegionsWriteKeys: + return r.movingAvgs[k].GetAll() + } + return []float64{0} +} + // GetInstantLoad returns store's instant load. func (r *RollingStoreStats) GetInstantLoad(k StoreStatKind) float64 { r.RLock() diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index ecbee239a69..52a25d13789 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -75,12 +75,14 @@ func GetHotStatus(stores []*core.StoreInfo, storesLoads map[uint64][]float64, re stLoadInfosAsLeader := SummaryStoresLoad( stInfos, storesLoads, + nil, regionStats, isTraceRegionFlow, typ, constant.LeaderKind) stLoadInfosAsPeer := SummaryStoresLoad( stInfos, storesLoads, + nil, regionStats, isTraceRegionFlow, typ, constant.RegionKind) @@ -105,6 +107,7 @@ func GetHotStatus(stores []*core.StoreInfo, storesLoads map[uint64][]float64, re func SummaryStoresLoad( storeInfos map[uint64]*StoreSummaryInfo, storesLoads map[uint64][]float64, + storesHistoryLoads map[uint64][][]float64, storeHotPeers map[uint64][]*HotPeerStat, isTraceRegionFlow bool, rwTy RWType, @@ -116,6 +119,7 @@ func SummaryStoresLoad( tikvLoadDetail := summaryStoresLoadByEngine( storeInfos, storesLoads, + storesHistoryLoads, storeHotPeers, rwTy, kind, newTikvCollector(), @@ -123,6 +127,7 @@ func SummaryStoresLoad( tiflashLoadDetail := summaryStoresLoadByEngine( storeInfos, storesLoads, + storesHistoryLoads, storeHotPeers, rwTy, kind, newTiFlashCollector(isTraceRegionFlow), @@ -137,6 +142,7 @@ func SummaryStoresLoad( func summaryStoresLoadByEngine( storeInfos map[uint64]*StoreSummaryInfo, storesLoads map[uint64][]float64, + storesHistoryLoads map[uint64][][]float64, storeHotPeers map[uint64][]*HotPeerStat, rwTy RWType, kind constant.ResourceKind, @@ -144,6 +150,7 @@ func summaryStoresLoadByEngine( ) []*StoreLoadDetail { loadDetail := make([]*StoreLoadDetail, 0, len(storeInfos)) allStoreLoadSum := make([]float64, DimLen) + allStoreHistoryLoadSum := make([][]float64, DimLen) allStoreCount := 0 allHotPeersCount := 0 @@ -177,6 +184,19 @@ func summaryStoresLoadByEngine( hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[QueryDim]) } + historyLoads := make([][]float64, DimLen) + if storesHistoryLoads, ok := storesHistoryLoads[id]; ok { + historyLoads = collector.GetHistoryLoads(storesHistoryLoads, peerLoadSum, rwTy, kind) + for i, loads := range historyLoads { + if allStoreHistoryLoadSum[i] == nil || len(allStoreHistoryLoadSum[i]) < len(loads) { + allStoreHistoryLoadSum[i] = make([]float64, len(loads)) + } + for j, load := range loads { + allStoreHistoryLoadSum[i][j] += load + } + } + } + loads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind) for i := range allStoreLoadSum { allStoreLoadSum[i] += loads[i] @@ -186,8 +206,9 @@ func summaryStoresLoadByEngine( // Build store load prediction from current load and pending influence. stLoadPred := (&StoreLoad{ - Loads: loads, - Count: float64(len(hotPeers)), + Loads: loads, + Count: float64(len(hotPeers)), + HistoryLoads: historyLoads, }).ToLoadPred(rwTy, info.PendingSum) // Construct store load info. @@ -208,6 +229,13 @@ func summaryStoresLoadByEngine( expectLoads[i] = allStoreLoadSum[i] / float64(allStoreCount) } + expectHistoryLoads := make([][]float64, DimLen) + for i := range allStoreHistoryLoadSum { + expectHistoryLoads[i] = make([]float64, len(allStoreHistoryLoadSum[i])) + for j := range allStoreHistoryLoadSum[i] { + expectHistoryLoads[i][j] = allStoreHistoryLoadSum[i][j] / float64(allStoreCount) + } + } stddevLoads := make([]float64, len(allStoreLoadSum)) if allHotPeersCount != 0 { for _, detail := range loadDetail { @@ -239,8 +267,9 @@ func summaryStoresLoadByEngine( hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[QueryDim]) } expect := StoreLoad{ - Loads: expectLoads, - Count: expectCount, + Loads: expectLoads, + Count: expectCount, + HistoryLoads: expectHistoryLoads, } stddev := StoreLoad{ Loads: stddevLoads, diff --git a/pkg/statistics/store_load.go b/pkg/statistics/store_load.go index 13c4aa57635..356d234dacd 100644 --- a/pkg/statistics/store_load.go +++ b/pkg/statistics/store_load.go @@ -144,8 +144,9 @@ func (s *StoreSummaryInfo) SetEngineAsTiFlash() { // StoreLoad records the current load. type StoreLoad struct { - Loads []float64 - Count float64 + Loads []float64 + Count float64 + HistoryLoads [][]float64 } // ToLoadPred returns the current load and future predictive load. diff --git a/pkg/statistics/store_stat_informer.go b/pkg/statistics/store_stat_informer.go index 0c2fc22f7b6..35c00b008be 100644 --- a/pkg/statistics/store_stat_informer.go +++ b/pkg/statistics/store_stat_informer.go @@ -17,4 +17,5 @@ package statistics // StoreStatInformer provides access to a shared informer of statistics. type StoreStatInformer interface { GetStoresLoads() map[uint64][]float64 + GetStoresHistoryLoads() map[uint64][][]float64 } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c4b9db33ea5..b3a47b450d1 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -109,6 +109,8 @@ type Server interface { ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error } +var _ statistics.StoreStatInformer = &RaftCluster{} + // RaftCluster is used for cluster config management. // Raft cluster key format: // cluster 1 -> /1/raft, value is metapb.Cluster @@ -2220,6 +2222,11 @@ func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 { return c.hotStat.GetStoresLoads() } +// GetStoresHistoryLoads returns load stats of all stores. +func (c *RaftCluster) GetStoresHistoryLoads() map[uint64][][]float64 { + return c.hotStat.GetStoresHistoryLoads() +} + // IsRegionHot checks if a region is in hot state. func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool { return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold()) diff --git a/server/handler.go b/server/handler.go index 35cec65527f..404d06666de 100644 --- a/server/handler.go +++ b/server/handler.go @@ -78,6 +78,8 @@ var ( schedulerConfigPrefix = "pd/api/v1/scheduler-config" ) +var _ statistics.StoreStatInformer = &Handler{} + // Handler is a helper to export methods to handle API/RPC requests. type Handler struct { s *Server @@ -214,6 +216,15 @@ func (h *Handler) GetStoresLoads() map[uint64][]float64 { return rc.GetStoresLoads() } +// GetStoresHistoryLoads gets all hot write stores stats. +func (h *Handler) GetStoresHistoryLoads() map[uint64][][]float64 { + rc := h.s.GetRaftCluster() + if rc == nil { + return nil + } + return rc.GetStoresHistoryLoads() +} + // AddScheduler adds a scheduler. func (h *Handler) AddScheduler(name string, args ...string) error { c, err := h.GetRaftCluster()