Skip to content

Commit

Permalink
feat(gemini): make schema seed and workload seed random
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Kropachev committed Jul 13, 2023
1 parent 0b6a06b commit cb5d494
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 31 deletions.
16 changes: 11 additions & 5 deletions cmd/gemini/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,29 @@ import (
func createGenerators(
schema *typedef.Schema,
schemaConfig typedef.SchemaConfig,
distributionFunc generators.DistributionFunc,
_, distributionSize uint64,
logger *zap.Logger,
) generators.Generators {
) (generators.Generators, error) {
partitionRangeConfig := schemaConfig.GetPartitionRangeConfig()

var gs []*generators.Generator
for _, table := range schema.Tables {
for id := range schema.Tables {
table := schema.Tables[id]

distFunc, err := createDistributionFunc(partitionKeyDistribution, partitionCount, seed, stdDistMean, oneStdDev)
if err != nil {
return nil, err
}

gCfg := &generators.Config{
PartitionsRangeConfig: partitionRangeConfig,
PartitionsCount: distributionSize,
PartitionsDistributionFunc: distributionFunc,
PartitionsDistributionFunc: distFunc,
Seed: seed,
PkUsedBufferSize: pkBufferReuseSize,
}
g := generators.NewGenerator(table, gCfg, logger.Named("generators"))
gs = append(gs, g)
}
return gs
return gs, nil
}
27 changes: 19 additions & 8 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"encoding/binary"
"encoding/json"
"fmt"
"log"
Expand All @@ -39,6 +40,8 @@ import (
"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/stop"

crand "crypto/rand"

"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"
"github.com/pkg/errors"
Expand Down Expand Up @@ -178,10 +181,6 @@ func run(_ *cobra.Command, _ []string) error {
if err = printSetup(); err != nil {
return errors.Wrapf(err, "unable to print setup")
}
distFunc, err := createDistributionFunc(partitionKeyDistribution, math.MaxUint64, seed, stdDistMean, oneStdDev)
if err != nil {
return err
}

outFile, err := createFile(outFileArg, os.Stdout)
if err != nil {
Expand Down Expand Up @@ -259,9 +258,12 @@ func run(_ *cobra.Command, _ []string) error {
stopFlag := stop.NewFlag("main")
warmupStopFlag := stop.NewFlag("warmup")
stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag)
pump := jobs.NewPump(ctx, logger)
pump := jobs.NewPump(stopFlag, logger)

gens := createGenerators(schema, schemaConfig, distFunc, concurrency, partitionCount, logger)
gens, err := createGenerators(schema, schemaConfig, concurrency, partitionCount, logger)
if err != nil {
return err
}
gens.StartAll(stopFlag)

if !nonInteractive {
Expand Down Expand Up @@ -457,8 +459,8 @@ func init() {
rootCmd.Flags().StringVarP(&schemaFile, "schema", "", "", "Schema JSON config file")
rootCmd.Flags().StringVarP(&mode, "mode", "m", jobs.MixedMode, "Query operation mode. Mode options: write, read, mixed (default)")
rootCmd.Flags().Uint64VarP(&concurrency, "concurrency", "c", 10, "Number of threads per table to run concurrently")
rootCmd.Flags().Uint64VarP(&seed, "seed", "s", 1, "Statement seed value")
rootCmd.Flags().Uint64VarP(&schemaSeed, "schema-seed", "", 1, "Schema seed value")
rootCmd.Flags().Uint64VarP(&seed, "seed", "s", RealRandom(), "Statement seed value")
rootCmd.Flags().Uint64VarP(&schemaSeed, "schema-seed", "", RealRandom(), "Schema seed value")
rootCmd.Flags().BoolVarP(&dropSchema, "drop-schema", "d", false, "Drop schema before starting tests run")
rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output during test run")
rootCmd.Flags().BoolVarP(&failFast, "fail-fast", "f", false, "Stop on the first failure")
Expand Down Expand Up @@ -540,3 +542,12 @@ func printSetup() error {
tw.Flush()
return nil
}

func RealRandom() uint64 {
var b [8]byte
_, err := crand.Read(b[:])
if err != nil {
return uint64(time.Now().Nanosecond() * time.Now().Second())
}
return binary.LittleEndian.Uint64(b[:])
}
9 changes: 5 additions & 4 deletions pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type Generator struct {
partitions Partitions
partitionsConfig typedef.PartitionRangeConfig
partitionCount uint64
seed uint64

cntCreated uint64
cntEmitted uint64
Expand Down Expand Up @@ -87,7 +86,6 @@ func NewGenerator(table *typedef.Table, config *Config, logger *zap.Logger) *Gen
partitionCount: config.PartitionsCount,
table: table,
partitionsConfig: config.PartitionsRangeConfig,
seed: config.Seed,
idxFunc: config.PartitionsDistributionFunc,
logger: logger,
wakeUpSignal: wakeUpSignal,
Expand Down Expand Up @@ -121,7 +119,7 @@ func (g *Generator) Start(stopFlag *stop.Flag) {
g.logger.Info("starting partition key generation loop")
defer g.partitions.CloseAll()
for {
g.fillAllPartitions()
g.fillAllPartitions(stopFlag)
select {
case <-stopFlag.SignalChannel():
g.logger.Debug("stopping partition key generation loop",
Expand All @@ -137,7 +135,7 @@ func (g *Generator) Start(stopFlag *stop.Flag) {
// fillAllPartitions guarantees that each partition was tested to be full
// at least once since the function started and before it ended.
// In other words no partition will be starved.
func (g *Generator) fillAllPartitions() {
func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
pFilled := make([]bool, len(g.partitions))
allFilled := func() bool {
for _, filled := range pFilled {
Expand All @@ -160,6 +158,9 @@ func (g *Generator) fillAllPartitions() {
case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}:
g.cntEmitted++
default:
if stopFlag.IsHardOrSoft() {
return
}
if !pFilled[idx] {
pFilled[idx] = true
if allFilled() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/generators/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Partition struct {
func (s *Partition) get() *typedef.ValueWithToken {
for {
v := s.pick()
if s.inFlight.AddIfNotPresent(v.Token) {
if v == nil || s.inFlight.AddIfNotPresent(v.Token) {
return v
}
}
Expand Down
13 changes: 5 additions & 8 deletions pkg/jobs/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
package jobs

import (
"context"
"time"

"github.com/scylladb/gemini/pkg/stop"

"go.uber.org/zap"
"golang.org/x/exp/rand"
)

func NewPump(ctx context.Context, logger *zap.Logger) chan time.Duration {
func NewPump(stopFlag *stop.Flag, logger *zap.Logger) chan time.Duration {
pump := make(chan time.Duration, 10000)
logger = logger.Named("Pump")
go func() {
Expand All @@ -31,12 +32,8 @@ func NewPump(ctx context.Context, logger *zap.Logger) chan time.Duration {
close(pump)
logger.Debug("pump channel closed")
}()
for {
select {
case <-ctx.Done():
break
case pump <- newHeartBeat():
}
for !stopFlag.IsHardOrSoft() {
pump <- newHeartBeat()
}
}()

Expand Down
1 change: 1 addition & 0 deletions pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (gs *GlobalStatus) PrintResultAsJSON(w io.Writer, schema *typedef.Schema, v
result := map[string]interface{}{
"result": gs,
"gemini_version": version,
"schemaHash": schema.GetHash(),
"schema": schema,
}
encoder := json.NewEncoder(w)
Expand Down
15 changes: 15 additions & 0 deletions pkg/typedef/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

package typedef

import (
"encoding/json"
"strconv"

"github.com/scylladb/gemini/pkg/murmur"
)

type MaterializedView struct {
NonPrimaryKey *ColumnDef
Name string `json:"name"`
Expand All @@ -28,6 +35,14 @@ type Schema struct {
Config SchemaConfig `json:"-"`
}

func (s *Schema) GetHash() string {
out, err := json.Marshal(s)
if err != nil {
panic(err)
}
return strconv.FormatUint(uint64(murmur.Murmur3H1(out)), 16)
}

func (m *MaterializedView) HaveNonPrimaryKey() bool {
return m.NonPrimaryKey != nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/typedef/simple_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func (st SimpleType) genValue(r *rand.Rand, p *PartitionRangeConfig) interface{}
case TYPE_INT:
return r.Int31()
case TYPE_SMALLINT:
return int16(r.Int31())
return int16(r.Uint64n(65535))
case TYPE_TIMEUUID, TYPE_UUID:
return utils.UUIDFromTime(r)
case TYPE_TINYINT:
return int8(r.Int31())
return int8(r.Uint64n(255))
case TYPE_VARINT:
return big.NewInt(r.Int63())
default:
Expand Down
11 changes: 8 additions & 3 deletions pkg/typedef/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ var (
TYPE_DURATION: {},
}
TypesForIndex = SimpleTypes{TYPE_DECIMAL, TYPE_DOUBLE, TYPE_FLOAT, TYPE_INT, TYPE_SMALLINT, TYPE_TINYINT, TYPE_VARINT}
PartitionKeyTypes = SimpleTypes{TYPE_INT, TYPE_SMALLINT, TYPE_TINYINT, TYPE_VARINT}
PkTypes = SimpleTypes{
PartitionKeyTypes = SimpleTypes{
TYPE_ASCII, TYPE_BIGINT, TYPE_DATE, TYPE_DECIMAL, TYPE_DOUBLE,
TYPE_FLOAT, TYPE_INET, TYPE_INT, TYPE_SMALLINT, TYPE_TEXT, TYPE_TIME, TYPE_TIMESTAMP, TYPE_TIMEUUID,
TYPE_TINYINT, TYPE_UUID, TYPE_VARCHAR, TYPE_VARINT, TYPE_BOOLEAN,
}

PkTypes = SimpleTypes{
TYPE_ASCII, TYPE_BIGINT, TYPE_BLOB, TYPE_DATE, TYPE_DECIMAL, TYPE_DOUBLE,
TYPE_FLOAT, TYPE_INET, TYPE_INT, TYPE_SMALLINT, TYPE_TEXT, TYPE_TIME, TYPE_TIMESTAMP, TYPE_TIMEUUID,
TYPE_TINYINT, TYPE_UUID, TYPE_VARCHAR, TYPE_VARINT,
Expand Down Expand Up @@ -155,7 +160,7 @@ func (mt *MapType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface

func (mt *MapType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} {
count := r.Intn(9) + 1
vals := reflect.MakeMap(reflect.MapOf(reflect.TypeOf(mt.KeyType.GenJSONValue(r, p)), reflect.TypeOf(mt.ValueType.GenJSONValue(r, p))))
vals := reflect.MakeMap(reflect.MapOf(reflect.TypeOf(mt.KeyType.GenValue(r, p)[0]), reflect.TypeOf(mt.ValueType.GenValue(r, p)[0])))
for i := 0; i < count; i++ {
vals.SetMapIndex(reflect.ValueOf(mt.KeyType.GenValue(r, p)[0]), reflect.ValueOf(mt.ValueType.GenValue(r, p)[0]))
}
Expand Down

0 comments on commit cb5d494

Please sign in to comment.