Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
31155: workload/ycsb: add support for uniform load distribution r=m-schneider a=m-schneider

Previously we could only run YCSB with a zipfian distribution, now we'll
be able to use a default uniform distribution.

Closes cockroachdb#30996

Release note: None

Co-authored-by: Masha Schneider <[email protected]>
  • Loading branch information
craig[bot] and Masha Schneider committed Oct 15, 2018
2 parents 55cdc51 + 0542369 commit 88c8744
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 8 deletions.
67 changes: 67 additions & 0 deletions pkg/workload/ycsb/uniform_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package ycsb

import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"math/rand"
)

// UniformGenerator is a random number generator that generates draws from a
// uniform distribution.
type UniformGenerator struct {
mu struct {
syncutil.Mutex
r *rand.Rand
sequence uint64
}
}

// NewUniformGenerator constructs a new UniformGenerator with the given parameters.
// It returns an error if the parameters are outside the accepted range.
func NewUniformGenerator(rng *rand.Rand, minInsertRow uint64) (*UniformGenerator, error) {

z := UniformGenerator{}
z.mu.r = rng
z.mu.sequence = minInsertRow

return &z, nil
}

// IMaxHead returns the current value of IMaxHead, without incrementing.
func (z *UniformGenerator) IMaxHead() uint64 {
z.mu.Lock()
max := z.mu.sequence
z.mu.Unlock()
return max
}

// IncrementIMax increments the sequence number.
func (z *UniformGenerator) IncrementIMax() error {
z.mu.Lock()
z.mu.sequence++
z.mu.Unlock()
return nil
}

// Uint64 returns a random Uint64 between min and sequence, drawn from a uniform
// distribution.
func (z *UniformGenerator) Uint64() uint64 {
z.mu.Lock()
result := rand.Uint64() % z.mu.sequence
z.mu.Unlock()
return result
}
37 changes: 29 additions & 8 deletions pkg/workload/ycsb/ycsb.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type ycsb struct {
splits int

workload string
distribution string
readFreq, insertFreq, updateFreq, scanFreq float32
}

Expand All @@ -90,6 +91,8 @@ var ycsbMeta = workload.Meta{
g.flags.BoolVar(&g.json, `json`, false, `Use JSONB rather than relational data`)
g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations`)
g.flags.StringVar(&g.workload, `workload`, `B`, `Workload type. Choose from A-F.`)
g.flags.StringVar(&g.distribution, `request-distribution`, `zipfian`, `Distribution for random number generator [zipfian, uniform].`)

// TODO(dan): g.flags.Uint64Var(&g.maxWrites, `max-writes`,
// 7*24*3600*1500, // 7 days at 5% writes and 30k ops/s
// `Maximum number of writes to perform before halting. This is required for `+
Expand Down Expand Up @@ -232,8 +235,18 @@ func (g *ycsb) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Que
}

zipfRng := rand.New(rand.NewSource(g.seed))
zipfR, err := NewZipfGenerator(
zipfRng, zipfIMin, defaultIMax, defaultTheta, false /* verbose */)
var randGen randGenerator

switch strings.ToLower(g.distribution) {
case "zipfian":
randGen, err = NewZipfGenerator(
zipfRng, zipfIMin, defaultIMax, defaultTheta, false /* verbose */)
case "uniform":
randGen, err = NewUniformGenerator(zipfRng, uint64(g.initialRows))
default:
return workload.QueryLoad{}, errors.Errorf("Unknown distribution: %s", g.distribution)
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
for i := 0; i < g.connFlags.Concurrency; i++ {
rng := rand.New(rand.NewSource(g.seed + int64(i)))
Expand All @@ -247,7 +260,7 @@ func (g *ycsb) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Que
readStmt: readStmt,
insertStmt: insertStmt,
updateStmts: updateStmts,
zipfR: zipfR,
randGen: randGen,
rng: rng,
hashFunc: fnv.New64(),
}
Expand All @@ -256,6 +269,12 @@ func (g *ycsb) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Que
return ql, nil
}

type randGenerator interface {
Uint64() uint64
IMaxHead() uint64
IncrementIMax() error
}

type ycsbWorker struct {
config *ycsb
hists *workload.Histograms
Expand All @@ -266,8 +285,8 @@ type ycsbWorker struct {
// be parametrized. In JSON mode it's a single statement.
updateStmts []*gosql.Stmt

zipfR *ZipfGenerator // used to generate random keys
rng *rand.Rand // used to generate random strings for the values
randGen randGenerator // used to generate random keys
rng *rand.Rand // used to generate random strings for the values
hashFunc hash.Hash64
hashBuf [8]byte
}
Expand Down Expand Up @@ -333,15 +352,17 @@ func (yw *ycsbWorker) nextReadKey() string {
// for the number of rows growing over time. See the YCSB paper/code for how
// this should work. (Basically repeatedly drawing from the distribution until
// a sufficiently low value is chosen, but with some complications.)
rownum := yw.hashKey(yw.zipfR.Uint64()) % uint64(yw.config.initialRows)

// TODO(arjun): Look into why this was being hashed twice before.
rownum := yw.randGen.Uint64() % uint64(yw.config.initialRows)
return yw.buildKeyName(rownum)
}

func (yw *ycsbWorker) nextInsertKey() string {
// TODO: This logic is no longer valid now that we are using a large YCSB
// distribution and modding the samples. To properly support INSERTS, we need
// to maintain a separate rownum counter.
rownum := yw.zipfR.IMaxHead()
rownum := yw.randGen.IMaxHead()
return yw.buildKeyName(rownum)
}

Expand All @@ -367,7 +388,7 @@ func (yw *ycsbWorker) insertRow(ctx context.Context, key string, increment bool)
}

if increment {
if err := yw.zipfR.IncrementIMax(); err != nil {
if err := yw.randGen.IncrementIMax(); err != nil {
return err
}
}
Expand Down

0 comments on commit 88c8744

Please sign in to comment.