From fadfb22565533046874dbabe9f6e05fcb75194ec Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 13 Oct 2020 17:35:48 +0800 Subject: [PATCH 1/3] add test Signed-off-by: lhy1024 --- pkg/mock/mockcluster/mockcluster.go | 9 ++++++ server/schedulers/hot_test.go | 47 +++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 4f073ef9df1..81d4788ad87 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -632,3 +632,12 @@ func (mc *Cluster) SetStoreLabel(storeID uint64, labels map[string]string) { newStore := store.Clone(core.SetStoreLabels(newLabels)) mc.PutStore(newStore) } + +// SetStoreLabel set the last heartbeat to the target store +func (mc *Cluster) SetStoreLastHeartbeatInterval(storeID uint64, interval time.Duration) { + store := mc.GetStore(storeID) + newStore := store.Clone( + core.SetStoreState(metapb.StoreState_Up), + core.SetLastHeartbeatTS(time.Now().Add(-interval))) + mc.PutStore(newStore) +} diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 5091bfabc0c..25809f6ad95 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -348,6 +348,53 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { } } +func (s *testHotWriteRegionSchedulerSuite) TestUnhealthyStore(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := mockoption.NewScheduleOptions() + hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + hb.(*hotScheduler).conf.SetDstToleranceRatio(1) + hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) + opt.HotRegionCacheHitsThreshold = 0 + + tc := mockcluster.NewCluster(opt) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + + tc.UpdateStorageWrittenStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(2, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(4, 0*MB*statistics.StoreHeartBeatReportInterval, 0*MB*statistics.StoreHeartBeatReportInterval) + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 0.5 * MB, 0.5 * MB}, + {2, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, + {3, []uint64{3, 2, 1}, 0.5 * MB, 0.5 * MB}, + }) + + intervals := []time.Duration{ + 9 * time.Second, + 10 * time.Second, + 19 * time.Second, + 20 * time.Second, + 9 * time.Minute, + 10 * time.Minute, + 29 * time.Minute, + 30 * time.Minute, + } + + for _, interval := range intervals { + tc.SetStoreLastHeartbeatInterval(4,interval) + hb.(*hotScheduler).clearPendingInfluence() + hb.Schedule(tc) + // no panic + } +} + func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From f87886f5ccb4fb0ffc28e3b9d123b3e05e3c1962 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 13 Oct 2020 19:26:41 +0800 Subject: [PATCH 2/3] update test Signed-off-by: lhy1024 --- server/schedulers/hot_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 25809f6ad95..ae0ed49a081 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -386,9 +386,16 @@ func (s *testHotWriteRegionSchedulerSuite) TestUnhealthyStore(c *C) { 29 * time.Minute, 30 * time.Minute, } - + // test dst + for _, interval := range intervals { + tc.SetStoreLastHeartbeatInterval(4, interval) + hb.(*hotScheduler).clearPendingInfluence() + hb.Schedule(tc) + // no panic + } + // test src for _, interval := range intervals { - tc.SetStoreLastHeartbeatInterval(4,interval) + tc.SetStoreLastHeartbeatInterval(1, interval) hb.(*hotScheduler).clearPendingInfluence() hb.Schedule(tc) // no panic From 38d6d8d48f53aa07d174f03584517e5d52fc763f Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 13 Oct 2020 20:19:52 +0800 Subject: [PATCH 3/3] replace healthy filter with Connected filter Signed-off-by: lhy1024 --- pkg/mock/mockcluster/mockcluster.go | 2 +- server/schedule/filter/filters.go | 27 +++++++++++++++++++++++++++ server/schedulers/hot_region.go | 4 ++-- server/schedulers/hot_test.go | 8 -------- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 81d4788ad87..3bd098a5b4d 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -633,7 +633,7 @@ func (mc *Cluster) SetStoreLabel(storeID uint64, labels map[string]string) { mc.PutStore(newStore) } -// SetStoreLabel set the last heartbeat to the target store +// SetStoreLastHeartbeatInterval set the last heartbeat to the target store func (mc *Cluster) SetStoreLastHeartbeatInterval(storeID uint64, interval time.Duration) { store := mc.GetStore(storeID) newStore := store.Clone( diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index 0cea7c925fc..4e5fa4ca76a 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -198,6 +198,33 @@ func (f *healthFilter) Target(opt opt.Options, store *core.StoreInfo) bool { return f.filter(opt, store) } +type connectedFilter struct{ scope string } + +// NewConnectedFilter creates a Filter that filters all stores that are disconnected. +func NewConnectedFilter(scope string) Filter { + return &connectedFilter{scope: scope} +} + +func (f *connectedFilter) Scope() string { + return f.scope +} + +func (f *connectedFilter) Type() string { + return "connected-filter" +} + +func (f *connectedFilter) filter(opt opt.Options, store *core.StoreInfo) bool { + return !store.IsDisconnected() +} + +func (f *connectedFilter) Source(opt opt.Options, store *core.StoreInfo) bool { + return f.filter(opt, store) +} + +func (f *connectedFilter) Target(opt opt.Options, store *core.StoreInfo) bool { + return f.filter(opt, store) +} + type pendingPeerCountFilter struct{ scope string } // NewPendingPeerCountFilter creates a Filter that filters all stores that are diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 3dfd4fbb889..f8aa2ec3904 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -779,7 +779,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { filters = []filter.Filter{ filter.StoreStateFilter{ActionScope: bs.sche.GetName(), MoveRegion: true}, filter.NewExcludedFilter(bs.sche.GetName(), bs.cur.region.GetStoreIds(), bs.cur.region.GetStoreIds()), - filter.NewHealthFilter(bs.sche.GetName()), + filter.NewConnectedFilter(bs.sche.GetName()), filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion), scoreGuard, } @@ -789,7 +789,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { case transferLeader: filters = []filter.Filter{ filter.StoreStateFilter{ActionScope: bs.sche.GetName(), TransferLeader: true}, - filter.NewHealthFilter(bs.sche.GetName()), + filter.NewConnectedFilter(bs.sche.GetName()), filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion), } diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index ae0ed49a081..eb28c62530a 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -364,7 +364,6 @@ func (s *testHotWriteRegionSchedulerSuite) TestUnhealthyStore(c *C) { tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) tc.AddRegionStore(4, 20) - tc.AddRegionStore(5, 20) tc.UpdateStorageWrittenStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.5*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(2, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) @@ -393,13 +392,6 @@ func (s *testHotWriteRegionSchedulerSuite) TestUnhealthyStore(c *C) { hb.Schedule(tc) // no panic } - // test src - for _, interval := range intervals { - tc.SetStoreLastHeartbeatInterval(1, interval) - hb.(*hotScheduler).clearPendingInfluence() - hb.Schedule(tc) - // no panic - } } func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) {