From 95b92898d4bddf71db8ba3e517d8742d23fdeff8 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Thu, 30 Mar 2023 18:14:51 +0800 Subject: [PATCH 01/13] only transfer leader Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 49e302cfd46..7d7e5de847f 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -595,7 +595,7 @@ func (bs *balanceSolver) filterUniformStoreV1() (string, bool) { // solve travels all the src stores, hot peers, dst stores and select each one of them to make a best scheduling solution. // The comparing between solutions is based on calcProgressiveRank. func (bs *balanceSolver) solve() []*operator.Operator { - if !bs.isValid() { + if !bs.isValid() || bs.opTy != transferLeader { return nil } From d232dfc14abd5d6e5df11f1c5d302329c063b578 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Mon, 3 Apr 2023 17:32:13 +0800 Subject: [PATCH 02/13] add history load check Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/mock/mockcluster/mockcluster.go | 8 +++ .../exponential_moving_average.go | 6 ++ pkg/movingaverage/hull_moving_average.go | 7 +++ pkg/movingaverage/max_filter.go | 7 +++ pkg/movingaverage/median_filter.go | 4 ++ pkg/movingaverage/moving_average.go | 2 + pkg/movingaverage/time_median.go | 5 ++ pkg/movingaverage/weight_moving_average.go | 7 +++ pkg/schedule/schedulers/hot_region.go | 61 ++++++++++++++++--- pkg/statistics/collector.go | 55 +++++++++++++++++ pkg/statistics/store.go | 28 +++++++++ pkg/statistics/store_hot_peers_infos.go | 35 +++++++++-- pkg/statistics/store_load.go | 5 +- pkg/statistics/store_stat_informer.go | 1 + server/cluster/cluster.go | 7 +++ server/handler.go | 11 ++++ 16 files changed, 235 insertions(+), 14 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 6ff059d5fae..480e9cc8b33 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -45,6 +45,8 @@ const ( defaultRegionSize = 96 * units.MiB // 96MiB ) +var _ statistics.StoreStatInformer = &Cluster{} + // Cluster is used to mock a cluster for test purpose. type Cluster struct { *core.BasicCluster @@ -114,6 +116,12 @@ func (mc *Cluster) GetStoresLoads() map[uint64][]float64 { return mc.HotStat.GetStoresLoads() } +// GetStoresHistoryLoads gets stores load statistics. +func (mc *Cluster) GetStoresHistoryLoads() map[uint64][][]float64 { + mc.HotStat.FilterUnhealthyStore(mc) + return mc.HotStat.GetStoresHistoryLoads() +} + // GetStore gets a store with a given store ID. func (mc *Cluster) GetStore(storeID uint64) *core.StoreInfo { return mc.Stores.GetStore(storeID) diff --git a/pkg/movingaverage/exponential_moving_average.go b/pkg/movingaverage/exponential_moving_average.go index 6c1904a9730..c6773784558 100644 --- a/pkg/movingaverage/exponential_moving_average.go +++ b/pkg/movingaverage/exponential_moving_average.go @@ -31,6 +31,8 @@ type EMA struct { instantaneous float64 } +var _ MovingAvg = &EMA{} + // NewEMA returns an EMA. func NewEMA(decays ...float64) *EMA { decay := defaultDecay @@ -70,6 +72,10 @@ func (e *EMA) Get() float64 { return e.value } +func (e *EMA) GetAll() []float64 { + return []float64{e.Get()} +} + // Reset cleans the data set. func (e *EMA) Reset() { e.count = 0 diff --git a/pkg/movingaverage/hull_moving_average.go b/pkg/movingaverage/hull_moving_average.go index 14b550d43f5..69d22a5bd65 100644 --- a/pkg/movingaverage/hull_moving_average.go +++ b/pkg/movingaverage/hull_moving_average.go @@ -26,6 +26,8 @@ type HMA struct { wma []*WMA } +var _ MovingAvg = &HMA{} + // NewHMA returns a WMA. func NewHMA(sizes ...float64) *HMA { size := defaultHMASize @@ -54,6 +56,11 @@ func (h *HMA) Get() float64 { return h.wma[2].Get() } +// GetAll returns all the data points. +func (h *HMA) GetAll() []float64 { + return h.wma[2].GetAll() +} + // Reset cleans the data set. func (h *HMA) Reset() { h.wma[0] = NewWMA(int(h.size / 2)) diff --git a/pkg/movingaverage/max_filter.go b/pkg/movingaverage/max_filter.go index 70bd45f98de..e2d04961625 100644 --- a/pkg/movingaverage/max_filter.go +++ b/pkg/movingaverage/max_filter.go @@ -16,6 +16,8 @@ package movingaverage import "github.com/elliotchance/pie/v2" +var _ MovingAvg = &MaxFilter{} + // MaxFilter works as a maximum filter with specified window size. // There are at most `size` data points for calculating. type MaxFilter struct { @@ -50,6 +52,11 @@ func (r *MaxFilter) Get() float64 { return pie.Max(records) } +// GetAll returns all the data points. +func (r *MaxFilter) GetAll() []float64 { + return r.records +} + // Reset cleans the data set. func (r *MaxFilter) Reset() { r.count = 0 diff --git a/pkg/movingaverage/median_filter.go b/pkg/movingaverage/median_filter.go index fa499da0dda..da9f0cc3a3a 100644 --- a/pkg/movingaverage/median_filter.go +++ b/pkg/movingaverage/median_filter.go @@ -54,6 +54,10 @@ func (r *MedianFilter) Get() float64 { return r.result } +func (r *MedianFilter) GetAll() []float64 { + return r.records +} + // Reset cleans the data set. func (r *MedianFilter) Reset() { r.count = 0 diff --git a/pkg/movingaverage/moving_average.go b/pkg/movingaverage/moving_average.go index 3434f7bf0a5..8e936f7dc25 100644 --- a/pkg/movingaverage/moving_average.go +++ b/pkg/movingaverage/moving_average.go @@ -21,6 +21,8 @@ type MovingAvg interface { Add(data float64) // Get returns the moving average. Get() float64 + // GetAll returns all the data points. + GetAll() []float64 // GetInstantaneous returns the value just added. GetInstantaneous() float64 // Reset cleans the data set. diff --git a/pkg/movingaverage/time_median.go b/pkg/movingaverage/time_median.go index 88ad562a3cf..ccccd608f09 100644 --- a/pkg/movingaverage/time_median.go +++ b/pkg/movingaverage/time_median.go @@ -38,6 +38,11 @@ func (t *TimeMedian) Get() float64 { return t.mf.Get() } +// GetAll returns all the data points in the median filter. +func (t *TimeMedian) GetAll() []float64 { + return t.mf.GetAll() +} + // Add adds recent change to TimeMedian. func (t *TimeMedian) Add(delta float64, interval time.Duration) { t.aot.Add(delta, interval) diff --git a/pkg/movingaverage/weight_moving_average.go b/pkg/movingaverage/weight_moving_average.go index f8a72b493ee..cec31292daf 100644 --- a/pkg/movingaverage/weight_moving_average.go +++ b/pkg/movingaverage/weight_moving_average.go @@ -16,6 +16,8 @@ package movingaverage const defaultWMASize = 10 +var _ MovingAvg = &WMA{} + // WMA works as a weight with specified window size. // There are at most `size` data points for calculating. // References:https://en.wikipedia.org/wiki/Moving_average#Weighted_moving_average @@ -64,6 +66,11 @@ func (w *WMA) Get() float64 { return w.score / float64((w.size+1)*w.size/2) } +// GetAll returns all the data points. +func (w *WMA) GetAll() []float64 { + return w.records +} + // Reset cleans the data set. func (w *WMA) Reset() { w.count = 0 diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 7d7e5de847f..fa75a78baa1 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -111,12 +111,14 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched h.summaryPendingInfluence(cluster) h.storesLoads = cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow() + storesHistoryLoads := cluster.GetStoresHistoryLoads() prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) { ty := buildResourceType(rw, resource) h.stLoadInfos[ty] = statistics.SummaryStoresLoad( h.stInfos, h.storesLoads, + storesHistoryLoads, regionStats, isTraceRegionFlow, rw, resource) @@ -462,6 +464,7 @@ type balanceSolver struct { betterThan func(*solution) bool rankToDimString func() string checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool + checkHistoryLoadsByPriority func(loads [][]float64, f func(int) bool) bool } func (bs *balanceSolver) init() { @@ -526,10 +529,13 @@ func (bs *balanceSolver) pickCheckPolicyV1() { switch { case bs.resourceTy == writeLeader: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly case bs.sche.conf.IsStrictPickingStoreEnabled(): bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf + bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceAllOf default: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly } } @@ -775,12 +781,17 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai continue } - if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { - ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() - } else { + if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc() + continue } + + if !bs.checkHistoryLoadByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed", strconv.FormatUint(id, 10)).Inc() + continue + } + ret[id] = detail + hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() } return ret } @@ -791,6 +802,14 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta }) } +func (bs *balanceSolver) checkHistoryLoadByPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { + return bs.checkHistoryLoadsByPriority(minLoad.HistoryLoads, func(i int) bool { + return slice.AllOf(minLoad.HistoryLoads[i], func(j int) bool { + return minLoad.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] + }) + }) +} + // filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status. // The returned hotPeer count in controlled by `max-peer-number`. func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) { @@ -989,12 +1008,17 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st } if filter.Target(bs.GetOpts(), store, filters) { id := store.GetID() - if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { - ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc() - } else { + if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc() + continue } + + if !bs.checkDstHistoryLoadsByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed", strconv.FormatUint(id, 10)).Inc() + continue + } + hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc() + ret[id] = detail } } return ret @@ -1006,6 +1030,14 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist }) } +func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool { + return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { + return slice.AllOf(current.HistoryLoads[i], func(j int) bool { + return current.HistoryLoads[i][j]*toleranceRatio < expect.Loads[i] + }) + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool { return slice.AllOf(loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -1015,6 +1047,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f fun }) } +func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceAllOf(loads [][]float64, f func(int) bool) bool { + return slice.AllOf(loads, func(i int) bool { + if bs.isSelectedDim(i) { + return f(i) + } + return true + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool { return slice.AnyOf(loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -1028,6 +1069,10 @@ func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f return f(bs.firstPriority) } +func (bs *balanceSolver) checkHistoryByPriorityAndToleranceFirstOnly(_ [][]float64, f func(int) bool) bool { + return f(bs.firstPriority) +} + func (bs *balanceSolver) enableExpectation() bool { return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0 } diff --git a/pkg/statistics/collector.go b/pkg/statistics/collector.go index f4785103c89..7010761effa 100644 --- a/pkg/statistics/collector.go +++ b/pkg/statistics/collector.go @@ -27,8 +27,12 @@ type storeCollector interface { Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool // GetLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind. GetLoads(storeLoads, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads []float64) + GetHistoryLoads(storeLoads [][]float64, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads [][]float64) } +var _ storeCollector = tikvCollector{} +var _ storeCollector = tiflashCollector{} + type tikvCollector struct{} func newTikvCollector() storeCollector { @@ -78,6 +82,32 @@ func (c tikvCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy RWType, return } +func (c tikvCollector) GetHistoryLoads(storeLoads [][]float64, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads [][]float64) { + loads = make([][]float64, DimLen) + switch rwTy { + case Read: + loads[ByteDim] = storeLoads[StoreReadBytes] + loads[KeyDim] = storeLoads[StoreReadKeys] + loads[QueryDim] = storeLoads[StoreReadQuery] + case Write: + switch kind { + case constant.LeaderKind: + // Use sum of hot peers to estimate leader-only byte rate. + // For Write requests, Write{Bytes, Keys} is applied to all Peers at the same time, + // while the Leader and Follower are under different loads (usually the Leader consumes more CPU). + // Write{Query} does not require such processing. + loads[ByteDim] = []float64{peerLoadSum[ByteDim]} + loads[KeyDim] = []float64{peerLoadSum[KeyDim]} + loads[QueryDim] = storeLoads[StoreWriteQuery] + case constant.RegionKind: + loads[ByteDim] = storeLoads[StoreWriteBytes] + loads[KeyDim] = storeLoads[StoreWriteKeys] + // The `Write-peer` does not have `QueryDim` + } + } + return +} + type tiflashCollector struct { isTraceRegionFlow bool } @@ -124,3 +154,28 @@ func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy RWTyp } return } + +func (c tiflashCollector) GetHistoryLoads(storeLoads [][]float64, peerLoadSum []float64, rwTy RWType, kind constant.ResourceKind) (loads [][]float64) { + loads = make([][]float64, DimLen) + switch rwTy { + case Read: + // TODO: Need TiFlash StoreHeartbeat support + case Write: + switch kind { + case constant.LeaderKind: + // There is no Leader on TiFlash + case constant.RegionKind: + // TiFlash is currently unable to report statistics in the same unit as Region, + // so it uses the sum of Regions. If it is not accurate enough, use sum of hot peer. + if c.isTraceRegionFlow { + loads[ByteDim] = storeLoads[StoreRegionsWriteBytes] + loads[KeyDim] = storeLoads[StoreRegionsWriteKeys] + } else { + loads[ByteDim] = []float64{peerLoadSum[ByteDim]} + loads[KeyDim] = []float64{peerLoadSum[KeyDim]} + } + // The `Write-peer` does not have `QueryDim` + } + } + return +} diff --git a/pkg/statistics/store.go b/pkg/statistics/store.go index d8288a9b9cf..278aad8793d 100644 --- a/pkg/statistics/store.go +++ b/pkg/statistics/store.go @@ -34,6 +34,8 @@ const ( RegionsStatsRollingWindowsSize = 9 ) +var _ StoreStatInformer = &StoresStats{} + // StoresStats is a cache hold hot regions. type StoresStats struct { syncutil.RWMutex @@ -114,6 +116,19 @@ func (s *StoresStats) GetStoresLoads() map[uint64][]float64 { return res } +// GetStoresHistoryLoads returns all stores loads. +func (s *StoresStats) GetStoresHistoryLoads() map[uint64][][]float64 { + s.RLock() + defer s.RUnlock() + res := make(map[uint64][][]float64, len(s.rollingStoresStats)) + for storeID, stats := range s.rollingStoresStats { + for i := StoreStatKind(0); i < StoreStatCount; i++ { + res[storeID] = append(res[storeID], stats.GetLoads(i)) + } + } + return res +} + // FilterUnhealthyStore filter unhealthy store func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) { s.Lock() @@ -253,6 +268,19 @@ func (r *RollingStoreStats) GetLoad(k StoreStatKind) float64 { return 0 } +// GetLoads returns store's loads. +func (r *RollingStoreStats) GetLoads(k StoreStatKind) []float64 { + r.RLock() + defer r.RUnlock() + switch k { + case StoreReadBytes, StoreReadKeys, StoreReadQuery, StoreWriteBytes, StoreWriteKeys, StoreWriteQuery: + return r.timeMedians[k].GetAll() + case StoreCPUUsage, StoreDiskReadRate, StoreDiskWriteRate, StoreRegionsWriteBytes, StoreRegionsWriteKeys: + return r.movingAvgs[k].GetAll() + } + return []float64{0} +} + // GetInstantLoad returns store's instant load. func (r *RollingStoreStats) GetInstantLoad(k StoreStatKind) float64 { r.RLock() diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index ecbee239a69..63944d20637 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -75,12 +75,14 @@ func GetHotStatus(stores []*core.StoreInfo, storesLoads map[uint64][]float64, re stLoadInfosAsLeader := SummaryStoresLoad( stInfos, storesLoads, + nil, regionStats, isTraceRegionFlow, typ, constant.LeaderKind) stLoadInfosAsPeer := SummaryStoresLoad( stInfos, storesLoads, + nil, regionStats, isTraceRegionFlow, typ, constant.RegionKind) @@ -105,6 +107,7 @@ func GetHotStatus(stores []*core.StoreInfo, storesLoads map[uint64][]float64, re func SummaryStoresLoad( storeInfos map[uint64]*StoreSummaryInfo, storesLoads map[uint64][]float64, + storesHistoryLoads map[uint64][][]float64, storeHotPeers map[uint64][]*HotPeerStat, isTraceRegionFlow bool, rwTy RWType, @@ -116,6 +119,7 @@ func SummaryStoresLoad( tikvLoadDetail := summaryStoresLoadByEngine( storeInfos, storesLoads, + storesHistoryLoads, storeHotPeers, rwTy, kind, newTikvCollector(), @@ -123,6 +127,7 @@ func SummaryStoresLoad( tiflashLoadDetail := summaryStoresLoadByEngine( storeInfos, storesLoads, + storesHistoryLoads, storeHotPeers, rwTy, kind, newTiFlashCollector(isTraceRegionFlow), @@ -137,6 +142,7 @@ func SummaryStoresLoad( func summaryStoresLoadByEngine( storeInfos map[uint64]*StoreSummaryInfo, storesLoads map[uint64][]float64, + storesHistoryLoads map[uint64][][]float64, storeHotPeers map[uint64][]*HotPeerStat, rwTy RWType, kind constant.ResourceKind, @@ -144,6 +150,7 @@ func summaryStoresLoadByEngine( ) []*StoreLoadDetail { loadDetail := make([]*StoreLoadDetail, 0, len(storeInfos)) allStoreLoadSum := make([]float64, DimLen) + allStoreHistoryLoadSum := make([][]float64, DimLen) allStoreCount := 0 allHotPeersCount := 0 @@ -151,6 +158,7 @@ func summaryStoresLoadByEngine( store := info.StoreInfo id := store.GetID() storeLoads, ok := storesLoads[id] + //storesHistoryLoads, ok1 := storesHistoryLoads[id] if !ok || !collector.Filter(info, kind) { continue } @@ -177,6 +185,16 @@ func summaryStoresLoadByEngine( hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[QueryDim]) } + historyLoads := make([][]float64, DimLen) + if storesHistoryLoads, ok := storesHistoryLoads[id]; ok { + historyLoads = collector.GetHistoryLoads(storesHistoryLoads, peerLoadSum, rwTy, kind) + for i := range allStoreHistoryLoadSum { + for j := range allStoreHistoryLoadSum[i] { + allStoreHistoryLoadSum[i][j] += historyLoads[i][j] + } + } + } + loads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind) for i := range allStoreLoadSum { allStoreLoadSum[i] += loads[i] @@ -186,8 +204,9 @@ func summaryStoresLoadByEngine( // Build store load prediction from current load and pending influence. stLoadPred := (&StoreLoad{ - Loads: loads, - Count: float64(len(hotPeers)), + Loads: loads, + Count: float64(len(hotPeers)), + HistoryLoads: historyLoads, }).ToLoadPred(rwTy, info.PendingSum) // Construct store load info. @@ -208,6 +227,13 @@ func summaryStoresLoadByEngine( expectLoads[i] = allStoreLoadSum[i] / float64(allStoreCount) } + expectHistoryLoads := make([][]float64, DimLen) + for i := range allStoreHistoryLoadSum { + for j := range allStoreHistoryLoadSum[i] { + expectHistoryLoads[i][j] = allStoreLoadSum[i] / float64(allStoreCount) + } + } + stddevLoads := make([]float64, len(allStoreLoadSum)) if allHotPeersCount != 0 { for _, detail := range loadDetail { @@ -239,8 +265,9 @@ func summaryStoresLoadByEngine( hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[QueryDim]) } expect := StoreLoad{ - Loads: expectLoads, - Count: expectCount, + Loads: expectLoads, + Count: expectCount, + HistoryLoads: expectHistoryLoads, } stddev := StoreLoad{ Loads: stddevLoads, diff --git a/pkg/statistics/store_load.go b/pkg/statistics/store_load.go index 13c4aa57635..356d234dacd 100644 --- a/pkg/statistics/store_load.go +++ b/pkg/statistics/store_load.go @@ -144,8 +144,9 @@ func (s *StoreSummaryInfo) SetEngineAsTiFlash() { // StoreLoad records the current load. type StoreLoad struct { - Loads []float64 - Count float64 + Loads []float64 + Count float64 + HistoryLoads [][]float64 } // ToLoadPred returns the current load and future predictive load. diff --git a/pkg/statistics/store_stat_informer.go b/pkg/statistics/store_stat_informer.go index 0c2fc22f7b6..35c00b008be 100644 --- a/pkg/statistics/store_stat_informer.go +++ b/pkg/statistics/store_stat_informer.go @@ -17,4 +17,5 @@ package statistics // StoreStatInformer provides access to a shared informer of statistics. type StoreStatInformer interface { GetStoresLoads() map[uint64][]float64 + GetStoresHistoryLoads() map[uint64][][]float64 } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c4b9db33ea5..b3a47b450d1 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -109,6 +109,8 @@ type Server interface { ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error } +var _ statistics.StoreStatInformer = &RaftCluster{} + // RaftCluster is used for cluster config management. // Raft cluster key format: // cluster 1 -> /1/raft, value is metapb.Cluster @@ -2220,6 +2222,11 @@ func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 { return c.hotStat.GetStoresLoads() } +// GetStoresHistoryLoads returns load stats of all stores. +func (c *RaftCluster) GetStoresHistoryLoads() map[uint64][][]float64 { + return c.hotStat.GetStoresHistoryLoads() +} + // IsRegionHot checks if a region is in hot state. func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool { return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold()) diff --git a/server/handler.go b/server/handler.go index 35cec65527f..404d06666de 100644 --- a/server/handler.go +++ b/server/handler.go @@ -78,6 +78,8 @@ var ( schedulerConfigPrefix = "pd/api/v1/scheduler-config" ) +var _ statistics.StoreStatInformer = &Handler{} + // Handler is a helper to export methods to handle API/RPC requests. type Handler struct { s *Server @@ -214,6 +216,15 @@ func (h *Handler) GetStoresLoads() map[uint64][]float64 { return rc.GetStoresLoads() } +// GetStoresHistoryLoads gets all hot write stores stats. +func (h *Handler) GetStoresHistoryLoads() map[uint64][][]float64 { + rc := h.s.GetRaftCluster() + if rc == nil { + return nil + } + return rc.GetStoresHistoryLoads() +} + // AddScheduler adds a scheduler. func (h *Handler) AddScheduler(name string, args ...string) error { c, err := h.GetRaftCluster() From 8a2a04e394d9fa16d4f68625d56008c5788e78d8 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Mon, 3 Apr 2023 18:08:44 +0800 Subject: [PATCH 03/13] add log Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 7 +++++-- pkg/statistics/store_hot_peers_infos.go | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index fa75a78baa1..8e48a8bfb3c 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -786,7 +786,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai continue } - if !bs.checkHistoryLoadByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) { + if !bs.checkSrcHistoryLoadByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) { hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed", strconv.FormatUint(id, 10)).Inc() continue } @@ -802,7 +802,10 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta }) } -func (bs *balanceSolver) checkHistoryLoadByPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { +func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { + if minLoad == nil || len(minLoad.HistoryLoads) == 0 { + return true + } return bs.checkHistoryLoadsByPriority(minLoad.HistoryLoads, func(i int) bool { return slice.AllOf(minLoad.HistoryLoads[i], func(j int) bool { return minLoad.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index 63944d20637..68edcbe4b5a 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -16,10 +16,13 @@ package statistics import ( "fmt" + "math" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" + "go.uber.org/zap" ) // StoreHotPeersInfos is used to get human-readable description for hot regions. @@ -186,7 +189,9 @@ func summaryStoresLoadByEngine( } historyLoads := make([][]float64, DimLen) + if storesHistoryLoads, ok := storesHistoryLoads[id]; ok { + log.Info("history loads", zap.Uint64("id", id), zap.Any("history loads", storesHistoryLoads)) historyLoads = collector.GetHistoryLoads(storesHistoryLoads, peerLoadSum, rwTy, kind) for i := range allStoreHistoryLoadSum { for j := range allStoreHistoryLoadSum[i] { From d20ea9c8e0ac17e124c0ce33341fcbd1d36fd3d4 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Mon, 3 Apr 2023 18:27:48 +0800 Subject: [PATCH 04/13] panic Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 9 +++++++++ pkg/schedule/schedulers/hot_region_v2.go | 2 ++ pkg/statistics/store_hot_peers_infos.go | 4 ++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 8e48a8bfb3c..9a20aede7bf 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -1068,6 +1068,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f fun }) } +func (bs *balanceSolver) checkHistoryByPriorityAndToleranceAnyOf(loads [][]float64, f func(int) bool) bool { + return slice.AnyOf(loads, func(i int) bool { + if bs.isSelectedDim(i) { + return f(i) + } + return false + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool { return f(bs.firstPriority) } diff --git a/pkg/schedule/schedulers/hot_region_v2.go b/pkg/schedule/schedulers/hot_region_v2.go index 48ede517c8b..57ef4c97c62 100644 --- a/pkg/schedule/schedulers/hot_region_v2.go +++ b/pkg/schedule/schedulers/hot_region_v2.go @@ -86,8 +86,10 @@ func (bs *balanceSolver) pickCheckPolicyV2() { switch { case bs.resourceTy == writeLeader: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly default: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAnyOf + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceAnyOf } } diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index 68edcbe4b5a..baf0d65fc76 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -16,7 +16,6 @@ package statistics import ( "fmt" - "math" "github.com/pingcap/log" @@ -191,7 +190,6 @@ func summaryStoresLoadByEngine( historyLoads := make([][]float64, DimLen) if storesHistoryLoads, ok := storesHistoryLoads[id]; ok { - log.Info("history loads", zap.Uint64("id", id), zap.Any("history loads", storesHistoryLoads)) historyLoads = collector.GetHistoryLoads(storesHistoryLoads, peerLoadSum, rwTy, kind) for i := range allStoreHistoryLoadSum { for j := range allStoreHistoryLoadSum[i] { @@ -239,6 +237,8 @@ func summaryStoresLoadByEngine( } } + log.Info("history loads", zap.Any("expect loads", expectHistoryLoads)) + stddevLoads := make([]float64, len(allStoreLoadSum)) if allHotPeersCount != 0 { for _, detail := range loadDetail { From aac24bd023d11635fb7467981f0329e6d4357677 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Mon, 3 Apr 2023 13:09:17 +0200 Subject: [PATCH 05/13] panic Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 14 ++++++-------- pkg/statistics/store_hot_peers_infos.go | 9 ++++++--- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 9a20aede7bf..3fbd25ce0aa 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -802,13 +802,11 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta }) } -func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { - if minLoad == nil || len(minLoad.HistoryLoads) == 0 { - return true - } - return bs.checkHistoryLoadsByPriority(minLoad.HistoryLoads, func(i int) bool { - return slice.AllOf(minLoad.HistoryLoads[i], func(j int) bool { - return minLoad.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] +func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { + log.Info("check src history load", zap.Any("current", current), zap.Any("expectLoad", expectLoad), zap.Any("bs", bs)) + return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { + return slice.AllOf(current.HistoryLoads[i], func(j int) bool { + return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] }) }) } @@ -1036,7 +1034,7 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool { return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { return slice.AllOf(current.HistoryLoads[i], func(j int) bool { - return current.HistoryLoads[i][j]*toleranceRatio < expect.Loads[i] + return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j] }) }) } diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index baf0d65fc76..cf470e9fbcd 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -188,11 +188,13 @@ func summaryStoresLoadByEngine( } historyLoads := make([][]float64, DimLen) - if storesHistoryLoads, ok := storesHistoryLoads[id]; ok { historyLoads = collector.GetHistoryLoads(storesHistoryLoads, peerLoadSum, rwTy, kind) - for i := range allStoreHistoryLoadSum { - for j := range allStoreHistoryLoadSum[i] { + for i := range historyLoads { + if allStoreHistoryLoadSum[i] == nil { + allStoreHistoryLoadSum[i] = make([]float64, len(historyLoads[i])) + } + for j := range historyLoads[i] { allStoreHistoryLoadSum[i][j] += historyLoads[i][j] } } @@ -232,6 +234,7 @@ func summaryStoresLoadByEngine( expectHistoryLoads := make([][]float64, DimLen) for i := range allStoreHistoryLoadSum { + expectHistoryLoads[i] = make([]float64, len(allStoreHistoryLoadSum[i])) for j := range allStoreHistoryLoadSum[i] { expectHistoryLoads[i][j] = allStoreLoadSum[i] / float64(allStoreCount) } From 887ad8d934483f64d07147e2ee57c239ba758252 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 4 Apr 2023 08:32:58 +0200 Subject: [PATCH 06/13] add log Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 32 ++++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 3fbd25ce0aa..59435282f04 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -782,16 +782,16 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai } if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("src-store-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } if !bs.checkSrcHistoryLoadByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed", strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("src-store-succ"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() } return ret } @@ -803,7 +803,7 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta } func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { - log.Info("check src history load", zap.Any("current", current), zap.Any("expectLoad", expectLoad), zap.Any("bs", bs)) + log.Info("check src history load", zap.Any("current", current), zap.Any("expect-load", expectLoad), zap.Any("bs", bs)) return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { return slice.AllOf(current.HistoryLoads[i], func(j int) bool { return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] @@ -1010,15 +1010,15 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st if filter.Target(bs.GetOpts(), store, filters) { id := store.GetID() if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("dst-store-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } - if !bs.checkDstHistoryLoadsByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed", strconv.FormatUint(id, 10)).Inc() + if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } - hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("dst-store-succ"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() ret[id] = detail } } @@ -1032,6 +1032,7 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist } func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool { + log.Info("check dst history load", zap.Any("current", current), zap.Any("expectLoad", expect), zap.Any("bs", bs)) return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { return slice.AllOf(current.HistoryLoads[i], func(j int) bool { return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j] @@ -1591,6 +1592,21 @@ func (ty opType) String() string { type resourceType int +func (rt resourceType) String() string { + switch rt { + case writePeer: + return "write_peer" + case writeLeader: + return "write_leader" + case readLeader: + return "read-leader" + case readPeer: + return "read-peer" + default: + return "unknown" + } +} + const ( writePeer resourceType = iota writeLeader From b1548c3a3c1bc8ef959074278b62ccd469a7fbe6 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 4 Apr 2023 09:56:47 +0200 Subject: [PATCH 07/13] add metrics Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 59435282f04..bbb00d1f133 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -782,16 +782,16 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai } if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("src-store-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("src-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } if !bs.checkSrcHistoryLoadByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("src-store-succ"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("src-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() } return ret } @@ -1010,15 +1010,15 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st if filter.Target(bs.GetOpts(), store, filters) { id := store.GetID() if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("dst-store-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("dst-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) { - hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() continue } - hotSchedulerResultCounter.WithLabelValues("dst-store-succ"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + hotSchedulerResultCounter.WithLabelValues("dst-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() ret[id] = detail } } @@ -1595,9 +1595,9 @@ type resourceType int func (rt resourceType) String() string { switch rt { case writePeer: - return "write_peer" + return "write-peer" case writeLeader: - return "write_leader" + return "write-leader" case readLeader: return "read-leader" case readPeer: From 56c87907ed27d9eb5778a8170d66b474b9b7c36f Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 4 Apr 2023 10:05:30 +0200 Subject: [PATCH 08/13] hot region Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/statistics/store_hot_peers_infos.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index cf470e9fbcd..470adff9cd5 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -236,7 +236,7 @@ func summaryStoresLoadByEngine( for i := range allStoreHistoryLoadSum { expectHistoryLoads[i] = make([]float64, len(allStoreHistoryLoadSum[i])) for j := range allStoreHistoryLoadSum[i] { - expectHistoryLoads[i][j] = allStoreLoadSum[i] / float64(allStoreCount) + expectHistoryLoads[i][j] = allStoreHistoryLoadSum[i][j] / float64(allStoreCount) } } From 2602b84aef51793769d540d8f486921e2009c750 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 4 Apr 2023 10:35:29 +0200 Subject: [PATCH 09/13] enable write peer Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index bbb00d1f133..1e260cbe61f 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -601,7 +601,7 @@ func (bs *balanceSolver) filterUniformStoreV1() (string, bool) { // solve travels all the src stores, hot peers, dst stores and select each one of them to make a best scheduling solution. // The comparing between solutions is based on calcProgressiveRank. func (bs *balanceSolver) solve() []*operator.Operator { - if !bs.isValid() || bs.opTy != transferLeader { + if !bs.isValid() { return nil } From d06550159dd592fe970c9e1249bb8466ed5ea5a6 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Thu, 6 Apr 2023 16:08:58 +0800 Subject: [PATCH 10/13] remove unused log Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/schedulers/hot_region.go | 1 - pkg/statistics/store_hot_peers_infos.go | 5 ----- 2 files changed, 6 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 1e260cbe61f..927f03f6fab 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -1032,7 +1032,6 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist } func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool { - log.Info("check dst history load", zap.Any("current", current), zap.Any("expectLoad", expect), zap.Any("bs", bs)) return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { return slice.AllOf(current.HistoryLoads[i], func(j int) bool { return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j] diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index 470adff9cd5..57b9e9334ad 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -18,10 +18,8 @@ import ( "fmt" "math" - "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" - "go.uber.org/zap" ) // StoreHotPeersInfos is used to get human-readable description for hot regions. @@ -239,9 +237,6 @@ func summaryStoresLoadByEngine( expectHistoryLoads[i][j] = allStoreHistoryLoadSum[i][j] / float64(allStoreCount) } } - - log.Info("history loads", zap.Any("expect loads", expectHistoryLoads)) - stddevLoads := make([]float64, len(allStoreLoadSum)) if allHotPeersCount != 0 { for _, detail := range loadDetail { From 34f48f26ec135f3b8adad17875d87efa5574576e Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Fri, 7 Apr 2023 15:51:14 +0800 Subject: [PATCH 11/13] get add wrong Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/movingaverage/max_filter.go | 3 +++ pkg/movingaverage/median_filter.go | 3 +++ pkg/movingaverage/weight_moving_average.go | 3 +++ pkg/schedule/schedulers/hot_region.go | 3 --- pkg/schedule/schedulers/hot_region_test.go | 2 ++ pkg/statistics/store_hot_peers_infos.go | 1 - 6 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/movingaverage/max_filter.go b/pkg/movingaverage/max_filter.go index e2d04961625..21678147c57 100644 --- a/pkg/movingaverage/max_filter.go +++ b/pkg/movingaverage/max_filter.go @@ -54,6 +54,9 @@ func (r *MaxFilter) Get() float64 { // GetAll returns all the data points. func (r *MaxFilter) GetAll() []float64 { + if r.count < r.size { + return r.records[:r.count] + } return r.records } diff --git a/pkg/movingaverage/median_filter.go b/pkg/movingaverage/median_filter.go index da9f0cc3a3a..77e41f45e62 100644 --- a/pkg/movingaverage/median_filter.go +++ b/pkg/movingaverage/median_filter.go @@ -55,6 +55,9 @@ func (r *MedianFilter) Get() float64 { } func (r *MedianFilter) GetAll() []float64 { + if r.count < r.size { + return r.records[:r.count] + } return r.records } diff --git a/pkg/movingaverage/weight_moving_average.go b/pkg/movingaverage/weight_moving_average.go index cec31292daf..d00859b7c9e 100644 --- a/pkg/movingaverage/weight_moving_average.go +++ b/pkg/movingaverage/weight_moving_average.go @@ -68,6 +68,9 @@ func (w *WMA) Get() float64 { // GetAll returns all the data points. func (w *WMA) GetAll() []float64 { + if w.count < w.size { + return w.records[:w.count] + } return w.records } diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 927f03f6fab..3e4d7427b3d 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -267,7 +267,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) if h.conf.IsForbidRWType(typ) { return nil } - switch typ { case statistics.Read: return h.balanceHotReadRegions(cluster) @@ -604,7 +603,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { if !bs.isValid() { return nil } - bs.cur = &solution{} tryUpdateBestSolution := func() { if label, ok := bs.filterUniformStore(); ok { @@ -803,7 +801,6 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta } func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { - log.Info("check src history load", zap.Any("current", current), zap.Any("expect-load", expectLoad), zap.Any("bs", bs)) return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { return slice.AllOf(current.HistoryLoads[i], func(j int) bool { return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 11d86af508b..f75544c8700 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -520,11 +520,13 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { tikvKeysSum += float64(storesBytes[i]/100) / 10 tikvQuerySum += float64(storesBytes[i]/100) / 10 } + for i := uint64(1); i <= storeCount; i++ { if i != downStoreID { tc.UpdateStorageWrittenBytes(i, storesBytes[i]) } } + { // Check the load expect aliveTiKVCount := float64(aliveTiKVLastID - aliveTiKVStartID + 1) allowLeaderTiKVCount := aliveTiKVCount - 1 // store 5 with evict leader diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index 57b9e9334ad..91c1c0223e4 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -158,7 +158,6 @@ func summaryStoresLoadByEngine( store := info.StoreInfo id := store.GetID() storeLoads, ok := storesLoads[id] - //storesHistoryLoads, ok1 := storesHistoryLoads[id] if !ok || !collector.Filter(info, kind) { continue } From add9492164245a491873ef4974f5d88143e28d6f Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Fri, 7 Apr 2023 15:51:14 +0800 Subject: [PATCH 12/13] add unit test for get all Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/movingaverage/avg_over_time_test.go | 1 + pkg/movingaverage/max_filter.go | 3 +++ pkg/movingaverage/max_filter_test.go | 1 + pkg/movingaverage/median_filter.go | 3 +++ pkg/movingaverage/moving_average_test.go | 1 + pkg/movingaverage/weight_moving_average.go | 3 +++ pkg/schedule/schedulers/hot_region.go | 3 --- pkg/schedule/schedulers/hot_region_test.go | 2 ++ pkg/statistics/store_hot_peers_infos.go | 1 - 9 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/movingaverage/avg_over_time_test.go b/pkg/movingaverage/avg_over_time_test.go index a0787f5af81..37e13bebfad 100644 --- a/pkg/movingaverage/avg_over_time_test.go +++ b/pkg/movingaverage/avg_over_time_test.go @@ -84,6 +84,7 @@ func TestMinFilled(t *testing.T) { for aotSize := 2; aotSize < 10; aotSize++ { for mfSize := 2; mfSize < 10; mfSize++ { tm := NewTimeMedian(aotSize, mfSize, interval) + re.Equal([]float64{}, tm.GetAll()) for i := 0; i < aotSize; i++ { re.Equal(0.0, tm.Get()) tm.Add(rate*interval.Seconds(), interval) diff --git a/pkg/movingaverage/max_filter.go b/pkg/movingaverage/max_filter.go index e2d04961625..21678147c57 100644 --- a/pkg/movingaverage/max_filter.go +++ b/pkg/movingaverage/max_filter.go @@ -54,6 +54,9 @@ func (r *MaxFilter) Get() float64 { // GetAll returns all the data points. func (r *MaxFilter) GetAll() []float64 { + if r.count < r.size { + return r.records[:r.count] + } return r.records } diff --git a/pkg/movingaverage/max_filter_test.go b/pkg/movingaverage/max_filter_test.go index bba770cecc2..3e9f88e73e7 100644 --- a/pkg/movingaverage/max_filter_test.go +++ b/pkg/movingaverage/max_filter_test.go @@ -29,6 +29,7 @@ func TestMaxFilter(t *testing.T) { mf := NewMaxFilter(5) re.Equal(empty, mf.Get()) + re.Equal([]float64{}, mf.GetAll()) checkReset(re, mf, empty) checkAdd(re, mf, data, expected) diff --git a/pkg/movingaverage/median_filter.go b/pkg/movingaverage/median_filter.go index da9f0cc3a3a..77e41f45e62 100644 --- a/pkg/movingaverage/median_filter.go +++ b/pkg/movingaverage/median_filter.go @@ -55,6 +55,9 @@ func (r *MedianFilter) Get() float64 { } func (r *MedianFilter) GetAll() []float64 { + if r.count < r.size { + return r.records[:r.count] + } return r.records } diff --git a/pkg/movingaverage/moving_average_test.go b/pkg/movingaverage/moving_average_test.go index 49c20637c20..84c235db484 100644 --- a/pkg/movingaverage/moving_average_test.go +++ b/pkg/movingaverage/moving_average_test.go @@ -80,6 +80,7 @@ func TestMedianFilter(t *testing.T) { mf := NewMedianFilter(5) re.Equal(empty, mf.Get()) + re.Equal([]float64{}, mf.GetAll()) checkReset(re, mf, empty) checkAdd(re, mf, data, expected) diff --git a/pkg/movingaverage/weight_moving_average.go b/pkg/movingaverage/weight_moving_average.go index cec31292daf..d00859b7c9e 100644 --- a/pkg/movingaverage/weight_moving_average.go +++ b/pkg/movingaverage/weight_moving_average.go @@ -68,6 +68,9 @@ func (w *WMA) Get() float64 { // GetAll returns all the data points. func (w *WMA) GetAll() []float64 { + if w.count < w.size { + return w.records[:w.count] + } return w.records } diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 927f03f6fab..3e4d7427b3d 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -267,7 +267,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) if h.conf.IsForbidRWType(typ) { return nil } - switch typ { case statistics.Read: return h.balanceHotReadRegions(cluster) @@ -604,7 +603,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { if !bs.isValid() { return nil } - bs.cur = &solution{} tryUpdateBestSolution := func() { if label, ok := bs.filterUniformStore(); ok { @@ -803,7 +801,6 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta } func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { - log.Info("check src history load", zap.Any("current", current), zap.Any("expect-load", expectLoad), zap.Any("bs", bs)) return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { return slice.AllOf(current.HistoryLoads[i], func(j int) bool { return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 11d86af508b..f75544c8700 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -520,11 +520,13 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { tikvKeysSum += float64(storesBytes[i]/100) / 10 tikvQuerySum += float64(storesBytes[i]/100) / 10 } + for i := uint64(1); i <= storeCount; i++ { if i != downStoreID { tc.UpdateStorageWrittenBytes(i, storesBytes[i]) } } + { // Check the load expect aliveTiKVCount := float64(aliveTiKVLastID - aliveTiKVStartID + 1) allowLeaderTiKVCount := aliveTiKVCount - 1 // store 5 with evict leader diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index 57b9e9334ad..91c1c0223e4 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -158,7 +158,6 @@ func summaryStoresLoadByEngine( store := info.StoreInfo id := store.GetID() storeLoads, ok := storesLoads[id] - //storesHistoryLoads, ok1 := storesHistoryLoads[id] if !ok || !collector.Filter(info, kind) { continue } From 4551be3915564affb65641427549d060aeab35f5 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 11 Apr 2023 15:27:32 +0800 Subject: [PATCH 13/13] panic Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/statistics/store_hot_peers_infos.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index 91c1c0223e4..52a25d13789 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -187,12 +187,12 @@ func summaryStoresLoadByEngine( historyLoads := make([][]float64, DimLen) if storesHistoryLoads, ok := storesHistoryLoads[id]; ok { historyLoads = collector.GetHistoryLoads(storesHistoryLoads, peerLoadSum, rwTy, kind) - for i := range historyLoads { - if allStoreHistoryLoadSum[i] == nil { - allStoreHistoryLoadSum[i] = make([]float64, len(historyLoads[i])) + for i, loads := range historyLoads { + if allStoreHistoryLoadSum[i] == nil || len(allStoreHistoryLoadSum[i]) < len(loads) { + allStoreHistoryLoadSum[i] = make([]float64, len(loads)) } - for j := range historyLoads[i] { - allStoreHistoryLoadSum[i][j] += historyLoads[i][j] + for j, load := range loads { + allStoreHistoryLoadSum[i][j] += load } } }