Skip to content

Commit

Permalink
tools: support changing the percentage of heartbeat (tikv#7698)
Browse files Browse the repository at this point in the history
ref tikv#7703

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Signed-off-by: pingandb <[email protected]>
  • Loading branch information
2 people authored and pingandb committed Jan 18, 2024
1 parent b32fd5a commit eb10bb6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 5 deletions.
1 change: 1 addition & 0 deletions tools/pd-heartbeat-bench/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ leader-update-ratio = 0.06
epoch-update-ratio = 0.04
space-update-ratio = 0.15
flow-update-ratio = 0.35
no-update-ratio = 0.0

sample = false
40 changes: 39 additions & 1 deletion tools/pd-heartbeat-bench/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"math"
"sync/atomic"

"github.com/BurntSushi/toml"
Expand All @@ -20,6 +21,7 @@ const (
defaultEpochUpdateRatio = 0.04
defaultSpaceUpdateRatio = 0.15
defaultFlowUpdateRatio = 0.35
defaultNoUpdateRatio = 0
defaultRound = 0
defaultSample = false

Expand All @@ -45,6 +47,7 @@ type Config struct {
EpochUpdateRatio float64 `toml:"epoch-update-ratio" json:"epoch-update-ratio"`
SpaceUpdateRatio float64 `toml:"space-update-ratio" json:"space-update-ratio"`
FlowUpdateRatio float64 `toml:"flow-update-ratio" json:"flow-update-ratio"`
NoUpdateRatio float64 `toml:"no-update-ratio" json:"no-update-ratio"`
Sample bool `toml:"sample" json:"sample"`
Round int `toml:"round" json:"round"`
}
Expand Down Expand Up @@ -90,7 +93,7 @@ func (c *Config) Parse(arguments []string) error {
}

c.Adjust(meta)
return nil
return c.Validate()
}

// Adjust is used to adjust configurations
Expand Down Expand Up @@ -129,11 +132,38 @@ func (c *Config) Adjust(meta *toml.MetaData) {
if !meta.IsDefined("flow-update-ratio") {
configutil.AdjustFloat64(&c.FlowUpdateRatio, defaultFlowUpdateRatio)
}
if !meta.IsDefined("no-update-ratio") {
configutil.AdjustFloat64(&c.NoUpdateRatio, defaultNoUpdateRatio)
}
if !meta.IsDefined("sample") {
c.Sample = defaultSample
}
}

// Validate is used to validate configurations
func (c *Config) Validate() error {
if c.LeaderUpdateRatio < 0 || c.LeaderUpdateRatio > 1 {
return errors.Errorf("leader-update-ratio must be in [0, 1]")
}
if c.EpochUpdateRatio < 0 || c.EpochUpdateRatio > 1 {
return errors.Errorf("epoch-update-ratio must be in [0, 1]")
}
if c.SpaceUpdateRatio < 0 || c.SpaceUpdateRatio > 1 {
return errors.Errorf("space-update-ratio must be in [0, 1]")
}
if c.FlowUpdateRatio < 0 || c.FlowUpdateRatio > 1 {
return errors.Errorf("flow-update-ratio must be in [0, 1]")
}
if c.NoUpdateRatio < 0 || c.NoUpdateRatio > 1 {
return errors.Errorf("no-update-ratio must be in [0, 1]")
}
max := math.Max(c.LeaderUpdateRatio, math.Max(c.EpochUpdateRatio, math.Max(c.SpaceUpdateRatio, c.FlowUpdateRatio)))
if max+c.NoUpdateRatio > 1 {
return errors.Errorf("sum of update-ratio must be in [0, 1]")
}
return nil
}

// Clone creates a copy of current config.
func (c *Config) Clone() *Config {
cfg := &Config{}
Expand All @@ -147,6 +177,7 @@ type Options struct {
EpochUpdateRatio atomic.Value
SpaceUpdateRatio atomic.Value
FlowUpdateRatio atomic.Value
NoUpdateRatio atomic.Value
}

// NewOptions creates a new option.
Expand All @@ -156,6 +187,7 @@ func NewOptions(cfg *Config) *Options {
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
o.NoUpdateRatio.Store(cfg.NoUpdateRatio)
return o
}

Expand All @@ -179,10 +211,16 @@ func (o *Options) GetFlowUpdateRatio() float64 {
return o.FlowUpdateRatio.Load().(float64)
}

// GetNoUpdateRatio returns the no update ratio.
func (o *Options) GetNoUpdateRatio() float64 {
return o.NoUpdateRatio.Load().(float64)
}

// SetOptions sets the option.
func (o *Options) SetOptions(cfg *Config) {
o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio)
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
o.NoUpdateRatio.Store(cfg.NoUpdateRatio)
}
55 changes: 51 additions & 4 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func newEndKey(id uint64, keyLen int) []byte {

// Regions simulates all regions to heartbeat.
type Regions struct {
regions []*pdpb.RegionHeartbeatRequest
regions []*pdpb.RegionHeartbeatRequest
awakenRegions atomic.Value

updateRound int

Expand Down Expand Up @@ -258,22 +259,27 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [
rs.updateEpoch = pick(indexes, cfg, options.GetEpochUpdateRatio())
rs.updateSpace = pick(indexes, cfg, options.GetSpaceUpdateRatio())
rs.updateFlow = pick(indexes, cfg, options.GetFlowUpdateRatio())
updatedRegionsMap := make(map[int]*pdpb.RegionHeartbeatRequest)
var awakenRegions []*pdpb.RegionHeartbeatRequest

// update leader
for _, i := range rs.updateLeader {
region := rs.regions[i]
region.Leader = region.Region.Peers[rs.updateRound%cfg.Replica]
updatedRegionsMap[i] = region
}
// update epoch
for _, i := range rs.updateEpoch {
region := rs.regions[i]
region.Region.RegionEpoch.Version += 1
updatedRegionsMap[i] = region
}
// update space
for _, i := range rs.updateSpace {
region := rs.regions[i]
region.ApproximateSize = uint64(bytesUnit * rand.Float64())
region.ApproximateKeys = uint64(keysUint * rand.Float64())
updatedRegionsMap[i] = region
}
// update flow
for _, i := range rs.updateFlow {
Expand All @@ -286,12 +292,21 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [
Get: uint64(queryUnit * rand.Float64()),
Put: uint64(queryUnit * rand.Float64()),
}
updatedRegionsMap[i] = region
}
// update interval
for _, region := range rs.regions {
region.Interval.StartTimestamp = region.Interval.EndTimestamp
region.Interval.EndTimestamp = region.Interval.StartTimestamp + regionReportInterval
}
for _, region := range updatedRegionsMap {
awakenRegions = append(awakenRegions, region)
}
noUpdatedRegions := pickNoUpdatedRegions(indexes, cfg, options.GetNoUpdateRatio(), updatedRegionsMap)
for _, i := range noUpdatedRegions {
awakenRegions = append(awakenRegions, rs.regions[i])
}
rs.awakenRegions.Store(awakenRegions)
}

func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_RegionHeartbeatClient {
Expand All @@ -312,8 +327,14 @@ func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_Regi

func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_RegionHeartbeatClient, storeID uint64, rep report.Report) {
defer wg.Done()
var regions []*pdpb.RegionHeartbeatRequest
for _, region := range rs.regions {
var regions, toUpdate []*pdpb.RegionHeartbeatRequest
updatedRegions := rs.awakenRegions.Load()
if updatedRegions == nil {
toUpdate = rs.regions
} else {
toUpdate = updatedRegions.([]*pdpb.RegionHeartbeatRequest)
}
for _, region := range toUpdate {
if region.Leader.StoreId != storeID {
continue
}
Expand Down Expand Up @@ -411,6 +432,23 @@ func pick(slice []int, cfg *config.Config, ratio float64) []int {
return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...)
}

func pickNoUpdatedRegions(slice []int, cfg *config.Config, ratio float64, updatedMap map[int]*pdpb.RegionHeartbeatRequest) []int {
if ratio == 0 {
return nil
}
rand.Shuffle(cfg.RegionCount, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
NoUpdatedRegionsNum := int(float64(cfg.RegionCount) * ratio)
res := make([]int, 0, NoUpdatedRegionsNum)
for i := 0; len(res) < NoUpdatedRegionsNum; i++ {
if _, ok := updatedMap[slice[i]]; !ok {
res = append(res, slice[i])
}
}
return res
}

func main() {
rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times
cfg := config.NewConfig()
Expand Down Expand Up @@ -562,11 +600,19 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
pprof.Register(engine)
engine.PUT("config", func(c *gin.Context) {
newCfg := cfg.Clone()
newCfg.FlowUpdateRatio = options.GetFlowUpdateRatio()
newCfg.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
newCfg.EpochUpdateRatio = options.GetEpochUpdateRatio()
newCfg.SpaceUpdateRatio = options.GetSpaceUpdateRatio()
newCfg.NoUpdateRatio = options.GetNoUpdateRatio()
if err := c.BindJSON(&newCfg); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

if err := newCfg.Validate(); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
options.SetOptions(newCfg)
c.String(http.StatusOK, "Successfully updated the configuration")
})
Expand All @@ -576,6 +622,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
output.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
output.EpochUpdateRatio = options.GetEpochUpdateRatio()
output.SpaceUpdateRatio = options.GetSpaceUpdateRatio()
output.NoUpdateRatio = options.GetNoUpdateRatio()

c.IndentedJSON(http.StatusOK, output)
})
Expand Down

0 comments on commit eb10bb6

Please sign in to comment.