Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into store-slow-trend
Browse files Browse the repository at this point in the history
  • Loading branch information
innerr committed Feb 8, 2023
2 parents 00e0a44 + 54f85a2 commit 1a6e948
Show file tree
Hide file tree
Showing 47 changed files with 814 additions and 571 deletions.
67 changes: 37 additions & 30 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/autoscaling"
basicsvr "github.com/tikv/pd/pkg/basic_server"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/swaggerserver"
Expand All @@ -37,17 +38,48 @@ import (
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// Register schedulers.
_ "github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedulers"

// Register Service
_ "github.com/tikv/pd/pkg/mcs/registry"
_ "github.com/tikv/pd/pkg/mcs/resource_manager/server/install"
)

func main() {
ctx, cancel, svr := createServerWrapper(os.Args[1:])

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func createServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) {
cfg := config.NewConfig()
err := cfg.Parse(os.Args[1:])
err := cfg.Parse(args)

if cfg.Version {
server.PrintPDInfo()
Expand Down Expand Up @@ -103,34 +135,9 @@ func main() {
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
schedulers.Register()

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
return ctx, cancel, svr
}

func exit(code int) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/basic_server/basic_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package basicsvr

import (
"context"
"net/http"

"go.etcd.io/etcd/clientv3"
)

// Server defines the common basic behaviors of a server
type Server interface {
// Name returns the unique Name for this server in the cluster.
Name() string
// Context returns the context of server.
Context() context.Context

// Run runs the server.
Run() error
// Close closes the server.
Close()

// GetClient returns builtin etcd client.
GetClient() *clientv3.Client
// GetHTTPClient returns builtin etcd client.
GetHTTPClient() *http.Client
}
4 changes: 2 additions & 2 deletions pkg/mcs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package msc used to implement the core logic of the external services which rely on the PD backend provider.
package msc
// Package mcs used to implement the core logic of the external services which rely on the PD backend provider.
package mcs
7 changes: 3 additions & 4 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
Expand Down Expand Up @@ -250,12 +249,12 @@ func (m *Member) isSameLeader(leader *pdpb.Member) bool {
}

// MemberInfo initializes the member info.
func (m *Member) MemberInfo(cfg *config.Config, name string, rootPath string) {
func (m *Member) MemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) {
leader := &pdpb.Member{
Name: name,
MemberId: m.ID(),
ClientUrls: strings.Split(cfg.AdvertiseClientUrls, ","),
PeerUrls: strings.Split(cfg.AdvertisePeerUrls, ","),
ClientUrls: strings.Split(advertiseClientUrls, ","),
PeerUrls: strings.Split(advertisePeerUrls, ","),
}

data, err := leader.Marshal()
Expand Down
15 changes: 8 additions & 7 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (
leaderTickInterval = 50 * time.Millisecond
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
// The value should be the same as the variable defined in server's config.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
)

var (
Expand Down Expand Up @@ -134,17 +132,20 @@ type AllocatorManager struct {
func NewAllocatorManager(
m *member.Member,
rootPath string,
cfg config,
enableLocalTSO bool,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
tlsConfig *grpcutil.TLSConfig,
maxResetTSGap func() time.Duration,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
enableLocalTSO: cfg.IsLocalTSOEnabled(),
enableLocalTSO: enableLocalTSO,
member: m,
rootPath: rootPath,
saveInterval: cfg.GetTSOSaveInterval(),
updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(),
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
securityConfig: tlsConfig,
}
allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup)
allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
44 changes: 38 additions & 6 deletions pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,46 @@
package tso

import (
"flag"
"time"

"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

type config interface {
IsLocalTSOEnabled() bool
GetTSOSaveInterval() time.Duration
GetTSOUpdatePhysicalInterval() time.Duration
GetTLSConfig() *grpcutil.TLSConfig
const (
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
)

// Config is the configuration for the TSO.
type Config struct {
flagSet *flag.FlagSet

configFile string
// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
// to indicate which DC this PD belongs to.
EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`

// The interval to update physical part of timestamp. Usually, this config should not be set.
// At most 1<<18 (262144) TSOs can be generated in the interval. The smaller the value, the
// more TSOs provided, and at the same time consuming more CPU time.
// This config is only valid in 1ms to 10s. If it's configured too long or too short, it will
// be automatically clamped to the range.
TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"`
}

// NewConfig creates a new config.
func NewConfig() *Config {
cfg := &Config{}
cfg.flagSet = flag.NewFlagSet("pd", flag.ContinueOnError)
fs := cfg.flagSet

fs.StringVar(&cfg.configFile, "config", "", "config file")

return cfg
}
1 change: 0 additions & 1 deletion server/api/hot_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server"
_ "github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/storage"
)

Expand Down
1 change: 0 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
_ "github.com/tikv/pd/server/schedulers"
)

type scheduleTestSuite struct {
Expand Down
25 changes: 25 additions & 0 deletions server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func RegisterKeyspace(r *gin.RouterGroup) {
router.GET("/:name", LoadKeyspace)
router.PATCH("/:name/config", UpdateKeyspaceConfig)
router.PUT("/:name/state", UpdateKeyspaceState)
router.GET("/id/:id", LoadKeyspaceByID)
}

// CreateKeyspaceParams represents parameters needed when creating a new keyspace.
Expand Down Expand Up @@ -99,6 +100,30 @@ func LoadKeyspace(c *gin.Context) {
c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta})
}

// LoadKeyspaceByID returns target keyspace.
// @Tags keyspaces
// @Summary Get keyspace info.
// @Param id path string true "Keyspace id"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces/id/{id} [get]
func LoadKeyspaceByID(c *gin.Context) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil || id == 0 {
c.AbortWithStatusJSON(http.StatusInternalServerError, "invalid keyspace id")
return
}
svr := c.MustGet("server").(*server.Server)
manager := svr.GetKeyspaceManager()
meta, err := manager.LoadKeyspaceByID(uint32(id))
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta})
}

// parseLoadAllQuery parses LoadAllKeyspaces' query parameters.
// page_token:
// The keyspace id of the scan start. If not set, scan from keyspace with id 1.
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockid"
_ "github.com/tikv/pd/server/schedulers"
_ "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/storage"
)

Expand Down
Loading

0 comments on commit 1a6e948

Please sign in to comment.