From 2a7c8d4c96769fccb2b9246818c45566d9766b82 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 7 Feb 2023 03:15:57 +0800 Subject: [PATCH 01/10] config: refactor TSO config (#5902) ref tikv/pd#5839, ref tikv/pd#5901 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/member/member.go | 7 ++- pkg/tso/allocator_manager.go | 15 ++++--- pkg/tso/config.go | 44 +++++++++++++++--- server/config/config.go | 86 ++++++++++++++++++------------------ server/server.go | 4 +- 5 files changed, 94 insertions(+), 62 deletions(-) diff --git a/pkg/member/member.go b/pkg/member/member.go index 30a03922e98..0e545d9cd67 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/server/config" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "go.uber.org/zap" @@ -250,12 +249,12 @@ func (m *Member) isSameLeader(leader *pdpb.Member) bool { } // MemberInfo initializes the member info. -func (m *Member) MemberInfo(cfg *config.Config, name string, rootPath string) { +func (m *Member) MemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) { leader := &pdpb.Member{ Name: name, MemberId: m.ID(), - ClientUrls: strings.Split(cfg.AdvertiseClientUrls, ","), - PeerUrls: strings.Split(cfg.AdvertisePeerUrls, ","), + ClientUrls: strings.Split(advertiseClientUrls, ","), + PeerUrls: strings.Split(advertisePeerUrls, ","), } data, err := leader.Marshal() diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index d75d53a47bb..68ec3b3af84 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -50,8 +50,6 @@ const ( leaderTickInterval = 50 * time.Millisecond localTSOAllocatorEtcdPrefix = "lta" localTSOSuffixEtcdPrefix = "lts" - // The value should be the same as the variable defined in server's config. - defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond ) var ( @@ -134,17 +132,20 @@ type AllocatorManager struct { func NewAllocatorManager( m *member.Member, rootPath string, - cfg config, + enableLocalTSO bool, + saveInterval time.Duration, + updatePhysicalInterval time.Duration, + tlsConfig *grpcutil.TLSConfig, maxResetTSGap func() time.Duration, ) *AllocatorManager { allocatorManager := &AllocatorManager{ - enableLocalTSO: cfg.IsLocalTSOEnabled(), + enableLocalTSO: enableLocalTSO, member: m, rootPath: rootPath, - saveInterval: cfg.GetTSOSaveInterval(), - updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(), + saveInterval: saveInterval, + updatePhysicalInterval: updatePhysicalInterval, maxResetTSGap: maxResetTSGap, - securityConfig: cfg.GetTLSConfig(), + securityConfig: tlsConfig, } allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup) allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo) diff --git a/pkg/tso/config.go b/pkg/tso/config.go index d8bd2e63e06..a03c1f45e0b 100644 --- a/pkg/tso/config.go +++ b/pkg/tso/config.go @@ -15,14 +15,46 @@ package tso import ( + "flag" "time" - "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/typeutil" ) -type config interface { - IsLocalTSOEnabled() bool - GetTSOSaveInterval() time.Duration - GetTSOUpdatePhysicalInterval() time.Duration - GetTLSConfig() *grpcutil.TLSConfig +const ( + // defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`. + defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond +) + +// Config is the configuration for the TSO. +type Config struct { + flagSet *flag.FlagSet + + configFile string + // EnableLocalTSO is used to enable the Local TSO Allocator feature, + // which allows the PD server to generate Local TSO for certain DC-level transactions. + // To make this feature meaningful, user has to set the "zone" label for the PD server + // to indicate which DC this PD belongs to. + EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"` + + // TSOSaveInterval is the interval to save timestamp. + TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"` + + // The interval to update physical part of timestamp. Usually, this config should not be set. + // At most 1<<18 (262144) TSOs can be generated in the interval. The smaller the value, the + // more TSOs provided, and at the same time consuming more CPU time. + // This config is only valid in 1ms to 10s. If it's configured too long or too short, it will + // be automatically clamped to the range. + TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"` +} + +// NewConfig creates a new config. +func NewConfig() *Config { + cfg := &Config{} + cfg.flagSet = flag.NewFlagSet("pd", flag.ContinueOnError) + fs := cfg.flagSet + + fs.StringVar(&cfg.configFile, "config", "", "config file") + + return cfg } diff --git a/server/config/config.go b/server/config/config.go index eb29ca1bf62..e2b9892bc88 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -457,49 +457,6 @@ func (c *Config) Validate() error { return nil } -// Utility to test if a configuration is defined. -type configMetaData struct { - meta *toml.MetaData - path []string -} - -func newConfigMetadata(meta *toml.MetaData) *configMetaData { - return &configMetaData{meta: meta} -} - -func (m *configMetaData) IsDefined(key string) bool { - if m.meta == nil { - return false - } - keys := append([]string(nil), m.path...) - keys = append(keys, key) - return m.meta.IsDefined(keys...) -} - -func (m *configMetaData) Child(path ...string) *configMetaData { - newPath := append([]string(nil), m.path...) - newPath = append(newPath, path...) - return &configMetaData{ - meta: m.meta, - path: newPath, - } -} - -func (m *configMetaData) CheckUndecoded() error { - if m.meta == nil { - return nil - } - undecoded := m.meta.Undecoded() - if len(undecoded) == 0 { - return nil - } - errInfo := "Config contains undefined item: " - for _, key := range undecoded { - errInfo += key.String() + ", " - } - return errors.New(errInfo[:len(errInfo)-2]) -} - // Adjust is used to adjust the PD configurations. func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { configMetaData := newConfigMetadata(meta) @@ -613,6 +570,49 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { return nil } +// Utility to test if a configuration is defined. +type configMetaData struct { + meta *toml.MetaData + path []string +} + +func newConfigMetadata(meta *toml.MetaData) *configMetaData { + return &configMetaData{meta: meta} +} + +func (m *configMetaData) IsDefined(key string) bool { + if m.meta == nil { + return false + } + keys := append([]string(nil), m.path...) + keys = append(keys, key) + return m.meta.IsDefined(keys...) +} + +func (m *configMetaData) Child(path ...string) *configMetaData { + newPath := append([]string(nil), m.path...) + newPath = append(newPath, path...) + return &configMetaData{ + meta: m.meta, + path: newPath, + } +} + +func (m *configMetaData) CheckUndecoded() error { + if m.meta == nil { + return nil + } + undecoded := m.meta.Undecoded() + if len(undecoded) == 0 { + return nil + } + errInfo := "Config contains undefined item: " + for _, key := range undecoded { + errInfo += key.String() + ", " + } + return errors.New(errInfo[:len(errInfo)-2]) +} + func (c *Config) adjustLog(meta *configMetaData) { if !meta.IsDefined("disable-error-verbose") { c.Log.DisableErrorVerbose = defaultDisableErrorVerbose diff --git a/server/server.go b/server/server.go index 890c04b2c69..fa21790a6f2 100644 --- a/server/server.go +++ b/server/server.go @@ -362,7 +362,7 @@ func (s *Server) startServer(ctx context.Context) error { serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) s.rootPath = path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) - s.member.MemberInfo(s.cfg, s.Name(), s.rootPath) + s.member.MemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath) s.member.SetMemberDeployPath(s.member.ID()) s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion) s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash) @@ -374,7 +374,7 @@ func (s *Server) startServer(ctx context.Context) error { Member: s.member.MemberValue(), }) s.tsoAllocatorManager = tso.NewAllocatorManager( - s.member, s.rootPath, s.cfg, + s.member, s.rootPath, s.cfg.IsLocalTSOEnabled(), s.cfg.GetTSOSaveInterval(), s.cfg.GetTSOUpdatePhysicalInterval(), s.cfg.GetTLSConfig(), func() time.Duration { return s.persistOptions.GetMaxResetTSGap() }) // Set up the Global TSO Allocator here, it will be initialized once the PD campaigns leader successfully. s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership()) From 8bfb86d4c78363225dbb7afae6da9a7336b80074 Mon Sep 17 00:00:00 2001 From: Zwb Date: Tue, 7 Feb 2023 12:03:57 +0800 Subject: [PATCH 02/10] schedulers: reduce the batch size to avoid discarding (#5918) ref tikv/pd#5638, close tikv/pd#5638 - Reduce the batch size to avoid discarding if it exceeds MaxWaitingOperator - Increase the channel capacity to avoid discarding when the capacity is full Signed-off-by: Wenbo Zhang --- server/schedulers/transfer_witness_leader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/schedulers/transfer_witness_leader.go b/server/schedulers/transfer_witness_leader.go index b8895ae651d..41f3a4a157d 100644 --- a/server/schedulers/transfer_witness_leader.go +++ b/server/schedulers/transfer_witness_leader.go @@ -33,10 +33,10 @@ const ( TransferWitnessLeaderType = "transfer-witness-leader" // TransferWitnessLeaderBatchSize is the number of operators to to transfer // leaders by one scheduling - transferWitnessLeaderBatchSize = 10 + transferWitnessLeaderBatchSize = 3 // TransferWitnessLeaderRecvMaxRegionSize is the max number of region can receive // TODO: make it a reasonable value - transferWitnessLeaderRecvMaxRegionSize = 1000 + transferWitnessLeaderRecvMaxRegionSize = 10000 ) var ( From 77d7be043fc6601d4fb422cf25e15508ffdf9b25 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 7 Feb 2023 12:15:57 +0800 Subject: [PATCH 03/10] *: fix the package name (#5919) ref tikv/pd#4820 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/mcs/docs.go | 4 ++-- server/util.go | 2 +- tests/{msc => mcs}/Makefile | 2 +- tests/{msc => mcs}/go.mod | 2 +- tests/{msc => mcs}/go.sum | 0 tests/{msc => mcs}/resource_manager/resource_manager_test.go | 0 6 files changed, 5 insertions(+), 5 deletions(-) rename tests/{msc => mcs}/Makefile (99%) rename tests/{msc => mcs}/go.mod (99%) rename tests/{msc => mcs}/go.sum (100%) rename tests/{msc => mcs}/resource_manager/resource_manager_test.go (100%) diff --git a/pkg/mcs/docs.go b/pkg/mcs/docs.go index 436ca97aabc..6c71ab812c9 100644 --- a/pkg/mcs/docs.go +++ b/pkg/mcs/docs.go @@ -12,5 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package msc used to implement the core logic of the external services which rely on the PD backend provider. -package msc +// Package mcs used to implement the core logic of the external services which rely on the PD backend provider. +package mcs diff --git a/server/util.go b/server/util.go index 74e1bc52d1b..33cbea29b63 100644 --- a/server/util.go +++ b/server/util.go @@ -172,7 +172,7 @@ type ServiceRegistry interface { InstallAllRESTHandler(srv *Server, userDefineHandler map[string]http.Handler) } -// NewServiceRegistry is a hook for msc code which implements the micro service. +// NewServiceRegistry is a hook for mcs code which implements the micro service. var NewServiceRegistry = func() ServiceRegistry { return dummyServiceRegistry{} } diff --git a/tests/msc/Makefile b/tests/mcs/Makefile similarity index 99% rename from tests/msc/Makefile rename to tests/mcs/Makefile index a9dae4dcb6e..86dc5f58bd2 100644 --- a/tests/msc/Makefile +++ b/tests/mcs/Makefile @@ -31,7 +31,7 @@ test: enable-codegen $(MAKE) disable-codegen ci-test-job: enable-codegen - CGO_ENABLED=1 go test ./... -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=../../... github.com/tikv/pd/tests/msc + CGO_ENABLED=1 go test ./... -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=../../... github.com/tikv/pd/tests/mcs install-tools: cd ../../ && $(MAKE) install-tools diff --git a/tests/msc/go.mod b/tests/mcs/go.mod similarity index 99% rename from tests/msc/go.mod rename to tests/mcs/go.mod index 9b680b0477b..58df57dcd6d 100644 --- a/tests/msc/go.mod +++ b/tests/mcs/go.mod @@ -1,4 +1,4 @@ -module github.com/tikv/pd/tests/msc +module github.com/tikv/pd/tests/mcs go 1.19 diff --git a/tests/msc/go.sum b/tests/mcs/go.sum similarity index 100% rename from tests/msc/go.sum rename to tests/mcs/go.sum diff --git a/tests/msc/resource_manager/resource_manager_test.go b/tests/mcs/resource_manager/resource_manager_test.go similarity index 100% rename from tests/msc/resource_manager/resource_manager_test.go rename to tests/mcs/resource_manager/resource_manager_test.go From 7497e3d8f7440229918b8681c11d948e7e39b8b9 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Tue, 7 Feb 2023 01:15:58 -0800 Subject: [PATCH 04/10] *: refactor server entry point to support multi-mode service (#5921) ref tikv/pd#5836 A service in different mode has different config struct and initialization logic, so we need to abstract an interface to create server. Signed-off-by: Bin Shi --- cmd/pd-server/main.go | 63 ++++++++++++++++++-------------- pkg/basic_server/basic_server.go | 40 ++++++++++++++++++++ 2 files changed, 75 insertions(+), 28 deletions(-) create mode 100644 pkg/basic_server/basic_server.go diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 0f39e20243c..3292439b370 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/autoscaling" + basicsvr "github.com/tikv/pd/pkg/basic_server" "github.com/tikv/pd/pkg/dashboard" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/swaggerserver" @@ -46,8 +47,40 @@ import ( ) func main() { + ctx, cancel, svr := createServerWrapper(os.Args[1:]) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", errs.ZapError(err)) + } + + <-ctx.Done() + log.Info("Got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func createServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) { cfg := config.NewConfig() - err := cfg.Parse(os.Args[1:]) + err := cfg.Parse(args) if cfg.Version { server.PrintPDInfo() @@ -104,33 +137,7 @@ func main() { log.Fatal("create server failed", errs.ZapError(err)) } - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - var sig os.Signal - go func() { - sig = <-sc - cancel() - }() - - if err := svr.Run(); err != nil { - log.Fatal("run server failed", errs.ZapError(err)) - } - - <-ctx.Done() - log.Info("Got signal to exit", zap.String("signal", sig.String())) - - svr.Close() - switch sig { - case syscall.SIGTERM: - exit(0) - default: - exit(1) - } + return ctx, cancel, svr } func exit(code int) { diff --git a/pkg/basic_server/basic_server.go b/pkg/basic_server/basic_server.go new file mode 100644 index 00000000000..b5758485718 --- /dev/null +++ b/pkg/basic_server/basic_server.go @@ -0,0 +1,40 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package basicsvr + +import ( + "context" + "net/http" + + "go.etcd.io/etcd/clientv3" +) + +// Server defines the common basic behaviors of a server +type Server interface { + // Name returns the unique Name for this server in the cluster. + Name() string + // Context returns the context of server. + Context() context.Context + + // Run runs the server. + Run() error + // Close closes the server. + Close() + + // GetClient returns builtin etcd client. + GetClient() *clientv3.Client + // GetHTTPClient returns builtin etcd client. + GetHTTPClient() *http.Client +} From f5b5391c0901b0b4955123f2c680b9374ad8b382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=B7=E6=B2=8CDM?= Date: Tue, 7 Feb 2023 17:53:58 +0800 Subject: [PATCH 05/10] region_scatterer: fix the bug that could generate schedule with too many peers (#5920) ref tikv/pd#4570, close tikv/pd#5909 Signed-off-by: HunDunDM --- server/schedule/region_scatterer.go | 1 + server/schedule/region_scatterer_test.go | 41 ++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 251162c811d..b06f80ba7ec 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -338,6 +338,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // it is considered that the selected peer select itself. // This origin peer re-selects. if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() { + selectedStores[peer.GetStoreId()] = struct{}{} break } } diff --git a/server/schedule/region_scatterer_test.go b/server/schedule/region_scatterer_test.go index 3657bd8e4b8..34033880abd 100644 --- a/server/schedule/region_scatterer_test.go +++ b/server/schedule/region_scatterer_test.go @@ -655,9 +655,9 @@ func TestRegionHasLearner(t *testing.T) { checkLeader(scatterer.ordinaryEngine.selectedLeader) } -// TestSelectedStores tests if the peer count has changed due to the picking strategy. +// TestSelectedStoresTooFewPeers tests if the peer count has changed due to the picking strategy. // Ref https://github.com/tikv/pd/issues/4565 -func TestSelectedStores(t *testing.T) { +func TestSelectedStoresTooFewPeers(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -692,6 +692,43 @@ func TestSelectedStores(t *testing.T) { } } +// TestSelectedStoresTooManyPeers tests if the peer count has changed due to the picking strategy. +// Ref https://github.com/tikv/pd/issues/5909 +func TestSelectedStoresTooManyPeers(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) + oc := NewOperatorController(ctx, tc, stream) + // Add 4 stores. + for i := uint64(1); i <= 5; i++ { + tc.AddRegionStore(i, 0) + // prevent store from being disconnected + tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) + } + group := "group" + scatterer := NewRegionScatterer(ctx, tc, oc) + // priority 4 > 1 > 5 > 2 == 3 + for i := 0; i < 1200; i++ { + scatterer.ordinaryEngine.selectedPeer.Put(2, group) + scatterer.ordinaryEngine.selectedPeer.Put(3, group) + } + for i := 0; i < 800; i++ { + scatterer.ordinaryEngine.selectedPeer.Put(5, group) + } + for i := 0; i < 400; i++ { + scatterer.ordinaryEngine.selectedPeer.Put(1, group) + } + // test region with peer 1 2 3 + for i := uint64(1); i < 20; i++ { + region := tc.AddLeaderRegion(i+200, i%3+1, (i+1)%3+1, (i+2)%3+1) + op := scatterer.scatterRegion(region, group) + re.False(isPeerCountChanged(op)) + } +} + func isPeerCountChanged(op *operator.Operator) bool { if op == nil { return false From b99dc1f195d1974b0412c6eb9d2baee062f0e7ca Mon Sep 17 00:00:00 2001 From: Zwb Date: Tue, 7 Feb 2023 18:03:58 +0800 Subject: [PATCH 06/10] rule_checker: promote witness when has pending voter (#5896) ref tikv/pd#5627 When there's pending voter, we should promote the witness to the voter to avoid accumulating too many logs leading to disk full. Signed-off-by: Wenbo Zhang Co-authored-by: Ti Chi Robot --- server/schedule/checker/rule_checker.go | 13 +++- server/schedule/checker/rule_checker_test.go | 71 +++++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/server/schedule/checker/rule_checker.go b/server/schedule/checker/rule_checker.go index 52d04980e85..3ff0b862e73 100644 --- a/server/schedule/checker/rule_checker.go +++ b/server/schedule/checker/rule_checker.go @@ -208,7 +208,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region if c.isWitnessEnabled() && core.IsVoter(peer) { if witness, ok := c.hasAvailableWitness(region, peer); ok { ruleCheckerPromoteWitnessCounter.Inc() - return operator.CreateNonWitnessPeerOperator("promote-witness", c.cluster, region, witness) + return operator.CreateNonWitnessPeerOperator("promote-witness-for-down", c.cluster, region, witness) } } } @@ -216,6 +216,13 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region ruleCheckerReplaceOfflineCounter.Inc() return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus) } + + if c.isWitnessEnabled() && c.isPendingVoter(region, peer) { + if witness, ok := c.hasAvailableWitness(region, peer); ok { + ruleCheckerPromoteWitnessCounter.Inc() + return operator.CreateNonWitnessPeerOperator("promote-witness-for-pending", c.cluster, region, witness) + } + } } // fix loose matched peers. for _, peer := range rf.PeersWithDifferentRole { @@ -499,6 +506,10 @@ func (c *RuleChecker) isOfflinePeer(peer *metapb.Peer) bool { return !store.IsPreparing() && !store.IsServing() } +func (c *RuleChecker) isPendingVoter(region *core.RegionInfo, peer *metapb.Peer) bool { + return region.GetPendingVoter(peer.Id) != nil +} + func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb.Peer) (*metapb.Peer, bool) { witnesses := region.GetWitnesses() if len(witnesses) == 0 { diff --git a/server/schedule/checker/rule_checker_test.go b/server/schedule/checker/rule_checker_test.go index 485c4429035..4e6314ff392 100644 --- a/server/schedule/checker/rule_checker_test.go +++ b/server/schedule/checker/rule_checker_test.go @@ -894,7 +894,7 @@ func (suite *ruleCheckerTestSuite) TestFixDownPeerWithAvailableWitness() { op := suite.rc.Check(r) suite.NotNil(op) - suite.Equal("promote-witness", op.Desc()) + suite.Equal("promote-witness-for-down", op.Desc()) suite.Equal(uint64(3), op.Step(0).(operator.RemovePeer).FromStore) suite.Equal(uint64(3), op.Step(1).(operator.AddLearner).ToStore) suite.Equal(uint64(3), op.Step(2).(operator.BecomeNonWitness).StoreID) @@ -1062,6 +1062,75 @@ func (suite *ruleCheckerTestSuite) TestFixOfflinePeerWithAvaliableWitness() { suite.Equal("replace-rule-offline-peer", op.Desc()) } +func (suite *ruleCheckerTestSuite) TestFixPendingVoterWithAvailableWitness() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + r := suite.cluster.GetRegion(1) + // set peer2 as pending voter + r = r.Clone(core.WithPendingPeers([]*metapb.Peer{r.GetPeer(2)})) + // set peer3 to witness + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Role: placement.Voter, + Count: 1, + IsWitness: true, + }) + + op := suite.rc.Check(r) + + suite.NotNil(op) + suite.Equal("promote-witness-for-pending", op.Desc()) + suite.Equal(uint64(3), op.Step(0).(operator.RemovePeer).FromStore) + suite.Equal(uint64(3), op.Step(1).(operator.AddLearner).ToStore) + suite.Equal(uint64(3), op.Step(2).(operator.BecomeNonWitness).StoreID) + suite.Equal(uint64(3), op.Step(3).(operator.PromoteLearner).ToStore) +} + +func (suite *ruleCheckerTestSuite) TestFixPendingVoterWithAvailableWitness2() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + r := suite.cluster.GetRegion(1) + // set peer2 as pending learner + peer := r.GetPeer(2) + peer.Role = metapb.PeerRole_Learner + r = r.Clone(core.WithPendingPeers([]*metapb.Peer{peer})) + // set peer3 to witness + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Role: placement.Voter, + Count: 1, + IsWitness: true, + }) + + op := suite.rc.Check(r) + + suite.Nil(op) +} + func (suite *ruleCheckerTestSuite) TestRuleCache() { suite.cluster.PersistOptions.SetPlacementRulesCacheEnabled(true) suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) From 03081c6a7cf25fdb2f7e49948c1285cedd85f8be Mon Sep 17 00:00:00 2001 From: Zwb Date: Tue, 7 Feb 2023 18:15:58 +0800 Subject: [PATCH 07/10] filter: fix panic caused by nil pointer (#5922) ref tikv/pd#5638 filter: fix panic caused by nil pointer Signed-off-by: Wenbo Zhang Co-authored-by: Ti Chi Robot --- server/schedule/filter/filters.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index af0aeb94e99..6a973e04452 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -733,7 +733,7 @@ func (f *ruleWitnessFitFilter) Target(options *config.PersistOptions, store *cor log.Warn("ruleWitnessFitFilter couldn't find peer on target Store", zap.Uint64("target-store", store.GetID())) return statusStoreNotMatchRule } - if targetPeer.Id == f.region.GetLeader().Id { + if targetPeer.Id == f.region.GetLeader().GetId() { return statusStoreNotMatchRule } if f.oldFit.Replace(f.srcStore, store) { From efa3e319f43afd4da560d3039f3a99c6c6ccde3a Mon Sep 17 00:00:00 2001 From: Zwb Date: Wed, 8 Feb 2023 11:09:58 +0800 Subject: [PATCH 08/10] fit: optimize the selection of witness rule candidates (#5904) ref tikv/pd#5568 Signed-off-by: Wenbo Zhang Co-authored-by: Ti Chi Robot --- server/schedule/placement/fit.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/schedule/placement/fit.go b/server/schedule/placement/fit.go index 7a4a056a176..8dc9932cebe 100644 --- a/server/schedule/placement/fit.go +++ b/server/schedule/placement/fit.go @@ -243,9 +243,10 @@ func (w *fitWorker) fitRule(index int) bool { // Only consider stores: // 1. Match label constraints // 2. Role match, or can match after transformed. - // 3. Not selected by other rules. + // 3. Don't select leader as witness. + // 4. Not selected by other rules. for _, p := range w.peers { - if !p.selected && MatchLabelConstraints(p.store, w.rules[index].LabelConstraints) { + if !p.selected && MatchLabelConstraints(p.store, w.rules[index].LabelConstraints) && !(p.isLeader && w.supportWitness && w.rules[index].IsWitness) { candidates = append(candidates, p) } } From 76c0c104e5fc7b99c1d94a172a65b652fa1ec90e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 8 Feb 2023 12:51:58 +0800 Subject: [PATCH 09/10] scheduler: move init function (#5934) ref tikv/pd#5837 Signed-off-by: lhy1024 Co-authored-by: Ti Chi Robot --- cmd/pd-server/main.go | 4 +- server/api/hot_status_test.go | 1 - server/api/scheduler_test.go | 1 - server/cluster/cluster_worker_test.go | 2 +- server/schedulers/balance_leader.go | 29 -- server/schedulers/balance_region.go | 27 -- server/schedulers/balance_witness.go | 29 -- server/schedulers/evict_leader.go | 35 -- server/schedulers/evict_slow_store.go | 16 - server/schedulers/grant_hot_region.go | 41 -- server/schedulers/grant_leader.go | 35 -- server/schedulers/hot_region.go | 32 -- server/schedulers/init.go | 450 +++++++++++++++++++ server/schedulers/label.go | 27 -- server/schedulers/random_merge.go | 26 -- server/schedulers/scatter_range.go | 36 -- server/schedulers/shuffle_hot_region.go | 32 -- server/schedulers/shuffle_leader.go | 27 -- server/schedulers/shuffle_region.go | 27 -- server/schedulers/split_bucket.go | 17 - server/schedulers/transfer_witness_leader.go | 13 - server/testutil.go | 8 +- tests/dashboard/race_test.go | 3 +- tests/dashboard/service_test.go | 3 - tests/server/server_test.go | 3 - tools/pd-simulator/main.go | 3 +- 26 files changed, 460 insertions(+), 467 deletions(-) create mode 100644 server/schedulers/init.go diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 3292439b370..484f81cf558 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -38,8 +38,7 @@ import ( "github.com/tikv/pd/server/join" "go.uber.org/zap" - // Register schedulers. - _ "github.com/tikv/pd/server/schedulers" + "github.com/tikv/pd/server/schedulers" // Register Service _ "github.com/tikv/pd/pkg/mcs/registry" @@ -136,6 +135,7 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, ba if err != nil { log.Fatal("create server failed", errs.ZapError(err)) } + schedulers.Register() return ctx, cancel, svr } diff --git a/server/api/hot_status_test.go b/server/api/hot_status_test.go index 11fa3e326aa..b1318376e46 100644 --- a/server/api/hot_status_test.go +++ b/server/api/hot_status_test.go @@ -25,7 +25,6 @@ import ( "github.com/tikv/pd/pkg/storage/kv" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" - _ "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/storage" ) diff --git a/server/api/scheduler_test.go b/server/api/scheduler_test.go index 7d9b0146cea..45f33da1a91 100644 --- a/server/api/scheduler_test.go +++ b/server/api/scheduler_test.go @@ -28,7 +28,6 @@ import ( tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" - _ "github.com/tikv/pd/server/schedulers" ) type scheduleTestSuite struct { diff --git a/server/cluster/cluster_worker_test.go b/server/cluster/cluster_worker_test.go index 95b734d25d9..1d4fc993174 100644 --- a/server/cluster/cluster_worker_test.go +++ b/server/cluster/cluster_worker_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockid" - _ "github.com/tikv/pd/server/schedulers" + _ "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/storage" ) diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 7bbf2e52884..d32831d2588 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -66,35 +66,6 @@ var ( balanceLeaderNewOpCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "new-operator") ) -func init() { - schedule.RegisterSliceDecoderBuilder(BalanceLeaderType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*balanceLeaderSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - ranges, err := getKeyRanges(args) - if err != nil { - return err - } - conf.Ranges = ranges - conf.Batch = BalanceLeaderBatchSize - return nil - } - }) - - schedule.RegisterScheduler(BalanceLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &balanceLeaderSchedulerConfig{storage: storage} - if err := decoder(conf); err != nil { - return nil, err - } - if conf.Batch == 0 { - conf.Batch = BalanceLeaderBatchSize - } - return newBalanceLeaderScheduler(opController, conf), nil - }) -} - type balanceLeaderSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index bd330767dcd..c83c86ffe94 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -22,8 +22,6 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" "github.com/tikv/pd/server/schedule/operator" @@ -31,31 +29,6 @@ import ( "go.uber.org/zap" ) -func init() { - schedule.RegisterSliceDecoderBuilder(BalanceRegionType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*balanceRegionSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - ranges, err := getKeyRanges(args) - if err != nil { - return err - } - conf.Ranges = ranges - conf.Name = BalanceRegionName - return nil - } - }) - schedule.RegisterScheduler(BalanceRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &balanceRegionSchedulerConfig{} - if err := decoder(conf); err != nil { - return nil, err - } - return newBalanceRegionScheduler(opController, conf), nil - }) -} - const ( // BalanceRegionName is balance region scheduler name. BalanceRegionName = "balance-region-scheduler" diff --git a/server/schedulers/balance_witness.go b/server/schedulers/balance_witness.go index 046d854a8b0..fc692cdf2b2 100644 --- a/server/schedulers/balance_witness.go +++ b/server/schedulers/balance_witness.go @@ -51,35 +51,6 @@ const ( MaxBalanceWitnessBatchSize = 10 ) -func init() { - schedule.RegisterSliceDecoderBuilder(BalanceWitnessType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*balanceWitnessSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - ranges, err := getKeyRanges(args) - if err != nil { - return err - } - conf.Ranges = ranges - conf.Batch = balanceWitnessBatchSize - return nil - } - }) - - schedule.RegisterScheduler(BalanceWitnessType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &balanceWitnessSchedulerConfig{storage: storage} - if err := decoder(conf); err != nil { - return nil, err - } - if conf.Batch == 0 { - conf.Batch = balanceWitnessBatchSize - } - return newBalanceWitnessScheduler(opController, conf), nil - }) -} - type balanceWitnessSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index d9a526bc68e..8273da19d61 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -54,41 +54,6 @@ var ( evictLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "new-operator") ) -func init() { - schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - if len(args) != 1 { - return errs.ErrSchedulerConfig.FastGenByArgs("id") - } - conf, ok := v.(*evictLeaderSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - - id, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() - } - - ranges, err := getKeyRanges(args[1:]) - if err != nil { - return err - } - conf.StoreIDWithRanges[id] = ranges - return nil - } - }) - - schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} - if err := decoder(conf); err != nil { - return nil, err - } - conf.cluster = opController.GetCluster() - return newEvictLeaderScheduler(opController, conf), nil - }) -} - type evictLeaderSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/server/schedulers/evict_slow_store.go b/server/schedulers/evict_slow_store.go index 7a45f27187d..6ed783ce947 100644 --- a/server/schedulers/evict_slow_store.go +++ b/server/schedulers/evict_slow_store.go @@ -39,22 +39,6 @@ const ( // WithLabelValues is a heavy operation, define variable to avoid call it every time. var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") -func init() { - schedule.RegisterSliceDecoderBuilder(EvictSlowStoreType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - return nil - } - }) - - schedule.RegisterScheduler(EvictSlowStoreType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} - if err := decoder(conf); err != nil { - return nil, err - } - return newEvictSlowStoreScheduler(opController, conf), nil - }) -} - type evictSlowStoreSchedulerConfig struct { storage endpoint.ConfigStorage EvictedStores []uint64 `json:"evict-stores"` diff --git a/server/schedulers/grant_hot_region.go b/server/schedulers/grant_hot_region.go index 89c11fc39d4..5831917d833 100644 --- a/server/schedulers/grant_hot_region.go +++ b/server/schedulers/grant_hot_region.go @@ -51,47 +51,6 @@ var ( grantHotRegionSkipCounter = schedulerCounter.WithLabelValues(GrantHotRegionName, "skip") ) -func init() { - schedule.RegisterSliceDecoderBuilder(GrantHotRegionType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - if len(args) != 2 { - return errs.ErrSchedulerConfig.FastGenByArgs("id") - } - - conf, ok := v.(*grantHotRegionSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - leaderID, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() - } - - storeIDs := make([]uint64, 0) - for _, id := range strings.Split(args[1], ",") { - storeID, err := strconv.ParseUint(id, 10, 64) - if err != nil { - return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() - } - storeIDs = append(storeIDs, storeID) - } - if !conf.setStore(leaderID, storeIDs) { - return errs.ErrSchedulerConfig - } - return nil - } - }) - - schedule.RegisterScheduler(GrantHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage} - conf.cluster = opController.GetCluster() - if err := decoder(conf); err != nil { - return nil, err - } - return newGrantHotRegionScheduler(opController, conf), nil - }) -} - type grantHotRegionSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index 3131f59ecb1..6fd09f4e502 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -47,41 +47,6 @@ var ( grantLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(GrantLeaderName, "new-operator") ) -func init() { - schedule.RegisterSliceDecoderBuilder(GrantLeaderType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - if len(args) != 1 { - return errs.ErrSchedulerConfig.FastGenByArgs("id") - } - - conf, ok := v.(*grantLeaderSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - - id, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() - } - ranges, err := getKeyRanges(args[1:]) - if err != nil { - return err - } - conf.StoreIDWithRanges[id] = ranges - return nil - } - }) - - schedule.RegisterScheduler(GrantLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} - conf.cluster = opController.GetCluster() - if err := decoder(conf); err != nil { - return nil, err - } - return newGrantLeaderScheduler(opController, conf), nil - }) -} - type grantLeaderSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index acf9d87a4b1..2cb45b89be1 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -29,7 +29,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" @@ -177,37 +176,6 @@ func (h *baseHotScheduler) randomRWType() statistics.RWType { return h.types[h.r.Int()%len(h.types)] } -func init() { - schedule.RegisterSliceDecoderBuilder(HotRegionType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - return nil - } - }) - schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := initHotRegionScheduleConfig() - - var data map[string]interface{} - if err := decoder(&data); err != nil { - return nil, err - } - if len(data) != 0 { - // After upgrading, use compatible config. - - // For clusters with the initial version >= v5.2, it will be overwritten by the default config. - conf.applyPrioritiesConfig(compatiblePrioritiesConfig) - // For clusters with the initial version >= v6.4, it will be overwritten by the default config. - conf.SetRankFormulaVersion("") - - if err := decoder(conf); err != nil { - return nil, err - } - } - - conf.storage = storage - return newHotScheduler(opController, conf), nil - }) -} - const ( // HotRegionName is balance hot region scheduler name. HotRegionName = "balance-hot-region-scheduler" diff --git a/server/schedulers/init.go b/server/schedulers/init.go new file mode 100644 index 00000000000..d1508f5ce57 --- /dev/null +++ b/server/schedulers/init.go @@ -0,0 +1,450 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "strconv" + "strings" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/server/schedule" +) + +// Register registers schedulers. +func Register() { + // balance leader + schedule.RegisterSliceDecoderBuilder(BalanceLeaderType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*balanceLeaderSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + ranges, err := getKeyRanges(args) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Batch = BalanceLeaderBatchSize + return nil + } + }) + + schedule.RegisterScheduler(BalanceLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &balanceLeaderSchedulerConfig{storage: storage} + if err := decoder(conf); err != nil { + return nil, err + } + if conf.Batch == 0 { + conf.Batch = BalanceLeaderBatchSize + } + return newBalanceLeaderScheduler(opController, conf), nil + }) + + // balance region + schedule.RegisterSliceDecoderBuilder(BalanceRegionType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*balanceRegionSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + ranges, err := getKeyRanges(args) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Name = BalanceRegionName + return nil + } + }) + + schedule.RegisterScheduler(BalanceRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &balanceRegionSchedulerConfig{} + if err := decoder(conf); err != nil { + return nil, err + } + return newBalanceRegionScheduler(opController, conf), nil + }) + + // balance witness + schedule.RegisterSliceDecoderBuilder(BalanceWitnessType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*balanceWitnessSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + ranges, err := getKeyRanges(args) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Batch = balanceWitnessBatchSize + return nil + } + }) + + schedule.RegisterScheduler(BalanceWitnessType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &balanceWitnessSchedulerConfig{storage: storage} + if err := decoder(conf); err != nil { + return nil, err + } + if conf.Batch == 0 { + conf.Batch = balanceWitnessBatchSize + } + return newBalanceWitnessScheduler(opController, conf), nil + }) + + // evict leader + schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + if len(args) != 1 { + return errs.ErrSchedulerConfig.FastGenByArgs("id") + } + conf, ok := v.(*evictLeaderSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + + id, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() + } + + ranges, err := getKeyRanges(args[1:]) + if err != nil { + return err + } + conf.StoreIDWithRanges[id] = ranges + return nil + } + }) + + schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} + if err := decoder(conf); err != nil { + return nil, err + } + conf.cluster = opController.GetCluster() + return newEvictLeaderScheduler(opController, conf), nil + }) + + // evict slow store + schedule.RegisterSliceDecoderBuilder(EvictSlowStoreType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + return nil + } + }) + + schedule.RegisterScheduler(EvictSlowStoreType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} + if err := decoder(conf); err != nil { + return nil, err + } + return newEvictSlowStoreScheduler(opController, conf), nil + }) + + // todo: evict slow trend store + // grant hot region + schedule.RegisterSliceDecoderBuilder(GrantHotRegionType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + if len(args) != 2 { + return errs.ErrSchedulerConfig.FastGenByArgs("id") + } + + conf, ok := v.(*grantHotRegionSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + leaderID, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() + } + + storeIDs := make([]uint64, 0) + for _, id := range strings.Split(args[1], ",") { + storeID, err := strconv.ParseUint(id, 10, 64) + if err != nil { + return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() + } + storeIDs = append(storeIDs, storeID) + } + if !conf.setStore(leaderID, storeIDs) { + return errs.ErrSchedulerConfig + } + return nil + } + }) + + schedule.RegisterScheduler(GrantHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage} + conf.cluster = opController.GetCluster() + if err := decoder(conf); err != nil { + return nil, err + } + return newGrantHotRegionScheduler(opController, conf), nil + }) + + // hot region + schedule.RegisterSliceDecoderBuilder(HotRegionType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + return nil + } + }) + + schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := initHotRegionScheduleConfig() + var data map[string]interface{} + if err := decoder(&data); err != nil { + return nil, err + } + if len(data) != 0 { + // After upgrading, use compatible config. + // For clusters with the initial version >= v5.2, it will be overwritten by the default config. + conf.applyPrioritiesConfig(compatiblePrioritiesConfig) + // For clusters with the initial version >= v6.4, it will be overwritten by the default config. + conf.SetRankFormulaVersion("") + if err := decoder(conf); err != nil { + return nil, err + } + } + conf.storage = storage + return newHotScheduler(opController, conf), nil + }) + + // grant leader + schedule.RegisterSliceDecoderBuilder(GrantLeaderType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + if len(args) != 1 { + return errs.ErrSchedulerConfig.FastGenByArgs("id") + } + + conf, ok := v.(*grantLeaderSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + + id, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() + } + ranges, err := getKeyRanges(args[1:]) + if err != nil { + return err + } + conf.StoreIDWithRanges[id] = ranges + return nil + } + }) + + schedule.RegisterScheduler(GrantLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} + conf.cluster = opController.GetCluster() + if err := decoder(conf); err != nil { + return nil, err + } + return newGrantLeaderScheduler(opController, conf), nil + }) + + // label + schedule.RegisterSliceDecoderBuilder(LabelType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*labelSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + ranges, err := getKeyRanges(args) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Name = LabelName + return nil + } + }) + + schedule.RegisterScheduler(LabelType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &labelSchedulerConfig{} + if err := decoder(conf); err != nil { + return nil, err + } + return newLabelScheduler(opController, conf), nil + }) + + // random merge + schedule.RegisterSliceDecoderBuilder(RandomMergeType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*randomMergeSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + ranges, err := getKeyRanges(args) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Name = RandomMergeName + return nil + } + }) + + schedule.RegisterScheduler(RandomMergeType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &randomMergeSchedulerConfig{} + if err := decoder(conf); err != nil { + return nil, err + } + return newRandomMergeScheduler(opController, conf), nil + }) + + // scatter range + // args: [start-key, end-key, range-name]. + schedule.RegisterSliceDecoderBuilder(ScatterRangeType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + if len(args) != 3 { + return errs.ErrSchedulerConfig.FastGenByArgs("ranges and name") + } + if len(args[2]) == 0 { + return errs.ErrSchedulerConfig.FastGenByArgs("range name") + } + conf, ok := v.(*scatterRangeSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + conf.StartKey = args[0] + conf.EndKey = args[1] + conf.RangeName = args[2] + return nil + } + }) + + schedule.RegisterScheduler(ScatterRangeType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &scatterRangeSchedulerConfig{ + storage: storage, + } + if err := decoder(conf); err != nil { + return nil, err + } + rangeName := conf.RangeName + if len(rangeName) == 0 { + return nil, errs.ErrSchedulerConfig.FastGenByArgs("range name") + } + return newScatterRangeScheduler(opController, conf), nil + }) + + // shuffle hot region + schedule.RegisterSliceDecoderBuilder(ShuffleHotRegionType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*shuffleHotRegionSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + conf.Limit = uint64(1) + if len(args) == 1 { + limit, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() + } + conf.Limit = limit + } + conf.Name = ShuffleHotRegionName + return nil + } + }) + + schedule.RegisterScheduler(ShuffleHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &shuffleHotRegionSchedulerConfig{Limit: uint64(1)} + if err := decoder(conf); err != nil { + return nil, err + } + return newShuffleHotRegionScheduler(opController, conf), nil + }) + + // shuffle leader + schedule.RegisterSliceDecoderBuilder(ShuffleLeaderType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*shuffleLeaderSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + ranges, err := getKeyRanges(args) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Name = ShuffleLeaderName + return nil + } + }) + + schedule.RegisterScheduler(ShuffleLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &shuffleLeaderSchedulerConfig{} + if err := decoder(conf); err != nil { + return nil, err + } + return newShuffleLeaderScheduler(opController, conf), nil + }) + + // shuffle region + schedule.RegisterSliceDecoderBuilder(ShuffleRegionType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + conf, ok := v.(*shuffleRegionSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + ranges, err := getKeyRanges(args) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Roles = allRoles + return nil + } + }) + + schedule.RegisterScheduler(ShuffleRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &shuffleRegionSchedulerConfig{storage: storage} + if err := decoder(conf); err != nil { + return nil, err + } + return newShuffleRegionScheduler(opController, conf), nil + }) + + // split bucket + schedule.RegisterSliceDecoderBuilder(SplitBucketType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + return nil + } + }) + + schedule.RegisterScheduler(SplitBucketType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := initSplitBucketConfig() + if err := decoder(conf); err != nil { + return nil, err + } + conf.storage = storage + return newSplitBucketScheduler(opController, conf), nil + }) + + // transfer witness leader + schedule.RegisterSliceDecoderBuilder(TransferWitnessLeaderType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + return nil + } + }) + + schedule.RegisterScheduler(TransferWitnessLeaderType, func(opController *schedule.OperatorController, _ endpoint.ConfigStorage, _ schedule.ConfigDecoder) (schedule.Scheduler, error) { + return newTransferWitnessLeaderScheduler(opController), nil + }) +} diff --git a/server/schedulers/label.go b/server/schedulers/label.go index 6e66333e5d6..2f85021b624 100644 --- a/server/schedulers/label.go +++ b/server/schedulers/label.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" @@ -43,32 +42,6 @@ var ( labelNoRegionCounter = schedulerCounter.WithLabelValues(LabelName, "no-region") ) -func init() { - schedule.RegisterSliceDecoderBuilder(LabelType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*labelSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - ranges, err := getKeyRanges(args) - if err != nil { - return err - } - conf.Ranges = ranges - conf.Name = LabelName - return nil - } - }) - - schedule.RegisterScheduler(LabelType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &labelSchedulerConfig{} - if err := decoder(conf); err != nil { - return nil, err - } - return newLabelScheduler(opController, conf), nil - }) -} - type labelSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` diff --git a/server/schedulers/random_merge.go b/server/schedulers/random_merge.go index 553f8ce8999..84593bf820b 100644 --- a/server/schedulers/random_merge.go +++ b/server/schedulers/random_merge.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/checker" "github.com/tikv/pd/server/schedule/filter" @@ -45,31 +44,6 @@ var ( randomMergeNotAllowedCounter = schedulerCounter.WithLabelValues(RandomMergeName, "not-allowed") ) -func init() { - schedule.RegisterSliceDecoderBuilder(RandomMergeType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*randomMergeSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - ranges, err := getKeyRanges(args) - if err != nil { - return err - } - conf.Ranges = ranges - conf.Name = RandomMergeName - return nil - } - }) - schedule.RegisterScheduler(RandomMergeType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &randomMergeSchedulerConfig{} - if err := decoder(conf); err != nil { - return nil, err - } - return newRandomMergeScheduler(opController, conf), nil - }) -} - type randomMergeSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` diff --git a/server/schedulers/scatter_range.go b/server/schedulers/scatter_range.go index bf9d5c9f728..a3ef02ff5fb 100644 --- a/server/schedulers/scatter_range.go +++ b/server/schedulers/scatter_range.go @@ -48,42 +48,6 @@ var ( scatterRangeNoNeedBalanceLeaderCounter = schedulerCounter.WithLabelValues(ScatterRangeName, "no-need-balance-leader") ) -func init() { - // args: [start-key, end-key, range-name]. - schedule.RegisterSliceDecoderBuilder(ScatterRangeType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - if len(args) != 3 { - return errs.ErrSchedulerConfig.FastGenByArgs("ranges and name") - } - if len(args[2]) == 0 { - return errs.ErrSchedulerConfig.FastGenByArgs("range name") - } - conf, ok := v.(*scatterRangeSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - conf.StartKey = args[0] - conf.EndKey = args[1] - conf.RangeName = args[2] - return nil - } - }) - - schedule.RegisterScheduler(ScatterRangeType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &scatterRangeSchedulerConfig{ - storage: storage, - } - if err := decoder(conf); err != nil { - return nil, err - } - rangeName := conf.RangeName - if len(rangeName) == 0 { - return nil, errs.ErrSchedulerConfig.FastGenByArgs("range name") - } - return newScatterRangeScheduler(opController, conf), nil - }) -} - type scatterRangeSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 54498408cfc..729a9f02e1d 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -15,13 +15,10 @@ package schedulers import ( - "strconv" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" "github.com/tikv/pd/server/schedule/operator" @@ -44,35 +41,6 @@ var ( shuffleHotRegionSkipCounter = schedulerCounter.WithLabelValues(ShuffleHotRegionName, "skip") ) -func init() { - schedule.RegisterSliceDecoderBuilder(ShuffleHotRegionType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*shuffleHotRegionSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - conf.Limit = uint64(1) - if len(args) == 1 { - limit, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() - } - conf.Limit = limit - } - conf.Name = ShuffleHotRegionName - return nil - } - }) - - schedule.RegisterScheduler(ShuffleHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &shuffleHotRegionSchedulerConfig{Limit: uint64(1)} - if err := decoder(conf); err != nil { - return nil, err - } - return newShuffleHotRegionScheduler(opController, conf), nil - }) -} - type shuffleHotRegionSchedulerConfig struct { Name string `json:"name"` Limit uint64 `json:"limit"` diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index 8e270dedc5e..111553e12ab 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" "github.com/tikv/pd/server/schedule/operator" @@ -40,32 +39,6 @@ var ( shuffleLeaderNoFollowerCounter = schedulerCounter.WithLabelValues(ShuffleLeaderName, "no-follower") ) -func init() { - schedule.RegisterSliceDecoderBuilder(ShuffleLeaderType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*shuffleLeaderSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - ranges, err := getKeyRanges(args) - if err != nil { - return err - } - conf.Ranges = ranges - conf.Name = ShuffleLeaderName - return nil - } - }) - - schedule.RegisterScheduler(ShuffleLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &shuffleLeaderSchedulerConfig{} - if err := decoder(conf); err != nil { - return nil, err - } - return newShuffleLeaderScheduler(opController, conf), nil - }) -} - type shuffleLeaderSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index 38e537b32e3..9721fd56438 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -19,8 +19,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" "github.com/tikv/pd/server/schedule/operator" @@ -44,31 +42,6 @@ var ( shuffleRegionNoSourceStoreCounter = schedulerCounter.WithLabelValues(ShuffleRegionName, "no-source-store") ) -func init() { - schedule.RegisterSliceDecoderBuilder(ShuffleRegionType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - conf, ok := v.(*shuffleRegionSchedulerConfig) - if !ok { - return errs.ErrScheduleConfigNotExist.FastGenByArgs() - } - ranges, err := getKeyRanges(args) - if err != nil { - return err - } - conf.Ranges = ranges - conf.Roles = allRoles - return nil - } - }) - schedule.RegisterScheduler(ShuffleRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &shuffleRegionSchedulerConfig{storage: storage} - if err := decoder(conf); err != nil { - return nil, err - } - return newShuffleRegionScheduler(opController, conf), nil - }) -} - type shuffleRegionScheduler struct { *BaseScheduler conf *shuffleRegionSchedulerConfig diff --git a/server/schedulers/split_bucket.go b/server/schedulers/split_bucket.go index 4dceb2e1a9e..430f156b79d 100644 --- a/server/schedulers/split_bucket.go +++ b/server/schedulers/split_bucket.go @@ -57,23 +57,6 @@ var ( splitBucketNewOperatorCounter = schedulerCounter.WithLabelValues(SplitBucketName, "new-operator") ) -func init() { - schedule.RegisterSliceDecoderBuilder(SplitBucketType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - return nil - } - }) - - schedule.RegisterScheduler(SplitBucketType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := initSplitBucketConfig() - if err := decoder(conf); err != nil { - return nil, err - } - conf.storage = storage - return newSplitBucketScheduler(opController, conf), nil - }) -} - func initSplitBucketConfig() *splitBucketSchedulerConfig { return &splitBucketSchedulerConfig{ Degree: defaultHotDegree, diff --git a/server/schedulers/transfer_witness_leader.go b/server/schedulers/transfer_witness_leader.go index 41f3a4a157d..6d06f5503ba 100644 --- a/server/schedulers/transfer_witness_leader.go +++ b/server/schedulers/transfer_witness_leader.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" "github.com/tikv/pd/server/schedule/operator" @@ -46,18 +45,6 @@ var ( transferWitnessLeaderNoTargetStoreCounter = schedulerCounter.WithLabelValues(TransferWitnessLeaderName, "no-target-store") ) -func init() { - schedule.RegisterSliceDecoderBuilder(TransferWitnessLeaderType, func(args []string) schedule.ConfigDecoder { - return func(v interface{}) error { - return nil - } - }) - - schedule.RegisterScheduler(TransferWitnessLeaderType, func(opController *schedule.OperatorController, _ endpoint.ConfigStorage, _ schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newTransferWitnessLeaderScheduler(opController), nil - }) -} - type trasferWitnessLeaderScheduler struct { *BaseScheduler regions chan *core.RegionInfo diff --git a/server/testutil.go b/server/testutil.go index d96414efa65..f0fc9069904 100644 --- a/server/testutil.go +++ b/server/testutil.go @@ -29,12 +29,14 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/schedulers" "go.etcd.io/etcd/embed" - - // Register schedulers - _ "github.com/tikv/pd/server/schedulers" ) +func init() { + schedulers.Register() +} + // CleanupFunc closes test pd server(s) and deletes any files left behind. type CleanupFunc func() diff --git a/tests/dashboard/race_test.go b/tests/dashboard/race_test.go index 4bb31f55ecc..69596531dbb 100644 --- a/tests/dashboard/race_test.go +++ b/tests/dashboard/race_test.go @@ -23,8 +23,7 @@ import ( "github.com/tikv/pd/pkg/dashboard" "github.com/tikv/pd/tests" - // Register schedulers. - _ "github.com/tikv/pd/server/schedulers" + _ "github.com/tikv/pd/pkg/utils/testutil" ) func TestCancelDuringStarting(t *testing.T) { diff --git a/tests/dashboard/service_test.go b/tests/dashboard/service_test.go index 612b54ac4eb..f75e047d8f1 100644 --- a/tests/dashboard/service_test.go +++ b/tests/dashboard/service_test.go @@ -31,9 +31,6 @@ import ( "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" - - // Register schedulers. - _ "github.com/tikv/pd/server/schedulers" ) func TestMain(m *testing.M) { diff --git a/tests/server/server_test.go b/tests/server/server_test.go index f07f89bc295..3b85cd3cf0d 100644 --- a/tests/server/server_test.go +++ b/tests/server/server_test.go @@ -24,9 +24,6 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "go.uber.org/goleak" - - // Register schedulers. - _ "github.com/tikv/pd/server/schedulers" ) func TestMain(m *testing.M) { diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 08348b6b519..179184e62d4 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -38,8 +38,7 @@ import ( "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" "go.uber.org/zap" - // Register schedulers. - _ "github.com/tikv/pd/server/schedulers" + _ "github.com/tikv/pd/pkg/utils/testutil" ) var ( From 54f85a28116c23584f45bed297d9ca628e75f6d3 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 8 Feb 2023 13:27:58 +0800 Subject: [PATCH 10/10] keyspace: add load keyspace by id (#5935) ref tikv/pd#4399 Signed-off-by: disksing Co-authored-by: Ti Chi Robot --- server/apiv2/handlers/keyspace.go | 25 +++++++++++++++++++++++++ server/keyspace/keyspace.go | 22 ++++++++++++++++++++++ server/keyspace/keyspace_test.go | 5 +++++ 3 files changed, 52 insertions(+) diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index f524bc2aaef..5facbf5d465 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -39,6 +39,7 @@ func RegisterKeyspace(r *gin.RouterGroup) { router.GET("/:name", LoadKeyspace) router.PATCH("/:name/config", UpdateKeyspaceConfig) router.PUT("/:name/state", UpdateKeyspaceState) + router.GET("/id/:id", LoadKeyspaceByID) } // CreateKeyspaceParams represents parameters needed when creating a new keyspace. @@ -99,6 +100,30 @@ func LoadKeyspace(c *gin.Context) { c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta}) } +// LoadKeyspaceByID returns target keyspace. +// @Tags keyspaces +// @Summary Get keyspace info. +// @Param id path string true "Keyspace id" +// @Produce json +// @Success 200 {object} KeyspaceMeta +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /keyspaces/id/{id} [get] +func LoadKeyspaceByID(c *gin.Context) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil || id == 0 { + c.AbortWithStatusJSON(http.StatusInternalServerError, "invalid keyspace id") + return + } + svr := c.MustGet("server").(*server.Server) + manager := svr.GetKeyspaceManager() + meta, err := manager.LoadKeyspaceByID(uint32(id)) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta}) +} + // parseLoadAllQuery parses LoadAllKeyspaces' query parameters. // page_token: // The keyspace id of the scan start. If not set, scan from keyspace with id 1. diff --git a/server/keyspace/keyspace.go b/server/keyspace/keyspace.go index 3c2d549bd7a..bf64f3e680b 100644 --- a/server/keyspace/keyspace.go +++ b/server/keyspace/keyspace.go @@ -240,11 +240,33 @@ func (manager *Manager) LoadKeyspace(name string) (*keyspacepb.KeyspaceMeta, err if meta == nil { return ErrKeyspaceNotFound } + meta.Id = id return nil }) return meta, err } +// LoadKeyspaceByID returns the keyspace specified by id. +// It returns error if loading or unmarshalling met error or if keyspace does not exist. +func (manager *Manager) LoadKeyspaceByID(spaceID uint32) (*keyspacepb.KeyspaceMeta, error) { + var ( + meta *keyspacepb.KeyspaceMeta + err error + ) + err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { + meta, err = manager.store.LoadKeyspaceMeta(txn, spaceID) + if err != nil { + return err + } + if meta == nil { + return ErrKeyspaceNotFound + } + return nil + }) + meta.Id = spaceID + return meta, err +} + // Mutation represents a single operation to be applied on keyspace config. type Mutation struct { Op OpType diff --git a/server/keyspace/keyspace_test.go b/server/keyspace/keyspace_test.go index 401e573ea4b..d478736052e 100644 --- a/server/keyspace/keyspace_test.go +++ b/server/keyspace/keyspace_test.go @@ -92,6 +92,11 @@ func (suite *keyspaceTestSuite) TestCreateKeyspace() { re.NoError(err) re.Equal(uint32(i+1), loaded.Id) checkCreateRequest(re, request, loaded) + + loaded, err = manager.LoadKeyspaceByID(created.Id) + re.NoError(err) + re.Equal(loaded.Name, request.Name) + checkCreateRequest(re, request, loaded) } // Create a keyspace with existing name must return error.