diff --git a/tools/go.mod b/tools/go.mod index 659d05f5e58..c7a38093226 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -15,6 +15,10 @@ require ( github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/coreos/go-semver v0.3.0 github.com/docker/go-units v0.4.0 + github.com/gin-contrib/cors v1.4.0 + github.com/gin-contrib/gzip v0.0.1 + github.com/gin-contrib/pprof v1.4.0 + github.com/gin-gonic/gin v1.9.1 github.com/go-echarts/go-echarts v1.0.0 github.com/influxdata/tdigest v0.0.1 github.com/mattn/go-shellwords v1.0.12 @@ -72,11 +76,7 @@ require ( github.com/elliotchance/pie/v2 v2.1.0 // indirect github.com/fogleman/gg v1.3.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect - github.com/gin-contrib/cors v1.4.0 // indirect - github.com/gin-contrib/gzip v0.0.1 // indirect - github.com/gin-contrib/pprof v1.4.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/gin-gonic/gin v1.9.1 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.6 // indirect diff --git a/tools/pd-heartbeat-bench/config/config.go b/tools/pd-heartbeat-bench/config/config.go index a33a058e2e2..0637d991c5e 100644 --- a/tools/pd-heartbeat-bench/config/config.go +++ b/tools/pd-heartbeat-bench/config/config.go @@ -1,6 +1,8 @@ package config import ( + "sync/atomic" + "github.com/BurntSushi/toml" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -54,8 +56,8 @@ func NewConfig() *Config { fs := cfg.flagSet fs.ParseErrorsWhitelist.UnknownFlags = true fs.StringVar(&cfg.configFile, "config", "", "config file") - fs.StringVar(&cfg.PDAddr, "pd", "http://127.0.0.1:2379", "pd address") - fs.StringVar(&cfg.StatusAddr, "status-addr", "http://127.0.0.1:20180", "status address") + fs.StringVar(&cfg.PDAddr, "pd", "127.0.0.1:2379", "pd address") + fs.StringVar(&cfg.StatusAddr, "status-addr", "127.0.0.1:20180", "status address") return cfg } @@ -131,3 +133,56 @@ func (c *Config) Adjust(meta *toml.MetaData) { c.Sample = defaultSample } } + +// Clone creates a copy of current config. +func (c *Config) Clone() *Config { + cfg := &Config{} + *cfg = *c + return cfg +} + +// Options is the option of the heartbeat-bench. +type Options struct { + LeaderUpdateRatio atomic.Value + EpochUpdateRatio atomic.Value + SpaceUpdateRatio atomic.Value + FlowUpdateRatio atomic.Value +} + +// NewOptions creates a new option. +func NewOptions(cfg *Config) *Options { + o := &Options{} + o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio) + o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) + o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) + o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio) + return o +} + +// GetLeaderUpdateRatio returns the leader update ratio. +func (o *Options) GetLeaderUpdateRatio() float64 { + return o.LeaderUpdateRatio.Load().(float64) +} + +// GetEpochUpdateRatio returns the epoch update ratio. +func (o *Options) GetEpochUpdateRatio() float64 { + return o.EpochUpdateRatio.Load().(float64) +} + +// GetSpaceUpdateRatio returns the space update ratio. +func (o *Options) GetSpaceUpdateRatio() float64 { + return o.SpaceUpdateRatio.Load().(float64) +} + +// GetFlowUpdateRatio returns the flow update ratio. +func (o *Options) GetFlowUpdateRatio() float64 { + return o.FlowUpdateRatio.Load().(float64) +} + +// SetOptions sets the option. +func (o *Options) SetOptions(cfg *Config) { + o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio) + o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) + o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) + o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio) +} diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 7a4198a4f4f..e9fe32e067d 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "math/rand" + "net/http" "os" "os/signal" "strings" @@ -28,11 +29,16 @@ import ( "time" "github.com/docker/go-units" + "github.com/gin-contrib/cors" + "github.com/gin-contrib/gzip" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/spf13/pflag" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-heartbeat-bench/config" "go.etcd.io/etcd/pkg/report" @@ -67,17 +73,27 @@ func newClient(cfg *config.Config) pdpb.PDClient { } func initClusterID(ctx context.Context, cli pdpb.PDClient) { - cctx, cancel := context.WithCancel(ctx) - res, err := cli.GetMembers(cctx, &pdpb.GetMembersRequest{}) - cancel() - if err != nil { - log.Fatal("failed to get members", zap.Error(err)) - } - if res.GetHeader().GetError() != nil { - log.Fatal("failed to get members", zap.String("err", res.GetHeader().GetError().String())) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + cctx, cancel := context.WithCancel(ctx) + res, err := cli.GetMembers(cctx, &pdpb.GetMembersRequest{}) + cancel() + if err != nil { + continue + } + if res.GetHeader().GetError() != nil { + continue + } + clusterID = res.GetHeader().GetClusterId() + log.Info("init cluster ID successfully", zap.Uint64("cluster-id", clusterID)) + return + } } - clusterID = res.GetHeader().GetClusterId() - log.Info("init cluster ID successfully", zap.Uint64("cluster-id", clusterID)) } func header() *pdpb.RequestHeader { @@ -180,7 +196,7 @@ type Regions struct { updateFlow []int } -func (rs *Regions) init(cfg *config.Config) { +func (rs *Regions) init(cfg *config.Config, options *config.Options) []int { rs.regions = make([]*pdpb.RegionHeartbeatRequest, 0, cfg.RegionCount) rs.updateRound = 0 @@ -227,32 +243,26 @@ func (rs *Regions) init(cfg *config.Config) { } // Generate sample index - slice := make([]int, cfg.RegionCount) - for i := range slice { - slice[i] = i + indexes := make([]int, cfg.RegionCount) + for i := range indexes { + indexes[i] = i } - rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times - pick := func(ratio float64) []int { - rand.Shuffle(cfg.RegionCount, func(i, j int) { - slice[i], slice[j] = slice[j], slice[i] - }) - return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...) - } - - rs.updateLeader = pick(cfg.LeaderUpdateRatio) - rs.updateEpoch = pick(cfg.EpochUpdateRatio) - rs.updateSpace = pick(cfg.SpaceUpdateRatio) - rs.updateFlow = pick(cfg.FlowUpdateRatio) + return indexes } -func (rs *Regions) update(replica int) { +func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes []int) { rs.updateRound += 1 + rs.updateLeader = pick(indexes, cfg, options.GetLeaderUpdateRatio()) + rs.updateEpoch = pick(indexes, cfg, options.GetEpochUpdateRatio()) + rs.updateSpace = pick(indexes, cfg, options.GetSpaceUpdateRatio()) + rs.updateFlow = pick(indexes, cfg, options.GetFlowUpdateRatio()) + // update leader for _, i := range rs.updateLeader { region := rs.regions[i] - region.Leader = region.Region.Peers[rs.updateRound%replica] + region.Leader = region.Region.Peers[rs.updateRound%cfg.Replica] } // update epoch for _, i := range rs.updateEpoch { @@ -394,7 +404,15 @@ func (s *Stores) update(rs *Regions) { } } +func pick(slice []int, cfg *config.Config, ratio float64) []int { + rand.Shuffle(cfg.RegionCount, func(i, j int) { + slice[i], slice[j] = slice[j], slice[i] + }) + return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...) +} + func main() { + rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times cfg := config.NewConfig() err := cfg.Parse(os.Args[1:]) defer logutil.LogPanic() @@ -415,6 +433,7 @@ func main() { log.Fatal("initialize logger error", zap.Error(err)) } + options := config.NewOptions(cfg) // let PD have enough time to start time.Sleep(5 * time.Second) ctx, cancel := context.WithCancel(context.Background()) @@ -432,8 +451,9 @@ func main() { }() cli := newClient(cfg) initClusterID(ctx, cli) + go runHTTPServer(cfg, options) regions := new(Regions) - regions.init(cfg) + indexes := regions.init(cfg, options) log.Info("finish init regions") stores := newStores(cfg.StoreCount) stores.update(regions) @@ -476,7 +496,7 @@ func main() { zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), ) log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since))) - regions.update(cfg.Replica) + regions.update(cfg, options, indexes) go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update. case <-ctx.Done(): log.Info("got signal to exit") @@ -530,3 +550,34 @@ func (rs *Regions) result(regionCount int, sec float64) { zap.String("save-flow", fmt.Sprintf("%.4f", float64(len(rs.updateFlow))/sec)), zap.String("skip", fmt.Sprintf("%.4f", float64(inactiveCount)/sec))) } + +func runHTTPServer(cfg *config.Config, options *config.Options) { + gin.SetMode(gin.ReleaseMode) + engine := gin.New() + engine.Use(gin.Recovery()) + engine.Use(cors.Default()) + engine.Use(gzip.Gzip(gzip.DefaultCompression)) + engine.GET("metrics", utils.PromHandler()) + // profile API + pprof.Register(engine) + engine.PUT("config", func(c *gin.Context) { + newCfg := cfg.Clone() + if err := c.BindJSON(&newCfg); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + options.SetOptions(newCfg) + c.String(http.StatusOK, "Successfully updated the configuration") + }) + engine.GET("config", func(c *gin.Context) { + output := cfg.Clone() + output.FlowUpdateRatio = options.GetFlowUpdateRatio() + output.LeaderUpdateRatio = options.GetLeaderUpdateRatio() + output.EpochUpdateRatio = options.GetEpochUpdateRatio() + output.SpaceUpdateRatio = options.GetSpaceUpdateRatio() + + c.IndentedJSON(http.StatusOK, output) + }) + engine.Run(cfg.StatusAddr) +}