diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 45585a7666a..f7eb252b31b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -55,6 +55,8 @@ type Cluster struct { ID uint64 suspectRegions map[uint64]struct{} *config.StoreConfigManager + *buckets.HotBucketCache + ctx context.Context } // NewCluster creates a new Cluster @@ -63,9 +65,11 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { BasicCluster: core.NewBasicCluster(), IDAllocator: mockid.NewIDAllocator(), HotStat: statistics.NewHotStat(ctx), + HotBucketCache: buckets.NewBucketsCache(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, StoreConfigManager: config.NewTestStoreConfigManager(nil), + ctx: ctx, } if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules { clus.initRuleManager() @@ -133,7 +137,11 @@ func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { // BucketsStats returns hot region's buckets stats. func (mc *Cluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat { - return nil + task := buckets.NewCollectBucketStatsTask(degree) + if !mc.HotBucketCache.CheckAsync(task) { + return nil + } + return task.WaitRet(mc.ctx) } // RegionWriteStats returns hot region's write stats. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b7271101dcd..b511f217576 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1925,7 +1925,11 @@ func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { // BucketsStats returns hot region's buckets stats. func (c *RaftCluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat { - return nil + task := buckets.NewCollectBucketStatsTask(degree) + if !c.hotBuckets.CheckAsync(task) { + return nil + } + return task.WaitRet(c.ctx) } // RegionWriteStats returns hot region's write stats. diff --git a/server/grpc_service.go b/server/grpc_service.go index a3bb35bcd11..83726ac2592 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -779,6 +779,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc() continue } + bucketReportInterval.WithLabelValues(storeAddress, storeLabel).Observe(float64(buckets.GetPeriodInMs() / 1000)) bucketReportLatency.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() } diff --git a/server/metrics.go b/server/metrics.go index aad97ee0bf5..6e6e8f8c1a2 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -100,6 +100,15 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours }, []string{"address", "store"}) + bucketReportInterval = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "bucket_report_interval_seconds", + Help: "Bucketed histogram of processing time (s) of handled bucket report requests.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours + }, []string{"address", "store"}) + regionHeartbeatHandleDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "pd", @@ -151,4 +160,5 @@ func init() { prometheus.MustRegister(bucketReportCounter) prometheus.MustRegister(bucketReportLatency) prometheus.MustRegister(serviceAuditHistogram) + prometheus.MustRegister(bucketReportInterval) } diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go index c64ff947917..c08efe9ad54 100644 --- a/server/statistics/buckets/bucket_stat_informer.go +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -207,3 +207,10 @@ func (b *BucketTreeItem) calculateHotDegree() { } } } + +// collectBucketsMetrics collects the metrics of the hot stats. +func (stats *BucketTreeItem) collectBucketsMetrics() { + for _, bucket := range stats.stats { + bucketsHotDegreeHist.Observe(float64(bucket.HotDegree)) + } +} diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go index 5aba9ffdab7..8b1ed0b4526 100644 --- a/server/statistics/buckets/hot_bucket_cache.go +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -17,6 +17,7 @@ package buckets import ( "bytes" "context" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -52,6 +53,23 @@ type HotBucketCache struct { ctx context.Context } +// GetHotBucketStats returns the hot stats of the regions that great than degree. +func (h *HotBucketCache) GetHotBucketStats(degree int) map[uint64][]*BucketStat { + rst := make(map[uint64][]*BucketStat) + for _, item := range h.bucketsOfRegion { + stats := make([]*BucketStat, 0) + for _, b := range item.stats { + if b.HotDegree >= degree { + stats = append(stats, b) + } + } + if len(stats) > 0 { + rst[item.regionID] = stats + } + } + return rst +} + // bucketDebrisFactory returns the debris if the key range of the item is bigger than the given key range. // start and end key: | 001------------------------200| // the split key range: |050---150| @@ -137,7 +155,9 @@ func (h *HotBucketCache) schedule() { case <-h.ctx.Done(): return case task := <-h.taskQueue: + start := time.Now() task.runTask(h) + bucketsTaskDuration.WithLabelValues(task.taskType().String()).Observe(time.Since(start).Seconds()) } } } @@ -156,6 +176,7 @@ func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *Buc } newItem.inherit(overlaps) newItem.calculateHotDegree() + newItem.collectBucketsMetrics() return newItem, overlaps } diff --git a/server/statistics/buckets/hot_bucket_task.go b/server/statistics/buckets/hot_bucket_task.go index d873dcf50be..4ceecdefd01 100644 --- a/server/statistics/buckets/hot_bucket_task.go +++ b/server/statistics/buckets/hot_bucket_task.go @@ -15,6 +15,8 @@ package buckets import ( + "context" + "github.com/pingcap/kvproto/pkg/metapb" ) @@ -22,11 +24,15 @@ type flowItemTaskKind uint32 const ( checkBucketsTaskType flowItemTaskKind = iota + collectBucketStatsTaskType ) func (kind flowItemTaskKind) String() string { - if kind == checkBucketsTaskType { + switch kind { + case checkBucketsTaskType: return "check_buckets" + case collectBucketStatsTaskType: + return "collect_bucket_stats" } return "unknown" } @@ -57,3 +63,34 @@ func (t *checkBucketsTask) runTask(cache *HotBucketCache) { newItems, overlaps := cache.checkBucketsFlow(t.Buckets) cache.putItem(newItems, overlaps) } + +type collectBucketStatsTask struct { + minDegree int + ret chan map[uint64][]*BucketStat // RegionID ==>Buckets +} + +// NewCollectBucketStatsTask creates task to collect bucket stats. +func NewCollectBucketStatsTask(minDegree int) *collectBucketStatsTask { + return &collectBucketStatsTask{ + minDegree: minDegree, + ret: make(chan map[uint64][]*BucketStat, 1), + } +} + +func (t *collectBucketStatsTask) taskType() flowItemTaskKind { + return collectBucketStatsTaskType +} + +func (t *collectBucketStatsTask) runTask(cache *HotBucketCache) { + t.ret <- cache.GetHotBucketStats(t.minDegree) +} + +// WaitRet returns the result of the task. +func (t *collectBucketStatsTask) WaitRet(ctx context.Context) map[uint64][]*BucketStat { + select { + case <-ctx.Done(): + return nil + case ret := <-t.ret: + return ret + } +} diff --git a/server/statistics/buckets/hot_bucket_task_test.go b/server/statistics/buckets/hot_bucket_task_test.go new file mode 100644 index 00000000000..a5fe0d7ad8c --- /dev/null +++ b/server/statistics/buckets/hot_bucket_task_test.go @@ -0,0 +1,127 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "context" + "math" + "strconv" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" +) + +var _ = Suite(&testHotBucketTaskCache{}) + +type testHotBucketTaskCache struct { +} + +func getAllBucketStats(ctx context.Context, hotCache *HotBucketCache) map[uint64][]*BucketStat { + task := NewCollectBucketStatsTask(minHotDegree) + hotCache.CheckAsync(task) + return task.WaitRet(ctx) +} + +func (s *testHotBucketTaskCache) TestColdHot(c *C) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + hotCache := NewBucketsCache(ctx) + testdata := []struct { + buckets *metapb.Buckets + isHot bool + }{{ + buckets: newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20")}, 0), + isHot: false, + }, { + buckets: newTestBuckets(2, 1, [][]byte{[]byte("20"), []byte("30")}, math.MaxUint64), + isHot: true, + }} + for _, v := range testdata { + for i := 0; i < 20; i++ { + task := NewCheckPeerTask(v.buckets) + c.Assert(hotCache.CheckAsync(task), IsTrue) + hotBuckets := getAllBucketStats(ctx, hotCache) + time.Sleep(time.Millisecond * 10) + item := hotBuckets[v.buckets.RegionId] + c.Assert(item, NotNil) + if v.isHot { + c.Assert(item[0].HotDegree, Equals, i+1) + } else { + c.Assert(item[0].HotDegree, Equals, -i-1) + } + } + } +} + +func (s *testHotBucketTaskCache) TestCheckBucketsTask(c *C) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + hotCache := NewBucketsCache(ctx) + // case1: add bucket successfully + buckets := newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("30")}, 0) + task := NewCheckPeerTask(buckets) + c.Assert(hotCache.CheckAsync(task), IsTrue) + time.Sleep(time.Millisecond * 10) + + hotBuckets := getAllBucketStats(ctx, hotCache) + c.Assert(hotBuckets, HasLen, 1) + item := hotBuckets[uint64(1)] + c.Assert(item, NotNil) + c.Assert(item, HasLen, 2) + c.Assert(item[0].HotDegree, Equals, -1) + c.Assert(item[1].HotDegree, Equals, -1) + + // case2: add bucket successful and the hot degree should inherit from the old one. + buckets = newTestBuckets(2, 1, [][]byte{[]byte("20"), []byte("30")}, 0) + task = NewCheckPeerTask(buckets) + c.Assert(hotCache.CheckAsync(task), IsTrue) + hotBuckets = getAllBucketStats(ctx, hotCache) + time.Sleep(time.Millisecond * 10) + item = hotBuckets[uint64(2)] + c.Assert(item, HasLen, 1) + c.Assert(item[0].HotDegree, Equals, -2) + + // case3:add bucket successful and the hot degree should inherit from the old one. + buckets = newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20")}, 0) + task = NewCheckPeerTask(buckets) + c.Assert(hotCache.CheckAsync(task), IsTrue) + hotBuckets = getAllBucketStats(ctx, hotCache) + time.Sleep(time.Millisecond * 10) + item = hotBuckets[uint64(1)] + c.Assert(item, HasLen, 1) + c.Assert(item[0].HotDegree, Equals, -2) +} + +func (s *testHotBucketTaskCache) TestCollectBucketStatsTask(c *C) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + hotCache := NewBucketsCache(ctx) + // case1: add bucket successfully + for i := uint64(0); i < 10; i++ { + buckets := convertToBucketTreeItem(newTestBuckets(i, 1, [][]byte{[]byte(strconv.FormatUint(i*10, 10)), + []byte(strconv.FormatUint((i+1)*10, 10))}, 0)) + hotCache.putItem(buckets, hotCache.getBucketsByKeyRange(buckets.startKey, buckets.endKey)) + } + time.Sleep(time.Millisecond * 10) + task := NewCollectBucketStatsTask(-100) + c.Assert(hotCache.CheckAsync(task), IsTrue) + stats := task.WaitRet(ctx) + c.Assert(stats, HasLen, 10) + task = NewCollectBucketStatsTask(1) + c.Assert(hotCache.CheckAsync(task), IsTrue) + stats = task.WaitRet(ctx) + c.Assert(stats, HasLen, 0) +} diff --git a/server/statistics/buckets/metric.go b/server/statistics/buckets/metric.go new file mode 100644 index 00000000000..feb65ba2817 --- /dev/null +++ b/server/statistics/buckets/metric.go @@ -0,0 +1,44 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + bucketsHotDegreeHist = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "scheduler", + Name: "buckets_hot_degree_hist", + Help: "Bucketed histogram of bucket hot degree", + Buckets: prometheus.LinearBuckets(-100, 10, 20), + }) + + bucketsTaskDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "scheduler", + Name: "bucket_task_duration", + Help: "Bucketed histogram of processing time (s) of bucket task.", + Buckets: prometheus.ExponentialBuckets(1, 1.4, 30), // 1s ~ 6.72 hours + }, []string{"type"}) +) + +func init() { + prometheus.MustRegister(bucketsHotDegreeHist) + prometheus.MustRegister(bucketsTaskDuration) +}