Skip to content

Commit 0d566cb

Browse files
ti-srebotlhy1024
andauthored
scheduler: filter unhealthy store in summaryStoresLoad (#2737) (#2805)
* scheduler: filter unhealthy store in summaryStoresLoad (#2737) Signed-off-by: lhy1024 <[email protected]> (cherry picked from commit 3a4cae5) * statistic: fix nil in collect metrics (#2811) Signed-off-by: lhy1024 <[email protected]> (cherry picked from commit 72a7ee3) Co-authored-by: lhy1024 <[email protected]>
1 parent 0ef1e1c commit 0d566cb

File tree

5 files changed

+55
-15
lines changed

5 files changed

+55
-15
lines changed

server/cluster/cluster.go

+2
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
501501
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
502502
c.storesStats.UpdateTotalBytesRate(c.core.GetStores)
503503
c.storesStats.UpdateTotalKeysRate(c.core.GetStores)
504+
c.storesStats.FilterUnhealthyStore(c)
504505

505506
// c.limiter is nil before "start" is called
506507
if c.limiter != nil && c.opt.GetStoreLimitMode() == "auto" {
@@ -821,6 +822,7 @@ func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.Region
821822
}
822823

823824
// GetStoresStats returns stores' statistics from cluster.
825+
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
824826
func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {
825827
c.RLock()
826828
defer c.RUnlock()

server/cluster/cluster_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,38 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
9090
}
9191
}
9292

93+
func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
94+
_, opt, err := newTestScheduleConfig()
95+
c.Assert(err, IsNil)
96+
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
97+
98+
stores := newTestStores(3)
99+
for _, store := range stores {
100+
storeStats := &pdpb.StoreStats{
101+
StoreId: store.GetID(),
102+
Capacity: 100,
103+
Available: 50,
104+
RegionCount: 1,
105+
}
106+
c.Assert(cluster.putStoreLocked(store), IsNil)
107+
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
108+
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), NotNil)
109+
}
110+
111+
for _, store := range stores {
112+
storeStats := &pdpb.StoreStats{
113+
StoreId: store.GetID(),
114+
Capacity: 100,
115+
Available: 50,
116+
RegionCount: 1,
117+
}
118+
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
119+
c.Assert(cluster.putStoreLocked(newStore), IsNil)
120+
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
121+
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), IsNil)
122+
}
123+
}
124+
93125
func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
94126
_, opt, err := newTestScheduleConfig()
95127
c.Assert(err, IsNil)

server/schedulers/hot_region.go

+1-15
Original file line numberDiff line numberDiff line change
@@ -517,9 +517,7 @@ func (bs *balanceSolver) init() {
517517
case readLeader:
518518
bs.stLoadDetail = bs.sche.stLoadInfos[readLeader]
519519
}
520-
for _, id := range getUnhealthyStores(bs.cluster) {
521-
delete(bs.stLoadDetail, id)
522-
}
520+
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
523521

524522
bs.maxSrc = &storeLoad{}
525523
bs.minDst = &storeLoad{
@@ -542,18 +540,6 @@ func (bs *balanceSolver) init() {
542540
}
543541
}
544542

545-
func getUnhealthyStores(cluster opt.Cluster) []uint64 {
546-
ret := make([]uint64, 0)
547-
stores := cluster.GetStores()
548-
for _, store := range stores {
549-
if store.IsTombstone() ||
550-
store.DownTime() > cluster.GetMaxStoreDownTime() {
551-
ret = append(ret, store.GetID())
552-
}
553-
}
554-
return ret
555-
}
556-
557543
func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver {
558544
solver := &balanceSolver{
559545
sche: sche,

server/statistics/store.go

+16
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,22 @@ func (s *StoresStats) GetStoresKeysReadStat() map[uint64]float64 {
290290
return res
291291
}
292292

293+
func (s *StoresStats) storeIsUnhealthy(cluster core.StoreSetInformer, storeID uint64) bool {
294+
store := cluster.GetStore(storeID)
295+
return store.IsTombstone() || store.IsUnhealth()
296+
}
297+
298+
// FilterUnhealthyStore filter unhealthy store
299+
func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) {
300+
s.Lock()
301+
defer s.Unlock()
302+
for storeID := range s.rollingStoresStats {
303+
if s.storeIsUnhealthy(cluster, storeID) {
304+
delete(s.rollingStoresStats, storeID)
305+
}
306+
}
307+
}
308+
293309
// RollingStoreStats are multiple sets of recent historical records with specified windows size.
294310
type RollingStoreStats struct {
295311
sync.RWMutex

server/statistics/store_collection.go

+4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
104104

105105
// Store flows.
106106
storeFlowStats := stats.GetRollingStoreStats(store.GetID())
107+
if storeFlowStats == nil {
108+
return
109+
}
110+
107111
storeWriteRateByte, storeReadRateByte := storeFlowStats.GetBytesRate()
108112
storeStatusGauge.WithLabelValues(storeAddress, id, "store_write_rate_bytes").Set(storeWriteRateByte)
109113
storeStatusGauge.WithLabelValues(storeAddress, id, "store_read_rate_bytes").Set(storeReadRateByte)

0 commit comments

Comments
 (0)