Skip to content

Commit

Permalink
statistics: reduce unnecessary cpu time
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 26, 2022
1 parent c5b06e2 commit a45cc0b
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 67 deletions.
5 changes: 0 additions & 5 deletions server/core/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo
}
}

// GetStoreID provides located storeID
func (p *PeerInfo) GetStoreID() uint64 {
return p.GetStoreId()
}

// GetLoads provides loads
func (p *PeerInfo) GetLoads() []float64 {
return p.loads
Expand Down
23 changes: 9 additions & 14 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/tikv/pd/server/core"
)

var (
readTaskMetrics = hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String())
writeTaskMetrics = hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String())
)

// HotCache is a cache hold hot regions.
type HotCache struct {
ctx context.Context
Expand Down Expand Up @@ -105,8 +110,8 @@ func (w *HotCache) GetHotPeerStat(kind RWType, regionID, storeID uint64) *HotPee

// CollectMetrics collects the hot cache metrics.
func (w *HotCache) CollectMetrics() {
writeMetricsTask := newCollectMetricsTask("write")
readMetricsTask := newCollectMetricsTask("read")
writeMetricsTask := newCollectMetricsTask(Write.String())
readMetricsTask := newCollectMetricsTask(Read.String())
w.CheckWriteAsync(writeMetricsTask)
w.CheckReadAsync(readMetricsTask)
}
Expand All @@ -116,16 +121,6 @@ func (w *HotCache) ResetMetrics() {
hotCacheStatusGauge.Reset()
}

func incMetrics(name string, storeID uint64, kind RWType) {
store := storeTag(storeID)
switch kind {
case Write:
hotCacheStatusGauge.WithLabelValues(name, store, "write").Inc()
case Read:
hotCacheStatusGauge.WithLabelValues(name, store, "read").Inc()
}
}

func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task FlowItemTask)) {
for {
select {
Expand All @@ -141,15 +136,15 @@ func (w *HotCache) runReadTask(task FlowItemTask) {
if task != nil {
// TODO: do we need a run-task timeout to protect the queue won't be stuck by a task?
task.runTask(w.readCache)
hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readCache.taskQueue)))
readTaskMetrics.Set(float64(len(w.readCache.taskQueue)))
}
}

func (w *HotCache) runWriteTask(task FlowItemTask) {
if task != nil {
// TODO: do we need a run-task timeout to protect the queue won't be stuck by a task?
task.runTask(w.writeCache)
hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeCache.taskQueue)))
writeTaskMetrics.Set(float64(len(w.writeCache.taskQueue)))
}
}

Expand Down
27 changes: 15 additions & 12 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand Down Expand Up @@ -126,18 +127,20 @@ func (stat *HotPeerStat) Less(dim int, than TopNItem) bool {
}

// Log is used to output some info
func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Field)) {
level(str,
zap.Uint64("region-id", stat.RegionID),
zap.Bool("is-leader", stat.isLeader),
zap.Float64s("loads", stat.GetLoads()),
zap.Float64s("loads-instant", stat.Loads),
zap.Int("hot-degree", stat.HotDegree),
zap.Int("hot-anti-count", stat.AntiCount),
zap.Duration("sum-interval", stat.getIntervalSum()),
zap.Bool("allow-inherited", stat.allowInherited),
zap.String("action-type", stat.actionType.String()),
zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime))
func (stat *HotPeerStat) Log(str string) {
if log.GetLevel() <= zap.DebugLevel {
log.Debug(str,
zap.Uint64("region-id", stat.RegionID),
zap.Bool("is-leader", stat.isLeader),
zap.Float64s("loads", stat.GetLoads()),
zap.Float64s("loads-instant", stat.Loads),
zap.Int("hot-degree", stat.HotDegree),
zap.Int("hot-anti-count", stat.AntiCount),
zap.Duration("sum-interval", stat.getIntervalSum()),
zap.Bool("allow-inherited", stat.allowInherited),
zap.String("action-type", stat.actionType.String()),
zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime))
}
}

// IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule
Expand Down
88 changes: 52 additions & 36 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/core"
)
Expand Down Expand Up @@ -77,7 +77,8 @@ type hotPeerCache struct {
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
topNTTL time.Duration
taskQueue chan FlowItemTask
thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds
thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds
metrics map[uint64][ActionTypeLen]prometheus.Gauge // storeID -> metrics
// TODO: consider to remove store info when store is offline.
}

Expand All @@ -91,6 +92,7 @@ func NewHotPeerCache(kind RWType) *hotPeerCache {
taskQueue: make(chan FlowItemTask, queueCap),
thresholdsOfStore: make(map[uint64]*thresholds),
topNTTL: time.Duration(3*kind.ReportInterval()) * time.Second,
metrics: make(map[uint64][ActionTypeLen]prometheus.Gauge),
}
}

Expand All @@ -116,17 +118,27 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) {
switch item.actionType {
case Remove:
f.removeItem(item)
item.Log("region heartbeat remove from cache", log.Debug)
incMetrics("remove_item", item.StoreID, f.kind)
item.Log("region heartbeat remove from cache")
case Add, Update:
f.putItem(item)
item.Log("region heartbeat update")
default:
return
case Add:
incMetrics("add_item", item.StoreID, f.kind)
case Update:
incMetrics("update_item", item.StoreID, f.kind)
}
// for add and update
f.putItem(item)
item.Log("region heartbeat update", log.Debug)
}
f.incMetrics(item.actionType, item.StoreID)
}

func (f *hotPeerCache) incMetrics(action ActionType, storeID uint64) {
if _, ok := f.metrics[storeID]; !ok {
store := storeTag(storeID)
kind := f.kind.String()
f.metrics[storeID] = [ActionTypeLen]prometheus.Gauge{
Add: hotCacheStatusGauge.WithLabelValues("add_item", store, kind),
Remove: hotCacheStatusGauge.WithLabelValues("remove_item", store, kind),
Update: hotCacheStatusGauge.WithLabelValues("update_item", store, kind),
}
}
f.metrics[storeID][action].Inc()
}

func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) {
Expand Down Expand Up @@ -157,12 +169,14 @@ func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) {
func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerStat {
regionID := region.GetID()
items := make([]*HotPeerStat, 0)
for _, storeID := range f.getAllStoreIDs(region) {
if region.GetStorePeer(storeID) == nil {
item := f.getOldHotPeerStat(regionID, storeID)
if item != nil {
item.actionType = Remove
items = append(items, item)
if ids, ok := f.storesOfRegion[regionID]; ok {
for storeID := range ids {
if region.GetStorePeer(storeID) == nil {
item := f.getOldHotPeerStat(regionID, storeID)
if item != nil {
item.actionType = Remove
items = append(items, item)
}
}
}
}
Expand All @@ -177,7 +191,7 @@ func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInf
if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose
return nil
}
storeID := peer.GetStoreID()
storeID := peer.GetStoreId()
deltaLoads := peer.GetLoads()
f.collectPeerMetrics(deltaLoads, interval) // update metrics
regionID := region.GetID()
Expand Down Expand Up @@ -207,15 +221,16 @@ func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInf
}
}

peers := region.GetPeers()
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: regionID,
Loads: f.kind.GetLoadRatesFromPeer(peer),
isLeader: region.GetLeader().GetStoreId() == storeID,
actionType: Update,
stores: make([]uint64, len(region.GetPeers())),
stores: make([]uint64, len(peers)),
}
for _, peer := range region.GetPeers() {
for _, peer := range peers {
newItem.stores = append(newItem.stores, peer.GetStoreId())
}

Expand Down Expand Up @@ -332,10 +347,11 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 {
}

// new stores
for _, peer := range region.GetPeers() {
if _, ok := storeIDs[peer.GetStoreId()]; !ok {
storeIDs[peer.GetStoreId()] = struct{}{}
ret = append(ret, peer.GetStoreId())
for _, peer := range regionPeers {
storeID := peer.GetStoreId()
if _, ok := storeIDs[storeID]; !ok {
storeIDs[storeID] = struct{}{}
ret = append(ret, storeID)
}
}

Expand All @@ -351,24 +367,24 @@ func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool
}
return false
}
noInCache := func() bool {
ids, ok := f.storesOfRegion[oldItem.RegionID]
if ok {
for id := range ids {
if id == storeID {
return false
}
isInHotCache := func() bool {
if ids, ok := f.storesOfRegion[oldItem.RegionID]; ok {
if _, ok := ids[storeID]; ok {
return true
}
}
return true
return false
}
return isOldPeer() && noInCache()
return isOldPeer() && !isInHotCache()
}

func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo) bool {
func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo, oldItem *HotPeerStat) bool {
if region == nil {
return false
}
if oldItem.isLeader { // old item is not nil according to the function
return oldItem.StoreID != region.GetLeader().GetStoreId()
}
ids, ok := f.storesOfRegion[region.GetID()]
if ok {
for storeID := range ids {
Expand Down Expand Up @@ -425,7 +441,7 @@ func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt
newItem.allowInherited = oldItem.allowInherited
}

if f.justTransferLeader(region) {
if f.justTransferLeader(region, oldItem) {
newItem.lastTransferLeaderTime = time.Now()
// skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate
// maintain anticount and hotdegree to avoid store threshold and hot peer are unstable.
Expand Down
1 change: 1 addition & 0 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ const (
Add ActionType = iota
Remove
Update
ActionTypeLen
)

func (t ActionType) String() string {
Expand Down

0 comments on commit a45cc0b

Please sign in to comment.