Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: add http server for heartbeat bench #7685

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions tools/pd-heartbeat-bench/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"sync/atomic"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -54,8 +56,8 @@ func NewConfig() *Config {
fs := cfg.flagSet
fs.ParseErrorsWhitelist.UnknownFlags = true
fs.StringVar(&cfg.configFile, "config", "", "config file")
fs.StringVar(&cfg.PDAddr, "pd", "http://127.0.0.1:2379", "pd address")
fs.StringVar(&cfg.StatusAddr, "status-addr", "http://127.0.0.1:20180", "status address")
fs.StringVar(&cfg.PDAddr, "pd", "http://10.2.8.101:2379", "pd address")
fs.StringVar(&cfg.StatusAddr, "status-addr", "http://10.2.8.101:30180", "status address")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add HTTP scheme prefix


return cfg
}
Expand Down Expand Up @@ -131,3 +133,48 @@ func (c *Config) Adjust(meta *toml.MetaData) {
c.Sample = defaultSample
}
}

type Options struct {
LeaderUpdateRatio atomic.Value
EpochUpdateRatio atomic.Value
SpaceUpdateRatio atomic.Value
FlowUpdateRatio atomic.Value
}

// NewOptions creates a new option.
func NewOptions(cfg *Config) *Options {
o := &Options{}
o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio)
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
return o
}

// GetLeaderUpdateRatio returns the leader update ratio.
func (o *Options) GetLeaderUpdateRatio() float64 {
return o.LeaderUpdateRatio.Load().(float64)
}

// GetEpochUpdateRatio returns the epoch update ratio.
func (o *Options) GetEpochUpdateRatio() float64 {
return o.EpochUpdateRatio.Load().(float64)
}

// GetSpaceUpdateRatio returns the space update ratio.
func (o *Options) GetSpaceUpdateRatio() float64 {
return o.SpaceUpdateRatio.Load().(float64)
}

// GetFlowUpdateRatio returns the flow update ratio.
func (o *Options) GetFlowUpdateRatio() float64 {
return o.FlowUpdateRatio.Load().(float64)
}

// SetOptions sets the option.
func (o *Options) SetOptions(cfg *Config) {
o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio)
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
}
106 changes: 76 additions & 30 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package main

import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/tools/pd-heartbeat-bench/config"
Expand Down Expand Up @@ -67,17 +71,22 @@ func newClient(cfg *config.Config) pdpb.PDClient {
}

func initClusterID(ctx context.Context, cli pdpb.PDClient) {
cctx, cancel := context.WithCancel(ctx)
res, err := cli.GetMembers(cctx, &pdpb.GetMembersRequest{})
cancel()
if err != nil {
log.Fatal("failed to get members", zap.Error(err))
}
if res.GetHeader().GetError() != nil {
log.Fatal("failed to get members", zap.String("err", res.GetHeader().GetError().String()))
for i := 0; i < 100; i++ {
time.Sleep(time.Second)
cctx, cancel := context.WithCancel(ctx)
res, err := cli.GetMembers(cctx, &pdpb.GetMembersRequest{})
cancel()
if err != nil {
continue
}
if res.GetHeader().GetError() != nil {
continue
}
clusterID = res.GetHeader().GetClusterId()
log.Info("init cluster ID successfully", zap.Uint64("cluster-id", clusterID))
return
}
clusterID = res.GetHeader().GetClusterId()
log.Info("init cluster ID successfully", zap.Uint64("cluster-id", clusterID))
log.Fatal("init cluster ID failed")
}

func header() *pdpb.RequestHeader {
Expand Down Expand Up @@ -180,7 +189,7 @@ type Regions struct {
updateFlow []int
}

func (rs *Regions) init(cfg *config.Config) {
func (rs *Regions) init(cfg *config.Config, options *config.Options) []int {
rs.regions = make([]*pdpb.RegionHeartbeatRequest, 0, cfg.RegionCount)
rs.updateRound = 0

Expand Down Expand Up @@ -227,32 +236,26 @@ func (rs *Regions) init(cfg *config.Config) {
}

// Generate sample index
slice := make([]int, cfg.RegionCount)
for i := range slice {
slice[i] = i
}

rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times
pick := func(ratio float64) []int {
rand.Shuffle(cfg.RegionCount, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...)
indexes := make([]int, cfg.RegionCount)
for i := range indexes {
indexes[i] = i
}

rs.updateLeader = pick(cfg.LeaderUpdateRatio)
rs.updateEpoch = pick(cfg.EpochUpdateRatio)
rs.updateSpace = pick(cfg.SpaceUpdateRatio)
rs.updateFlow = pick(cfg.FlowUpdateRatio)
return indexes
}

func (rs *Regions) update(replica int) {
func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes []int) {
rs.updateRound += 1

rs.updateLeader = pick(indexes, cfg, options.GetLeaderUpdateRatio())
rs.updateEpoch = pick(indexes, cfg, options.GetEpochUpdateRatio())
rs.updateSpace = pick(indexes, cfg, options.GetSpaceUpdateRatio())
rs.updateFlow = pick(indexes, cfg, options.GetFlowUpdateRatio())

// update leader
for _, i := range rs.updateLeader {
region := rs.regions[i]
region.Leader = region.Region.Peers[rs.updateRound%replica]
region.Leader = region.Region.Peers[rs.updateRound%cfg.Replica]
}
// update epoch
for _, i := range rs.updateEpoch {
Expand Down Expand Up @@ -394,7 +397,15 @@ func (s *Stores) update(rs *Regions) {
}
}

func pick(slice []int, cfg *config.Config, ratio float64) []int {
rand.Shuffle(cfg.RegionCount, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...)
}

func main() {
rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times
cfg := config.NewConfig()
err := cfg.Parse(os.Args[1:])
defer logutil.LogPanic()
Expand All @@ -415,6 +426,7 @@ func main() {
log.Fatal("initialize logger error", zap.Error(err))
}

options := config.NewOptions(cfg)
// let PD have enough time to start
time.Sleep(5 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -432,8 +444,9 @@ func main() {
}()
cli := newClient(cfg)
initClusterID(ctx, cli)
go runHTTPServer(cfg, options)
regions := new(Regions)
regions.init(cfg)
indexes := regions.init(cfg, options)
log.Info("finish init regions")
stores := newStores(cfg.StoreCount)
stores.update(regions)
Expand Down Expand Up @@ -476,7 +489,7 @@ func main() {
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
)
log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since)))
regions.update(cfg.Replica)
regions.update(cfg, options, indexes)
go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update.
case <-ctx.Done():
log.Info("got signal to exit")
Expand Down Expand Up @@ -530,3 +543,36 @@ func (rs *Regions) result(regionCount int, sec float64) {
zap.String("save-flow", fmt.Sprintf("%.4f", float64(len(rs.updateFlow))/sec)),
zap.String("skip", fmt.Sprintf("%.4f", float64(inactiveCount)/sec)))
}

func runHTTPServer(cfg *config.Config, options *config.Options) {
http.Handle("/metrics", promhttp.Handler())
// profile API
http.HandleFunc("/pprof/profile", pprof.Profile)
http.HandleFunc("/pprof/trace", pprof.Trace)
http.HandleFunc("/pprof/symbol", pprof.Symbol)
http.Handle("/pprof/heap", pprof.Handler("heap"))
http.Handle("/pprof/mutex", pprof.Handler("mutex"))
http.Handle("/pprof/allocs", pprof.Handler("allocs"))
http.Handle("/pprof/block", pprof.Handler("block"))
http.Handle("/pprof/goroutine", pprof.Handler("goroutine"))

// config API
http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
newCfg := config.NewConfig()
data, err := io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
fmt.Fprint(w, err.Error())
return
}

if err := json.Unmarshal(data, &newCfg); err != nil {
fmt.Fprint(w, err.Error())
return
}

options.SetOptions(newCfg)
})
// nolint
http.ListenAndServe(cfg.StatusAddr, nil)
}
Loading