Skip to content

Commit

Permalink
Merge branch 'release-5.1' into cherry-pick-3808-to-release-5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
HunDunDM authored Sep 13, 2021
2 parents 89b6ff4 + 59693ab commit cc28564
Show file tree
Hide file tree
Showing 20 changed files with 294 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20210712050333-b66fdbd6bfd5
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296
github.com/pingcap/tidb-dashboard v0.0.0-20210902124511-e723204205f7
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71J
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296 h1:kTH6Jyn8XVoFJNxT3UF4eiZMxDbyfsSXkAtSk9jLGr4=
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-dashboard v0.0.0-20210902124511-e723204205f7 h1:uYimp8O2UlwlZm/gMlPDXvuCCTKQETRc8iFmPpxNi78=
github.com/pingcap/tidb-dashboard v0.0.0-20210902124511-e723204205f7/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
13 changes: 13 additions & 0 deletions pkg/typeutil/comparison.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,16 @@ func MinDuration(a, b time.Duration) time.Duration {
}
return b
}

// StringsEqual checks if two string slices are equal. Empyt slice and nil are considered equal.
func StringsEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
7 changes: 6 additions & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package api

import (
"encoding/json"
"io"
"net/http"
"strconv"
Expand Down Expand Up @@ -95,14 +96,18 @@ func (h *adminHandler) ResetTS(w http.ResponseWriter, r *http.Request) {
}

// Intentionally no swagger mark as it is supposed to be only used in
// server-to-server.
// server-to-server. For security reason, it only accepts JSON formatted data.
func (h *adminHandler) persistFile(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
h.rd.Text(w, http.StatusInternalServerError, "")
return
}
defer r.Body.Close()
if !json.Valid(data) {
h.rd.Text(w, http.StatusBadRequest, "body should be json format")
return
}
err = h.svr.PersistFile(mux.Vars(r)["file_name"], data)
if err != nil {
h.rd.Text(w, http.StatusInternalServerError, err.Error())
Expand Down
9 changes: 9 additions & 0 deletions server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ func (s *testAdminSuite) TestDropRegion(c *C) {
c.Assert(region.GetRegionEpoch().Version, Equals, uint64(50))
}

func (s *testAdminSuite) TestPersistFile(c *C) {
data := []byte("#!/bin/sh\nrm -rf /")
err := postJSON(testDialClient, s.urlPrefix+"/admin/persist-file/fun.sh", data)
c.Assert(err, NotNil)
data = []byte(`{"foo":"bar"}`)
err = postJSON(testDialClient, s.urlPrefix+"/admin/persist-file/good.json", data)
c.Assert(err, IsNil)
}

var _ = Suite(&testTSOSuite{})

type testTSOSuite struct {
Expand Down
5 changes: 4 additions & 1 deletion server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ func (s *testClusterSuite) TestCluster(c *C) {
s.testGetClusterStatus(c)
s.svr.GetPersistOptions().SetPlacementRuleEnabled(true)
s.svr.GetPersistOptions().GetReplicationConfig().LocationLabels = []string{"host"}
rule := s.svr.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
rm := s.svr.GetRaftCluster().GetRuleManager()
rule := rm.GetRule("pd", "default")
rule.LocationLabels = []string{"host"}
rule.Count = 1
rm.SetRule(rule)

// Test set the config
url := fmt.Sprintf("%s/cluster", s.urlPrefix)
c1 := &metapb.Cluster{}
Expand Down
4 changes: 3 additions & 1 deletion server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func (s *testRuleSuite) TestSetAll(c *C) {
rule6 := placement.Rule{GroupID: "pd", ID: "default", StartKeyHex: "", EndKeyHex: "", Role: "voter", Count: 3}

s.svr.GetPersistOptions().GetReplicationConfig().LocationLabels = []string{"host"}
s.svr.GetRaftCluster().GetRuleManager().GetRule("pd", "default").LocationLabels = []string{"host"}
defaultRule := s.svr.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
defaultRule.LocationLabels = []string{"host"}
s.svr.GetRaftCluster().GetRuleManager().SetRule(defaultRule)

successData, err := json.Marshal([]*placement.Rule{&rule1, &rule2})
c.Assert(err, IsNil)
Expand Down
5 changes: 1 addition & 4 deletions server/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < r.opts.GetMaxStoreDownTime() {
continue
}
if stats.GetDownSeconds() < uint64(r.opts.GetMaxStoreDownTime().Seconds()) {
continue
}

return r.fixPeer(region, storeID, downStatus)
}
return nil
Expand Down
21 changes: 17 additions & 4 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,27 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
// remove orphan peers only when all rules are satisfied (count+role)
// remove orphan peers only when all rules are satisfied (count+role) and all peers selected
// by RuleFits is not pending or down.
for _, rf := range fit.RuleFits {
if !rf.IsSatisfied() {
checkerCounter.WithLabelValues("rule_checker", "skip-remove-orphan-peer").Inc()
return nil, nil
}
for _, p := range rf.Peers {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.Id == p.Id {
checkerCounter.WithLabelValues("rule_checker", "skip-remove-orphan-peer").Inc()
return nil, nil
}
}
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.Id == p.Id {
checkerCounter.WithLabelValues("rule_checker", "skip-remove-orphan-peer").Inc()
return nil, nil
}
}
}
}
checkerCounter.WithLabelValues("rule_checker", "remove-orphan-peer").Inc()
peer := fit.OrphanPeers[0]
Expand All @@ -292,12 +307,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return false
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < c.cluster.GetOpts().GetMaxStoreDownTime() {
continue
}
if stats.GetDownSeconds() < uint64(c.cluster.GetOpts().GetMaxStoreDownTime().Seconds()) {
continue
}
return true
}
return false
Expand Down
42 changes: 42 additions & 0 deletions server/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,45 @@ func (s *testRuleCheckerSuite) TestFixOfflinePeer(c *C) {
s.ruleManager.SetRule(rule)
c.Assert(s.rc.Check(region), IsNil)
}

// Ref https://github.com/tikv/pd/issues/4045
func (s *testRuleCheckerSuite) TestSkipFixOrphanPeerIfSelectedPeerisPendingOrDown(c *C) {
s.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
s.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"})
s.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host2"})
s.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
s.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3, 4)

// set peer3 and peer4 to pending
r1 := s.cluster.GetRegion(1)
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetStorePeer(3), r1.GetStorePeer(4)}))
s.cluster.PutRegion(r1)

// should not remove extra peer
op := s.rc.Check(s.cluster.GetRegion(1))
c.Assert(op, IsNil)

// set peer3 to down-peer
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetStorePeer(4)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 42,
},
}))
s.cluster.PutRegion(r1)

// should not remove extra peer
op = s.rc.Check(s.cluster.GetRegion(1))
c.Assert(op, IsNil)

// set peer3 to normal
r1 = r1.Clone(core.WithDownPeers(nil))
s.cluster.PutRegion(r1)

// should remove extra peer now
var remove operator.RemovePeer
op = s.rc.Check(s.cluster.GetRegion(1))
c.Assert(op.Step(0), FitsTypeOf, remove)
c.Assert(op.Desc(), Equals, "remove-orphan-peer")
}
9 changes: 9 additions & 0 deletions server/schedule/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func (r *Rule) String() string {
return string(b)
}

// Clone returns a copy of Rule.
func (r *Rule) Clone() *Rule {
var clone Rule
json.Unmarshal([]byte(r.String()), &clone)
clone.StartKey = append(r.StartKey[:0:0], r.StartKey...)
clone.EndKey = append(r.EndKey[:0:0], r.EndKey...)
return &clone
}

// Key returns (groupID, ID) as the global unique key of a rule.
func (r *Rule) Key() [2]string {
return [2]string{r.GroupID, r.ID}
Expand Down
16 changes: 12 additions & 4 deletions server/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) {
func (m *RuleManager) GetRule(group, id string) *Rule {
m.RLock()
defer m.RUnlock()
return m.ruleConfig.getRule([2]string{group, id})
if r := m.ruleConfig.getRule([2]string{group, id}); r != nil {
return r.Clone()
}
return nil
}

// SetRule inserts or updates a Rule.
Expand Down Expand Up @@ -261,7 +264,7 @@ func (m *RuleManager) GetAllRules() []*Rule {
defer m.RUnlock()
rules := make([]*Rule, 0, len(m.ruleConfig.rules))
for _, r := range m.ruleConfig.rules {
rules = append(rules, r)
rules = append(rules, r.Clone())
}
sortRules(rules)
return rules
Expand All @@ -274,7 +277,7 @@ func (m *RuleManager) GetRulesByGroup(group string) []*Rule {
var rules []*Rule
for _, r := range m.ruleConfig.rules {
if r.GroupID == group {
rules = append(rules, r)
rules = append(rules, r.Clone())
}
}
sortRules(rules)
Expand All @@ -285,7 +288,12 @@ func (m *RuleManager) GetRulesByGroup(group string) []*Rule {
func (m *RuleManager) GetRulesByKey(key []byte) []*Rule {
m.RLock()
defer m.RUnlock()
return m.ruleList.getRulesByKey(key)
rules := m.ruleList.getRulesByKey(key)
ret := make([]*Rule, 0, len(rules))
for _, r := range rules {
ret = append(ret, r.Clone())
}
return ret
}

// GetRulesForApplyRegion returns the rules list that should be applied to a region.
Expand Down
21 changes: 17 additions & 4 deletions server/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,29 @@ func (s *testManagerSuite) TestSaveLoad(c *C) {
{GroupID: "foo", ID: "bar", Role: "learner", Count: 1},
}
for _, r := range rules {
c.Assert(s.manager.SetRule(r), IsNil)
c.Assert(s.manager.SetRule(r.Clone()), IsNil)
}

m2 := NewRuleManager(s.store, nil)
err := m2.Initialize(3, []string{"no", "labels"})
c.Assert(err, IsNil)
c.Assert(m2.GetAllRules(), HasLen, 3)
c.Assert(m2.GetRule("pd", "default"), DeepEquals, rules[0])
c.Assert(m2.GetRule("foo", "baz"), DeepEquals, rules[1])
c.Assert(m2.GetRule("foo", "bar"), DeepEquals, rules[2])
c.Assert(m2.GetRule("pd", "default").String(), Equals, rules[0].String())
c.Assert(m2.GetRule("foo", "baz").String(), Equals, rules[1].String())
c.Assert(m2.GetRule("foo", "bar").String(), Equals, rules[2].String())
}

// https://github.com/tikv/pd/issues/3886
func (s *testManagerSuite) TestSetAfterGet(c *C) {
rule := s.manager.GetRule("pd", "default")
rule.Count = 1
s.manager.SetRule(rule)

m2 := NewRuleManager(s.store, nil)
err := m2.Initialize(100, []string{})
c.Assert(err, IsNil)
rule = m2.GetRule("pd", "default")
c.Assert(rule.Count, Equals, 1)
}

func (s *testManagerSuite) checkRules(c *C, rules []*Rule, expect [][2]string) {
Expand Down
16 changes: 12 additions & 4 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceLeaderSchedulerConfig struct {

type balanceLeaderScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceLeaderSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -83,6 +84,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *

s := &balanceLeaderScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceLeaderRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -153,38 +155,44 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
})
sort.Slice(targets, func(i, j int) bool {
iOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[j].GetID())
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) <
targets[j].LeaderScore(leaderSchedulePolicy, jOp)
})

for i := 0; i < len(sources) || i < len(targets); i++ {
if i < len(sources) {
plan.source, plan.target = sources[i], nil
retryLimit := l.retryQuota.GetLimit(plan.source)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("source-store", plan.SourceStoreID()))
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc()
for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
if ops := l.transferLeaderOut(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.source)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel()))
return ops
}
}
l.Attenuate(plan.source)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID()))
}
if i < len(targets) {
plan.source, plan.target = nil, targets[i]
retryLimit := l.retryQuota.GetLimit(plan.target)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("target-store", plan.TargetStoreID()))
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc()

for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
if ops := l.transferLeaderIn(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.target)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel()))
return ops
}
}
l.Attenuate(plan.target)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID()))
}
}
l.retryQuota.GC(append(sources, targets...))
return nil
}

Expand Down
Loading

0 comments on commit cc28564

Please sign in to comment.