diff --git a/CHANGELOG.md b/CHANGELOG.md index 48ff3de3..7a5bdd59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ # Unreleased +- Lazy partition key generation reintroduced to avoid out of memory issues. + This brings in a new CLI arg `--token-range-slices` that defines how many slices the + partition keyspace should be divided into when applying the different distribution functions. + The default value of this is set to an ad-hoc value of 10000 which should supply ample possibilities + for varying selection of values according to the chosen probability distribution. - Fix overlapping operations on the same partition key ([#198](https://github.com/scylladb/gemini/issues/198)). - Partition keys can now be drawn from various distributions such as ___"zipf"___, ___"uniform"___ and ___"normal"___. The CLI argument `--partition-key-distribution` is used diff --git a/cmd/gemini/generators.go b/cmd/gemini/generators.go index fe842cfa..ea343a06 100644 --- a/cmd/gemini/generators.go +++ b/cmd/gemini/generators.go @@ -5,7 +5,7 @@ import ( "go.uber.org/zap" ) -func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, distributionFunc gemini.DistributionFunc, actors, distributionSize uint64, logger *zap.Logger) []*gemini.Generators { +func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, distributionFunc gemini.DistributionFunc, actors, distributionSize uint64, logger *zap.Logger) []*gemini.Generator { partitionRangeConfig := gemini.PartitionRangeConfig{ MaxBlobLength: schemaConfig.MaxBlobLength, MinBlobLength: schemaConfig.MinBlobLength, @@ -13,15 +13,14 @@ func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, d MinStringLength: schemaConfig.MinStringLength, } - var gs []*gemini.Generators + var gs []*gemini.Generator for _, table := range schema.Tables { - gCfg := &gemini.GeneratorsConfig{ - Partitions: partitionRangeConfig, - Size: actors, - DistributionSize: distributionSize, - DistributionFunc: distributionFunc, - Seed: seed, - PkUsedBufferSize: pkBufferReuseSize, + gCfg := &gemini.GeneratorConfig{ + PartitionsRangeConfig: partitionRangeConfig, + PartitionsCount: distributionSize, + PartitionsDistributionFunc: distributionFunc, + Seed: seed, + PkUsedBufferSize: pkBufferReuseSize, } g := gemini.NewGenerator(table, gCfg, logger.Named("generator")) gs = append(gs, g) diff --git a/cmd/gemini/jobs.go b/cmd/gemini/jobs.go index e03b7019..e93f7e4c 100644 --- a/cmd/gemini/jobs.go +++ b/cmd/gemini/jobs.go @@ -4,22 +4,25 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" "github.com/scylladb/gemini" "github.com/scylladb/gemini/store" "go.uber.org/zap" "golang.org/x/exp/rand" + "gopkg.in/tomb.v2" ) // MutationJob continuously applies mutations against the database // for as long as the pump is active. -func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { - defer wg.Done() +func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("mutation_job") testStatus := Status{} + defer func() { + // Send any remaining updates back + c <- testStatus + }() var i int for hb := range pump { hb.await() @@ -27,7 +30,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, if ind%100000 == 0 { ddl(ctx, schema, schemaConfig, table, s, r, p, &testStatus, logger) } else { - mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger) + mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, true, logger) } if i%1000 == 0 { c <- testStatus @@ -35,7 +38,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, } if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) { c <- testStatus - break + return } i++ } @@ -43,23 +46,24 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, // ValidationJob continuously applies validations against the database // for as long as the pump is active. -func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { - defer wg.Done() +func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("validation_job") testStatus := Status{} + defer func() { + c <- testStatus + }() var i int for hb := range pump { hb.await() - validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger) + validation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, logger) if i%1000 == 0 { c <- testStatus testStatus = Status{} } if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) { - c <- testStatus - break + return } i++ } @@ -67,8 +71,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou // WarmupJob continuously applies mutations against the database // for as long as the pump is active or the supplied duration expires. -func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { - defer wg.Done() +func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg testStatus := Status{} var i int @@ -87,7 +90,7 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s c <- testStatus return default: - mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, false, logger) + mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, false, logger) if i%1000 == 0 { c <- testStatus testStatus = Status{} @@ -96,16 +99,8 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s } } -func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generators, result chan Status, logger *zap.Logger) { - defer done.Done() - var finished sync.WaitGroup - finished.Add(1) - - // Wait group for the worker goroutines. - var workers sync.WaitGroup +func job(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) { workerCtx, _ := context.WithCancel(context.Background()) - workers.Add(len(schema.Tables) * int(actors)) - partitionRangeConfig := gemini.PartitionRangeConfig{ MaxBlobLength: schemaConfig.MaxBlobLength, MinBlobLength: schemaConfig.MinBlobLength, @@ -114,13 +109,15 @@ func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, } for j, table := range schema.Tables { + g := generators[j] for i := 0; i < int(actors); i++ { r := rand.New(rand.NewSource(seed)) - go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, generators[j].Get(i), result, mode, warmup, logger) + t.Go(func() error { + f(workerCtx, pump.ch, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger) + return nil + }) } } - - workers.Wait() } func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, testStatus *Status, logger *zap.Logger) { @@ -168,8 +165,8 @@ func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, ta } } -func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, deletes bool, logger *zap.Logger) { - mutateStmt, err := schema.GenMutateStmt(table, source, r, p, deletes) +func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) { + mutateStmt, err := schema.GenMutateStmt(table, g, r, p, deletes) if err != nil { logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) testStatus.WriteErrors++ @@ -186,7 +183,7 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig defer func() { v := make(gemini.Value, len(table.PartitionKeys)) copy(v, mutateValues) - source.GiveOld(gemini.ValueWithToken{Token: token, Value: v}) + g.GiveOld(gemini.ValueWithToken{Token: token, Value: v}) }() if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) @@ -204,8 +201,8 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig } } -func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, logger *zap.Logger) { - checkStmt := schema.GenCheckStmt(table, source, r, p) +func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, logger *zap.Logger) { + checkStmt := schema.GenCheckStmt(table, g, r, p) if checkStmt == nil { if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { w.Write(zap.String("job", "validation")) @@ -216,7 +213,7 @@ func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConf token, checkValues := checkStmt.Values() defer func() { // Signal done with this pk... - source.GiveOld(gemini.ValueWithToken{Token: token}) + g.GiveOld(gemini.ValueWithToken{Token: token}) }() if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL())) diff --git a/cmd/gemini/pump.go b/cmd/gemini/pump.go index c591cf1a..0421e8a9 100644 --- a/cmd/gemini/pump.go +++ b/cmd/gemini/pump.go @@ -7,12 +7,14 @@ import ( "time" "github.com/briandowns/spinner" + "github.com/scylladb/gemini" "go.uber.org/zap" + "gopkg.in/tomb.v2" ) type Pump struct { ch chan heartBeat - done chan struct{} + t *tomb.Tomb graceful chan os.Signal logger *zap.Logger } @@ -28,29 +30,26 @@ func (hb heartBeat) await() { } func (p *Pump) Start(d time.Duration, postFunc func()) { - go func() { + p.t.Go(func() error { defer p.cleanup(postFunc) timer := time.NewTimer(d) for { select { - case <-p.done: + case <-p.t.Dying(): p.logger.Info("Test run stopped. Exiting.") - return + return nil case <-p.graceful: p.logger.Info("Test run aborted. Exiting.") - return + p.t.Kill(nil) + return nil case <-timer.C: p.logger.Info("Test run completed. Exiting.") - return + p.t.Kill(nil) + return nil case p.ch <- newHeartBeat(): } } - }() -} - -func (p *Pump) Stop() { - p.logger.Debug("pump asked to stop") - p.done <- struct{}{} + }) } func (p *Pump) cleanup(postFunc func()) { @@ -61,23 +60,26 @@ func (p *Pump) cleanup(postFunc func()) { postFunc() } -func createPump(sz int, logger *zap.Logger) *Pump { +func createPump(t *tomb.Tomb, sz int, logger *zap.Logger) *Pump { logger = logger.Named("pump") var graceful = make(chan os.Signal, 1) signal.Notify(graceful, syscall.SIGTERM, syscall.SIGINT) pump := &Pump{ ch: make(chan heartBeat, sz), - done: make(chan struct{}, 1), + t: t, graceful: graceful, logger: logger, } return pump } -func createPumpCallback(result chan Status, sp *spinner.Spinner) func() { +func createPumpCallback(generators []*gemini.Generator, result chan Status, sp *spinner.Spinner) func() { return func() { if sp != nil { sp.Stop() } + for _, g := range generators { + g.Stop() + } } } diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index b0352bcc..9da0624a 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -6,11 +6,11 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "net/http" _ "net/http/pprof" "os" "strings" - "sync" "text/tabwriter" "time" @@ -27,6 +27,7 @@ import ( "golang.org/x/exp/rand" "golang.org/x/net/context" "gonum.org/v1/gonum/stat/distuv" + "gopkg.in/tomb.v2" ) var ( @@ -59,7 +60,7 @@ var ( maxRetriesMutate int maxRetriesMutateSleep time.Duration pkBufferReuseSize uint64 - distributionSize uint64 + partitionCount uint64 partitionKeyDistribution string normalDistMean float64 normalDistSigma float64 @@ -87,7 +88,7 @@ func interactive() bool { return !nonInteractive } -type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Source, chan Status, string, time.Duration, *zap.Logger) +type testJob func(context.Context, <-chan heartBeat, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger) func readSchema(confFile string) (*gemini.Schema, error) { byteValue, err := ioutil.ReadFile(confFile) @@ -136,7 +137,7 @@ func run(cmd *cobra.Command, args []string) error { if err := printSetup(); err != nil { return errors.Wrapf(err, "unable to print setup") } - distFunc, err := createDistributionFunc(partitionKeyDistribution, distributionSize, seed, stdDistMean, oneStdDev) + distFunc, err := createDistributionFunc(partitionKeyDistribution, math.MaxUint64, seed, stdDistMean, oneStdDev) if err != nil { return err } @@ -204,25 +205,25 @@ func run(cmd *cobra.Command, args []string) error { } } - done := &sync.WaitGroup{} - done.Add(1) + t := &tomb.Tomb{} result := make(chan Status, 10000) endResult := make(chan Status, 1) - pump := createPump(10000, logger) - generators := createGenerators(schema, schemaConfig, distFunc, concurrency, distributionSize, logger) - go func() { - defer done.Done() + pump := createPump(t, 10000, logger) + generators := createGenerators(schema, schemaConfig, distFunc, concurrency, partitionCount, logger) + t.Go(func() error { var sp *spinner.Spinner = nil if interactive() { sp = createSpinner() } - pump.Start(duration+warmup, createPumpCallback(result, sp)) - endResult <- sampleStatus(pump, result, sp, logger) - }() + pump.Start(duration+warmup, createPumpCallback(generators, result, sp)) + endResult <- sampleStatus(result, sp, logger) + return nil + }) launch(schema, schemaConfig, store, pump, generators, result, logger) close(result) - done.Wait() + logger.Info("result channel closed") + _ = t.Wait() res := <-endResult res.PrintResult(outFile, schema) if res.HasErrors() { @@ -243,16 +244,16 @@ func createFile(fname string, def *os.File) (*os.File, error) { } const ( - stdDistMean = 0.5 - oneStdDev = 0.341 + stdDistMean = math.MaxUint64 / 2 + oneStdDev = 0.341 * math.MaxUint64 ) func createDistributionFunc(distribution string, size, seed uint64, mu, sigma float64) (gemini.DistributionFunc, error) { switch strings.ToLower(distribution) { case "zipf": - dist := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.1, 1.1, size-1) - return func() uint64 { - return dist.Uint64() + dist := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.1, 1.1, size) + return func() gemini.TokenIndex { + return gemini.TokenIndex(dist.Uint64()) }, nil case "normal": dist := distuv.Normal{ @@ -260,43 +261,70 @@ func createDistributionFunc(distribution string, size, seed uint64, mu, sigma fl Mu: mu, Sigma: sigma, } - return func() uint64 { - return uint64(dist.Rand()) * size + return func() gemini.TokenIndex { + return gemini.TokenIndex(dist.Rand()) }, nil case "uniform": rnd := rand.New(rand.NewSource(seed)) - return func() uint64 { - return rnd.Uint64n(size) + return func() gemini.TokenIndex { + return gemini.TokenIndex(rnd.Uint64n(size)) }, nil default: return nil, errors.Errorf("unsupported distribution: %s", distribution) } } -func launch(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, store store.Store, pump *Pump, generators []*gemini.Generators, result chan Status, logger *zap.Logger) { - if warmup > 0 { - done := &sync.WaitGroup{} - done.Add(1) - job(done, WarmupJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) - done.Wait() - logger.Info("Warmup done") +func launch(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, store store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) { + if doWarmup(schema, schemaConfig, store, pump, generators, result, logger) { + logger.Info("doWarmup terminates launch") + return } - done := &sync.WaitGroup{} - done.Add(1) + t := &tomb.Tomb{} switch mode { case writeMode: - go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) + entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, MutationJob) case readMode: - go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) + entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, ValidationJob) default: - done.Add(1) - go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) - go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) + entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, MutationJob, ValidationJob) } - done.Wait() + _ = t.Wait() logger.Info("All jobs complete") } +func doWarmup(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) bool { + if warmup > 0 { + t := &tomb.Tomb{} + entombJobs(t, concurrency, schema, schemaConfig, s, pump, generators, result, logger, WarmupJob) + _ = t.Wait() + logger.Info("Warmup done") + select { + case <-pump.t.Dying(): + logger.Info("Warmup dying") + return true + default: + logger.Info("Warmup not dying") + } + } + return false +} + +func wrapJobInTombFunc(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) func() error { + return func() error { + job(t, f, concurrency, schema, schemaConfig, s, pump, generators, result, logger) + return nil + } +} + +func entombJobs(t *tomb.Tomb, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger, fs ...testJob) { + t.Go(func() error { + for _, f := range fs { + t.Go(wrapJobInTombFunc(t, f, actors, schema, schemaConfig, s, pump, generators, result, logger)) + } + return nil + }) +} + func createLogger(level string) *zap.Logger { lvl := zap.NewAtomicLevel() if err := lvl.UnmarshalText([]byte(level)); err != nil { @@ -417,9 +445,9 @@ func init() { rootCmd.Flags().StringVarP(&level, "level", "", "info", "Specify the logging level, debug|info|warn|error|dpanic|panic|fatal") rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 2, "Maximum number of attempts to apply a mutation") rootCmd.Flags().DurationVarP(&maxRetriesMutateSleep, "max-mutation-retries-backoff", "", 10*time.Millisecond, "Duration between attempts to apply a mutation for example 10ms or 1s") - rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 2000, "Number of reused buffered partition keys") - rootCmd.Flags().Uint64VarP(&distributionSize, "distribution-size", "", 1000000, "Number of partition keys each worker creates") - rootCmd.Flags().StringVarP(&partitionKeyDistribution, "partition-key-distribution", "", "uniform", "Specify the distribution from which to draw partition keys, supported values are currently uniform|normal|exponential") + rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 100, "Number of reused buffered partition keys") + rootCmd.Flags().Uint64VarP(&partitionCount, "token-range-slices", "", 10000, "Number of slices to divide the token space into") + rootCmd.Flags().StringVarP(&partitionKeyDistribution, "partition-key-distribution", "", "uniform", "Specify the distribution from which to draw partition keys, supported values are currently uniform|normal|zipf") rootCmd.Flags().Float64VarP(&normalDistMean, "normal-dist-mean", "", stdDistMean, "Mean of the normal distribution") rootCmd.Flags().Float64VarP(&normalDistSigma, "normal-dist-sigma", "", oneStdDev, "Sigma of the normal distribution, defaults to one standard deviation ~0.341") rootCmd.Flags().StringVarP(&tracingOutFile, "tracing-outfile", "", "", "Specify the file to which tracing information gets written. Two magic names are available, 'stdout' and 'stderr'. By default tracing is disabled.") diff --git a/cmd/gemini/status.go b/cmd/gemini/status.go index 96055ed0..3cd633b5 100644 --- a/cmd/gemini/status.go +++ b/cmd/gemini/status.go @@ -70,7 +70,7 @@ func (r Status) HasErrors() bool { return r.WriteErrors > 0 || r.ReadErrors > 0 } -func sampleStatus(p *Pump, c chan Status, sp *spinner.Spinner, logger *zap.Logger) Status { +func sampleStatus(c chan Status, sp *spinner.Spinner, logger *zap.Logger) Status { failfastDone := sync.Once{} logger = logger.Named("sample_results") var testRes Status @@ -83,8 +83,8 @@ func sampleStatus(p *Pump, c chan Status, sp *spinner.Spinner, logger *zap.Logge if failFast { failfastDone.Do(func() { logger.Warn("Errors detected. Exiting.") - p.Stop() }) + return testRes } } } diff --git a/docs/architecture.md b/docs/architecture.md index ed70d626..63c028cc 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -55,7 +55,7 @@ in use but the idea is to introduce some jitter into the execution flow. The application generates partition ids through a `Generator` that creates a steady flow of partition key components for the desired [concurrency](architecture.md#Concurrency). -Each goroutine is connected to a `source` that the generator controls. This source continuously emits +Each goroutine is connected to a `partition` that the generator controls. This partition continuously emits new partition ids in the form of a `[]interface{}`. These keys are created in the same way as the the driver does to ensure that each goroutine only processes partition keys from it's designated bucket. These partition keys These values are copied into another list that keeps the old partition ids for diff --git a/generator.go b/generator.go index eec91884..42d5fb12 100644 --- a/generator.go +++ b/generator.go @@ -1,110 +1,89 @@ package gemini import ( - "sync" - "time" - + "github.com/scylladb/gemini/inflight" "github.com/scylladb/gemini/murmur" - "github.com/scylladb/go-set/u64set" "go.uber.org/zap" "golang.org/x/exp/rand" + "gopkg.in/tomb.v2" ) -type DistributionFunc func() uint64 - -type Source struct { - values []ValueWithToken - idxFunc func() uint64 - oldValues chan ValueWithToken - inFlight syncU64set -} - -//Get returns a new value and ensures that it's corresponding token -//is not already in-flight. -func (s *Source) Get() (ValueWithToken, bool) { - for { - v := s.pick() - if s.inFlight.addIfNotPresent(v.Token) { - return v, true - } - } -} - -//GetOld returns a previously used value and token or a new if -//the old queue is empty. -func (s *Source) GetOld() (ValueWithToken, bool) { - select { - case v, ok := <-s.oldValues: - return v, ok - default: - // There are no old values so we generate a new - return s.Get() - } -} - -// GiveOld returns the supplied value for later reuse unless the value -//is empty in which case it removes the corresponding token from the -// in-flight tracking. -func (s *Source) GiveOld(v ValueWithToken) { - if len(v.Value) == 0 { - s.inFlight.delete(v.Token) - return - } - select { - case s.oldValues <- v: - default: - // Old source is full, just drop the value - } -} - -func (s *Source) pick() ValueWithToken { - return s.values[s.idxFunc()] -} - -type Generators struct { - generators []*Source - size uint64 - distributionSize uint64 +// TokenIndex represents the position of a token in the token ring. +// A token index is translated to a token by a generator. If the generator +// preserves the exact position, then the token index becomes the token; +// otherwise token index represents an approximation of the token. +// +// We use a token index approach, because our generators actually generate +// partition keys, and map them to tokens. The generators, therefore, do +// not populate the full token ring space. With token index, we can +// approximate different token distributions from a sparse set of tokens. +type TokenIndex uint64 + +type DistributionFunc func() TokenIndex + +type Generator struct { + partitions []*Partition + partitionCount uint64 table *Table partitionsConfig PartitionRangeConfig seed uint64 + idxFunc DistributionFunc + t *tomb.Tomb logger *zap.Logger } -type GeneratorsConfig struct { - Partitions PartitionRangeConfig - DistributionSize uint64 - DistributionFunc DistributionFunc - Size uint64 - Seed uint64 - PkUsedBufferSize uint64 +type GeneratorConfig struct { + PartitionsRangeConfig PartitionRangeConfig + PartitionsCount uint64 + PartitionsDistributionFunc DistributionFunc + Seed uint64 + PkUsedBufferSize uint64 } -func NewGenerator(table *Table, config *GeneratorsConfig, logger *zap.Logger) *Generators { - generators := make([]*Source, config.Size) - for i := uint64(0); i < config.Size; i++ { - generators[i] = &Source{ - values: make([]ValueWithToken, 0, config.DistributionSize), - idxFunc: config.DistributionFunc, +func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Generator { + t := &tomb.Tomb{} + partitions := make([]*Partition, config.PartitionsCount) + for i := 0; i < len(partitions); i++ { + partitions[i] = &Partition{ + values: make(chan ValueWithToken, config.PkUsedBufferSize), oldValues: make(chan ValueWithToken, config.PkUsedBufferSize), - inFlight: syncU64set{pks: u64set.New()}, + inFlight: inflight.New(), + t: t, } } - gs := &Generators{ - generators: generators, - size: config.Size, - distributionSize: config.DistributionSize, + gs := &Generator{ + partitions: partitions, + partitionCount: config.PartitionsCount, table: table, - partitionsConfig: config.Partitions, + partitionsConfig: config.PartitionsRangeConfig, seed: config.Seed, + idxFunc: config.PartitionsDistributionFunc, + t: t, logger: logger, } - gs.create() + gs.start() return gs } -func (gs Generators) Get(idx int) *Source { - return gs.generators[idx] +func (g Generator) Get() (ValueWithToken, bool) { + select { + case <-g.t.Dying(): + return emptyValueWithToken, false + default: + } + partition := g.partitions[uint64(g.idxFunc())%g.partitionCount] + return partition.get() +} + +// GetOld returns a previously used value and token or a new if +// the old queue is empty. +func (g Generator) GetOld() (ValueWithToken, bool) { + select { + case <-g.t.Dying(): + return emptyValueWithToken, false + default: + } + return g.partitions[uint64(g.idxFunc())%g.partitionCount].getOld() } type ValueWithToken struct { @@ -112,38 +91,57 @@ type ValueWithToken struct { Value Value } -func (gs *Generators) create() { - gs.logger.Info("generating partition keys, this can take a while", zap.Uint64("distribution_size", gs.distributionSize)) - start := time.Now() - routingKeyCreator := &RoutingKeyCreator{} - r := rand.New(rand.NewSource(gs.seed)) - fullSources := u64set.New() - for { - values := gs.createPartitionKeyValues(r) - hash := hash(routingKeyCreator, gs.table, values) - idx := hash % gs.size - source := gs.generators[idx] - if fullSources.Has(idx) { - continue - } - if uint64(len(source.values)) < gs.distributionSize { - source.values = append(source.values, ValueWithToken{Token: hash, Value: values}) - } - if uint64(len(source.values)) == gs.distributionSize { - gs.logger.Debug("partial generation", zap.Uint64("source", idx), zap.Int("size", len(source.values))) - fullSources.Add(idx) - } - if fullSources.Size() == len(gs.generators) { - gs.logger.Info("finished generating partition ids", zap.Duration("duration", time.Since(start))) - break - } +// GiveOld returns the supplied value for later reuse unless the value +// is empty in which case it removes the corresponding token from the +// in-flight tracking. +func (g *Generator) GiveOld(v ValueWithToken) { + select { + case <-g.t.Dying(): + return + default: } + partition := g.partitions[v.Token%g.partitionCount] + partition.giveOld(v) +} + +func (g *Generator) Stop() { + g.t.Kill(nil) + _ = g.t.Wait() +} + +func (g *Generator) start() { + g.t.Go(func() error { + g.logger.Info("starting partition key generation loop") + routingKeyCreator := &RoutingKeyCreator{} + r := rand.New(rand.NewSource(g.seed)) + var ( + cntCreated uint64 + cntEmitted uint64 + ) + for { + values := g.createPartitionKeyValues(r) + hash := hash(routingKeyCreator, g.table, values) + idx := hash % g.partitionCount + partition := g.partitions[idx] + cntCreated++ + select { + case partition.values <- ValueWithToken{Token: hash, Value: values}: + cntEmitted++ + case <-g.t.Dying(): + g.logger.Info("stopping partition key generation loop", + zap.Uint64("keys_created", cntCreated), + zap.Uint64("keys_emitted", cntEmitted)) + return nil + default: + } + } + }) } -func (gs *Generators) createPartitionKeyValues(r *rand.Rand) []interface{} { +func (g *Generator) createPartitionKeyValues(r *rand.Rand) []interface{} { var values []interface{} - for _, pk := range gs.table.PartitionKeys { - values = append(values, pk.Type.GenValue(r, gs.partitionsConfig)...) + for _, pk := range g.table.PartitionKeys { + values = append(values, pk.Type.GenValue(r, g.partitionsConfig)...) } return values } @@ -152,25 +150,3 @@ func hash(rkc *RoutingKeyCreator, t *Table, values []interface{}) uint64 { b, _ := rkc.CreateRoutingKey(t, values) return uint64(murmur.Murmur3H1(b)) } - -type syncU64set struct { - pks *u64set.Set - mu sync.Mutex -} - -func (s *syncU64set) delete(v uint64) bool { - s.mu.Lock() - _, found := s.pks.Pop2() - s.mu.Unlock() - return found -} - -func (s *syncU64set) addIfNotPresent(v uint64) bool { - s.mu.Lock() - defer s.mu.Unlock() - if s.pks.Has(v) { - return false - } - s.pks.Add(v) - return true -} diff --git a/generator_test.go b/generator_test.go index f3bb81c3..adc5f8f5 100644 --- a/generator_test.go +++ b/generator_test.go @@ -1,7 +1,6 @@ package gemini import ( - "reflect" "sync/atomic" "testing" @@ -14,28 +13,27 @@ func TestGenerator(t *testing.T) { PartitionKeys: createPkColumns(1, "pk"), } var current uint64 - cfg := &GeneratorsConfig{ - Partitions: PartitionRangeConfig{ + cfg := &GeneratorConfig{ + PartitionsRangeConfig: PartitionRangeConfig{ MaxStringLength: 10, MinStringLength: 0, MaxBlobLength: 10, MinBlobLength: 0, }, - Size: 1, + Size: 10000, PkUsedBufferSize: 10000, - DistributionSize: 1000, - DistributionFunc: func() uint64 { - return atomic.LoadUint64(¤t) + PartitionsCount: 1000, + PartitionsDistributionFunc: func() TokenIndex { + return TokenIndex(atomic.LoadUint64(¤t)) }, } logger, _ := zap.NewDevelopment() generators := NewGenerator(table, cfg, logger) - source := generators.Get(0) - for i := uint64(0); i < cfg.DistributionSize; i++ { + for i := uint64(0); i < cfg.PartitionsCount; i++ { atomic.StoreUint64(¤t, i) - v, _ := source.Get() - n, _ := source.Get() - if !reflect.DeepEqual(v, n) { + v, _ := generators.Get() + n, _ := generators.Get() + if v.Token%generators.partitionCount != n.Token%generators.partitionCount { t.Errorf("expected %v, got %v", v, n) } } diff --git a/go.mod b/go.mod index 11c5c441..168c8e38 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect gonum.org/v1/gonum v0.0.0-20190724133715-a8659125a966 gopkg.in/inf.v0 v0.9.1 + gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) replace github.com/gocql/gocql => github.com/scylladb/gocql v1.2.0 diff --git a/go.sum b/go.sum index fbccf8ff..3b23b494 100644 --- a/go.sum +++ b/go.sum @@ -144,5 +144,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/inflight/inflight.go b/inflight/inflight.go new file mode 100644 index 00000000..bd8f1c26 --- /dev/null +++ b/inflight/inflight.go @@ -0,0 +1,89 @@ +package inflight + +import ( + "sync" + + "github.com/scylladb/go-set/u64set" +) + +type InFlight interface { + AddIfNotPresent(uint64) bool + Delete(uint64) +} + +// New creates a instance of a simple InFlight set. +// It's internal data is protected by a simple sync.RWMutex. +func New() InFlight { + return newSyncU64set() +} + +func newSyncU64set() *syncU64set { + return &syncU64set{ + pks: u64set.New(), + mu: &sync.RWMutex{}, + } +} + +// NewConcurrent creates a instance of a sharded InFlight set. +// It shards the values over 256 buckets which should afford a +// decent increase in concurrency support. +func NewConcurrent() InFlight { + return newShardedSyncU64set() +} + +func newShardedSyncU64set() *shardedSyncU64set { + s := &shardedSyncU64set{} + for i := range s.shards { + s.shards[i] = newSyncU64set() + } + return s +} + +// shardedSyncU64set is a sharded InFlight implementation protected by a sync.RWLock +// which should support greater concurrency. +type shardedSyncU64set struct { + shards [256]*syncU64set +} + +func (s *shardedSyncU64set) Delete(v uint64) { + ss := s.shards[v%256] + ss.Delete(v) +} + +func (s *shardedSyncU64set) AddIfNotPresent(v uint64) bool { + ss := s.shards[v%256] + return ss.AddIfNotPresent(v) +} + +// syncU64set is an InFlight implementation protected by a sync.RWLock +type syncU64set struct { + pks *u64set.Set + mu *sync.RWMutex +} + +func (s *syncU64set) Delete(v uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.pks.Remove(v) +} + +func (s *syncU64set) AddIfNotPresent(v uint64) bool { + s.mu.RLock() + if s.pks.Has(v) { + s.mu.RUnlock() + return false + } + s.mu.RUnlock() + return s.addIfNotPresent(v) +} + +func (s *syncU64set) addIfNotPresent(v uint64) bool { + s.mu.Lock() + defer s.mu.Unlock() + if s.pks.Has(v) { + // double check + return false + } + s.pks.Add(v) + return true +} diff --git a/inflight/inflight_test.go b/inflight/inflight_test.go new file mode 100644 index 00000000..950bf237 --- /dev/null +++ b/inflight/inflight_test.go @@ -0,0 +1,100 @@ +package inflight + +import ( + "math/rand" + "reflect" + "testing" + "testing/quick" +) + +func TestAddIfNotPresent(t *testing.T) { + t.Parallel() + flight := newSyncU64set() + if !flight.AddIfNotPresent(10) { + t.Error("could not add the first value") + } + if flight.AddIfNotPresent(10) { + t.Error("value added twice") + } +} + +func TestDelete(t *testing.T) { + t.Parallel() + flight := newSyncU64set() + flight.AddIfNotPresent(10) + + flight.Delete(10) + if flight.pks.Has(10) { + t.Error("did not delete the value") + } +} + +func TestAddIfNotPresentSharded(t *testing.T) { + t.Parallel() + flight := newShardedSyncU64set() + if !flight.AddIfNotPresent(10) { + t.Error("could not add the first value") + } + if flight.AddIfNotPresent(10) { + t.Error("value added twice") + } +} + +func TestDeleteSharded(t *testing.T) { + t.Parallel() + flight := newShardedSyncU64set() + flight.AddIfNotPresent(10) + + flight.Delete(10) + if flight.shards[10%256].pks.Has(10) { + t.Error("did not delete the value") + } +} + +func TestInflight(t *testing.T) { + t.Parallel() + flight := newSyncU64set() + f := func(v uint64) interface{} { + return flight.AddIfNotPresent(v) + } + g := func(v uint64) interface{} { + flight.Delete(v) + return !flight.pks.Has(v) + } + + cfg := createQuickConfig() + if err := quick.CheckEqual(f, g, cfg); err != nil { + t.Error(err) + } +} + +func TestInflightSharded(t *testing.T) { + t.Parallel() + flight := newShardedSyncU64set() + f := func(v uint64) interface{} { + return flight.AddIfNotPresent(v) + } + g := func(v uint64) interface{} { + flight.Delete(v) + return !flight.shards[v%256].pks.Has(v) + } + + cfg := createQuickConfig() + if err := quick.CheckEqual(f, g, cfg); err != nil { + t.Error(err) + } +} + +func createQuickConfig() *quick.Config { + return &quick.Config{ + MaxCount: 2000000, + Values: func(vs []reflect.Value, r *rand.Rand) { + for i := 0; i < len(vs); i++ { + uv := r.Uint64() + v := reflect.New(reflect.TypeOf(uv)).Elem() + v.SetUint(uv) + vs[i] = v + } + }, + } +} diff --git a/partition.go b/partition.go new file mode 100644 index 00000000..e55f6e99 --- /dev/null +++ b/partition.go @@ -0,0 +1,58 @@ +package gemini + +import ( + "github.com/scylladb/gemini/inflight" + "gopkg.in/tomb.v2" +) + +type Partition struct { + values chan ValueWithToken + oldValues chan ValueWithToken + inFlight inflight.InFlight + t *tomb.Tomb +} + +// get returns a new value and ensures that it's corresponding token +// is not already in-flight. +func (s *Partition) get() (ValueWithToken, bool) { + for { + v := s.pick() + if s.inFlight.AddIfNotPresent(v.Token) { + return v, true + } + } +} + +var emptyValueWithToken = ValueWithToken{} + +// getOld returns a previously used value and token or a new if +// the old queue is empty. +func (s *Partition) getOld() (ValueWithToken, bool) { + select { + case <-s.t.Dying(): + return emptyValueWithToken, false + case v, ok := <-s.oldValues: + return v, ok + default: + return s.get() + } +} + +// giveOld returns the supplied value for later reuse unless the value +// is empty in which case it removes the corresponding token from the +// in-flight tracking. +func (s *Partition) giveOld(v ValueWithToken) { + if len(v.Value) == 0 { + s.inFlight.Delete(v.Token) + return + } + select { + case s.oldValues <- v: + default: + // Old partition buffer is full, just drop the value + } +} + +func (s *Partition) pick() ValueWithToken { + return <-s.values +} diff --git a/schema.go b/schema.go index 6a262427..12b569a2 100644 --- a/schema.go +++ b/schema.go @@ -536,7 +536,7 @@ func (s *Schema) GetCreateSchema() []string { return stmts } -func (s *Schema) GenInsertStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -549,7 +549,7 @@ func (s *Schema) GenInsertStmt(t *Table, source *Source, r *rand.Rand, p Partiti typs = append(typs, pk.Type) } - valuesWithToken, ok := source.Get() + valuesWithToken, ok := g.Get() if !ok { return nil, nil } @@ -578,11 +578,11 @@ func (s *Schema) GenInsertStmt(t *Table, source *Source, r *rand.Rand, p Partiti }, nil } -func (s *Schema) GenInsertJsonStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertJsonStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() - vals, ok := source.Get() + vals, ok := g.Get() if !ok { return nil, nil } @@ -636,7 +636,7 @@ func (s *Schema) GenInsertJsonStmt(t *Table, source *Source, r *rand.Rand, p Par }, nil } -func (s *Schema) GenDeleteRows(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenDeleteRows(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -649,7 +649,7 @@ func (s *Schema) GenDeleteRows(t *Table, source *Source, r *rand.Rand, p Partiti typs = append(typs, pk.Type) } - vs, ok := source.Get() + vs, ok := g.Get() if !ok { return nil, nil } @@ -681,30 +681,30 @@ func (s *Schema) GenDDLStmt(t *Table, r *rand.Rand, p PartitionRangeConfig, sc * } } -func (s *Schema) GenMutateStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { +func (s *Schema) GenMutateStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() if !deletes { - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } switch n := rand.Intn(1000); n { case 10, 100: - return s.GenDeleteRows(t, source, r, p) + return s.GenDeleteRows(t, g, r, p) default: switch n := rand.Intn(2); n { case 0: if t.KnownIssues[KnownIssuesJsonWithTuples] { - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } - return s.GenInsertJsonStmt(t, source, r, p) + return s.GenInsertJsonStmt(t, g, r, p) default: - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } } } -func (s *Schema) GenCheckStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) GenCheckStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { var n int if len(t.Indexes) > 0 { n = r.Intn(5) @@ -713,27 +713,27 @@ func (s *Schema) GenCheckStmt(t *Table, source *Source, r *rand.Rand, p Partitio } switch n { case 0: - return s.genSinglePartitionQuery(t, source, r, p) + return s.genSinglePartitionQuery(t, g, r, p) case 1: - return s.genMultiplePartitionQuery(t, source, r, p) + return s.genMultiplePartitionQuery(t, g, r, p) case 2: - return s.genClusteringRangeQuery(t, source, r, p) + return s.genClusteringRangeQuery(t, g, r, p) case 3: - return s.genMultiplePartitionClusteringRangeQuery(t, source, r, p) + return s.genMultiplePartitionClusteringRangeQuery(t, g, r, p) case 4: // Reducing the probability to hit these since they often take a long time to run n := r.Intn(5) switch n { case 0: - return s.genSingleIndexQuery(t, source, r, p) + return s.genSingleIndexQuery(t, g, r, p) default: - return s.genSinglePartitionQuery(t, source, r, p) + return s.genSinglePartitionQuery(t, g, r, p) } } return nil } -func (s *Schema) genSinglePartitionQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSinglePartitionQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -752,7 +752,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Source, r *rand.Rand, builder = builder.Where(qb.Eq(pk.Name)) typs = append(typs, pk.Type) } - values, ok := source.GetOld() + values, ok := g.GetOld() if !ok { return nil } @@ -765,7 +765,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Source, r *rand.Rand, } } -func (s *Schema) genMultiplePartitionQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -791,7 +791,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, source *Source, r *rand.Ran for i, pk := range partitionKeys { builder = builder.Where(qb.InTuple(pk.Name, pkNum)) for j := 0; j < pkNum; j++ { - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { return nil } @@ -808,7 +808,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, source *Source, r *rand.Ran } } -func (s *Schema) genClusteringRangeQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -827,7 +827,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, source *Source, r *rand.Rand, clusteringKeys = t.MaterializedViews[view].ClusteringKeys }*/ builder := qb.Select(s.Keyspace.Name + "." + tableName) - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { // Done or no values available... return nil @@ -858,7 +858,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, source *Source, r *rand.Rand, } } -func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -885,7 +885,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Sour for i, pk := range partitionKeys { builder = builder.Where(qb.InTuple(pk.Name, pkNum)) for j := 0; j < pkNum; j++ { - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { return nil } @@ -914,7 +914,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Sour } } -func (s *Schema) genSingleIndexQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSingleIndexQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock()