Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: make history-sample-interval and history-sample-duration configurable (#7878) #7893

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ type grantHotRegionScheduler struct {

// newGrantHotRegionScheduler creates an admin scheduler that transfers hot region peer to fixed store and hot region leader to one store.
func newGrantHotRegionScheduler(opController *operator.Controller, conf *grantHotRegionSchedulerConfig) *grantHotRegionScheduler {
base := newBaseHotScheduler(opController)
base := newBaseHotScheduler(opController,
statistics.DefaultHistorySampleDuration, statistics.DefaultHistorySampleInterval)
handler := newGrantHotRegionHandler(conf)
ret := &grantHotRegionScheduler{
baseHotScheduler: base,
Expand Down
52 changes: 49 additions & 3 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@
updateWriteTime time.Time
}

func newBaseHotScheduler(opController *operator.Controller) *baseHotScheduler {
func newBaseHotScheduler(opController *operator.Controller, sampleDuration time.Duration, sampleInterval time.Duration) *baseHotScheduler {
base := NewBaseScheduler(opController)
ret := &baseHotScheduler{
BaseScheduler: base,
types: []utils.RWType{utils.Write, utils.Read},
regionPendings: make(map[uint64]*pendingInfluence),
stHistoryLoads: statistics.NewStoreHistoryLoads(utils.DimLen),
stHistoryLoads: statistics.NewStoreHistoryLoads(utils.DimLen, sampleDuration, sampleInterval),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
Expand Down Expand Up @@ -180,6 +180,10 @@
}
}

func (h *baseHotScheduler) updateHistoryLoadConfig(sampleDuration, sampleInterval time.Duration) {
h.stHistoryLoads = h.stHistoryLoads.UpdateConfig(sampleDuration, sampleInterval)
}

// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
Expand Down Expand Up @@ -233,7 +237,8 @@
}

func newHotScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler {
base := newBaseHotScheduler(opController)
base := newBaseHotScheduler(opController,
conf.GetHistorySampleDuration(), conf.GetHistorySampleInterval())
ret := &hotScheduler{
name: HotRegionName,
baseHotScheduler: base,
Expand All @@ -257,6 +262,46 @@
return h.conf.EncodeConfig()
}

func (h *hotScheduler) ReloadConfig() error {
h.conf.Lock()
defer h.conf.Unlock()
cfgData, err := h.conf.storage.LoadSchedulerConfig(h.GetName())
if err != nil {
return err

Check warning on line 270 in pkg/schedule/schedulers/hot_region.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/hot_region.go#L270

Added line #L270 was not covered by tests
}
if len(cfgData) == 0 {
return nil

Check warning on line 273 in pkg/schedule/schedulers/hot_region.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/hot_region.go#L273

Added line #L273 was not covered by tests
}
newCfg := &hotRegionSchedulerConfig{}
if err := DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err

Check warning on line 277 in pkg/schedule/schedulers/hot_region.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/hot_region.go#L277

Added line #L277 was not covered by tests
}
h.conf.MinHotByteRate = newCfg.MinHotByteRate
h.conf.MinHotKeyRate = newCfg.MinHotKeyRate
h.conf.MinHotQueryRate = newCfg.MinHotQueryRate
h.conf.MaxZombieRounds = newCfg.MaxZombieRounds
h.conf.MaxPeerNum = newCfg.MaxPeerNum
h.conf.ByteRateRankStepRatio = newCfg.ByteRateRankStepRatio
h.conf.KeyRateRankStepRatio = newCfg.KeyRateRankStepRatio
h.conf.QueryRateRankStepRatio = newCfg.QueryRateRankStepRatio
h.conf.CountRankStepRatio = newCfg.CountRankStepRatio
h.conf.GreatDecRatio = newCfg.GreatDecRatio
h.conf.MinorDecRatio = newCfg.MinorDecRatio
h.conf.SrcToleranceRatio = newCfg.SrcToleranceRatio
h.conf.DstToleranceRatio = newCfg.DstToleranceRatio
h.conf.WriteLeaderPriorities = newCfg.WriteLeaderPriorities
h.conf.WritePeerPriorities = newCfg.WritePeerPriorities
h.conf.ReadPriorities = newCfg.ReadPriorities
h.conf.StrictPickingStore = newCfg.StrictPickingStore
h.conf.EnableForTiFlash = newCfg.EnableForTiFlash
h.conf.RankFormulaVersion = newCfg.RankFormulaVersion
h.conf.ForbidRWType = newCfg.ForbidRWType
h.conf.SplitThresholds = newCfg.SplitThresholds
h.conf.HistorySampleDuration = newCfg.HistorySampleDuration
h.conf.HistorySampleInterval = newCfg.HistorySampleInterval
return nil
}

func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.conf.ServeHTTP(w, r)
}
Expand Down Expand Up @@ -286,6 +331,7 @@
func (h *hotScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
h.Lock()
defer h.Unlock()
h.updateHistoryLoadConfig(h.conf.GetHistorySampleDuration(), h.conf.GetHistorySampleInterval())
h.prepareForBalance(typ, cluster)
// it can not move earlier to support to use api and metrics.
if h.conf.IsForbidRWType(typ) {
Expand Down
33 changes: 33 additions & 0 deletions pkg/schedule/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/unrolled/render"
"go.uber.org/zap"
Expand Down Expand Up @@ -76,6 +78,8 @@
RankFormulaVersion: "v2",
ForbidRWType: "none",
SplitThresholds: 0.2,
HistorySampleDuration: typeutil.NewDuration(statistics.DefaultHistorySampleDuration),
HistorySampleInterval: typeutil.NewDuration(statistics.DefaultHistorySampleInterval),
}
cfg.applyPrioritiesConfig(defaultPrioritiesConfig)
return cfg
Expand Down Expand Up @@ -104,6 +108,8 @@
RankFormulaVersion: conf.getRankFormulaVersionLocked(),
ForbidRWType: conf.getForbidRWTypeLocked(),
SplitThresholds: conf.SplitThresholds,
HistorySampleDuration: conf.HistorySampleDuration,
HistorySampleInterval: conf.HistorySampleInterval,
}
}

Expand Down Expand Up @@ -147,6 +153,9 @@
ForbidRWType string `json:"forbid-rw-type,omitempty"`
// SplitThresholds is the threshold to split hot region if the first priority flow of on hot region exceeds it.
SplitThresholds float64 `json:"split-thresholds"`

HistorySampleDuration typeutil.Duration `json:"history-sample-duration"`
HistorySampleInterval typeutil.Duration `json:"history-sample-interval"`
}

func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
Expand Down Expand Up @@ -305,6 +314,30 @@
return conf.getRankFormulaVersionLocked()
}

func (conf *hotRegionSchedulerConfig) GetHistorySampleDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return conf.HistorySampleDuration.Duration
}

func (conf *hotRegionSchedulerConfig) GetHistorySampleInterval() time.Duration {
conf.RLock()
defer conf.RUnlock()
return conf.HistorySampleInterval.Duration
}

func (conf *hotRegionSchedulerConfig) SetHistorySampleDuration(d time.Duration) {
conf.Lock()
defer conf.Unlock()
conf.HistorySampleDuration = typeutil.NewDuration(d)
}

func (conf *hotRegionSchedulerConfig) SetHistorySampleInterval(d time.Duration) {
conf.Lock()
defer conf.Unlock()
conf.HistorySampleInterval = typeutil.NewDuration(d)

Check warning on line 338 in pkg/schedule/schedulers/hot_region_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/hot_region_config.go#L336-L338

Added lines #L336 - L338 were not covered by tests
}

func (conf *hotRegionSchedulerConfig) getRankFormulaVersionLocked() string {
switch conf.RankFormulaVersion {
case "v2":
Expand Down
21 changes: 19 additions & 2 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (
)

func init() {
// TODO: remove this global variable in the future.
// And use a function to create hot schduler for test.
schedulePeerPr = 1.0
RegisterScheduler(utils.Write.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) {
cfg := initHotRegionScheduleConfig()
Expand Down Expand Up @@ -199,7 +201,6 @@ func newTestRegion(id uint64) *core.RegionInfo {
func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statistics.HistorySampleDuration = 0
statisticsInterval = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
Expand Down Expand Up @@ -402,6 +403,7 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
tc.SetHotRegionCacheHitsThreshold(0)

// Add stores 1, 2, 3, 4, 5, 6 with region counts 3, 2, 2, 2, 0, 0.
Expand Down Expand Up @@ -606,6 +608,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.conf.SetHistorySampleDuration(0)

// Add TiKV stores 1, 2, 3, 4, 5, 6, 7 (Down) with region counts 3, 3, 2, 2, 0, 0, 0.
// Add TiFlash stores 8, 9, 10, 11 with region counts 3, 1, 1, 0.
Expand Down Expand Up @@ -797,6 +800,7 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) {
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.QueryPriority, utils.BytePriority}
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 20)
Expand Down Expand Up @@ -834,6 +838,7 @@ func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) {
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
hb.(*hotScheduler).conf.RankFormulaVersion = "v1"
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

tc.SetHotRegionCacheHitsThreshold(0)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
Expand Down Expand Up @@ -972,6 +977,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) {
defer cancel()
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
re.NoError(err)

tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1040,6 +1046,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim
re.NoError(err)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
hb.(*hotScheduler).conf.RankFormulaVersion = "v1"
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
old := pendingAmpFactor
pendingAmpFactor = 0.0
defer func() {
Expand Down Expand Up @@ -1127,6 +1134,7 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) {
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

tc.SetHotRegionCacheHitsThreshold(0)
key, err := hex.DecodeString("")
Expand Down Expand Up @@ -1210,6 +1218,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) {
re.NoError(err)
hb := scheduler.(*hotScheduler)
hb.conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority}
hb.conf.SetHistorySampleDuration(0)
tc.SetHotRegionCacheHitsThreshold(0)

// Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0.
Expand Down Expand Up @@ -1334,6 +1343,7 @@ func TestHotReadRegionScheduleWithQuery(t *testing.T) {
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.RankFormulaVersion = "v1"
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 20)
Expand Down Expand Up @@ -1370,6 +1380,7 @@ func TestHotReadRegionScheduleWithKeyRate(t *testing.T) {
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority}
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 20)
Expand Down Expand Up @@ -1434,6 +1445,7 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim
hb.(*hotScheduler).conf.MinorDecRatio = 1
hb.(*hotScheduler).conf.DstToleranceRatio = 1
hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority}
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
old := pendingAmpFactor
pendingAmpFactor = 0.0
defer func() {
Expand Down Expand Up @@ -2011,7 +2023,6 @@ func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*stati

func TestInfluenceByRWType(t *testing.T) {
re := require.New(t)
statistics.HistorySampleDuration = 0
originValue := schedulePeerPr
defer func() {
schedulePeerPr = originValue
Expand All @@ -2025,6 +2036,7 @@ func TestInfluenceByRWType(t *testing.T) {
re.NoError(err)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
tc.SetHotRegionCacheHitsThreshold(0)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.AddRegionStore(1, 20)
Expand Down Expand Up @@ -2148,6 +2160,7 @@ func TestHotScheduleWithPriority(t *testing.T) {
re.NoError(err)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
// skip stddev check
origin := stddevThreshold
stddevThreshold = -1.0
Expand Down Expand Up @@ -2196,6 +2209,7 @@ func TestHotScheduleWithPriority(t *testing.T) {
addRegionInfo(tc, utils.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 2 * units.MiB, 2 * units.MiB, 0},
})
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority}
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
Expand All @@ -2209,6 +2223,7 @@ func TestHotScheduleWithPriority(t *testing.T) {
hb, err = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
hb.(*hotScheduler).conf.RankFormulaVersion = "v1"
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
re.NoError(err)

// assert loose store picking
Expand Down Expand Up @@ -2263,6 +2278,7 @@ func TestHotScheduleWithStddev(t *testing.T) {
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)
hb.(*hotScheduler).conf.StrictPickingStore = false
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

// skip uniform cluster
tc.UpdateStorageWrittenStats(1, 5*units.MiB*utils.StoreHeartBeatReportInterval, 5*units.MiB*utils.StoreHeartBeatReportInterval)
Expand Down Expand Up @@ -2314,6 +2330,7 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) {
re.NoError(err)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down
Loading
Loading