Skip to content

Commit

Permalink
resource_manager/client: Don't set token as -1 in burstable mode (#…
Browse files Browse the repository at this point in the history
…6216)

close #6209, ref #6209

Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB authored Mar 24, 2023
1 parent 19f7dd9 commit 79ec29c
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 20 deletions.
39 changes: 28 additions & 11 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(200 * time.Millisecond)
})
failpoint.Inject("acceleratedReportingPeriod", func() {
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
})

for {
select {
Expand Down Expand Up @@ -492,6 +496,8 @@ type groupCostController struct {
}

type tokenCounter struct {
getTokenBucketFunc func() *rmpb.TokenBucket

// avgRUPerSec is an exponentially-weighted moving average of the RU
// consumption per second; used to estimate the RU requirements for the next
// request.
Expand Down Expand Up @@ -596,6 +602,9 @@ func (gc *groupCostController) initRunState() {
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
getTokenBucketFunc: func() *rmpb.TokenBucket {
return getRUTokenBucketSetting(gc.ResourceGroup, typ)
},
}
gc.run.requestUnitTokens[typ] = counter
}
Expand All @@ -609,6 +618,9 @@ func (gc *groupCostController) initRunState() {
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
getTokenBucketFunc: func() *rmpb.TokenBucket {
return getRawResourceTokenBucketSetting(gc.ResourceGroup, typ)
},
}
gc.run.resourceTokens[typ] = counter
}
Expand Down Expand Up @@ -723,6 +735,9 @@ func (gc *groupCostController) updateAvgRUPerSec() {

func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool {
deltaDuration := gc.run.now.Sub(counter.avgLastTime)
failpoint.Inject("acceleratedReportingPeriod", func() {
deltaDuration = 100 * time.Millisecond
})
delta := (new - counter.avgRUPerSecLastRU) / deltaDuration.Seconds()
counter.avgRUPerSec = movingAvgFactor*counter.avgRUPerSec + (1-movingAvgFactor)*delta
counter.avgLastTime = gc.run.now
Expand All @@ -735,6 +750,9 @@ func (gc *groupCostController) shouldReportConsumption() bool {
return true
}
timeSinceLastRequest := gc.run.now.Sub(gc.run.lastRequestTime)
failpoint.Inject("acceleratedReportingPeriod", func() {
timeSinceLastRequest = extendedReportingPeriodFactor * defaultTargetPeriod
})
if timeSinceLastRequest >= defaultTargetPeriod {
if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod {
return true
Expand Down Expand Up @@ -798,9 +816,9 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() {
counter.inDegradedMode = true
initCounterNotify(counter)
var cfg tokenBucketReconfigureArgs
fillRate := getRUTokenBucketSetting(gc.ResourceGroup, typ)
cfg.NewBurst = int64(fillRate.Settings.FillRate)
cfg.NewRate = float64(fillRate.Settings.FillRate)
fillRate := counter.getTokenBucketFunc().Settings.FillRate
cfg.NewBurst = int64(fillRate)
cfg.NewRate = float64(fillRate)
failpoint.Inject("degradedModeRU", func() {
cfg.NewRate = 99999999
})
Expand All @@ -810,15 +828,15 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() {
}

func (gc *groupCostController) applyBasicConfigForRawResourceTokenCounter() {
for typ, counter := range gc.run.resourceTokens {
for _, counter := range gc.run.resourceTokens {
if !counter.limiter.IsLowTokens() {
continue
}
initCounterNotify(counter)
var cfg tokenBucketReconfigureArgs
fillRate := getRawResourceTokenBucketSetting(gc.ResourceGroup, typ)
cfg.NewBurst = int64(fillRate.Settings.FillRate)
cfg.NewRate = float64(fillRate.Settings.FillRate)
fillRate := counter.getTokenBucketFunc().Settings.FillRate
cfg.NewBurst = int64(fillRate)
cfg.NewRate = float64(fillRate)
counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess())
}
}
Expand All @@ -843,10 +861,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
cfg.NewTokens = granted
cfg.NewRate = float64(bucket.GetSettings().FillRate)
counter.lastDeadline = time.Time{}
cfg.NotifyThreshold = math.Min(granted+counter.limiter.AvailableTokens(gc.run.now), counter.avgRUPerSec*float64(defaultTargetPeriod)) * notifyFraction
// In the non-trickle case, clients can be allowed to accumulate more tokens.
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
cfg.NotifyThreshold = math.Min(granted+counter.limiter.AvailableTokens(gc.run.now), counter.avgRUPerSec*defaultTargetPeriod.Seconds()) * notifyFraction
if cfg.NewBurst < 0 {
cfg.NewTokens = float64(counter.getTokenBucketFunc().Settings.FillRate)
}
} else {
// Otherwise the granted token is delivered to the client by fill rate.
Expand Down
28 changes: 19 additions & 9 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *Reservation) CancelAt(now time.Time) {
r.lim.mu.Lock()
defer r.lim.mu.Unlock()

if r.lim.limit == Inf || r.tokens == 0 {
if r.tokens == 0 || r.lim.burst < 0 || r.lim.limit == Inf {
return
}
// advance time to now
Expand Down Expand Up @@ -217,7 +217,6 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now
func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.advance(now)
lim.notifyThreshold = threshold
}

Expand Down Expand Up @@ -268,6 +267,9 @@ func (lim *Limiter) GetBurst() int64 {
func (lim *Limiter) RemoveTokens(now time.Time, amount float64) {
lim.mu.Lock()
defer lim.mu.Unlock()
if lim.burst < 0 || lim.limit == Inf {
return
}
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens - amount
Expand Down Expand Up @@ -296,18 +298,23 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
log.Debug("[resource group controller] before reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold))
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens + args.NewTokens
log.Debug("[resource group controller] before reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
} else {
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens + args.NewTokens
}
lim.limit = Limit(args.NewRate)
lim.burst = args.NewBurst
lim.notifyThreshold = args.NotifyThreshold
for _, opt := range opts {
opt(lim)
}
lim.maybeNotify()
log.Debug("[resource group controller] after reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold))
log.Debug("[resource group controller] after reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
}

// AvailableTokens decreases the amount of tokens currently available.
Expand All @@ -325,7 +332,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
lim.mu.Lock()
defer lim.mu.Unlock()

if lim.limit == Inf || lim.burst < 0 {
if lim.burst < 0 || lim.limit == Inf {
return Reservation{
ok: true,
lim: lim,
Expand Down Expand Up @@ -375,12 +382,15 @@ func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time,
if now.Before(last) {
last = now
}
if lim.burst < 0 {
return now, last, lim.tokens
}

// Calculate the new number of tokens, due to time that passed.
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if lim.burst != 0 {
if lim.burst > 0 {
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
Expand Down
108 changes: 108 additions & 0 deletions tests/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,114 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
controller.Stop()
}

// TestSwitchBurst is used to test https://github.com/tikv/pd/issues/6209
func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
re := suite.Require()
cli := suite.client
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod", "return(true)"))

for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}

cfg := &controller.RequestUnitConfig{
ReadBaseCost: 1,
ReadCostPerByte: 1,
WriteBaseCost: 1,
WriteCostPerByte: 1,
CPUMsCost: 1,
}
testCases := []struct {
resourceGroupName string
tcs []tokenConsumptionPerSecond
len int
}{
{
resourceGroupName: suite.initGroups[0].Name,
len: 8,
tcs: []tokenConsumptionPerSecond{
{rruTokensAtATime: 50, wruTokensAtATime: 20, times: 100, waitDuration: 0},
{rruTokensAtATime: 50, wruTokensAtATime: 100, times: 100, waitDuration: 0},
{rruTokensAtATime: 50, wruTokensAtATime: 100, times: 100, waitDuration: 0},
{rruTokensAtATime: 20, wruTokensAtATime: 40, times: 250, waitDuration: 0},
{rruTokensAtATime: 25, wruTokensAtATime: 50, times: 200, waitDuration: 0},
{rruTokensAtATime: 30, wruTokensAtATime: 60, times: 165, waitDuration: 0},
{rruTokensAtATime: 40, wruTokensAtATime: 80, times: 125, waitDuration: 0},
{rruTokensAtATime: 50, wruTokensAtATime: 100, times: 100, waitDuration: 0},
},
},
}
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace())
controller.Start(suite.ctx)
resourceGroupName := suite.initGroups[1].Name
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 2, times: 100, waitDuration: 0}
for j := 0; j < tcs.times; j++ {
rreq := tcs.makeReadRequest()
wreq := tcs.makeWriteRequest()
rres := tcs.makeReadResponse()
wres := tcs.makeWriteResponse()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
controller.OnResponse(resourceGroupName, rreq, rres)
controller.OnResponse(resourceGroupName, wreq, wres)
}
time.Sleep(2 * time.Second)
cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{
Name: "test2",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 20000,
BurstLimit: 20000,
},
},
},
})
time.Sleep(100 * time.Millisecond)
tricker := time.NewTicker(time.Second)
defer tricker.Stop()
i := 0
for {
v := false
<-tricker.C
for _, cas := range testCases {
if i >= cas.len {
continue
}
v = true
sum := time.Duration(0)
for j := 0; j < cas.tcs[i].times; j++ {
rreq := cas.tcs[i].makeReadRequest()
wreq := cas.tcs[i].makeWriteRequest()
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(resourceGroupName, rreq, rres)
controller.OnResponse(resourceGroupName, wreq, wres)
time.Sleep(1000 * time.Microsecond)
}
re.LessOrEqual(sum, buffDuration+cas.tcs[i].waitDuration)
}
i++
if !v {
break
}
}
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod", "return(true)"))
suite.cleanupResourceGroups()
controller.Stop()
}

func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re := suite.Require()
cli := suite.client
Expand Down

0 comments on commit 79ec29c

Please sign in to comment.