From 8abce6adccc6948171a7c31902748c8c06b8a981 Mon Sep 17 00:00:00 2001 From: rleungx Date: Wed, 29 Aug 2018 16:15:57 +0800 Subject: [PATCH 1/5] replace down replicas instead of removing directly --- server/coordinator_test.go | 4 +- server/schedule/replica_checker.go | 59 ++++++++++++++++++------------ server/schedulers/balance_test.go | 10 ++--- 3 files changed, 40 insertions(+), 33 deletions(-) diff --git a/server/coordinator_test.go b/server/coordinator_test.go index a9a644ed2f0..dfd00f2335b 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -364,12 +364,10 @@ func (s *testCoordinatorSuite) TestReplica(c *C) { core.WithDownPeers(append(region.GetDownPeers(), downPeer)), ) dispatchHeartbeat(c, co, region, stream) - waitRemovePeer(c, stream, region, 3) - region = region.Clone(core.WithDownPeers(nil)) - dispatchHeartbeat(c, co, region, stream) waitAddLearner(c, stream, region, 4) dispatchHeartbeat(c, co, region, stream) waitPromoteLearner(c, stream, region, 4) + region = region.Clone(core.WithDownPeers(nil)) dispatchHeartbeat(c, co, region, stream) waitNoResponse(c, stream) diff --git a/server/schedule/replica_checker.go b/server/schedule/replica_checker.go index 76c65a3908d..f6b76058121 100644 --- a/server/schedule/replica_checker.go +++ b/server/schedule/replica_checker.go @@ -14,6 +14,8 @@ package schedule import ( + "fmt" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" @@ -169,7 +171,8 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *Operator { if stats.GetDownSeconds() < uint64(r.cluster.GetMaxStoreDownTime().Seconds()) { continue } - return CreateRemovePeerOperator("removeDownReplica", r.cluster, OpReplica, region, peer.GetStoreId()) + + return r.handleReplica(region, peer, "Down") } return nil } @@ -194,29 +197,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *Operator { continue } - // Check the number of replicas first. - if len(region.GetPeers()) > r.cluster.GetMaxReplicas() { - return CreateRemovePeerOperator("removeExtraOfflineReplica", r.cluster, OpReplica, region, peer.GetStoreId()) - } - - // Consider we have 3 peers (A, B, C), we set the store that contains C to - // offline while C is pending. If we generate an operator that adds a replica - // D then removes C, D will not be successfully added util C is normal again. - // So it's better to remove C directly. - if region.GetPendingPeer(peer.GetId()) != nil { - return CreateRemovePeerOperator("removePendingOfflineReplica", r.cluster, OpReplica, region, peer.GetStoreId()) - } - - storeID, _ := r.SelectBestReplacementStore(region, peer, NewStorageThresholdFilter()) - if storeID == 0 { - log.Debugf("[region %d] no best store to add replica", region.GetID()) - return nil - } - newPeer, err := r.cluster.AllocPeer(storeID) - if err != nil { - return nil - } - return CreateMovePeerOperator("replaceOfflineReplica", r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) + return r.handleReplica(region, peer, "Offline") } return nil @@ -250,3 +231,33 @@ func (r *ReplicaChecker) checkBestReplacement(region *core.RegionInfo) *Operator checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc() return CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) } + +func (r *ReplicaChecker) handleReplica(region *core.RegionInfo, peer *metapb.Peer, status string) *Operator { + removeExtra := fmt.Sprintf("%s%s%s", "removeExtra", status, "Replica") + // Check the number of replicas first. + if len(region.GetPeers()) > r.cluster.GetMaxReplicas() { + return CreateRemovePeerOperator(removeExtra, r.cluster, OpReplica, region, peer.GetStoreId()) + } + + removePending := fmt.Sprintf("%s%s%s", "removePending", status, "Replica") + // Consider we have 3 peers (A, B, C), we set the store that contains C to + // offline/down while C is pending. If we generate an operator that adds a replica + // D then removes C, D will not be successfully added util C is normal again. + // So it's better to remove C directly. + if region.GetPendingPeer(peer.GetId()) != nil { + return CreateRemovePeerOperator(removePending, r.cluster, OpReplica, region, peer.GetStoreId()) + } + + storeID, _ := r.SelectBestReplacementStore(region, peer, NewStorageThresholdFilter()) + if storeID == 0 { + log.Debugf("[region %d] no best store to add replica", region.GetId()) + return nil + } + newPeer, err := r.cluster.AllocPeer(storeID) + if err != nil { + return nil + } + + replace := fmt.Sprintf("%s%s%s", "replace", status, "Replica") + return CreateMovePeerOperator(replace, r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) +} diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 00cd5040f9c..eb94f7db609 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -544,11 +544,9 @@ func (s *testReplicaCheckerSuite) TestBasic(c *C) { Peer: region.GetStorePeer(2), DownSeconds: 24 * 60 * 60, } - region = region.Clone( - core.WithRemoveStorePeer(1), - core.WithDownPeers(append(region.GetDownPeers(), downPeer)), - ) - testutil.CheckRemovePeer(c, rc.Check(region), 2) + + region = region.Clone(core.WithDownPeers(append(region.GetDownPeers(), downPeer))) + testutil.CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 2, 1) region = region.Clone(core.WithDownPeers(nil)) c.Assert(rc.Check(region), IsNil) @@ -796,7 +794,7 @@ func (s *testReplicaCheckerSuite) TestOpts(c *C) { })) tc.SetStoreOffline(2) // RemoveDownReplica has higher priority than replaceOfflineReplica. - testutil.CheckRemovePeer(c, rc.Check(region), 1) + testutil.CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 1, 4) opt.DisableRemoveDownReplica = true testutil.CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 2, 4) opt.DisableReplaceOfflineReplica = true From 2fad2b024fd4b34c1bc55e9cd2405fa110a5804c Mon Sep 17 00:00:00 2001 From: rleungx Date: Wed, 29 Aug 2018 17:35:47 +0800 Subject: [PATCH 2/5] add make up down replicas case --- pkg/faketikv/cases/cases.go | 17 ++-- pkg/faketikv/cases/delete_nodes.go | 2 +- pkg/faketikv/cases/makeup_down_replica.go | 99 +++++++++++++++++++++++ 3 files changed, 109 insertions(+), 9 deletions(-) create mode 100644 pkg/faketikv/cases/makeup_down_replica.go diff --git a/pkg/faketikv/cases/cases.go b/pkg/faketikv/cases/cases.go index 87ef03f4e1c..263a9487417 100644 --- a/pkg/faketikv/cases/cases.go +++ b/pkg/faketikv/cases/cases.go @@ -74,14 +74,15 @@ func (a *idAllocator) nextID() uint64 { // ConfMap is a mapping of the cases to the their corresponding initialize functions. var ConfMap = map[string]func() *Conf{ - "balance-leader": newBalanceLeader, - "add-nodes": newAddNodes, - "add-nodes-dynamic": newAddNodesDynamic, - "delete-nodes": newDeleteNodes, - "region-split": newRegionSplit, - "region-merge": newRegionMerge, - "hot-read": newHotRead, - "hot-write": newHotWrite, + "balance-leader": newBalanceLeader, + "add-nodes": newAddNodes, + "add-nodes-dynamic": newAddNodesDynamic, + "delete-nodes": newDeleteNodes, + "region-split": newRegionSplit, + "region-merge": newRegionMerge, + "hot-read": newHotRead, + "hot-write": newHotWrite, + "makeup-down-replica": newMakeupDownReplica, } // NewConf creates a config to initialize simulator cluster. diff --git a/pkg/faketikv/cases/delete_nodes.go b/pkg/faketikv/cases/delete_nodes.go index 2d29e812f4e..6bacc9c6a84 100644 --- a/pkg/faketikv/cases/delete_nodes.go +++ b/pkg/faketikv/cases/delete_nodes.go @@ -58,7 +58,7 @@ func newDeleteNodes() *Conf { numNodes := 8 e := &DeleteNodesInner{} e.Step = func(tick int64) uint64 { - if tick%100 == 0 && numNodes > 7 { + if numNodes > 7 && tick%100 == 0 { idx := rand.Intn(numNodes) numNodes-- nodeID := ids[idx] diff --git a/pkg/faketikv/cases/makeup_down_replica.go b/pkg/faketikv/cases/makeup_down_replica.go new file mode 100644 index 00000000000..3ac307c1e4f --- /dev/null +++ b/pkg/faketikv/cases/makeup_down_replica.go @@ -0,0 +1,99 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// // http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/pkg/faketikv/simutil" + "github.com/pingcap/pd/server/core" +) + +func newMakeupDownReplica() *Conf { + var conf Conf + var id idAllocator + + for i := 1; i <= 4; i++ { + conf.Stores = append(conf.Stores, &Store{ + ID: id.nextID(), + Status: metapb.StoreState_Up, + Capacity: 1 * TB, + Available: 900 * GB, + Version: "2.1.0", + }) + } + + for i := 0; i < 400; i++ { + peers := []*metapb.Peer{ + {Id: id.nextID(), StoreId: uint64(i)%4 + 1}, + {Id: id.nextID(), StoreId: uint64(i+1)%4 + 1}, + {Id: id.nextID(), StoreId: uint64(i+2)%4 + 1}, + } + conf.Regions = append(conf.Regions, Region{ + ID: id.nextID(), + Peers: peers, + Leader: peers[0], + Size: 96 * MB, + Keys: 960000, + }) + } + conf.MaxID = id.maxID + + var ids []uint64 + for _, store := range conf.Stores { + ids = append(ids, store.ID) + } + + numNodes := 4 + e := &DeleteNodesInner{} + down := false + e.Step = func(tick int64) uint64 { + if numNodes > 3 && tick%100 == 0 { + numNodes-- + nodeID := uint64(1) + return nodeID + } + if tick == 300 { + down = true + } + return 0 + } + conf.Events = []EventInner{e} + + conf.Checker = func(regions *core.RegionsInfo) bool { + sum := 0 + regionCounts := make([]int, 0, 3) + for i := 1; i <= 4; i++ { + regionCount := regions.GetStoreRegionCount(uint64(i)) + if i != 1 { + regionCounts = append(regionCounts, regionCount) + } + sum += regionCount + } + + simutil.Logger.Infof("region counts: %v", regionCounts) + if down && sum < 1200 { + simutil.Logger.Error("making up replica doesn't start immediately") + down = false + return false + } + for _, regionCount := range regionCounts { + if regionCount != 400 { + return false + } + } + + return true + + } + return &conf +} From f92835a0a931ce10cf7b04705214dae1bf4b03d8 Mon Sep 17 00:00:00 2001 From: rleungx Date: Wed, 29 Aug 2018 18:17:31 +0800 Subject: [PATCH 3/5] tiny clean up --- pkg/faketikv/cases/cases.go | 18 +++++++++--------- pkg/faketikv/cases/makeup_down_replica.go | 20 +++++++------------- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/pkg/faketikv/cases/cases.go b/pkg/faketikv/cases/cases.go index 263a9487417..aad331d2f73 100644 --- a/pkg/faketikv/cases/cases.go +++ b/pkg/faketikv/cases/cases.go @@ -74,15 +74,15 @@ func (a *idAllocator) nextID() uint64 { // ConfMap is a mapping of the cases to the their corresponding initialize functions. var ConfMap = map[string]func() *Conf{ - "balance-leader": newBalanceLeader, - "add-nodes": newAddNodes, - "add-nodes-dynamic": newAddNodesDynamic, - "delete-nodes": newDeleteNodes, - "region-split": newRegionSplit, - "region-merge": newRegionMerge, - "hot-read": newHotRead, - "hot-write": newHotWrite, - "makeup-down-replica": newMakeupDownReplica, + "balance-leader": newBalanceLeader, + "add-nodes": newAddNodes, + "add-nodes-dynamic": newAddNodesDynamic, + "delete-nodes": newDeleteNodes, + "region-split": newRegionSplit, + "region-merge": newRegionMerge, + "hot-read": newHotRead, + "hot-write": newHotWrite, + "makeup-down-replicas": newMakeupDownReplicas, } // NewConf creates a config to initialize simulator cluster. diff --git a/pkg/faketikv/cases/makeup_down_replica.go b/pkg/faketikv/cases/makeup_down_replica.go index 3ac307c1e4f..3d45ca8f578 100644 --- a/pkg/faketikv/cases/makeup_down_replica.go +++ b/pkg/faketikv/cases/makeup_down_replica.go @@ -18,7 +18,7 @@ import ( "github.com/pingcap/pd/server/core" ) -func newMakeupDownReplica() *Conf { +func newMakeupDownReplicas() *Conf { var conf Conf var id idAllocator @@ -48,19 +48,13 @@ func newMakeupDownReplica() *Conf { } conf.MaxID = id.maxID - var ids []uint64 - for _, store := range conf.Stores { - ids = append(ids, store.ID) - } - numNodes := 4 - e := &DeleteNodesInner{} down := false + e := &DeleteNodesInner{} e.Step = func(tick int64) uint64 { if numNodes > 3 && tick%100 == 0 { numNodes-- - nodeID := uint64(1) - return nodeID + return uint64(1) } if tick == 300 { down = true @@ -79,21 +73,21 @@ func newMakeupDownReplica() *Conf { } sum += regionCount } - simutil.Logger.Infof("region counts: %v", regionCounts) + if down && sum < 1200 { - simutil.Logger.Error("making up replica doesn't start immediately") + // only need to print once down = false + simutil.Logger.Error("making up replicas don't start immediately") return false } + for _, regionCount := range regionCounts { if regionCount != 400 { return false } } - return true - } return &conf } From 44486c804f7b6e678c26c3e9e34934bc9173555b Mon Sep 17 00:00:00 2001 From: rleungx Date: Thu, 30 Aug 2018 11:40:36 +0800 Subject: [PATCH 4/5] address comments --- server/schedule/replica_checker.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/schedule/replica_checker.go b/server/schedule/replica_checker.go index f6b76058121..0fb427042c5 100644 --- a/server/schedule/replica_checker.go +++ b/server/schedule/replica_checker.go @@ -172,7 +172,7 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *Operator { continue } - return r.handleReplica(region, peer, "Down") + return r.fixPeer(region, peer, "Down") } return nil } @@ -197,7 +197,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *Operator { continue } - return r.handleReplica(region, peer, "Offline") + return r.fixPeer(region, peer, "Offline") } return nil @@ -232,14 +232,14 @@ func (r *ReplicaChecker) checkBestReplacement(region *core.RegionInfo) *Operator return CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) } -func (r *ReplicaChecker) handleReplica(region *core.RegionInfo, peer *metapb.Peer, status string) *Operator { - removeExtra := fmt.Sprintf("%s%s%s", "removeExtra", status, "Replica") +func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, status string) *Operator { + removeExtra := fmt.Sprintf("removeExtra%sReplica", status) // Check the number of replicas first. if len(region.GetPeers()) > r.cluster.GetMaxReplicas() { return CreateRemovePeerOperator(removeExtra, r.cluster, OpReplica, region, peer.GetStoreId()) } - removePending := fmt.Sprintf("%s%s%s", "removePending", status, "Replica") + removePending := fmt.Sprintf("removePending%sReplica", status) // Consider we have 3 peers (A, B, C), we set the store that contains C to // offline/down while C is pending. If we generate an operator that adds a replica // D then removes C, D will not be successfully added util C is normal again. @@ -258,6 +258,6 @@ func (r *ReplicaChecker) handleReplica(region *core.RegionInfo, peer *metapb.Pee return nil } - replace := fmt.Sprintf("%s%s%s", "replace", status, "Replica") + replace := fmt.Sprintf("replace%sReplica", status) return CreateMovePeerOperator(replace, r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) } From 738127433bb0934c77dc444bf69591b8f4a6308c Mon Sep 17 00:00:00 2001 From: rleungx Date: Mon, 3 Sep 2018 20:11:02 +0800 Subject: [PATCH 5/5] tiny fix --- server/schedule/replica_checker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/schedule/replica_checker.go b/server/schedule/replica_checker.go index 0fb427042c5..ee7a0010cd3 100644 --- a/server/schedule/replica_checker.go +++ b/server/schedule/replica_checker.go @@ -250,7 +250,7 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, peer *metapb.Peer, sta storeID, _ := r.SelectBestReplacementStore(region, peer, NewStorageThresholdFilter()) if storeID == 0 { - log.Debugf("[region %d] no best store to add replica", region.GetId()) + log.Debugf("[region %d] no best store to add replica", region.GetID()) return nil } newPeer, err := r.cluster.AllocPeer(storeID)