From cb5d4945db54c1468a63009baf1fa0c23eed6370 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 13 Jul 2023 01:52:29 -0400 Subject: [PATCH] feat(gemini): make schema seed and workload seed random --- cmd/gemini/generators.go | 16 +++++++++++----- cmd/gemini/root.go | 27 +++++++++++++++++++-------- pkg/generators/generator.go | 9 +++++---- pkg/generators/partition.go | 2 +- pkg/jobs/pump.go | 13 +++++-------- pkg/status/status.go | 1 + pkg/typedef/schema.go | 15 +++++++++++++++ pkg/typedef/simple_type.go | 4 ++-- pkg/typedef/types.go | 11 ++++++++--- 9 files changed, 67 insertions(+), 31 deletions(-) diff --git a/cmd/gemini/generators.go b/cmd/gemini/generators.go index d77adcbf..d511aecd 100644 --- a/cmd/gemini/generators.go +++ b/cmd/gemini/generators.go @@ -23,23 +23,29 @@ import ( func createGenerators( schema *typedef.Schema, schemaConfig typedef.SchemaConfig, - distributionFunc generators.DistributionFunc, _, distributionSize uint64, logger *zap.Logger, -) generators.Generators { +) (generators.Generators, error) { partitionRangeConfig := schemaConfig.GetPartitionRangeConfig() var gs []*generators.Generator - for _, table := range schema.Tables { + for id := range schema.Tables { + table := schema.Tables[id] + + distFunc, err := createDistributionFunc(partitionKeyDistribution, partitionCount, seed, stdDistMean, oneStdDev) + if err != nil { + return nil, err + } + gCfg := &generators.Config{ PartitionsRangeConfig: partitionRangeConfig, PartitionsCount: distributionSize, - PartitionsDistributionFunc: distributionFunc, + PartitionsDistributionFunc: distFunc, Seed: seed, PkUsedBufferSize: pkBufferReuseSize, } g := generators.NewGenerator(table, gCfg, logger.Named("generators")) gs = append(gs, g) } - return gs + return gs, nil } diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index fcf2659f..14c1c950 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -15,6 +15,7 @@ package main import ( + "encoding/binary" "encoding/json" "fmt" "log" @@ -39,6 +40,8 @@ import ( "github.com/scylladb/gemini/pkg/status" "github.com/scylladb/gemini/pkg/stop" + crand "crypto/rand" + "github.com/gocql/gocql" "github.com/hailocab/go-hostpool" "github.com/pkg/errors" @@ -178,10 +181,6 @@ func run(_ *cobra.Command, _ []string) error { if err = printSetup(); err != nil { return errors.Wrapf(err, "unable to print setup") } - distFunc, err := createDistributionFunc(partitionKeyDistribution, math.MaxUint64, seed, stdDistMean, oneStdDev) - if err != nil { - return err - } outFile, err := createFile(outFileArg, os.Stdout) if err != nil { @@ -259,9 +258,12 @@ func run(_ *cobra.Command, _ []string) error { stopFlag := stop.NewFlag("main") warmupStopFlag := stop.NewFlag("warmup") stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag) - pump := jobs.NewPump(ctx, logger) + pump := jobs.NewPump(stopFlag, logger) - gens := createGenerators(schema, schemaConfig, distFunc, concurrency, partitionCount, logger) + gens, err := createGenerators(schema, schemaConfig, concurrency, partitionCount, logger) + if err != nil { + return err + } gens.StartAll(stopFlag) if !nonInteractive { @@ -457,8 +459,8 @@ func init() { rootCmd.Flags().StringVarP(&schemaFile, "schema", "", "", "Schema JSON config file") rootCmd.Flags().StringVarP(&mode, "mode", "m", jobs.MixedMode, "Query operation mode. Mode options: write, read, mixed (default)") rootCmd.Flags().Uint64VarP(&concurrency, "concurrency", "c", 10, "Number of threads per table to run concurrently") - rootCmd.Flags().Uint64VarP(&seed, "seed", "s", 1, "Statement seed value") - rootCmd.Flags().Uint64VarP(&schemaSeed, "schema-seed", "", 1, "Schema seed value") + rootCmd.Flags().Uint64VarP(&seed, "seed", "s", RealRandom(), "Statement seed value") + rootCmd.Flags().Uint64VarP(&schemaSeed, "schema-seed", "", RealRandom(), "Schema seed value") rootCmd.Flags().BoolVarP(&dropSchema, "drop-schema", "d", false, "Drop schema before starting tests run") rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output during test run") rootCmd.Flags().BoolVarP(&failFast, "fail-fast", "f", false, "Stop on the first failure") @@ -540,3 +542,12 @@ func printSetup() error { tw.Flush() return nil } + +func RealRandom() uint64 { + var b [8]byte + _, err := crand.Read(b[:]) + if err != nil { + return uint64(time.Now().Nanosecond() * time.Now().Second()) + } + return binary.LittleEndian.Uint64(b[:]) +} diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index 4e9362e2..d9c8158f 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -54,7 +54,6 @@ type Generator struct { partitions Partitions partitionsConfig typedef.PartitionRangeConfig partitionCount uint64 - seed uint64 cntCreated uint64 cntEmitted uint64 @@ -87,7 +86,6 @@ func NewGenerator(table *typedef.Table, config *Config, logger *zap.Logger) *Gen partitionCount: config.PartitionsCount, table: table, partitionsConfig: config.PartitionsRangeConfig, - seed: config.Seed, idxFunc: config.PartitionsDistributionFunc, logger: logger, wakeUpSignal: wakeUpSignal, @@ -121,7 +119,7 @@ func (g *Generator) Start(stopFlag *stop.Flag) { g.logger.Info("starting partition key generation loop") defer g.partitions.CloseAll() for { - g.fillAllPartitions() + g.fillAllPartitions(stopFlag) select { case <-stopFlag.SignalChannel(): g.logger.Debug("stopping partition key generation loop", @@ -137,7 +135,7 @@ func (g *Generator) Start(stopFlag *stop.Flag) { // fillAllPartitions guarantees that each partition was tested to be full // at least once since the function started and before it ended. // In other words no partition will be starved. -func (g *Generator) fillAllPartitions() { +func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { pFilled := make([]bool, len(g.partitions)) allFilled := func() bool { for _, filled := range pFilled { @@ -160,6 +158,9 @@ func (g *Generator) fillAllPartitions() { case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}: g.cntEmitted++ default: + if stopFlag.IsHardOrSoft() { + return + } if !pFilled[idx] { pFilled[idx] = true if allFilled() { diff --git a/pkg/generators/partition.go b/pkg/generators/partition.go index 8b3d1cb8..11603976 100644 --- a/pkg/generators/partition.go +++ b/pkg/generators/partition.go @@ -35,7 +35,7 @@ type Partition struct { func (s *Partition) get() *typedef.ValueWithToken { for { v := s.pick() - if s.inFlight.AddIfNotPresent(v.Token) { + if v == nil || s.inFlight.AddIfNotPresent(v.Token) { return v } } diff --git a/pkg/jobs/pump.go b/pkg/jobs/pump.go index 9d83dc9d..c929f8ce 100644 --- a/pkg/jobs/pump.go +++ b/pkg/jobs/pump.go @@ -15,14 +15,15 @@ package jobs import ( - "context" "time" + "github.com/scylladb/gemini/pkg/stop" + "go.uber.org/zap" "golang.org/x/exp/rand" ) -func NewPump(ctx context.Context, logger *zap.Logger) chan time.Duration { +func NewPump(stopFlag *stop.Flag, logger *zap.Logger) chan time.Duration { pump := make(chan time.Duration, 10000) logger = logger.Named("Pump") go func() { @@ -31,12 +32,8 @@ func NewPump(ctx context.Context, logger *zap.Logger) chan time.Duration { close(pump) logger.Debug("pump channel closed") }() - for { - select { - case <-ctx.Done(): - break - case pump <- newHeartBeat(): - } + for !stopFlag.IsHardOrSoft() { + pump <- newHeartBeat() } }() diff --git a/pkg/status/status.go b/pkg/status/status.go index a81988b7..fc178aa5 100644 --- a/pkg/status/status.go +++ b/pkg/status/status.go @@ -60,6 +60,7 @@ func (gs *GlobalStatus) PrintResultAsJSON(w io.Writer, schema *typedef.Schema, v result := map[string]interface{}{ "result": gs, "gemini_version": version, + "schemaHash": schema.GetHash(), "schema": schema, } encoder := json.NewEncoder(w) diff --git a/pkg/typedef/schema.go b/pkg/typedef/schema.go index b9147e17..b29ab480 100644 --- a/pkg/typedef/schema.go +++ b/pkg/typedef/schema.go @@ -14,6 +14,13 @@ package typedef +import ( + "encoding/json" + "strconv" + + "github.com/scylladb/gemini/pkg/murmur" +) + type MaterializedView struct { NonPrimaryKey *ColumnDef Name string `json:"name"` @@ -28,6 +35,14 @@ type Schema struct { Config SchemaConfig `json:"-"` } +func (s *Schema) GetHash() string { + out, err := json.Marshal(s) + if err != nil { + panic(err) + } + return strconv.FormatUint(uint64(murmur.Murmur3H1(out)), 16) +} + func (m *MaterializedView) HaveNonPrimaryKey() bool { return m.NonPrimaryKey != nil } diff --git a/pkg/typedef/simple_type.go b/pkg/typedef/simple_type.go index b507cad6..1a650312 100644 --- a/pkg/typedef/simple_type.go +++ b/pkg/typedef/simple_type.go @@ -202,11 +202,11 @@ func (st SimpleType) genValue(r *rand.Rand, p *PartitionRangeConfig) interface{} case TYPE_INT: return r.Int31() case TYPE_SMALLINT: - return int16(r.Int31()) + return int16(r.Uint64n(65535)) case TYPE_TIMEUUID, TYPE_UUID: return utils.UUIDFromTime(r) case TYPE_TINYINT: - return int8(r.Int31()) + return int8(r.Uint64n(255)) case TYPE_VARINT: return big.NewInt(r.Int63()) default: diff --git a/pkg/typedef/types.go b/pkg/typedef/types.go index fb388927..478190ea 100644 --- a/pkg/typedef/types.go +++ b/pkg/typedef/types.go @@ -65,8 +65,13 @@ var ( TYPE_DURATION: {}, } TypesForIndex = SimpleTypes{TYPE_DECIMAL, TYPE_DOUBLE, TYPE_FLOAT, TYPE_INT, TYPE_SMALLINT, TYPE_TINYINT, TYPE_VARINT} - PartitionKeyTypes = SimpleTypes{TYPE_INT, TYPE_SMALLINT, TYPE_TINYINT, TYPE_VARINT} - PkTypes = SimpleTypes{ + PartitionKeyTypes = SimpleTypes{ + TYPE_ASCII, TYPE_BIGINT, TYPE_DATE, TYPE_DECIMAL, TYPE_DOUBLE, + TYPE_FLOAT, TYPE_INET, TYPE_INT, TYPE_SMALLINT, TYPE_TEXT, TYPE_TIME, TYPE_TIMESTAMP, TYPE_TIMEUUID, + TYPE_TINYINT, TYPE_UUID, TYPE_VARCHAR, TYPE_VARINT, TYPE_BOOLEAN, + } + + PkTypes = SimpleTypes{ TYPE_ASCII, TYPE_BIGINT, TYPE_BLOB, TYPE_DATE, TYPE_DECIMAL, TYPE_DOUBLE, TYPE_FLOAT, TYPE_INET, TYPE_INT, TYPE_SMALLINT, TYPE_TEXT, TYPE_TIME, TYPE_TIMESTAMP, TYPE_TIMEUUID, TYPE_TINYINT, TYPE_UUID, TYPE_VARCHAR, TYPE_VARINT, @@ -155,7 +160,7 @@ func (mt *MapType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface func (mt *MapType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} { count := r.Intn(9) + 1 - vals := reflect.MakeMap(reflect.MapOf(reflect.TypeOf(mt.KeyType.GenJSONValue(r, p)), reflect.TypeOf(mt.ValueType.GenJSONValue(r, p)))) + vals := reflect.MakeMap(reflect.MapOf(reflect.TypeOf(mt.KeyType.GenValue(r, p)[0]), reflect.TypeOf(mt.ValueType.GenValue(r, p)[0]))) for i := 0; i < count; i++ { vals.SetMapIndex(reflect.ValueOf(mt.KeyType.GenValue(r, p)[0]), reflect.ValueOf(mt.ValueType.GenValue(r, p)[0])) }