From 79ec29cee8db60538ad05196f49b8976fbc2ec4c Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Fri, 24 Mar 2023 11:34:43 +0800 Subject: [PATCH] resource_manager/client: Don't set token as `-1` in burstable mode (#6216) close tikv/pd#6209, ref tikv/pd#6209 Signed-off-by: Cabinfever_B --- .../resource_group/controller/controller.go | 39 +++++-- client/resource_group/controller/limiter.go | 28 +++-- .../resource_manager/resource_manager_test.go | 108 ++++++++++++++++++ 3 files changed, 155 insertions(+), 20 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index c1d565d93f5..2cd02fe7356 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -194,6 +194,10 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { stateUpdateTicker.Stop() stateUpdateTicker = time.NewTicker(200 * time.Millisecond) }) + failpoint.Inject("acceleratedReportingPeriod", func() { + stateUpdateTicker.Stop() + stateUpdateTicker = time.NewTicker(time.Millisecond * 100) + }) for { select { @@ -492,6 +496,8 @@ type groupCostController struct { } type tokenCounter struct { + getTokenBucketFunc func() *rmpb.TokenBucket + // avgRUPerSec is an exponentially-weighted moving average of the RU // consumption per second; used to estimate the RU requirements for the next // request. @@ -596,6 +602,9 @@ func (gc *groupCostController) initRunState() { limiter: limiter, avgRUPerSec: 0, avgLastTime: now, + getTokenBucketFunc: func() *rmpb.TokenBucket { + return getRUTokenBucketSetting(gc.ResourceGroup, typ) + }, } gc.run.requestUnitTokens[typ] = counter } @@ -609,6 +618,9 @@ func (gc *groupCostController) initRunState() { limiter: limiter, avgRUPerSec: 0, avgLastTime: now, + getTokenBucketFunc: func() *rmpb.TokenBucket { + return getRawResourceTokenBucketSetting(gc.ResourceGroup, typ) + }, } gc.run.resourceTokens[typ] = counter } @@ -723,6 +735,9 @@ func (gc *groupCostController) updateAvgRUPerSec() { func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool { deltaDuration := gc.run.now.Sub(counter.avgLastTime) + failpoint.Inject("acceleratedReportingPeriod", func() { + deltaDuration = 100 * time.Millisecond + }) delta := (new - counter.avgRUPerSecLastRU) / deltaDuration.Seconds() counter.avgRUPerSec = movingAvgFactor*counter.avgRUPerSec + (1-movingAvgFactor)*delta counter.avgLastTime = gc.run.now @@ -735,6 +750,9 @@ func (gc *groupCostController) shouldReportConsumption() bool { return true } timeSinceLastRequest := gc.run.now.Sub(gc.run.lastRequestTime) + failpoint.Inject("acceleratedReportingPeriod", func() { + timeSinceLastRequest = extendedReportingPeriodFactor * defaultTargetPeriod + }) if timeSinceLastRequest >= defaultTargetPeriod { if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod { return true @@ -798,9 +816,9 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() { counter.inDegradedMode = true initCounterNotify(counter) var cfg tokenBucketReconfigureArgs - fillRate := getRUTokenBucketSetting(gc.ResourceGroup, typ) - cfg.NewBurst = int64(fillRate.Settings.FillRate) - cfg.NewRate = float64(fillRate.Settings.FillRate) + fillRate := counter.getTokenBucketFunc().Settings.FillRate + cfg.NewBurst = int64(fillRate) + cfg.NewRate = float64(fillRate) failpoint.Inject("degradedModeRU", func() { cfg.NewRate = 99999999 }) @@ -810,15 +828,15 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() { } func (gc *groupCostController) applyBasicConfigForRawResourceTokenCounter() { - for typ, counter := range gc.run.resourceTokens { + for _, counter := range gc.run.resourceTokens { if !counter.limiter.IsLowTokens() { continue } initCounterNotify(counter) var cfg tokenBucketReconfigureArgs - fillRate := getRawResourceTokenBucketSetting(gc.ResourceGroup, typ) - cfg.NewBurst = int64(fillRate.Settings.FillRate) - cfg.NewRate = float64(fillRate.Settings.FillRate) + fillRate := counter.getTokenBucketFunc().Settings.FillRate + cfg.NewBurst = int64(fillRate) + cfg.NewRate = float64(fillRate) counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess()) } } @@ -843,10 +861,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket cfg.NewTokens = granted cfg.NewRate = float64(bucket.GetSettings().FillRate) counter.lastDeadline = time.Time{} - cfg.NotifyThreshold = math.Min(granted+counter.limiter.AvailableTokens(gc.run.now), counter.avgRUPerSec*float64(defaultTargetPeriod)) * notifyFraction - // In the non-trickle case, clients can be allowed to accumulate more tokens. - if cfg.NewBurst >= 0 { - cfg.NewBurst = 0 + cfg.NotifyThreshold = math.Min(granted+counter.limiter.AvailableTokens(gc.run.now), counter.avgRUPerSec*defaultTargetPeriod.Seconds()) * notifyFraction + if cfg.NewBurst < 0 { + cfg.NewTokens = float64(counter.getTokenBucketFunc().Settings.FillRate) } } else { // Otherwise the granted token is delivered to the client by fill rate. diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 36d2ff4d175..73da058a181 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -168,7 +168,7 @@ func (r *Reservation) CancelAt(now time.Time) { r.lim.mu.Lock() defer r.lim.mu.Unlock() - if r.lim.limit == Inf || r.tokens == 0 { + if r.tokens == 0 || r.lim.burst < 0 || r.lim.limit == Inf { return } // advance time to now @@ -217,7 +217,6 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) { lim.mu.Lock() defer lim.mu.Unlock() - lim.advance(now) lim.notifyThreshold = threshold } @@ -268,6 +267,9 @@ func (lim *Limiter) GetBurst() int64 { func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { lim.mu.Lock() defer lim.mu.Unlock() + if lim.burst < 0 || lim.limit == Inf { + return + } now, _, tokens := lim.advance(now) lim.last = now lim.tokens = tokens - amount @@ -296,10 +298,15 @@ func (lim *Limiter) Reconfigure(now time.Time, ) { lim.mu.Lock() defer lim.mu.Unlock() - log.Debug("[resource group controller] before reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold)) - now, _, tokens := lim.advance(now) - lim.last = now - lim.tokens = tokens + args.NewTokens + log.Debug("[resource group controller] before reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) + if args.NewBurst < 0 { + lim.last = now + lim.tokens = args.NewTokens + } else { + now, _, tokens := lim.advance(now) + lim.last = now + lim.tokens = tokens + args.NewTokens + } lim.limit = Limit(args.NewRate) lim.burst = args.NewBurst lim.notifyThreshold = args.NotifyThreshold @@ -307,7 +314,7 @@ func (lim *Limiter) Reconfigure(now time.Time, opt(lim) } lim.maybeNotify() - log.Debug("[resource group controller] after reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold)) + log.Debug("[resource group controller] after reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) } // AvailableTokens decreases the amount of tokens currently available. @@ -325,7 +332,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur lim.mu.Lock() defer lim.mu.Unlock() - if lim.limit == Inf || lim.burst < 0 { + if lim.burst < 0 || lim.limit == Inf { return Reservation{ ok: true, lim: lim, @@ -375,12 +382,15 @@ func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, if now.Before(last) { last = now } + if lim.burst < 0 { + return now, last, lim.tokens + } // Calculate the new number of tokens, due to time that passed. elapsed := now.Sub(last) delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta - if lim.burst != 0 { + if lim.burst > 0 { if burst := float64(lim.burst); tokens > burst { tokens = burst } diff --git a/tests/mcs/resource_manager/resource_manager_test.go b/tests/mcs/resource_manager/resource_manager_test.go index 01f92fa0913..143ea07e743 100644 --- a/tests/mcs/resource_manager/resource_manager_test.go +++ b/tests/mcs/resource_manager/resource_manager_test.go @@ -334,6 +334,114 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { controller.Stop() } +// TestSwitchBurst is used to test https://github.com/tikv/pd/issues/6209 +func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { + re := suite.Require() + cli := suite.client + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod", "return(true)")) + + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + + cfg := &controller.RequestUnitConfig{ + ReadBaseCost: 1, + ReadCostPerByte: 1, + WriteBaseCost: 1, + WriteCostPerByte: 1, + CPUMsCost: 1, + } + testCases := []struct { + resourceGroupName string + tcs []tokenConsumptionPerSecond + len int + }{ + { + resourceGroupName: suite.initGroups[0].Name, + len: 8, + tcs: []tokenConsumptionPerSecond{ + {rruTokensAtATime: 50, wruTokensAtATime: 20, times: 100, waitDuration: 0}, + {rruTokensAtATime: 50, wruTokensAtATime: 100, times: 100, waitDuration: 0}, + {rruTokensAtATime: 50, wruTokensAtATime: 100, times: 100, waitDuration: 0}, + {rruTokensAtATime: 20, wruTokensAtATime: 40, times: 250, waitDuration: 0}, + {rruTokensAtATime: 25, wruTokensAtATime: 50, times: 200, waitDuration: 0}, + {rruTokensAtATime: 30, wruTokensAtATime: 60, times: 165, waitDuration: 0}, + {rruTokensAtATime: 40, wruTokensAtATime: 80, times: 125, waitDuration: 0}, + {rruTokensAtATime: 50, wruTokensAtATime: 100, times: 100, waitDuration: 0}, + }, + }, + } + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) + controller.Start(suite.ctx) + resourceGroupName := suite.initGroups[1].Name + tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 2, times: 100, waitDuration: 0} + for j := 0; j < tcs.times; j++ { + rreq := tcs.makeReadRequest() + wreq := tcs.makeWriteRequest() + rres := tcs.makeReadResponse() + wres := tcs.makeWriteResponse() + _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + re.NoError(err) + _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + re.NoError(err) + controller.OnResponse(resourceGroupName, rreq, rres) + controller.OnResponse(resourceGroupName, wreq, wres) + } + time.Sleep(2 * time.Second) + cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{ + Name: "test2", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 20000, + BurstLimit: 20000, + }, + }, + }, + }) + time.Sleep(100 * time.Millisecond) + tricker := time.NewTicker(time.Second) + defer tricker.Stop() + i := 0 + for { + v := false + <-tricker.C + for _, cas := range testCases { + if i >= cas.len { + continue + } + v = true + sum := time.Duration(0) + for j := 0; j < cas.tcs[i].times; j++ { + rreq := cas.tcs[i].makeReadRequest() + wreq := cas.tcs[i].makeWriteRequest() + rres := cas.tcs[i].makeReadResponse() + wres := cas.tcs[i].makeWriteResponse() + startTime := time.Now() + _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + re.NoError(err) + _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + re.NoError(err) + sum += time.Since(startTime) + controller.OnResponse(resourceGroupName, rreq, rres) + controller.OnResponse(resourceGroupName, wreq, wres) + time.Sleep(1000 * time.Microsecond) + } + re.LessOrEqual(sum, buffDuration+cas.tcs[i].waitDuration) + } + i++ + if !v { + break + } + } + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod", "return(true)")) + suite.cleanupResourceGroups() + controller.Stop() +} + func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re := suite.Require() cli := suite.client