Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: move init function #5934

Merged
merged 3 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import (
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// Register schedulers.
_ "github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedulers"

// Register Service
_ "github.com/tikv/pd/pkg/mcs/registry"
Expand Down Expand Up @@ -136,6 +135,7 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, ba
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
schedulers.Register()
Copy link
Contributor

@binshi-bing binshi-bing Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other alternatives might be Package plugin or k8s sidecar as the mid/long term solutions.


return ctx, cancel, svr
}
Expand Down
1 change: 0 additions & 1 deletion server/api/hot_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server"
_ "github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/storage"
)

Expand Down
1 change: 0 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
_ "github.com/tikv/pd/server/schedulers"
)

type scheduleTestSuite struct {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockid"
_ "github.com/tikv/pd/server/schedulers"
_ "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/storage"
)

Expand Down
29 changes: 0 additions & 29 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,35 +66,6 @@ var (
balanceLeaderNewOpCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "new-operator")
)

func init() {
schedule.RegisterSliceDecoderBuilder(BalanceLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*balanceLeaderSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
ranges, err := getKeyRanges(args)
if err != nil {
return err
}
conf.Ranges = ranges
conf.Batch = BalanceLeaderBatchSize
return nil
}
})

schedule.RegisterScheduler(BalanceLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceLeaderSchedulerConfig{storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
if conf.Batch == 0 {
conf.Batch = BalanceLeaderBatchSize
}
return newBalanceLeaderScheduler(opController, conf), nil
})
}

type balanceLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
27 changes: 0 additions & 27 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,13 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"go.uber.org/zap"
)

func init() {
schedule.RegisterSliceDecoderBuilder(BalanceRegionType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*balanceRegionSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
ranges, err := getKeyRanges(args)
if err != nil {
return err
}
conf.Ranges = ranges
conf.Name = BalanceRegionName
return nil
}
})
schedule.RegisterScheduler(BalanceRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceRegionSchedulerConfig{}
if err := decoder(conf); err != nil {
return nil, err
}
return newBalanceRegionScheduler(opController, conf), nil
})
}

const (
// BalanceRegionName is balance region scheduler name.
BalanceRegionName = "balance-region-scheduler"
Expand Down
29 changes: 0 additions & 29 deletions server/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,6 @@ const (
MaxBalanceWitnessBatchSize = 10
)

func init() {
schedule.RegisterSliceDecoderBuilder(BalanceWitnessType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*balanceWitnessSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
ranges, err := getKeyRanges(args)
if err != nil {
return err
}
conf.Ranges = ranges
conf.Batch = balanceWitnessBatchSize
return nil
}
})

schedule.RegisterScheduler(BalanceWitnessType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceWitnessSchedulerConfig{storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
if conf.Batch == 0 {
conf.Batch = balanceWitnessBatchSize
}
return newBalanceWitnessScheduler(opController, conf), nil
})
}

type balanceWitnessSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
35 changes: 0 additions & 35 deletions server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,41 +54,6 @@ var (
evictLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "new-operator")
)

func init() {
schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}
conf, ok := v.(*evictLeaderSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}

ranges, err := getKeyRanges(args[1:])
if err != nil {
return err
}
conf.StoreIDWithRanges[id] = ranges
return nil
}
})

schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
conf.cluster = opController.GetCluster()
return newEvictLeaderScheduler(opController, conf), nil
})
}

type evictLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
16 changes: 0 additions & 16 deletions server/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,6 @@ const (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule")

func init() {
schedule.RegisterSliceDecoderBuilder(EvictSlowStoreType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
return nil
}
})

schedule.RegisterScheduler(EvictSlowStoreType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)}
if err := decoder(conf); err != nil {
return nil, err
}
return newEvictSlowStoreScheduler(opController, conf), nil
})
}

type evictSlowStoreSchedulerConfig struct {
storage endpoint.ConfigStorage
EvictedStores []uint64 `json:"evict-stores"`
Expand Down
41 changes: 0 additions & 41 deletions server/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,47 +51,6 @@ var (
grantHotRegionSkipCounter = schedulerCounter.WithLabelValues(GrantHotRegionName, "skip")
)

func init() {
schedule.RegisterSliceDecoderBuilder(GrantHotRegionType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 2 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}

conf, ok := v.(*grantHotRegionSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
leaderID, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}

storeIDs := make([]uint64, 0)
for _, id := range strings.Split(args[1], ",") {
storeID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
storeIDs = append(storeIDs, storeID)
}
if !conf.setStore(leaderID, storeIDs) {
return errs.ErrSchedulerConfig
}
return nil
}
})

schedule.RegisterScheduler(GrantHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage}
conf.cluster = opController.GetCluster()
if err := decoder(conf); err != nil {
return nil, err
}
return newGrantHotRegionScheduler(opController, conf), nil
})
}

type grantHotRegionSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
35 changes: 0 additions & 35 deletions server/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,41 +47,6 @@ var (
grantLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(GrantLeaderName, "new-operator")
)

func init() {
schedule.RegisterSliceDecoderBuilder(GrantLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}

conf, ok := v.(*grantLeaderSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
ranges, err := getKeyRanges(args[1:])
if err != nil {
return err
}
conf.StoreIDWithRanges[id] = ranges
return nil
}
})

schedule.RegisterScheduler(GrantLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage}
conf.cluster = opController.GetCluster()
if err := decoder(conf); err != nil {
return nil, err
}
return newGrantLeaderScheduler(opController, conf), nil
})
}

type grantLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
32 changes: 0 additions & 32 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/filter"
Expand Down Expand Up @@ -177,37 +176,6 @@ func (h *baseHotScheduler) randomRWType() statistics.RWType {
return h.types[h.r.Int()%len(h.types)]
}

func init() {
schedule.RegisterSliceDecoderBuilder(HotRegionType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
return nil
}
})
schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := initHotRegionScheduleConfig()

var data map[string]interface{}
if err := decoder(&data); err != nil {
return nil, err
}
if len(data) != 0 {
// After upgrading, use compatible config.

// For clusters with the initial version >= v5.2, it will be overwritten by the default config.
conf.applyPrioritiesConfig(compatiblePrioritiesConfig)
// For clusters with the initial version >= v6.4, it will be overwritten by the default config.
conf.SetRankFormulaVersion("")

if err := decoder(conf); err != nil {
return nil, err
}
}

conf.storage = storage
return newHotScheduler(opController, conf), nil
})
}

const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"
Expand Down
Loading