Skip to content
This repository has been archived by the owner on Sep 17, 2024. It is now read-only.

Commit

Permalink
kv: add a rate limiter to control the maximum op request frequency
Browse files Browse the repository at this point in the history
Specify `--max-rate=<frequency>` in order to activate a rate limiter.

Fixes #32
  • Loading branch information
spencerkimball committed Mar 15, 2017
1 parent e657564 commit cb590ed
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions kv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
"syscall"
"time"

"golang.org/x/net/context"
"golang.org/x/time/rate"

mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"

Expand All @@ -55,6 +58,8 @@ var splits = flag.Int("splits", 0, "Number of splits to perform before starting

var tolerateErrors = flag.Bool("tolerate-errors", false, "Keep running on error")

var maxRate = flag.Float64("max-rate", 0, "Maximum frequency of operations (reads/writes). If 0, no limit.")

// outputInterval = interval at which information is output to console.
var outputInterval = flag.Duration("output-interval", 1*time.Second, "Interval of output")

Expand Down Expand Up @@ -185,11 +190,18 @@ func newBlocker(db database, seq *sequence) *blocker {
}

// run is an infinite loop in which the blocker continuously attempts to
// write blocks of random data into a table in cockroach DB.
func (b *blocker) run(errCh chan<- error, wg *sync.WaitGroup) {
// read / write blocks of random data into a table in cockroach DB.
func (b *blocker) run(errCh chan<- error, wg *sync.WaitGroup, limiter *rate.Limiter) {
defer wg.Done()

for {
// Limit how quickly the load generator sends requests based on --max-rate.
if limiter != nil {
if err := limiter.Wait(context.Background()); err != nil {
panic(err)
}
}

start := time.Now()
var err error
if b.gen.rand.Intn(100) < *readPercent {
Expand Down Expand Up @@ -503,6 +515,13 @@ func main() {
}
}

var limiter *rate.Limiter
if *maxRate > 0 {
// Create a limiter using maxRate specified on the command line and
// with allowed burst of 1 at the maximum allowed rate.
limiter = rate.NewLimiter(rate.Limit(*maxRate), 1)
}

lastNow := time.Now()
start := lastNow
var lastOps uint64
Expand All @@ -514,7 +533,7 @@ func main() {
for i := range writers {
wg.Add(1)
writers[i] = newBlocker(db.clone(), seq)
go writers[i].run(errCh, &wg)
go writers[i].run(errCh, &wg, limiter)
}

var numErr int
Expand Down

0 comments on commit cb590ed

Please sign in to comment.