Skip to content

Commit

Permalink
Merge branch 'master' into min_ts
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 17, 2022
2 parents 5d6c960 + 1a353ae commit 6ebb1ec
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 50 deletions.
54 changes: 53 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,59 @@ func (s *testScheduleSuite) TestAPI(c *C) {
args []arg
extraTestFunc func(name string, c *C)
}{
{name: "balance-leader-scheduler"},
{
name: "balance-leader-scheduler",
extraTestFunc: func(name string, c *C) {
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
c.Assert(readJSON(testDialClient, listURL, &resp), IsNil)
c.Assert(resp["batch"], Equals, 4.0)
dataMap := make(map[string]interface{})
dataMap["batch"] = 3
updateURL := fmt.Sprintf("%s%s%s/%s/config", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
body, err := json.Marshal(dataMap)
c.Assert(err, IsNil)
c.Assert(postJSON(testDialClient, updateURL, body), IsNil)
resp = make(map[string]interface{})
c.Assert(readJSON(testDialClient, listURL, &resp), IsNil)
c.Assert(resp["batch"], Equals, 3.0)
// update again
err = postJSON(testDialClient, updateURL, body, func(res []byte, code int) {
c.Assert(string(res), Equals, "\"no changed\"\n")
c.Assert(code, Equals, 200)
})
c.Assert(err, IsNil)
// update invalidate batch
dataMap = map[string]interface{}{}
dataMap["batch"] = 100
body, err = json.Marshal(dataMap)
c.Assert(err, IsNil)
err = postJSON(testDialClient, updateURL, body, func(res []byte, code int) {
c.Assert(code, Equals, 400)
})
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "\"invalid batch size which should be an integer between 1 and 10\"\n")
resp = make(map[string]interface{})
c.Assert(readJSON(testDialClient, listURL, &resp), IsNil)
c.Assert(resp["batch"], Equals, 3.0)
// empty body
err = postJSON(testDialClient, updateURL, nil, func(res []byte, code int) {
c.Assert(code, Equals, 500)
})
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "\"unexpected end of JSON input\"\n")
// config item not found
dataMap = map[string]interface{}{}
dataMap["error"] = 3
body, err = json.Marshal(dataMap)
c.Assert(err, IsNil)
err = postJSON(testDialClient, updateURL, body, func(res []byte, code int) {
c.Assert(code, Equals, 400)
})
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "\"config item not found\"\n")
},
},
{
name: "balance-hot-region-scheduler",
extraTestFunc: func(name string, c *C) {
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1799,7 +1799,7 @@ func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_Wat

// ReportBuckets receives region buckets from tikv.
func (s *GrpcServer) ReportBuckets(pdpb.PD_ReportBucketsServer) error {
panic("not implemented")
return status.Errorf(codes.Unimplemented, "not implemented")
}

// Evict the leaders when the store is damaged. Damaged regions are emergency errors
Expand Down
126 changes: 113 additions & 13 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
package schedulers

import (
"bytes"
"encoding/json"
"io"
"net/http"
"sort"
"strconv"
"sync"

"github.com/gorilla/mux"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
Expand All @@ -26,6 +32,7 @@ import (
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/storage/endpoint"
"github.com/unrolled/render"
"go.uber.org/zap"
)

Expand All @@ -36,8 +43,12 @@ const (
BalanceLeaderType = "balance-leader"
// balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store.
balanceLeaderRetryLimit = 10
// BalanceLeaderBatchSize is the default number of operators to transfer leaders by one scheduling
BalanceLeaderBatchSize = 5
// BalanceLeaderBatchSize is the default number of operators to transfer leaders by one scheduling.
// Default value is 4 which is subjected by scheduler-max-waiting-operator and leader-schedule-limit
// If you want to increase balance speed more, please increase above-mentioned param.
BalanceLeaderBatchSize = 4
// MaxBalanceLeaderBatchSize is maximum of balance leader batch size
MaxBalanceLeaderBatchSize = 10

transferIn = "transfer-in"
transferOut = "transfer-out"
Expand All @@ -55,14 +66,13 @@ func init() {
return err
}
conf.Ranges = ranges
conf.Name = BalanceLeaderName
conf.Batch = BalanceLeaderBatchSize
return nil
}
})

schedule.RegisterScheduler(BalanceLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceLeaderSchedulerConfig{}
conf := &balanceLeaderSchedulerConfig{storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
Expand All @@ -71,15 +81,97 @@ func init() {
}

type balanceLeaderSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
Batch int `json:"batch"`
mu sync.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}

func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) {
conf.mu.Lock()
defer conf.mu.Unlock()

oldc, _ := json.Marshal(conf)

if err := json.Unmarshal(data, conf); err != nil {
return http.StatusInternalServerError, err.Error()
}
newc, _ := json.Marshal(conf)
if !bytes.Equal(oldc, newc) {
if !conf.validate() {
json.Unmarshal(oldc, conf)
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
conf.persistLocked()
return http.StatusOK, "success"
}
m := make(map[string]interface{})
if err := json.Unmarshal(data, &m); err != nil {
return http.StatusInternalServerError, err.Error()
}
ok := findSameField(conf, m)
if ok {
return http.StatusOK, "no changed"
}
return http.StatusBadRequest, "config item not found"
}

func (conf *balanceLeaderSchedulerConfig) validate() bool {
return conf.Batch >= 1 && conf.Batch <= 10
}

func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
return &balanceLeaderSchedulerConfig{
Ranges: conf.Ranges,
Batch: conf.Batch,
}
}

func (conf *balanceLeaderSchedulerConfig) persistLocked() error {
data, err := schedule.EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveScheduleConfig(BalanceLeaderName, data)
}

type balanceLeaderHandler struct {
rd *render.Render
config *balanceLeaderSchedulerConfig
}

func newBalanceLeaderHandler(conf *balanceLeaderSchedulerConfig) http.Handler {
handler := &balanceLeaderHandler{
config: conf,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", handler.UpdateConfig).Methods("POST")
router.HandleFunc("/list", handler.ListConfig).Methods("GET")
return router
}

func (handler *balanceLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) {
data, _ := io.ReadAll(r.Body)
r.Body.Close()
httpCode, v := handler.config.Update(data)
handler.rd.JSON(w, httpCode, v)
}

func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

type balanceLeaderScheduler struct {
*BaseScheduler
*retryQuota
name string
conf *balanceLeaderSchedulerConfig
handler http.Handler
opController *schedule.OperatorController
filters []filter.Filter
counter *prometheus.CounterVec
Expand All @@ -89,11 +181,12 @@ type balanceLeaderScheduler struct {
// each store balanced.
func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) schedule.Scheduler {
base := NewBaseScheduler(opController)

s := &balanceLeaderScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceLeaderRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
name: BalanceLeaderName,
conf: conf,
handler: newBalanceLeaderHandler(conf),
opController: opController,
counter: balanceLeaderCounter,
}
Expand All @@ -107,6 +200,10 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *
return s
}

func (l *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
l.handler.ServeHTTP(w, r)
}

// BalanceLeaderCreateOption is used to create a scheduler with an option.
type BalanceLeaderCreateOption func(s *balanceLeaderScheduler)

Expand All @@ -120,12 +217,12 @@ func WithBalanceLeaderCounter(counter *prometheus.CounterVec) BalanceLeaderCreat
// WithBalanceLeaderName sets the name for the scheduler.
func WithBalanceLeaderName(name string) BalanceLeaderCreateOption {
return func(s *balanceLeaderScheduler) {
s.conf.Name = name
s.name = name
}
}

func (l *balanceLeaderScheduler) GetName() string {
return l.conf.Name
return l.name
}

func (l *balanceLeaderScheduler) GetType() string {
Expand Down Expand Up @@ -192,6 +289,9 @@ func (cs *candidateStores) reSort(stores ...*core.StoreInfo) {
}

func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.Operator {
l.conf.mu.RLock()
batch := l.conf.Batch
l.conf.mu.RUnlock()
schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc()

leaderSchedulePolicy := cluster.GetOpts().GetLeaderSchedulePolicy()
Expand Down Expand Up @@ -220,14 +320,14 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetOpts()), lessOption)
usedRegions := make(map[uint64]struct{})

result := make([]*operator.Operator, 0, l.conf.Batch)
result := make([]*operator.Operator, 0, batch)
for sourceCandidate.hasStore() || targetCandidate.hasStore() {
// first choose source
if sourceCandidate.hasStore() {
op := createTransferLeaderOperator(sourceCandidate, transferOut, l, plan, usedRegions)
if op != nil {
result = append(result, op)
if len(result) >= l.conf.Batch {
if len(result) >= batch {
return result
}
makeInfluence(op, plan, usedRegions, sourceCandidate, targetCandidate)
Expand All @@ -238,7 +338,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
op := createTransferLeaderOperator(targetCandidate, transferIn, l, plan, usedRegions)
if op != nil {
result = append(result, op)
if len(result) >= l.conf.Batch {
if len(result) >= batch {
return result
}
makeInfluence(op, plan, usedRegions, sourceCandidate, targetCandidate)
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,12 +604,12 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestBatchBalance(c *C) {
}
s.tc.AddLeaderRegionWithRange(uint64(101), "101a", "101z", 5, 4, 3)
ops := lb.Schedule(s.tc)
c.Assert(ops, HasLen, 5)
c.Assert(ops, HasLen, 4)
regions := make(map[uint64]struct{})
for _, op := range ops {
regions[op.RegionID()] = struct{}{}
}
c.Assert(regions, HasLen, 5)
c.Assert(regions, HasLen, 4)
}

func (s *testBalanceLeaderRangeSchedulerSuite) TestReSortStores(c *C) {
Expand Down
16 changes: 4 additions & 12 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"encoding/json"
"io"
"net/http"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -327,16 +325,10 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *
rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
t := reflect.TypeOf(conf).Elem()
for i := 0; i < t.NumField(); i++ {
jsonTag := t.Field(i).Tag.Get("json")
if i := strings.Index(jsonTag, ","); i != -1 { // trim 'foobar,string' to 'foobar'
jsonTag = jsonTag[:i]
}
if _, ok := m[jsonTag]; ok {
rd.Text(w, http.StatusOK, "no changed")
return
}
ok := findSameField(conf, m)
if ok {
rd.Text(w, http.StatusOK, "no changed")
return
}

rd.Text(w, http.StatusBadRequest, "config item not found")
Expand Down
16 changes: 16 additions & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package schedulers

import (
"net/url"
"reflect"
"strconv"
"strings"
"time"

"github.com/pingcap/log"
Expand Down Expand Up @@ -341,3 +343,17 @@ func (q *retryQuota) GC(keepStores []*core.StoreInfo) {
}
}
}

func findSameField(v interface{}, m map[string]interface{}) bool {
t := reflect.TypeOf(v).Elem()
for i := 0; i < t.NumField(); i++ {
jsonTag := t.Field(i).Tag.Get("json")
if i := strings.Index(jsonTag, ","); i != -1 { // trim 'foobar,string' to 'foobar'
jsonTag = jsonTag[:i]
}
if _, ok := m[jsonTag]; ok {
return true
}
}
return false
}
5 changes: 1 addition & 4 deletions server/storage/endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ var _ ConfigStorage = (*StorageEndpoint)(nil)
// LoadConfig loads config from configPath then unmarshal it to cfg.
func (se *StorageEndpoint) LoadConfig(cfg interface{}) (bool, error) {
value, err := se.Load(configPath)
if err != nil {
if err != nil || value == "" {
return false, err
}
if value == "" {
return false, nil
}
err = json.Unmarshal([]byte(value), cfg)
if err != nil {
return false, errs.ErrJSONUnmarshal.Wrap(err).GenWithStackByCause()
Expand Down
Loading

0 comments on commit 6ebb1ec

Please sign in to comment.