Skip to content

Commit 58cc16e

Browse files
committed
merg
Signed-off-by: bufferflies <[email protected]>
1 parent 8e385b9 commit 58cc16e

File tree

4 files changed

+68
-9
lines changed

4 files changed

+68
-9
lines changed

server/statistics/buckets/bucket_stat_informer.go

+7
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,10 @@ func (b *BucketTreeItem) calculateHotDegree() {
207207
}
208208
}
209209
}
210+
211+
// collectBucketsMetrics collects the metrics of the hot stats.
212+
func (stats *BucketTreeItem) collectBucketsMetrics() {
213+
for _, bucket := range stats.stats {
214+
bucketsHotDegreeHist.Observe(float64(bucket.HotDegree))
215+
}
216+
}

server/statistics/buckets/hot_bucket_cache.go

+21
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package buckets
1717
import (
1818
"bytes"
1919
"context"
20+
"time"
2021

2122
"github.com/pingcap/kvproto/pkg/metapb"
2223
"github.com/pingcap/log"
@@ -52,6 +53,23 @@ type HotBucketCache struct {
5253
ctx context.Context
5354
}
5455

56+
// GetHotBucketStats returns the hot stats of the regions that great than degree.
57+
func (h *HotBucketCache) GetHotBucketStats(degree int) map[uint64][]*BucketStat {
58+
rst := make(map[uint64][]*BucketStat)
59+
for _, item := range h.bucketsOfRegion {
60+
stats := make([]*BucketStat, 0)
61+
for _, b := range item.stats {
62+
if b.HotDegree >= degree {
63+
stats = append(stats, b)
64+
}
65+
}
66+
if len(stats) > 0 {
67+
rst[item.regionID] = stats
68+
}
69+
}
70+
return rst
71+
}
72+
5573
// bucketDebrisFactory returns the debris if the key range of the item is bigger than the given key range.
5674
// start and end key: | 001------------------------200|
5775
// the split key range: |050---150|
@@ -137,7 +155,9 @@ func (h *HotBucketCache) schedule() {
137155
case <-h.ctx.Done():
138156
return
139157
case task := <-h.taskQueue:
158+
start := time.Now()
140159
task.runTask(h)
160+
bucketsTaskDuration.WithLabelValues(task.taskType().String()).Observe(time.Since(start).Seconds())
141161
}
142162
}
143163
}
@@ -156,6 +176,7 @@ func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *Buc
156176
}
157177
newItem.inherit(overlaps)
158178
newItem.calculateHotDegree()
179+
newItem.collectBucketsMetrics()
159180
return newItem, overlaps
160181
}
161182

server/statistics/buckets/hot_bucket_task.go

+38-1
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,24 @@
1515
package buckets
1616

1717
import (
18+
"context"
19+
1820
"github.com/pingcap/kvproto/pkg/metapb"
1921
)
2022

2123
type flowItemTaskKind uint32
2224

2325
const (
2426
checkBucketsTaskType flowItemTaskKind = iota
27+
collectBucketStatsTaskType
2528
)
2629

2730
func (kind flowItemTaskKind) String() string {
28-
if kind == checkBucketsTaskType {
31+
switch kind {
32+
case checkBucketsTaskType:
2933
return "check_buckets"
34+
case collectBucketStatsTaskType:
35+
return "collect_bucket_stats"
3036
}
3137
return "unknown"
3238
}
@@ -57,3 +63,34 @@ func (t *checkBucketsTask) runTask(cache *HotBucketCache) {
5763
newItems, overlaps := cache.checkBucketsFlow(t.Buckets)
5864
cache.putItem(newItems, overlaps)
5965
}
66+
67+
type collectBucketStatsTask struct {
68+
minDegree int
69+
ret chan map[uint64][]*BucketStat // RegionID ==>Buckets
70+
}
71+
72+
// NewCollectBucketStatsTask creates task to collect bucket stats.
73+
func NewCollectBucketStatsTask(minDegree int) *collectBucketStatsTask {
74+
return &collectBucketStatsTask{
75+
minDegree: minDegree,
76+
ret: make(chan map[uint64][]*BucketStat, 1),
77+
}
78+
}
79+
80+
func (t *collectBucketStatsTask) taskType() flowItemTaskKind {
81+
return collectBucketStatsTaskType
82+
}
83+
84+
func (t *collectBucketStatsTask) runTask(cache *HotBucketCache) {
85+
t.ret <- cache.GetHotBucketStats(t.minDegree)
86+
}
87+
88+
// WaitRet returns the result of the task.
89+
func (t *collectBucketStatsTask) WaitRet(ctx context.Context) map[uint64][]*BucketStat {
90+
select {
91+
case <-ctx.Done():
92+
return nil
93+
case ret := <-t.ret:
94+
return ret
95+
}
96+
}

server/statistics/buckets/hot_bucket_task_test.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,8 @@ var _ = Suite(&testHotBucketTaskCache{})
2929
type testHotBucketTaskCache struct {
3030
}
3131

32-
func (s *testHotBucketTaskCache) SetUpSuite(_ *C) {
33-
}
34-
35-
func (s *testHotBucketTaskCache) TearDownTest(_ *C) {
36-
}
37-
3832
func getAllBucketStats(ctx context.Context, hotCache *HotBucketCache) map[uint64][]*BucketStat {
39-
task := NewCollectBucketStatsTask(-100)
33+
task := NewCollectBucketStatsTask(minHotDegree)
4034
hotCache.CheckAsync(task)
4135
return task.WaitRet(ctx)
4236
}
@@ -56,7 +50,7 @@ func (s *testHotBucketTaskCache) TestColdHot(c *C) {
5650
isHot: true,
5751
}}
5852
for _, v := range testdata {
59-
for i := 0; i < 100; i++ {
53+
for i := 0; i < 20; i++ {
6054
task := NewCheckPeerTask(v.buckets)
6155
c.Assert(hotCache.CheckAsync(task), IsTrue)
6256
hotBuckets := getAllBucketStats(ctx, hotCache)

0 commit comments

Comments
 (0)