From cb590edec216433032974bbc6616e399f1e51563 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Wed, 15 Mar 2017 11:00:03 -0400 Subject: [PATCH] kv: add a rate limiter to control the maximum op request frequency Specify `--max-rate=` in order to activate a rate limiter. Fixes #32 --- kv/main.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/kv/main.go b/kv/main.go index 9ba171c..370e6c5 100644 --- a/kv/main.go +++ b/kv/main.go @@ -34,6 +34,9 @@ import ( "syscall" "time" + "golang.org/x/net/context" + "golang.org/x/time/rate" + mgo "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" @@ -55,6 +58,8 @@ var splits = flag.Int("splits", 0, "Number of splits to perform before starting var tolerateErrors = flag.Bool("tolerate-errors", false, "Keep running on error") +var maxRate = flag.Float64("max-rate", 0, "Maximum frequency of operations (reads/writes). If 0, no limit.") + // outputInterval = interval at which information is output to console. var outputInterval = flag.Duration("output-interval", 1*time.Second, "Interval of output") @@ -185,11 +190,18 @@ func newBlocker(db database, seq *sequence) *blocker { } // run is an infinite loop in which the blocker continuously attempts to -// write blocks of random data into a table in cockroach DB. -func (b *blocker) run(errCh chan<- error, wg *sync.WaitGroup) { +// read / write blocks of random data into a table in cockroach DB. +func (b *blocker) run(errCh chan<- error, wg *sync.WaitGroup, limiter *rate.Limiter) { defer wg.Done() for { + // Limit how quickly the load generator sends requests based on --max-rate. + if limiter != nil { + if err := limiter.Wait(context.Background()); err != nil { + panic(err) + } + } + start := time.Now() var err error if b.gen.rand.Intn(100) < *readPercent { @@ -503,6 +515,13 @@ func main() { } } + var limiter *rate.Limiter + if *maxRate > 0 { + // Create a limiter using maxRate specified on the command line and + // with allowed burst of 1 at the maximum allowed rate. + limiter = rate.NewLimiter(rate.Limit(*maxRate), 1) + } + lastNow := time.Now() start := lastNow var lastOps uint64 @@ -514,7 +533,7 @@ func main() { for i := range writers { wg.Add(1) writers[i] = newBlocker(db.clone(), seq) - go writers[i].run(errCh, &wg) + go writers[i].run(errCh, &wg, limiter) } var numErr int