Skip to content
This repository has been archived by the owner on Sep 17, 2024. It is now read-only.

kv: add a rate limiter to control the maximum op request frequency #33

Merged
merged 1 commit into from
Mar 15, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions kv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
"syscall"
"time"

"golang.org/x/net/context"
"golang.org/x/time/rate"

mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"

Expand All @@ -55,6 +58,8 @@ var splits = flag.Int("splits", 0, "Number of splits to perform before starting

var tolerateErrors = flag.Bool("tolerate-errors", false, "Keep running on error")

var maxRate = flag.Float64("max-rate", 0, "Maximum frequency of operations (reads/writes). If 0, no limit.")

// outputInterval = interval at which information is output to console.
var outputInterval = flag.Duration("output-interval", 1*time.Second, "Interval of output")

Expand Down Expand Up @@ -185,11 +190,18 @@ func newBlocker(db database, seq *sequence) *blocker {
}

// run is an infinite loop in which the blocker continuously attempts to
// write blocks of random data into a table in cockroach DB.
func (b *blocker) run(errCh chan<- error, wg *sync.WaitGroup) {
// read / write blocks of random data into a table in cockroach DB.
func (b *blocker) run(errCh chan<- error, wg *sync.WaitGroup, limiter *rate.Limiter) {
defer wg.Done()

for {
// Limit how quickly the load generator sends requests based on --max-rate.
if limiter != nil {
if err := limiter.Wait(context.Background()); err != nil {
panic(err)
}
}

start := time.Now()
var err error
if b.gen.rand.Intn(100) < *readPercent {
Expand Down Expand Up @@ -503,6 +515,13 @@ func main() {
}
}

var limiter *rate.Limiter
if *maxRate > 0 {
// Create a limiter using maxRate specified on the command line and
// with allowed burst of 1 at the maximum allowed rate.
limiter = rate.NewLimiter(rate.Limit(*maxRate), 1)
}

lastNow := time.Now()
start := lastNow
var lastOps uint64
Expand All @@ -514,7 +533,7 @@ func main() {
for i := range writers {
wg.Add(1)
writers[i] = newBlocker(db.clone(), seq)
go writers[i].run(errCh, &wg)
go writers[i].run(errCh, &wg, limiter)
}

var numErr int
Expand Down