Skip to content

Commit

Permalink
mcs: move service name to utils/constant.go (#6172)
Browse files Browse the repository at this point in the history
ref #5836

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 authored Mar 16, 2023
1 parent 7a0ce10 commit a330f0b
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "resource_manager", s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, utils.ResourceManagerServiceName, s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister.Register()
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "tso", s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, utils.TSOServiceName, s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister.Register()
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ const (
// DefaultKeySpaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeySpaceGroupID = 0

// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
TSOServiceName = "tso"
// ResourceManagerServiceName is the name of resource manager server.
ResourceManagerServiceName = "resource_manager"
)
4 changes: 2 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {

streamCtx := stream.Context()
if s.IsAPIServiceMode() {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, "tso")
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return ErrNotFoundTSOAddr
}
Expand Down Expand Up @@ -1964,7 +1964,7 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}

func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, "tso")
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
rm_server "github.com/tikv/pd/pkg/mcs/resource_manager/server"
_ "github.com/tikv/pd/pkg/mcs/resource_manager/server/apis/v1" // init API group
_ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" // init tso API group
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/schedule"
Expand Down Expand Up @@ -539,10 +540,10 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.etcdLeaderLoop()
go s.serverMetricsLoop()
go s.encryptionKeyManagerLoop()
if s.IsAPIServiceMode() { // disable tso service
if s.IsAPIServiceMode() { // disable tso service and resource manager service in api server
s.serverLoopWg.Add(2)
go s.watchServicePrimaryAddrLoop("tso")
go s.watchServicePrimaryAddrLoop("resource_manager")
go s.watchServicePrimaryAddrLoop(mcs.TSOServiceName)
go s.watchServicePrimaryAddrLoop(mcs.ResourceManagerServiceName)
} else { // enable tso service
s.serverLoopWg.Add(1)
go s.tsoAllocatorLoop()
Expand Down
3 changes: 2 additions & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/assertutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -297,7 +298,7 @@ func TestAPIService(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfg, []string{"api"}, mockHandler)
svr, err := CreateServer(ctx, cfg, []string{utils.APIServiceName}, mockHandler)
re.NoError(err)
defer svr.Close()
err = svr.Run()
Expand Down
3 changes: 2 additions & 1 deletion tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/tso"
Expand Down Expand Up @@ -79,7 +80,7 @@ func NewTestServer(ctx context.Context, cfg *config.Config) (*TestServer, error)

// NewTestAPIServer creates a new TestServer.
func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, error) {
return createTestServer(ctx, cfg, []string{"api"})
return createTestServer(ctx, cfg, []string{utils.APIServiceName})
}

func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) {
Expand Down
12 changes: 6 additions & 6 deletions tests/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func (suite *serverRegisterTestSuite) TearDownSuite() {
func (suite *serverRegisterTestSuite) TestServerRegister() {
// test register, primary and unregister when start tso and resource-manager with only one server
for i := 0; i < 3; i++ {
suite.checkServerRegister("tso")
suite.checkServerRegister(utils.TSOServiceName)
}
for i := 0; i < 3; i++ {
suite.checkServerRegister("resource_manager")
suite.checkServerRegister(utils.ResourceManagerServiceName)
}
}

Expand Down Expand Up @@ -105,8 +105,8 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
}

func (suite *serverRegisterTestSuite) TestServerPrimaryChange() {
suite.checkServerPrimaryChange("tso", 3)
suite.checkServerPrimaryChange("resource_manager", 3)
suite.checkServerPrimaryChange(utils.TSOServiceName, 3)
suite.checkServerPrimaryChange(utils.ResourceManagerServiceName, 3)
}

func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName string, serverNum int) {
Expand Down Expand Up @@ -146,9 +146,9 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin
func (suite *serverRegisterTestSuite) addServer(serviceName string) (bs.Server, func()) {
re := suite.Require()
switch serviceName {
case "tso":
case utils.TSOServiceName:
return mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints)
case "resource_manager":
case utils.ResourceManagerServiceName:
return mcs.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints)
default:
return nil, nil
Expand Down
10 changes: 5 additions & 5 deletions tests/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,10 @@ func (suite *APIServerForwardTestSuite) SetupTest() {

func (suite *APIServerForwardTestSuite) TearDownTest() {
etcdClient := suite.pdLeader.GetEtcdClient()
endpoints, err := discovery.Discover(etcdClient, "tso")
endpoints, err := discovery.Discover(etcdClient, utils.TSOServiceName)
suite.NoError(err)
if len(endpoints) != 0 {
endpoints, err = discovery.Discover(etcdClient, "tso")
endpoints, err = discovery.Discover(etcdClient, utils.TSOServiceName)
suite.NoError(err)
suite.Empty(endpoints)
}
Expand Down Expand Up @@ -245,11 +245,11 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() {
}

// can use the tso-related interface with new primary
oldPrimary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, "tso")
oldPrimary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
serverMap[oldPrimary].Close()
time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, "tso")
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
suite.NotEqual(oldPrimary, primary)
suite.checkAvailableTSO()
Expand All @@ -264,7 +264,7 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() {
}
}
time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout
primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, "tso")
primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
suite.Equal(oldPrimary, primary)
suite.checkAvailableTSO()
Expand Down

0 comments on commit a330f0b

Please sign in to comment.