diff --git a/go.mod b/go.mod index 10a23a9c3e23..0287ce2a1172 100644 --- a/go.mod +++ b/go.mod @@ -51,3 +51,5 @@ require ( google.golang.org/grpc v1.26.0 gotest.tools/gotestsum v1.7.0 ) + +replace github.com/pingcap/kvproto v0.0.0-20220228094105-9bb22e5a97fc => github.com/Connor1996/kvproto v0.0.0-20220303065703-db7164505658 diff --git a/go.sum b/go.sum index c329011d42ab..918ae2a93865 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/AlekSi/gocov-xml v1.0.0 h1:4QctJBgXEkbzeKz6PJy6bt3JSPNSN4I2mITYW+eKUo github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Connor1996/kvproto v0.0.0-20220303065703-db7164505658 h1:o25Jq01pukreC1GMQAwIUfTNiJaRe/pZstJ92dnUviY= +github.com/Connor1996/kvproto v0.0.0-20220303065703-db7164505658/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= @@ -400,8 +402,6 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c h1:jrPg+QFqQ7VyI30SPzB0ZviHCvDGyZHiASz6Bgomxi0= -github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/server/api/unsafe_operation.go b/server/api/unsafe_operation.go index 7fd54d68fc45..4261bf3f8c0f 100644 --- a/server/api/unsafe_operation.go +++ b/server/api/unsafe_operation.go @@ -43,7 +43,7 @@ func newUnsafeOperationHandler(svr *server.Server, rd *render.Render) *unsafeOpe // @Router /admin/unsafe/remove-failed-stores [POST] func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) - var stores map[uint64]string + var stores map[uint64]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &stores); err != nil { return } diff --git a/server/cluster/unsafe_recovery_controller.go b/server/cluster/unsafe_recovery_controller.go index c17a029a5e17..665c6cb35215 100644 --- a/server/cluster/unsafe_recovery_controller.go +++ b/server/cluster/unsafe_recovery_controller.go @@ -17,16 +17,15 @@ package cluster import ( "bytes" "fmt" - "sort" "strconv" "sync" "github.com/gogo/protobuf/proto" - "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/btree" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "go.uber.org/zap" @@ -37,6 +36,7 @@ type unsafeRecoveryStage int const ( ready unsafeRecoveryStage = iota collectingClusterInfo + forceLeader recovering finished ) @@ -46,11 +46,11 @@ type unsafeRecoveryController struct { cluster *RaftCluster stage unsafeRecoveryStage - failedStores map[uint64]string + failedStores map[uint64]interface{} storeReports map[uint64]*pdpb.StoreReport // Store info proto numStoresReported int storeRecoveryPlans map[uint64]*pdpb.RecoveryPlan // StoreRecoveryPlan proto - executionResults map[uint64]bool // Execution reports for tracking purpose + executionResults map[uint64]bool // Execution results for tracking purpose executionReports map[uint64]*pdpb.StoreReport // Execution reports for tracking purpose numStoresPlanExecuted int } @@ -59,7 +59,7 @@ func newUnsafeRecoveryController(cluster *RaftCluster) *unsafeRecoveryController return &unsafeRecoveryController{ cluster: cluster, stage: ready, - failedStores: make(map[uint64]string), + failedStores: make(map[uint64]interface{}), storeReports: make(map[uint64]*pdpb.StoreReport), numStoresReported: 0, storeRecoveryPlans: make(map[uint64]*pdpb.RecoveryPlan), @@ -70,7 +70,7 @@ func newUnsafeRecoveryController(cluster *RaftCluster) *unsafeRecoveryController } // RemoveFailedStores removes failed stores from the cluster. -func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]string) error { +func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]interface{}) error { u.Lock() defer u.Unlock() if len(failedStores) == 0 { @@ -106,6 +106,23 @@ func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]st return nil } +// Collects and checks if store reports have been fully collected. +func (u *unsafeRecoveryController) collectAndCheckStoreReport(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) bool { + if heartbeat.StoreReport == nil { + if _, failedStore := u.failedStores[heartbeat.Stats.StoreId]; !failedStore { + // Inform the store to send detailed report in the next heartbeat. + resp.RequireDetailedReport = true + } + } else if report, exist := u.storeReports[heartbeat.Stats.StoreId]; exist && report == nil { + u.storeReports[heartbeat.Stats.StoreId] = heartbeat.StoreReport + u.numStoresReported++ + if u.numStoresReported == len(u.storeReports) { + return true + } + } + return false +} + // HandleStoreHeartbeat handles the store heartbeat requests and checks whether the stores need to // send detailed report back. func (u *unsafeRecoveryController) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) { @@ -116,48 +133,52 @@ func (u *unsafeRecoveryController) HandleStoreHeartbeat(heartbeat *pdpb.StoreHea } switch u.stage { case collectingClusterInfo: - if heartbeat.StoreReport == nil { - if _, failedStore := u.failedStores[heartbeat.Stats.StoreId]; !failedStore { - // Inform the store to send detailed report in the next heartbeat. - resp.RequireDetailedReport = true - } - } else if report, exist := u.storeReports[heartbeat.Stats.StoreId]; exist && report == nil { - u.storeReports[heartbeat.Stats.StoreId] = heartbeat.StoreReport - u.numStoresReported++ - if u.numStoresReported == len(u.storeReports) { - log.Info("Reports have been fully collected, generating plan...") - go u.generateRecoveryPlan() - } + if u.collectAndCheckStoreReport(heartbeat, resp) { + go func() { + if !u.generateForceLeaderPlan() { + u.finishRecovery() + } + }() + } + case forceLeader: + if u.collectAndCheckStoreReport(heartbeat, resp) { + go func() { + if !u.generateRecoveryPlan() { + u.finishRecovery() + } + }() } case recovering: - if plan, tasked := u.storeRecoveryPlans[heartbeat.Stats.StoreId]; tasked { - if heartbeat.StoreReport == nil { - // Sends the recovering plan to the store for execution. - resp.Plan = plan - } else if !u.isPlanExecuted(heartbeat.Stats.StoreId, heartbeat.StoreReport) { - resp.Plan = plan - u.executionReports[heartbeat.Stats.StoreId] = heartbeat.StoreReport - } else { - u.executionResults[heartbeat.Stats.StoreId] = true - u.executionReports[heartbeat.Stats.StoreId] = heartbeat.StoreReport - u.numStoresPlanExecuted++ - if u.numStoresPlanExecuted == len(u.storeRecoveryPlans) { - log.Info("Recover finished.") - go func() { - for _, history := range u.History() { - log.Info(history) - } - }() - u.stage = finished + if u.collectAndCheckStoreReport(heartbeat, resp) { + go func() { + if u.generateForceLeaderPlan() { + // still have plan to do + return } - } + if u.generateRecoveryPlan() { + // still have plan to do + return + } + u.finishRecovery() + }() } } } +func (u *unsafeRecoveryController) finishRecovery() { + log.Info("Recover finished.") + for _, history := range u.History() { + log.Info(history) + } + + u.Lock() + defer u.Unlock() + u.stage = finished +} + func (u *unsafeRecoveryController) reset() { u.stage = ready - u.failedStores = make(map[uint64]string) + u.failedStores = make(map[uint64]interface{}) u.storeReports = make(map[uint64]*pdpb.StoreReport) u.numStoresReported = 0 u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan) @@ -166,210 +187,305 @@ func (u *unsafeRecoveryController) reset() { u.numStoresPlanExecuted = 0 } -func (u *unsafeRecoveryController) isPlanExecuted(storeID uint64, report *pdpb.StoreReport) bool { - targetRegions := make(map[uint64]*metapb.Region) - toBeRemovedRegions := make(map[uint64]bool) - for _, create := range u.storeRecoveryPlans[storeID].Creates { - targetRegions[create.Id] = create - } - for _, update := range u.storeRecoveryPlans[storeID].Updates { - targetRegions[update.Id] = update - } - for _, del := range u.storeRecoveryPlans[storeID].Deletes { - toBeRemovedRegions[del] = true - } - numFinished := 0 - for _, peerReport := range report.PeerReports { - region := peerReport.RegionState.Region - if _, ok := toBeRemovedRegions[region.Id]; ok { - return false - } else if target, ok := targetRegions[region.Id]; ok { - if bytes.Equal(target.StartKey, region.StartKey) && bytes.Equal(target.EndKey, region.EndKey) && !u.containsFailedPeers(region) { - numFinished += 1 +func (u *unsafeRecoveryController) canElectLeader(region *metapb.Region) bool { + hasQuorum := func(voters []*metapb.Peer) bool { + numFailedVoters := 0 + numLiveVoters := 0 + + for _, peer := range region.Peers { + if _, ok := u.failedStores[peer.StoreId]; ok { + numFailedVoters += 1 } else { - return false + numLiveVoters += 1 } } + return numFailedVoters < numLiveVoters } - return numFinished == len(targetRegions) -} - -type regionItem struct { - region *metapb.Region -} -func (r regionItem) Less(other btree.Item) bool { - return bytes.Compare(r.region.StartKey, other.(regionItem).region.StartKey) < 0 -} + // consider joint consensus + var incomingVoters []*metapb.Peer + var outgoingVoters []*metapb.Peer -func (u *unsafeRecoveryController) canElectLeader(region *metapb.Region) bool { - numFailedVoters := 0 - numLiveVoters := 0 for _, peer := range region.Peers { - if peer.Role != metapb.PeerRole_Voter && peer.Role != metapb.PeerRole_IncomingVoter { + if peer.Role == metapb.PeerRole_Learner { continue } - if _, ok := u.failedStores[peer.StoreId]; ok { - numFailedVoters += 1 - } else { - numLiveVoters += 1 + if peer.Role == metapb.PeerRole_Voter || peer.Role == metapb.PeerRole_IncomingVoter { + incomingVoters = append(incomingVoters, peer) + } else if peer.Role == metapb.PeerRole_Voter || peer.Role == metapb.PeerRole_DemotingVoter { + outgoingVoters = append(outgoingVoters, peer) } } - return numFailedVoters < numLiveVoters + + return hasQuorum(incomingVoters) && hasQuorum(outgoingVoters) + } -func (u *unsafeRecoveryController) containsFailedPeers(region *metapb.Region) bool { +func (u *unsafeRecoveryController) getFailedPeers(region *metapb.Region) []*metapb.Peer { + var failedPeers []*metapb.Peer for _, peer := range region.Peers { if _, ok := u.failedStores[peer.StoreId]; ok { - return true + failedPeers = append(failedPeers, peer) } } - return false + return failedPeers } -func keepOneReplica(storeID uint64, region *metapb.Region) { - var newPeerList []*metapb.Peer - for _, peer := range region.Peers { - if peer.StoreId == storeID { - if peer.Role != metapb.PeerRole_Voter { - peer.Role = metapb.PeerRole_Voter +var _ btree.Item = ®ionItem{} + +type regionItem struct { + report *pdpb.PeerReport + storeID uint64 +} + +func (r *regionItem) Region() *metapb.Region { + return r.report.GetRegionState().GetRegion() +} + +func (r *regionItem) IsEpochStale(other *regionItem) bool { + re := r.Region().GetRegionEpoch() + oe := other.Region().GetRegionEpoch() + return re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer() +} + +func (r *regionItem) IsStale(origin *regionItem) bool { + if r.Region().GetId() != origin.Region().GetId() { + panic("should compare peers of same region") + } + + // compare region epoch, commit index, term and last index in order + if r.IsEpochStale(origin) { + return true + } + re := r.Region().GetRegionEpoch() + oe := origin.Region().GetRegionEpoch() + if re.GetVersion() == oe.GetVersion() && re.GetConfVer() == oe.GetConfVer() { + rs := r.report.GetRaftState() + os := origin.report.GetRaftState() + if rs.GetHardState().GetCommit() < os.GetHardState().GetCommit() { + return true + } else if rs.GetHardState().GetCommit() == os.GetHardState().GetCommit() { + if rs.GetHardState().GetTerm() < os.GetHardState().GetTerm() { + return true + } else if rs.GetHardState().GetTerm() == os.GetHardState().GetTerm() { + if rs.GetLastIndex() < os.GetLastIndex() { + return true + } } - newPeerList = append(newPeerList, peer) } } - region.Peers = newPeerList + return false } -type peerStorePair struct { - peer *pdpb.PeerReport - storeID uint64 +// Less returns true if the region start key is less than the other. +func (r *regionItem) Less(other btree.Item) bool { + left := r.Region().GetStartKey() + right := other.(*regionItem).Region().GetStartKey() + return bytes.Compare(left, right) < 0 +} + +func (r *regionItem) Contains(key []byte) bool { + start, end := r.Region().GetStartKey(), r.Region().GetEndKey() + return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) } -func getOverlapRanges(tree *btree.BTree, region *metapb.Region) []*metapb.Region { - var overlapRanges []*metapb.Region - tree.DescendLessOrEqual(regionItem{region}, func(item btree.Item) bool { - if bytes.Compare(item.(regionItem).region.StartKey, region.StartKey) < 0 && bytes.Compare(item.(regionItem).region.EndKey, region.StartKey) > 0 { - overlapRanges = append(overlapRanges, item.(regionItem).region) +const ( + defaultBTreeDegree = 64 +) + +type regionTree struct { + regions map[uint64]*regionItem + tree *btree.BTree +} + +func newRegionTree() *regionTree { + return ®ionTree{ + regions: make(map[uint64]*regionItem), + tree: btree.New(defaultBTreeDegree), + } +} + +func (t *regionTree) contains(regionID uint64) bool { + _, ok := t.regions[regionID] + return ok +} + +// getOverlaps gets the regions which are overlapped with the specified region range. +func (t *regionTree) getOverlaps(item *regionItem) []*regionItem { + // note that find() gets the last item that is less or equal than the region. + // in the case: |_______a_______|_____b_____|___c___| + // new region is |______d______| + // find() will return regionItem of region_a + // and both startKey of region_a and region_b are less than endKey of region_d, + // thus they are regarded as overlapped regions. + result := t.find(item) + if result == nil { + result = item + } + + end := item.Region().GetEndKey() + var overlaps []*regionItem + t.tree.AscendGreaterOrEqual(result, func(i btree.Item) bool { + over := i.(*regionItem) + if len(end) > 0 && bytes.Compare(end, over.Region().GetStartKey()) <= 0 { + return false } + overlaps = append(overlaps, over) + return true + }) + return overlaps +} + +// find is a helper function to find an item that contains the regions start key. +func (t *regionTree) find(item *regionItem) *regionItem { + var result *regionItem + t.tree.DescendLessOrEqual(item, func(i btree.Item) bool { + result = i.(*regionItem) return false }) - tree.AscendGreaterOrEqual(regionItem{region}, func(item btree.Item) bool { - if len(region.EndKey) != 0 && bytes.Compare(item.(regionItem).region.StartKey, region.EndKey) > 0 { + if result == nil || !result.Contains(item.Region().GetStartKey()) { + return nil + } + + return result +} + +// update updates the tree with the region. +// It finds and deletes all the overlapped regions first, and then +// insert the region. +func (t *regionTree) update(item *regionItem) bool { + overlaps := t.getOverlaps(item) + + foundSelf := false + for _, old := range overlaps { + if item.Region().GetId() == old.Region().GetId() { + foundSelf = true + // not only check epoch for peers of the same region + if item.IsStale(old) { + return false + } + } else if item.IsEpochStale(old) { return false } - overlapRanges = append(overlapRanges, item.(regionItem).region) + } + + if !foundSelf { + // the ranges are not overlapped before and after, so use hash map + // to get the origin region info + if origin := t.regions[item.Region().GetId()]; origin != nil { + if item.IsStale(origin) { + return false + } + if origin.IsStale(item) { + t.tree.Delete(origin) + } + } + } + for _, old := range overlaps { + t.tree.Delete(old) + } + t.regions[item.Region().GetId()] = item + t.tree.ReplaceOrInsert(item) + return true +} + +func (u *unsafeRecoveryController) getRecoveryPlan(storeID uint64) *pdpb.RecoveryPlan { + storeRecoveryPlan, exists := u.storeRecoveryPlans[storeID] + if !exists { + u.storeRecoveryPlans[storeID] = &pdpb.RecoveryPlan{} + storeRecoveryPlan = u.storeRecoveryPlans[storeID] + } + return storeRecoveryPlan +} + +func (u *unsafeRecoveryController) generateForceLeaderPlan() bool { + u.Lock() + defer u.Unlock() + + newestRegionTree := newRegionTree() + // Go through all the peer reports to build up the newest region tree + for storeID, storeReport := range u.storeReports { + for _, peerReport := range storeReport.PeerReports { + newestRegionTree.update(®ionItem{report: peerReport, storeID: storeID}) + } + } + + hasPlan := false + // Check the regions in newest Region Tree to see if it can still elect leader + // considering the failed stores + newestRegionTree.tree.Ascend(func(item btree.Item) bool { + region := item.(*regionItem).Region() + storeID := item.(*regionItem).storeID + if !u.canElectLeader(region) { + storeRecoveryPlan := u.getRecoveryPlan(storeID) + storeRecoveryPlan.EnterForceLeaders = append(storeRecoveryPlan.EnterForceLeaders, region.GetId()) + hasPlan = true + } return true }) - return overlapRanges + + // TODO: need to resolve the case 2 + // it's hard to distinguish it with unfinished split region + // and it's rare, so won't do it now + + u.stage = recovering + return hasPlan } -func (u *unsafeRecoveryController) generateRecoveryPlan() { +func (u *unsafeRecoveryController) generateRecoveryPlan() bool { u.Lock() defer u.Unlock() - newestRegionReports := make(map[uint64]*pdpb.PeerReport) - var allPeerReports []*peerStorePair + + newestRegionTree := newRegionTree() + // Go through all the peer reports to build up the newest region tree for storeID, storeReport := range u.storeReports { for _, peerReport := range storeReport.PeerReports { - allPeerReports = append(allPeerReports, &peerStorePair{peerReport, storeID}) - regionID := peerReport.RegionState.Region.Id - if existing, ok := newestRegionReports[regionID]; ok { - if existing.RegionState.Region.RegionEpoch.Version >= peerReport.RegionState.Region.RegionEpoch.Version && - existing.RegionState.Region.RegionEpoch.ConfVer >= peerReport.RegionState.Region.RegionEpoch.Version && - existing.RaftState.LastIndex >= peerReport.RaftState.LastIndex { - continue - } - } - newestRegionReports[regionID] = peerReport + newestRegionTree.update(®ionItem{report: peerReport, storeID: storeID}) } } - recoveredRanges := btree.New(2) - healthyRegions := make(map[uint64]*pdpb.PeerReport) - inUseRegions := make(map[uint64]bool) - for _, report := range newestRegionReports { - region := report.RegionState.Region - // TODO(v01dstar): Whether the group can elect a leader should not merely rely on failed stores / peers, since it is possible that all reported peers are stale. - if u.canElectLeader(report.RegionState.Region) { - healthyRegions[region.Id] = report - inUseRegions[region.Id] = true - recoveredRanges.ReplaceOrInsert(regionItem{report.RegionState.Region}) + + hasPlan := false + // Check the regions in newest Region Tree to see if it can still elect leader + // considering the failed stores + newestRegionTree.tree.Ascend(func(item btree.Item) bool { + region := item.(*regionItem).Region() + storeID := item.(*regionItem).storeID + if !u.canElectLeader(region) { + storeRecoveryPlan := u.getRecoveryPlan(storeID) + storeRecoveryPlan.Removes = append(storeRecoveryPlan.Removes, + &pdpb.RemovePeers{ + RegionId: region.GetId(), + Peers: u.getFailedPeers(region), + }, + ) + hasPlan = true } - } - sort.SliceStable(allPeerReports, func(i, j int) bool { - return allPeerReports[i].peer.RegionState.Region.RegionEpoch.Version > allPeerReports[j].peer.RegionState.Region.RegionEpoch.Version + return true }) - for _, peerStorePair := range allPeerReports { - region := peerStorePair.peer.RegionState.Region - storeID := peerStorePair.storeID - lastEnd := region.StartKey - reachedTheEnd := false - var creates []*metapb.Region - var update *metapb.Region - for _, overlapRegion := range getOverlapRanges(recoveredRanges, region) { - if bytes.Compare(lastEnd, overlapRegion.StartKey) < 0 { - newRegion := proto.Clone(region).(*metapb.Region) - keepOneReplica(storeID, newRegion) - newRegion.StartKey = lastEnd - newRegion.EndKey = overlapRegion.StartKey - if _, inUse := inUseRegions[region.Id]; inUse { - newRegion.Id, _ = u.cluster.GetAllocator().Alloc() - creates = append(creates, newRegion) + + // should be same + for storeID, storeReport := range u.storeReports { + for _, peerReport := range storeReport.PeerReports { + regionID := peerReport.GetRegionState().Region.Id + if !newestRegionTree.contains(regionID) { + if u.canElectLeader(peerReport.GetRegionState().Region) { + log.Warn("find invalid peer but it has quorum") } else { - inUseRegions[region.Id] = true - update = newRegion - } - recoveredRanges.ReplaceOrInsert(regionItem{newRegion}) - if len(overlapRegion.EndKey) == 0 { - reachedTheEnd = true - break + // the peer is not in the valid regions, should be deleted directly + storeRecoveryPlan := u.getRecoveryPlan(storeID) + storeRecoveryPlan.Deletes = append(storeRecoveryPlan.Deletes, regionID) } - lastEnd = overlapRegion.EndKey - } else if len(overlapRegion.EndKey) == 0 { - reachedTheEnd = true - break - } else if bytes.Compare(overlapRegion.EndKey, lastEnd) > 0 { - lastEnd = overlapRegion.EndKey } } - if !reachedTheEnd && (bytes.Compare(lastEnd, region.EndKey) < 0 || len(region.EndKey) == 0) { - newRegion := proto.Clone(region).(*metapb.Region) - keepOneReplica(storeID, newRegion) - newRegion.StartKey = lastEnd - newRegion.EndKey = region.EndKey - if _, inUse := inUseRegions[region.Id]; inUse { - newRegion.Id, _ = u.cluster.GetAllocator().Alloc() - creates = append(creates, newRegion) - } else { - inUseRegions[region.Id] = true - update = newRegion - } - recoveredRanges.ReplaceOrInsert(regionItem{newRegion}) - } - if len(creates) != 0 || update != nil { - storeRecoveryPlan, exists := u.storeRecoveryPlans[storeID] - if !exists { - u.storeRecoveryPlans[storeID] = &pdpb.RecoveryPlan{} - storeRecoveryPlan = u.storeRecoveryPlans[storeID] - } - storeRecoveryPlan.Creates = append(storeRecoveryPlan.Creates, creates...) - if update != nil { - storeRecoveryPlan.Updates = append(storeRecoveryPlan.Updates, update) - } - } else if _, healthy := healthyRegions[region.Id]; !healthy { - // If this peer contributes nothing to the recovered ranges, and it does not belong to a healthy region, delete it. - storeRecoveryPlan, exists := u.storeRecoveryPlans[storeID] - if !exists { - u.storeRecoveryPlans[storeID] = &pdpb.RecoveryPlan{} - storeRecoveryPlan = u.storeRecoveryPlans[storeID] - } - storeRecoveryPlan.Deletes = append(storeRecoveryPlan.Deletes, region.Id) - } } - // There may be ranges that are covered by no one. Find these empty ranges, create new regions that cover them and evenly distribute newly created regions among all stores. + + // There may be ranges that are covered by no one. Find these empty ranges, create new + // regions that cover them and evenly distribute newly created regions among all stores. lastEnd := []byte("") var creates []*metapb.Region - recoveredRanges.Ascend(func(item btree.Item) bool { - region := item.(regionItem).region + newestRegionTree.tree.Ascend(func(item btree.Item) bool { + region := item.(*regionItem).Region() if !bytes.Equal(region.StartKey, lastEnd) { newRegion := &metapb.Region{} newRegion.StartKey = lastEnd @@ -395,23 +511,16 @@ func (u *unsafeRecoveryController) generateRecoveryPlan() { storeID := allStores[idx%len(allStores)] peerID, _ := u.cluster.GetAllocator().Alloc() create.Peers = []*metapb.Peer{{Id: peerID, StoreId: storeID, Role: metapb.PeerRole_Voter}} - storeRecoveryPlan, exists := u.storeRecoveryPlans[storeID] - if !exists { - u.storeRecoveryPlans[storeID] = &pdpb.RecoveryPlan{} - storeRecoveryPlan = u.storeRecoveryPlans[storeID] - } + storeRecoveryPlan := u.getRecoveryPlan(storeID) storeRecoveryPlan.Creates = append(storeRecoveryPlan.Creates, create) } + log.Info("Plan generated") - if len(u.storeRecoveryPlans) == 0 { - log.Info("Nothing to do") - u.stage = finished - return - } for store, plan := range u.storeRecoveryPlans { log.Info("Store plan", zap.String("store", strconv.FormatUint(store, 10)), zap.String("plan", proto.MarshalTextString(plan))) } u.stage = recovering + return hasPlan } func getPeerDigest(peer *metapb.Peer) string { @@ -464,6 +573,8 @@ func (u *unsafeRecoveryController) Show() []string { status = append(status, "Stores that have reported to PD: "+reported) status = append(status, "Stores that have not reported to PD: "+unreported) return status + case forceLeader: + case recovering: var status []string status = append(status, fmt.Sprintf("Waiting for recover commands being applied, %d/%d", u.numStoresPlanExecuted, len(u.storeRecoveryPlans))) @@ -473,9 +584,13 @@ func (u *unsafeRecoveryController) Show() []string { for _, create := range plan.Creates { planDigest += getRegionDigest(create) + ", " } - planDigest += "; updates: " - for _, update := range plan.Updates { - planDigest += getRegionDigest(update) + ", " + planDigest += "; removes: " + for _, remove := range plan.Removes { + var peers string + for _, peer := range remove.Peers { + peers += "(" + getPeerDigest(peer) + "), " + } + planDigest += fmt.Sprintf("region %s {%s}", remove.RegionId, peers) + ", " } planDigest += "; deletes: " for _, deletion := range plan.Deletes { @@ -501,7 +616,7 @@ func (u *unsafeRecoveryController) History() []string { u.RLock() defer u.RUnlock() if u.stage <= ready { - return []string{"No unasfe recover has been triggered since PD restarted."} + return []string{"No unsafe recover has been triggered since PD restarted."} } var history []string if u.stage >= collectingClusterInfo { @@ -521,9 +636,13 @@ func (u *unsafeRecoveryController) History() []string { for _, create := range plan.Creates { planDigest += getRegionDigest(create) + ", " } - planDigest += "; updates: " - for _, update := range plan.Updates { - planDigest += getRegionDigest(update) + ", " + planDigest += "; removes: " + for _, remove := range plan.Removes { + var peers string + for _, peer := range remove.Peers { + peers += "(" + getPeerDigest(peer) + "), " + } + planDigest += fmt.Sprintf("region %s {%s}", remove.RegionId, peers) + ", " } planDigest += "; deletes: " for _, deletion := range plan.Deletes {