Skip to content

Commit

Permalink
workload: use different random seeds by default
Browse files Browse the repository at this point in the history
This changes most of the workloads that take a `--seed` command line
flag so that they use a different, randomly generated seed on each
run.  Previously, they would all default to `1`, making every run of
the workload produce the same data and operations. That behavior is
good for reproducing a specific pattern or behavior, but workloads
that have a random element should exploit that randomness by
default.

When a workload is invoked, a random seed is generated and
logged. Users are still able to specify their own seeds by passing the
`--seed` flag.

Resolves #88566.

Release note (cli change): workloads that take a `--seed` argument
used to default to `1`. Now, they use a randomly generated seed in
each run. Users can still pass a custom seed with the `--seed` flag.
  • Loading branch information
renatolabs committed Feb 28, 2023
1 parent d5a076f commit 14a3746
Show file tree
Hide file tree
Showing 24 changed files with 255 additions and 82 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ func TestAvroLedger(t *testing.T) {

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
// assertions depend on this seed
ledger.RandomSeed.Set(1)
gen := ledger.FromFlags(`--customers=1`)
var l workloadsql.InsertsDataLoader
_, err := workloadsql.Setup(ctx, s.DB, gen, l)
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/workloadccl/allccl/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ func TestDeterministicInitialData(t *testing.T) {
continue
}
t.Run(meta.Name, func(t *testing.T) {
// assertions depend on this seed
switch rs := meta.RandomSeed.(type) {
case *workload.Int64RandomSeed:
rs.Set(1)
case *workload.Uint64RandomSeed:
rs.Set(1)
}

if bigInitialData(meta) {
skip.UnderShort(t, fmt.Sprintf(`%s involves a lot of data`, meta.Name))
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/cmd/roachtest/tests/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/cockroachdb/errors"
)

// we need to hardcode the random seed when loading fixtures since
// that references a static bucket in GCS
const fixturesRandomSeed = 1

func registerCopy(r registry.Registry) {
// This test imports a fully-populated Bank table. It then creates an empty
// Bank schema. Finally, it performs a series of `INSERT ... SELECT ...`
Expand Down Expand Up @@ -60,8 +64,8 @@ func registerCopy(r registry.Registry) {

t.Status("importing Bank fixture")
c.Run(ctx, c.Node(1), fmt.Sprintf(
"./workload fixtures load bank --rows=%d --payload-bytes=%d {pgurl:1}",
rows, payload))
"./workload fixtures load bank --rows=%d --payload-bytes=%d --seed %d {pgurl:1}",
rows, payload, fixturesRandomSeed))
if _, err := db.Exec("ALTER TABLE bank.bank RENAME TO bank.bank_orig"); err != nil {
t.Fatalf("failed to rename table: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/workload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"csv.go",
"driver.go",
"pgx_helpers.go",
"random.go",
"round_robin.go",
"sql_runner.go",
"stats.go",
Expand All @@ -22,6 +23,7 @@ go_library(
"//pkg/util/bufalloc",
"//pkg/util/encoding/csv",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/workload/histogram",
Expand Down
11 changes: 7 additions & 4 deletions pkg/workload/bank/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ const (
maxTransfer = 999
)

// RandomSeed is the bank random seed.
var RandomSeed = workload.NewUint64RandomSeed()

type bank struct {
flags workload.Flags
connFlags *workload.ConnFlags

seed uint64
rows, batchSize int
payloadBytes, ranges int
}
Expand All @@ -60,17 +62,18 @@ var bankMeta = workload.Meta{
Description: `Bank models a set of accounts with currency balances`,
Version: `1.0.0`,
PublicFacing: true,
RandomSeed: RandomSeed,
New: func() workload.Generator {
g := &bank{}
g.flags.FlagSet = pflag.NewFlagSet(`bank`, pflag.ContinueOnError)
g.flags.Meta = map[string]workload.FlagMeta{
`batch-size`: {RuntimeOnly: true},
}
g.flags.Uint64Var(&g.seed, `seed`, 1, `Key hash seed.`)
g.flags.IntVar(&g.rows, `rows`, defaultRows, `Initial number of accounts in bank table.`)
g.flags.IntVar(&g.batchSize, `batch-size`, defaultBatchSize, `Number of rows in each batch of initial data.`)
g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each initial row.`)
g.flags.IntVar(&g.ranges, `ranges`, defaultRanges, `Initial number of ranges in bank table.`)
RandomSeed.AddFlag(&g.flags)
g.connFlags = workload.NewConnFlags(&g.flags)
return g
},
Expand Down Expand Up @@ -138,7 +141,7 @@ func (b *bank) Tables() []workload.Table {
InitialRows: workload.BatchedTuples{
NumBatches: numBatches,
FillBatch: func(batchIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) {
rng := rand.NewSource(b.seed + uint64(batchIdx))
rng := rand.NewSource(RandomSeed.Seed() + uint64(batchIdx))

rowBegin, rowEnd := batchIdx*b.batchSize, (batchIdx+1)*b.batchSize
if rowEnd > b.rows {
Expand Down Expand Up @@ -204,7 +207,7 @@ func (b *bank) Ops(

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
for i := 0; i < b.connFlags.Concurrency; i++ {
rng := rand.New(rand.NewSource(b.seed))
rng := rand.New(rand.NewSource(RandomSeed.Seed()))
hists := reg.GetHandle()
workerFn := func(ctx context.Context) error {
from := rng.Intn(b.rows)
Expand Down
11 changes: 7 additions & 4 deletions pkg/workload/bulkingest/bulkingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ const (
defaultPayloadBytes = 100
)

// RandomSeed is the bulkingest workload random seed.
var RandomSeed = workload.NewInt64RandomSeed()

type bulkingest struct {
flags workload.Flags
connFlags *workload.ConnFlags

seed int64
aCount, bCount, cCount, payloadBytes int

generateBsFirst bool
Expand All @@ -94,16 +96,17 @@ var bulkingestMeta = workload.Meta{
Name: `bulkingest`,
Description: `bulkingest testdata is designed to produce a skewed distribution of KVs when ingested (in initial import or during later indexing)`,
Version: `1.0.0`,
RandomSeed: RandomSeed,
New: func() workload.Generator {
g := &bulkingest{}
g.flags.FlagSet = pflag.NewFlagSet(`bulkingest`, pflag.ContinueOnError)
g.flags.Int64Var(&g.seed, `seed`, 1, `Key hash seed.`)
g.flags.IntVar(&g.aCount, `a`, 10, `number of values of A (i.e. pk prefix)`)
g.flags.IntVar(&g.bCount, `b`, 10, `number of values of B (i.e. idx prefix)`)
g.flags.IntVar(&g.cCount, `c`, 1000, `number of values of C (i.e. rows per A/B pair)`)
g.flags.BoolVar(&g.generateBsFirst, `batches-by-b`, false, `generate all B batches for given A first`)
g.flags.BoolVar(&g.indexBCA, `index-b-c-a`, true, `include an index on (B, C, A)`)
g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each row.`)
RandomSeed.AddFlag(&g.flags)
g.connFlags = workload.NewConnFlags(&g.flags)
return g
},
Expand Down Expand Up @@ -154,7 +157,7 @@ func (w *bulkingest) Tables() []workload.Table {
cCol := cb.ColVec(2).Int64()
payloadCol := cb.ColVec(3).Bytes()

rng := rand.New(rand.NewSource(w.seed + int64(ab)))
rng := rand.New(rand.NewSource(RandomSeed.Seed() + int64(ab)))
var payload []byte
*alloc, payload = alloc.Alloc(w.cCount*w.payloadBytes, 0 /* extraCap */)
randutil.ReadTestdataBytes(rng, payload)
Expand Down Expand Up @@ -200,7 +203,7 @@ func (w *bulkingest) Ops(

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
for i := 0; i < w.connFlags.Concurrency; i++ {
rng := rand.New(rand.NewSource(w.seed))
rng := rand.New(rand.NewSource(RandomSeed.Seed()))
hists := reg.GetHandle()
pad := make([]byte, w.payloadBytes)
workerFn := func(ctx context.Context) error {
Expand Down
10 changes: 10 additions & 0 deletions pkg/workload/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func runInit(gen workload.Generator, urls []string, dbName string) error {
}

startPProfEndPoint(ctx)
maybeLogRandomSeed(ctx, gen)
return runInitImpl(ctx, gen, initDB, dbName)
}

Expand Down Expand Up @@ -404,6 +405,7 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
limiter = rate.NewLimiter(rate.Limit(*maxRate), 1)
}

maybeLogRandomSeed(ctx, gen)
o, ok := gen.(workload.Opser)
if !ok {
return errors.Errorf(`no operations defined for %s`, gen.Meta().Name)
Expand Down Expand Up @@ -612,3 +614,11 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
}
}
}

// maybeLogRandomSeed will log the random seed used by the generator,
// if a seed is being used.
func maybeLogRandomSeed(ctx context.Context, gen workload.Generator) {
if randomSeed := gen.Meta().RandomSeed; randomSeed != nil {
log.Infof(ctx, "%s", randomSeed.LogMessage())
}
}
4 changes: 4 additions & 0 deletions pkg/workload/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestHandleCSV(t *testing.T) {
},
}

// assertions depend on this seed
bank.RandomSeed.Set(1)
meta := bank.FromRows(0).Meta()
for _, test := range tests {
t.Run(test.params, func(t *testing.T) {
Expand Down Expand Up @@ -114,6 +116,8 @@ func BenchmarkWriteCSVRows(b *testing.B) {
func TestCSVRowsReader(t *testing.T) {
defer leaktest.AfterTest(t)()

// assertions depend on this seed
bank.RandomSeed.Set(1)
table := bank.FromRows(10).Tables()[0]
r := workload.NewCSVRowsReader(table, 1, 3)
b, err := io.ReadAll(r)
Expand Down
9 changes: 6 additions & 3 deletions pkg/workload/indexes/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ const (
payload BYTES NOT NULL`
)

// RandomSeed is the indexes workload random seed.
var RandomSeed = workload.NewInt64RandomSeed()

type indexes struct {
flags workload.Flags
connFlags *workload.ConnFlags

seed int64
idxs int
unique bool
payload int
Expand All @@ -62,15 +64,16 @@ var indexesMeta = workload.Meta{
Name: `indexes`,
Description: `Indexes writes to a table with a variable number of secondary indexes`,
Version: `1.0.0`,
RandomSeed: RandomSeed,
New: func() workload.Generator {
g := &indexes{}
g.flags.FlagSet = pflag.NewFlagSet(`indexes`, pflag.ContinueOnError)
g.flags.Int64Var(&g.seed, `seed`, 1, `Key hash seed.`)
g.flags.IntVar(&g.idxs, `secondary-indexes`, 1, `Number of indexes to add to the table.`)
g.flags.BoolVar(&g.unique, `unique-indexes`, false, `Use UNIQUE secondary indexes.`)
g.flags.IntVar(&g.payload, `payload`, 64, `Size of the unindexed payload column.`)
g.flags.Uint64Var(&g.cycleLength, `cycle-length`, math.MaxUint64,
`Number of keys repeatedly accessed by each writer through upserts.`)
RandomSeed.AddFlag(&g.flags)
g.connFlags = workload.NewConnFlags(&g.flags)
return g
},
Expand Down Expand Up @@ -170,7 +173,7 @@ func (w *indexes) Ops(
op := &indexesOp{
config: w,
hists: reg.GetHandle(),
rand: rand.New(rand.NewSource(int64((i + 1)) * w.seed)),
rand: rand.New(rand.NewSource(int64((i + 1)) * RandomSeed.Seed())),
buf: make([]byte, w.payload),
}
op.stmt = op.sr.Define(stmt)
Expand Down
13 changes: 8 additions & 5 deletions pkg/workload/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ const (

var tableNames = []string{tableNameA, tableNameB}

// RandomSeed is the insights workload random seed.
var RandomSeed = workload.NewUint64RandomSeed()

type insights struct {
flags workload.Flags
connFlags *workload.ConnFlags

seed uint64
rowCount, batchSize int
payloadBytes, ranges int
}
Expand All @@ -66,18 +68,19 @@ var insightsMeta = workload.Meta{
Name: `insights`,
Description: `This workload executes queries that will be detected by insights`,
Version: `1.0.0`,
RandomSeed: RandomSeed,
PublicFacing: false,
New: func() workload.Generator {
g := &insights{}
g.flags.FlagSet = pflag.NewFlagSet(`insights`, pflag.ContinueOnError)
g.flags.Meta = map[string]workload.FlagMeta{
`batch-size`: {RuntimeOnly: true},
}
g.flags.Uint64Var(&g.seed, `seed`, 1, `Key hash seed.`)
g.flags.IntVar(&g.rowCount, `rows`, defaultRows, `Initial number of accounts in insights table.`)
g.flags.IntVar(&g.batchSize, `batch-size`, defaultBatchSize, `Number of rows in each batch of initial data.`)
g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each initial row.`)
g.flags.IntVar(&g.ranges, `ranges`, defaultRanges, `Initial number of ranges in insights table.`)
RandomSeed.AddFlag(&g.flags)
g.connFlags = workload.NewConnFlags(&g.flags)
return g
},
Expand Down Expand Up @@ -150,7 +153,7 @@ func (b *insights) Tables() []workload.Table {
InitialRows: workload.BatchedTuples{
NumBatches: numBatches,
FillBatch: func(batchIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) {
rng := rand.NewSource(b.seed + uint64(batchIdx))
rng := rand.NewSource(RandomSeed.Seed() + uint64(batchIdx))

rowBegin, rowEnd := batchIdx*b.batchSize, (batchIdx+1)*b.batchSize
if rowEnd > b.rowCount {
Expand Down Expand Up @@ -207,7 +210,7 @@ func (b *insights) Ops(
db.SetMaxIdleConns(b.connFlags.Concurrency + 1)

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
rng := rand.New(rand.NewSource(b.seed))
rng := rand.New(rand.NewSource(RandomSeed.Seed()))
for i := 0; i < b.connFlags.Concurrency; i++ {
temp := i
hists := reg.GetHandle()
Expand Down Expand Up @@ -273,7 +276,7 @@ func joinOnNonIndexColumn(ctx context.Context, db *gosql.DB) error {
func orderByOnNonIndexColumn(ctx context.Context, db *gosql.DB, rowCount int) error {
rowLimit := (rand.Uint32() % uint32(rowCount)) + 1
_, err := db.ExecContext(ctx, `
select balance
select balance
from insights_workload_table_a order by balance desc limit $1;`, rowLimit)
return err
}
Expand Down
Loading

0 comments on commit 14a3746

Please sign in to comment.